Lets Talk Async (Python) - (Old) Coroutines

You’ve probably seen it. You’ve probably heard about it. You’ve probably even used it. So what is it? Is it related to rsync? Can you do man async and know how to use it? Sadly not.

This series of posts will go through async and hopefully through every single detail of it. Why now? I had a task recently for an application where I was building a trader/exchange service. And I thought that using async was great since a lot of the “processing” involved communication between the exchange server and many trading clients but I spent more time than I should have on that part. So this series is more of a guide for myself for the future. If you find it useful, that’s just bonus.

Let’s start with coroutines and generators. Once upon a time, in a release version far far away, there were coroutines that were implemented via generators. Ok what’s a generator? A generator returns a generator iterator. What? What’s an iterator?

Well, an iterator implements the __iter__ and __next__ dunder methods. The first dunder method (__iter__) returns an iterator object that iterates through each element of the object (is it allowed to define iterator by saying iterates?). What does the second dunder method (__next__) do? It returns the next item in the sequence. Let’s have a look at a quick example before implementing our own.

Python lists, sets, dictionary, tuples are iterable. That means we can make an iterator out of them. For example:

d = {"key1":"value1", "key2":"value2", "key3":"value3"}
it = iter(d)
type(it) # dict_keyiterator
next(it) # 'key1'
next(it) # 'key2'
next(it) # 'key3'
next(it) # StopIteration exception

What’s the StopIteration exception? It is simply used to signal that the iterator is exhausted. (let it rest…) Now let’s write our own iterator by creating an object with those two dunder methods:

class MyAmazingIteratingObjectAkaIterator:
    def __iter__(self):
        self.value = 0
        return self

    def __next__(self):
        worth = self.value
        self.value += 1
        return worth

it = iter(MyAmazingIteratingObjectAkaIterator())
next(it) # 0
next(it) # 1
next(it) # 2
next(it) # 3
...
next(it) # 4,789,145,239,144
next(it) # 4,789,145,239,145
...

Why won’t it end? Because we haven’t implemented the StopIteration exception. So let’s implement that. That is essentially our termination condition.

class MyAmazingIteratingObjectAkaIterator:
    def __iter__(self):
        self.value = 0
        return self

    def __next__(self):
        if self.value <= 10:
            worth = self.value
            self.value += 1
            return worth
        else:
            raise StopIteration
            return

it = iter(MyAmazingIteratingObjectAkaIterator())

next(it) # 0
next(it) # 1
next(it) # 2
...
next(it) # 10
next(it) # StopIteration

Congratulations! You’ve just implemented an iterator. But why did we do this? Because we mentioned generator iterators and then went down the rabbit hole. Let’s go slightly higher up and quickly explain generator iterators and then coroutines. This is all important (ish), I (half) promise.

You know the iterator we coded above? We can make it a lot shorter.

def my_amazing_generator():
    value = 0
    while value <= 10:
        yield value
        value += 1

gen = my_amazing_generator()
next(gen) # 0
gen.__next__() # 1
next(gen) # 2
gen.__next__() # 3
...
next(gen) # 10
next(gen) # StopIteration

Does the same thing in only 5 lines. Why do I alternate between calling next(gen) and gen.__next__()? Just to show that the same thing happens. When we use the in-built next function, it “retrieve the next item from the iterator by calling its __next__() method” Python documentation. So that was a generator. Now finally let’s discuss coroutines, after a quick aside of course. A quick way of creating generators is via generator expressions. What are those? They are like list comprehensions and generators but they don’t get created in memory. Instead, the elements are iterated over one at a time. Consider the following two lines:

# the following creates and stores all those numbers in memory
sum([x for x in range(50000000)])
# a generator expression doesn't do that, and only creates the value when needed
sum(x for x in range(50000000))

For large data sets, generator expressions are more memory efficient.

Is it time for coroutines??? Yes it is. At least, the old style of coroutines introduced by PEP342. I haven’t seen them much lately actually but that’s the whole point of this series: to understand where we started and to see where we are now. In a coroutine, we use the yield keyword as well. Let’s have a look:

def my_first_coroutine():
    print("Welcome to your first coroutine.")
    word = yield
    print(f"Thank you for sending {word} to me.")

cr = my_first_coroutine()
# we prime (activate) the coroutine
next(cr) # Welcome to your first coroutine.
cr.send("Hello") # Thank you for sending Hello to me.
# StopIteration

So we used the yield keyword. But not like before, yield value, but word = yield. Say what? When we activated/primed the coroutine, it ran until that yield statement and then paused execution and we got control back. Then, we send in a value and the coroutine resumes and executes the following line. Since there was nothing after (such as another yield), the coroutine exited and raised the StopIteration exception. Let’s do this again.

cr = my_first_coroutine()
cr.send("Hello") # TypeError: can't send non-None value to a just-started generator

So our coroutine (remember that they are just generators) hasn’t started yet. So its state is just-started? Is there a more official name for that? To figure that out we will use the inspect module, specifically the getgeneratorstate function. You might notice there is a getcoroutinestate as well. So why not use that? That function is designed to be used with the new style coroutines (async def ones). We will get to that.

from inspect import getgeneratorstate

def my_first_coroutine():
    print("Welcome to your first coroutine.")
    word = yield
    print(f"Thank you for sending {word} to me.")
    second_word = yield
    print(f"Thank you for sending {second_word} to me.")



cr = my_first_coroutine()
getgeneratorstate(cr) # 'GEN_CREATED'
# so our coroutine is created but not yet running

# Let's prime/activate it. 
next(cr)
# Welcome to your first coroutine.
getgeneratorstate(cr) 
# 'GEN_SUSPENDED'

# so the generator ran until it got to the first yield keyword and then got suspended

# Let's send in a word
cr.send("democracy")
# Thank you for sending democracy to me.
getgeneratorstate(cr)
# 'GEN_SUSPENDED'

# generator resumed from the first yield keyword until the second one and then got suspended

cr.send("law and order")
# Thank you for sending law and order to me.
# StopIteration
getgeneratorstate(cr)
# 'GEN_CLOSED'

There is actually one more state, ‘GEN_RUNNING’ but that is its status when its running. We’d have to check it during that. Now you might be wondering why we are looking at all of these. Until Python 3.5, when async and await were introduced with PEP492, generator-based coroutines were (more) commonly used. A useful thing to do was using nested generated-based coroutines.

def time_logger():
    action = yield
    time = datetime.now().isoformat()
    return f"{action} occurred at {time}"

def logger():
    person = "Vlad"
    log = yield from time_logger()
    return f"{log} and was called by {person}"

cr = logger()
next(cr)
cr.send("Printing")

# StopIteration: Printing occurred at 2021-04-02T18:01:45.504512 and was called by Vlad

So what happened? We created the coroutine, primed it and sent in the value of “Printing”. Then it got to yield from time_logger() and the logger got paused while the time_logger() coroutine was running. Once that ended and returned the value, the time_logger coroutine terminated. Control came back to our logger coroutine where it ran till the end and closed. The yield from is essentially the await keyword. We will look at that.

Now you know what iterables are, what iterators are, what iterator generators are and what coroutines are. They were (are?) commonly used for processing pipelines for producer/consumer type of flow. There’s this presentation by David Beazley from 2009 that’s actually really good at explaining it. He then (a few years later) presented a really interesting talk at PyCon 2015 and then a few other interesting talks too.

Gonna pause here for today and resume execution when the following post is ready.