Asyncio basic I

Tutorial on how to use Asyncio in Python

This story will show the basic of using Asyncio in Python

Prerequisites

Using Python 3.10. Need to install a few packages via pip along the way, but that will be obvious once we get to that.

Concepts

Concurrency, parallelism and multitasking

Concurrency is a term used for things that happen, or are in the proceess of happening, at the same time, but is says nothing about HOW they happen. Parallelism is a form of concurrency where 2 or more tasks take place at the same time. Multitasking (preemptive and cooperative multitasking): Preemptive multitasking is when we let the OS decided which process to execute via time slicing. Cooperative multitasking is when we explicitly in the code determine when a task can be paused in order to work on another task.

Process, Thread, Multithreading and Multitasking

A Process has it own memory space and CPU "time". Multiple processes running on a CPU with multiple cores (or on mulitple processors) is called multiprocessing.
A Thread shares memory with the process it is running on. Often thought of as a lightweight process. One process can have multiple threads, and will always have minimum one (the main thread). If multiple threads are spun up to do work concurrently, we are dealing with multithreading.

GIL (Global Interpreter Lock)

The GIL prevents a Python process from executing more than one Python bytecode instruction at any given time.

Coroutines and Tasks with async def and await

Coroutines

Coroutines are just python functions defined with "async def" instead of "def". The special sauce of a coroutine is that it can pause its execution when it encounters an operation that can take some time. When this operation is done, it can wake up and continue where it let go. While the coroutine is paused, other operations can be executed and run.
Executing a coroutine will not yields it result but rater return a coroutine object. This object must be put on an event loop to be executed.

import asyncio


async def my_coroutine():
    print('My coroutine')
    return 66


r = asyncio.run(my_coroutine())
print(r)

Which will produce:

My coroutine
66

In order for us to pause the execution of a coroutine (and do something else in the meantime), we have to use the await keyword. The await keyword will cause the coroutine following it to be run (like on the event loop). The await keyword will also pause the coroutine where it is contained in until the coroutine we awaited has finished and returned a result.

Even if we do something like this:

import asyncio


async def calculate(number: int) -> int:
    return number * 100


async def say_hello_eventually(seconds: int) -> str:
    await asyncio.sleep(seconds)
    return "Hello!"


async def main() -> None:
    message = await say_hello_eventually(5)
    calculated_number = await calculate(5)

    print(calculated_number)
    print(message)


asyncio.run(main()) 

we will not achieve concurrency. We would still be waiting 5 seconds for both functions to be completed and the results printed. This is because the await keyword pauses the current coroutine and wont run anything else until the coroutine has finished. Thus, "say_hello_eventually()" is run and the main() coroutine is paused while "say_hello_eventually()" finishes, then main() is resumed until "calculate()" is reached. Main() is then paused until "calculate()" has done it work, and then execution returns to main that prints the results.

In order to achieve real concurrency, we need to work with tasks.

Tasks

A task is a wrapper around a coroutine that schedule a coroutine to run on an event loop as soon as possible.

A task is created by using "asyncio.create_task(...)", which returns a task object. This task object can be put in an await expression that will extract the return value once it is complete.

import asyncio


async def calculate(number: int) -> None:
    print(number * 100) 


async def say_hello_eventually(seconds: int) -> None:
    await asyncio.sleep(seconds)
    print("Hello there")


async def main() -> None:
    message_task = asyncio.create_task(say_hello_eventually(3)) 
    calculate_task = asyncio.create_task(calculate(5))

    await message_task
    await calculate_task


asyncio.run(main()) 

This would immediately print the result of the "calculate_task" even though this task comes after the message_task in the main() function.

Cancelling tasks and timeouts

If you need to cancel a task, then you can call the task object's "cancel()" method. This will cause the task to raise a CancelledError when it is awaited.

await_for() is a asyncio function that takes a coroutine or a task object and a timeout specified in seconds. If the coroutine or task takes more than the given timeout to complete, a TimeoutException will be raised.

Futures

A future is a Python objects that contains a single value that you expect to get in the future but may not yet have. When a Future is created it typically doesnt have a value and is considered "unresolved", "incomplete" or "not done".

The Event Loop

The function "asyncio.run()" conviently creates an event loop and runs our tasks/coroutines. If we want to manually create an event loop, this can be done by:

import asyncio


async def main() -> None:
    print("Running main")
    await asyncio.sleep(2)

loop = asyncio.new_event_loop()

try:
    loop.run_until_complete(main())
finally:
    loop.close()
    print("Done")

If we want to access the current event loop (maybe bacause you are in a jupyter notebook), you can use the function "get_running_loop()" instead.

Examples

Async web calls

Lets say we wanna call multiple web sites and get the return values. In synchronious programming we would call them one at a time, which can be time consuming. It would be better to shoot off multiple calls at the same time and then just wait for the results to come in (web calls are I/O calls, not CPU calls). For this to work, we need to work with a async enabled web library. Requests is not async enabled, so instead we will use aiohttp.

import asyncio
import aiohttp


async def make_request(url: str) -> int:
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return response.status


async def main():
    task_1 = asyncio.create_task(make_request("https://www.google.com"))
    task_2 = asyncio.create_task(make_request("https://www.facebook.com"))
    task_3 = asyncio.create_task(make_request("https://www.twitter.com"))

    r1 = await task_1
    r2 = await task_2
    r3 = await task_3

    print(r1, r2, r3)

asyncio.run(main())

References

Python Concurrency With Asyncio