Lets Talk Async Asynchronously - Native Coroutines

I always found the concept of “native coroutines” slightly puzzling as they were introduced recently. Is native the right word? Let’s find out what they are and you can form your own opinion.

Let’s resume the execution of this. In the previous post, I discussed the old style of coroutines. In this one we will introduce the new type: native coroutines as introduced in PEP492. Before we do, however, let’s discuss the event loop.

The event loop is the core of every asyncio application. Event loops run asynchronous tasks and callbacks, perform network IO operations, and run subprocesses.

So what is and what happens during the event loop? The event loop is a concept from event-driven programming. Imagine an always running (unless some exit condition is specified) loop, for example a while True. It runs the entire time and is listening for incoming events. When an event occurs, it matches that event with a specific function. For now, we will only consider one event loop running in our program. The Python documentation suggests using asyncio.run() in order to execute a coroutine and return a result: by using that it avoids having to mannually create the event loop and close it at the end. Still, let’s discuss the event loop a bit more.

Before we can run coroutines, we need a loop instance. Let’s get our event loop by calling asyncio.get_event_loop(). Please note that if you want to interact with the event loop inside a coroutine (and a callback, but let’s not do that just yet), you should call the asyncio.get_running_loop().

OK let’s go back to get_event_loop(). If there is one running in the existing thread, it will return that. If, however, there is no current event loop in that thread, asyncio will create a new one and set that as the current one.

import asyncio
loop = asyncio.get_event_loop()

Now that we have the loop, let’s see what we can do with it. We come to native coroutines. What are those? Remember how in the previous post we discussed to old type coroutines?

def logger():
    person = "Vlad"
    log = yield from time_logger()
    return f"{log} and was called by {person}"

You can still use those with async/await expressions BUT you have to decorate them with @types.coroutine. Still, there are some subtle differences so let’s look at the native ones (not sure I like what they’re called).

async def get_happiness(ig):
    data = await ig.fetch('SELECT COUNT(*) FROM likes...')
    print(data)

What’s happening in that obviously real function? First of all, we used async def to define it. That makes it a coroutine. Please note that even if there was no await keyword in there, by using async def, Python will still see it as a coroutine. Then, the variable data contains the result of some ig.fetch function. That function must, itself, be awaitable. What does that mean? What can you await on? Typically, it’s another coroutine. (You can also await an object that has an __await__ dunder method but we’ll come back to that in a future post) but other types of awaitable objects include Tasks and Futures (we will look at both of these as well). If we pass any other object besides an awaitable one to the await keyword, we will get a TypeError. Like I mentioned earlier, think of the await keyword as the yield from if you are familiar with the old style of generator-based coroutines.

So the get_happiness function above is a native coroutine function. When it is called, they return a coroutine object, i.e. until we actually call the function, it’s just a function. When it is called (awaited), it’s a coroutine object. How do we run it though? Well, we could await on it in another coroutine function… to be honest that’s probably how you will see it but I guess that doesn’t answer your question about how it’s actually run on the event loop. Another way is to use the asyncio.run() function and within that function to call the top-level entry point function. What does that function do? Well, consider the code below.

import asyncio


async def say_my_name(name):
    await asyncio.sleep(1)
    print(name)


async def top_function():
    print("Calling asyncio.sleep() for 3 seconds")
    await asyncio.sleep(3)
    print("Calling saying my name function with argument Vlad")
    await say_my_name("Vlad")
    print("Calling asyncio.sleep() for 1 second")
    await asyncio.sleep(1)
    print("Calling saying my name function with argument `async say what?`")
    await say_my_name("async say what?")


asyncio.run(top_function())

Note that the I use the asyncio.run() function in order to run the main entry-level coroutine of my program. Typically, you’ll only want ot call that once. Why? Well, the function takes care of managing the asyncio event loop and all the details under the hood (it creates it at the beginning and closes it at the end). Does that mean you can always use it? If you have another asyncio event loop already running on your thread (and I am aware I am using the concept of threads quite a lot, we will get to that), you won’t be able to call the function.

In my main top_function(), I am also running the say_my_name(name) function by awaiting on it. When I await on it, it becomes a coroutine object. Just like I said earlier about the generator-based coroutines, you can/could (depending on when you are reading this), but you really really shouldn’t, use those in await expressions. In my current version of Python, 3.9, that still works but they will be removed in Python 3.10. So stick to the native types.

We have seen two ways of running a coroutine function: awaiting it in a different coroutine and using the asyncio.run() function. What next? Well, we can also run coroutine objects concurrently as asyncio Tasks. Ah come on. More terminology? Just a few more terms (and then a lot lot more). You wanted to learn about async no? So what is a Task?

A Future-like object that runs a Python coroutine. Not thread-safe.

Now I’m just being mean. That’s what the documentation says though. I have to admit the Python async documentation feels like it was written for a library developer rather than us the commoners. (That’s basically the whole reason I am writing this series of posts: I want to better understand it myself).

We have:

  • Coroutines (discussed)
  • Tasks
  • Futures

Let’s first see how we run a Task before explaining it.

import asyncio

async def easy_task():
    return "That was easy"

async def top_function():
    scheduled_task = asyncio.create_task(easy_task())

    await task

asyncio.run(main())

The easy_task is the coroutine function. When we called it, easy_task(), it became a coroutine object. We then use the asyncio.create_task to turn that coroutine object into a Task and schedule its execution. ok, but what is a Task? Well, a Task is a Future-like object. Seriously? I thought we were talking about Tasks? Well, quick aside then. While the Task is awaiting its execution, let’s discuss our Future. Think of the Future was being the Task’s superclass. That’s why we are looking at it first.

A Future represents an eventual result of an asynchronous operation. Not thread-safe.

What about something we (read I) can understand? Imagine sometihng happening on your event loop at some point in time: the value computed by that something will be available eventually. The future is essentially that: when created its status is “not yet completed” (not bright and not orange <- terrible joke btw). So what does a Future object has? Well, when we eventually do get the result/value (i.e. when the Future is done), we can then call a callback function by using the add_done_callback() to the future. But how do we check that the Future is done? It has a done() method. We can also set and get its result: set_result() and result() respectively.

Now that you have learned a bit more about Futures (we will use them in a future post), let’s go Back from the Future, Back to the Task (that’s not the correct movie title right?). So what are Tasks and why should we use them and how are they different from Futures? Tasks can be used to run coroutines in an event loop. If there are many tasks on the loop, the loop runs one at a time. That being said, if a certain Task is awaiting the completion of a Future (remember that eventual result that will be available at some point?), the event loop will run other Tasks, callbacks and/or perform IO operations. (Adding a diagram here would be nice wouldn’t it? Let’s brush up on those LaTeX flowdiagram skills.) What methods do tasks have? The user could cancel() them (which requests the Task to be cancelled, please note that it doesn’t cancel right away), the user could check if the Task is cancelled(), the user could check to see if the Task is done() where True would be returned if that’s the case, the user could check the result() of the Task and a few other methods we will explore. Let’s see some of these in practice.

(Note to self, I should add some headings…) We first need a coroutine function that we will call to create a coroutine object. We will then create a task in order to run the coroutine in the event loop. We will await for 2 seconds, to give the event loop a chance to start the Task. Then, let’s cancel it and see what happens.

import asyncio

async def life_coroutine():
    meaning = await asyncio.sleep(12, result=42)
    print(meaning)

async def top_function():
    life_task = asyncio.create_task(life_coroutine())
    await asyncio.sleep(2)
    life_task.cancel()
    await life_task

asyncio.run(top_function())

What did it raise?

asyncio.exceptions.CancelledError

Did that run gracefully? Yes, as graceful as a cement truck gently driving over flowers. But why? Let’s think of what happens when we call cancel() on a task. The coroutine object receives an CancelledError. That exception is then raised. We could, however, intercept it and do any necessary cleaning up using a try/except/finally block. Please do not, however, suppress it unless you know what you’re doing. Let’s see

import asyncio
from datetime import datetime


async def life_coroutine():
    print(f"coroutine started running at {datetime.now()}")
    print(f"coro found await and yielded execution at {datetime.now()}")
    try:
        meaning = await asyncio.sleep(12, result=42)
    except asyncio.CancelledError:
        print(f"coroutine received cancel signal at {datetime.now()}")
        raise  # still raising the exception, not suppressing it
    finally:
        print(f"coro cleaning up at {datetime.now()}")
    print(meaning)


async def top_function():
    life_task = asyncio.create_task(life_coroutine())
    await asyncio.sleep(2)
    life_task.cancel()
    print(f"Is the coro cancelled? {life_task.cancelled()}")
    await asyncio.sleep(2)
    try:
        await life_task
    except asyncio.CancelledError:  # handling the exception raised
        print(f"coro fully cancelled at {datetime.now()}")
    print(f"Is the coro cancelled? {life_task.cancelled()}")


asyncio.run(top_function())

What is the output?

# coroutine started running at 2021-04-16 07:41:04.385064
# coro found await and yielded execution at 2021-04-16 07:41:04.385102
# Is the coro cancelled? False
# coroutine received cancel signal at 2021-04-16 07:41:06.389708
# coro cleaning up at 2021-04-16 07:41:06.389760
# coro fully cancelled at 2021-04-16 07:41:08.393532
# Is the coro cancelled? True

Let’s think of what happened. We wrapped the coroutine object in a Task. Why do I then wait? To give the event loop a chance to start it before I cancel it. What would happen if I didn’t sleep before cancelling it?

# Is the coro cancelled? False
# coro fully cancelled at 2021-04-16 07:43:31.170305
# Is the coro cancelled? True

Strange. Is it? Not really. The event loop hasn’t even had the chance to start the Task as we’re already cancelling it. Consequently, the Task “starts” by receiving the cancelled message so it never runs. Then, when we try to await it, we are just awaiting its cancellation. We get the exception and print out the cancellation time.

I think we’ll stop here in this post. In the following post we will create a few tasks simultaneously and dynamically adding/creating some named tasks. Then, I’d like to discuss our Future and the extra functionality it provides over Tasks. Then, we will discuss Streams: specifically, creating some simple client/server examples using asyncio streams and doing the same using the sockets standard library. See you soon!