Tutorial on how to use Asyncio in Python
This story will show the basic of using Asyncio in Python
Prerequisites
Using Python 3.10. Need to install a few packages via pip along the way, but that will be obvious once we get to that.
Concepts
Concurrency, parallelism and multitasking
Concurrency is a term used for things that happen, or are in the proceess of happening, at the same time, but is says nothing about HOW they happen. Parallelism is a form of concurrency where 2 or more tasks take place at the same time. Multitasking (preemptive and cooperative multitasking): Preemptive multitasking is when we let the OS decided which process to execute via time slicing. Cooperative multitasking is when we explicitly in the code determine when a task can be paused in order to work on another task.
Process, Thread, Multithreading and Multitasking
A Process has it own memory space and CPU "time". Multiple processes running on a CPU with multiple cores (or on mulitple processors) is called multiprocessing.
A Thread shares memory with the process it is running on. Often thought of as a lightweight process. One process can have multiple threads, and will always have minimum one (the main thread). If multiple threads are spun up to do work concurrently, we are dealing with multithreading.
GIL (Global Interpreter Lock)
The GIL prevents a Python process from executing more than one Python bytecode instruction at any given time.
Coroutines and Tasks with async def and await
Coroutines
Coroutines are just python functions defined with "async def" instead of "def". The special sauce of a coroutine is that it can pause its execution when it encounters an operation that can take some time. When this operation is done, it can wake up and continue where it let go. While the coroutine is paused, other operations can be executed and run.
Executing a coroutine will not yields it result but rater return a coroutine object. This object must be put on an event loop to be executed.
import asyncio
async def my_coroutine():
print('My coroutine')
return 66
r = asyncio.run(my_coroutine())
print(r)
Which will produce:
My coroutine
66
In order for us to pause the execution of a coroutine (and do something else in the meantime), we have to use the await keyword. The await keyword will cause the coroutine following it to be run (like on the event loop). The await keyword will also pause the coroutine where it is contained in until the coroutine we awaited has finished and returned a result.
Even if we do something like this:
import asyncio
async def calculate(number: int) -> int:
return number * 100
async def say_hello_eventually(seconds: int) -> str:
await asyncio.sleep(seconds)
return "Hello!"
async def main() -> None:
message = await say_hello_eventually(5)
calculated_number = await calculate(5)
print(calculated_number)
print(message)
asyncio.run(main())
we will not achieve concurrency. We would still be waiting 5 seconds for both functions to be completed and the results printed. This is because the await keyword pauses the current coroutine and wont run anything else until the coroutine has finished. Thus, "say_hello_eventually()" is run and the main() coroutine is paused while "say_hello_eventually()" finishes, then main() is resumed until "calculate()" is reached. Main() is then paused until "calculate()" has done it work, and then execution returns to main that prints the results.
In order to achieve real concurrency, we need to work with tasks.
Tasks
A task is a wrapper around a coroutine that schedule a coroutine to run on an event loop as soon as possible.
A task is created by using "asyncio.create_task(...)", which returns a task object. This task object can be put in an await expression that will extract the return value once it is complete.
import asyncio
async def calculate(number: int) -> None:
print(number * 100)
async def say_hello_eventually(seconds: int) -> None:
await asyncio.sleep(seconds)
print("Hello there")
async def main() -> None:
message_task = asyncio.create_task(say_hello_eventually(3))
calculate_task = asyncio.create_task(calculate(5))
await message_task
await calculate_task
asyncio.run(main())
This would immediately print the result of the "calculate_task" even though this task comes after the message_task in the main() function.
Cancelling tasks and timeouts
If you need to cancel a task, then you can call the task object's "cancel()" method. This will cause the task to raise a CancelledError when it is awaited.
await_for() is a asyncio function that takes a coroutine or a task object and a timeout specified in seconds. If the coroutine or task takes more than the given timeout to complete, a TimeoutException will be raised.
Futures
A future is a Python objects that contains a single value that you expect to get in the future but may not yet have. When a Future is created it typically doesnt have a value and is considered "unresolved", "incomplete" or "not done".
The Event Loop
The function "asyncio.run()" conviently creates an event loop and runs our tasks/coroutines. If we want to manually create an event loop, this can be done by:
import asyncio
async def main() -> None:
print("Running main")
await asyncio.sleep(2)
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
print("Done")
If we want to access the current event loop (maybe bacause you are in a jupyter notebook), you can use the function "get_running_loop()" instead.
Examples
Async web calls
Lets say we wanna call multiple web sites and get the return values. In synchronious programming we would call them one at a time, which can be time consuming. It would be better to shoot off multiple calls at the same time and then just wait for the results to come in (web calls are I/O calls, not CPU calls). For this to work, we need to work with a async enabled web library. Requests is not async enabled, so instead we will use aiohttp.
import asyncio
import aiohttp
async def make_request(url: str) -> int:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return response.status
async def main():
task_1 = asyncio.create_task(make_request("https://www.google.com"))
task_2 = asyncio.create_task(make_request("https://www.facebook.com"))
task_3 = asyncio.create_task(make_request("https://www.twitter.com"))
r1 = await task_1
r2 = await task_2
r3 = await task_3
print(r1, r2, r3)
asyncio.run(main())