Python AsyncIO

Implements cooperative multitasking. Unlike preemptive multitasking, cooperative is vulnerable to bad actors.

Timeline

May 2001: PEP 255 was created, Simple Generators
October 2002: Twisted (Python network programming framework uses ioloop and futures) released
May 2005: PEP 342 was created, generator functions are coroutines
September 2012: Python 3.3 with yield from statement (PEP 380).
December 2012: asyncio (formerly tulip) was proposed as an enhancement of Python in order to add asynchronous I/O support.
October 2013: asyncio 0.1.1 released
October 2013: aiohttp 0.1 released
March 2014: Python 3.4 with asyncio in the standard library
September 2015: Python 3.5 with async/await statements (PEP 492)

Presentation on concurrency in Python.

A Brief History of Async talk on ThaiPy.

New syntax for generators/coroutines

There is a great talk by David Beazley on PyCon 2015 that shows how to use generators to simplify asynchronous code.

My notes on generators and coroutines.

yield from

The syntax is proposed for a generator to delegate part of its operations to another generator.

def subgenerator():

    for i in (0, 1, 2, 3):
        yield i


def generator():

    # For Python < 3.3:
    # for i in subgenerator():
    #     yield i

    yield from subgenerator()


if __name__ == '__main__':
    for i in generator():
        print(i)

# Python 2.7:
# SyntaxError: invalid syntax
# Python 3.3:
# 0
# 1
# 2
# 3

In case of coroutine:

def subcoroutine():

    while True:
        i = (yield)
        print(i)


def coroutine():

    sc = subcoroutine()

    # For Python < 3.3
    # sc.send(None)
    # while True:
    #   try:
    #       i = (yield)
    #       sc.send(i)
    #   except StopIteration:
    #       pass

    yield from sc


if __name__ == '__main__':
    c = coroutine()
    c.send(None)
    for i in (0, 1, 2, 3):
        c.send(i)

Subgenerator is allowed to return with a value, and the value is made available to the delegating generator.

return statement:

def subgenerator():

    a = (0, 1, 2, 3)
    for i in a:
        yield i
    # raises
    # SyntaxError: 'return' with argument inside generator
    # in Python < 3.3 (must use raise StopIteration() instead)
    return len(a)


def generator():
    # subgenerator return value is available
    value = yield from subgenerator()
    print(value)


if __name__ == '__main__':
    for i in subgenerator():
        print(i)

# 0
# 1
# 2
# 3

    for i in generator():
        print(i)

# 0
# 1
# 2
# 3
# 4

async/await

asyncio required that all generators meant to be used as a coroutine had to be decorated with asyncio.coroutine.

@asyncio.coroutine
def py34_coroutine():
    yield from avaitable()

Since Python 3.5 native coroutines are their own completely distinct type, before they were just enhanced generators. async syntax makes coroutines a native Python language feature, and clearly separates them from generators.

async def py35_coroutine():
    await avaitable()

Features:

async for, async with

Support for asynchronous calls is limited to expressions where yield is allowed syntactically, limiting the usefulness of syntactic features, such as with and for statements (PEP 492).

The new async with statement lets Python programs perform asynchronous calls when entering and exiting a runtime context, and the new async for statement makes it possible to perform asynchronous calls in iterators. An asynchronous context manager is a context manager that is able to suspend execution in its enter and exit methods.

class AsyncContextManager:
    async def __aenter__(self):
        await log('entering context')

    async def __aexit__(self, exc_type, exc, tb):
        await log('exiting context')

An asynchronous iterable is able to call asynchronous code in its iter implementation, and asynchronous iterator can call asynchronous code in its next method.

class AsyncIterable:
    def __aiter__(self):
        return self

    async def __anext__(self):
        data = await self.fetch_data()
        if data:
            return data
        else:
            raise StopAsyncIteration

    async def fetch_data(self):
        ...

Tasks and coroutines

See Python Tasks and coroutines documentation

Debug

loop.set_debug(True)

In debug mode many additional checks are enabled.

import gc
gc.set_debug(gc.DEBUG_UNCOLLECTABLE)

When a native coroutine is garbage collected, a RuntimeWarning is raised if it was never awaited on.

Libs

aiohttp
aiohttp.web
aiohttp session
aiohttp debugtoolbar
aiopg
aioredis / asyncio_redis
aioes
aiozmq
aio-s3
...

Datadog - aiomeasures.

aiohttp.web

Server example

from aiohttp import web


def index(request):
    return web.Response(text="Welcome home!")


my_web_app = web.Application()
my_web_app.router.add_route('GET', '/', index)

Static files serving while development

app.router.add_static('/static', '/path/to/static', name='static')

URL reverse:

app.router.static.url(filename="...")

Deployment

http://aiohttp.readthedocs.io/en/v0.22.3/gunicorn.html

pip install gunicorn

gunicorn -b 0.0.0.0:8000 -k aiohttp.worker.GunicornWebWorker -w 9 -t 60 project.app:app

Flags:

#!/bin/bash

exec .env/bin/gunicorn -b 0.0.0.0:9001 -k aiohttp.worker.GunicornWebWorker -w 2 -t 60 app:app --env=APP_EMAIL=... --env SMTP_PORT=...

Links:

How to Deploy Python WSGI Apps Using Gunicorn HTTP Server Behind Nginx

Graceful shutdown

async def init_pg(app):
    conf = app['config']['postgres']
    engine = await aiopg.sa.create_engine(
        database=conf['database'],
        user=conf['user'],
        password=conf['password'],
        host=conf['host'],
        port=conf['port'],
        minsize=conf['minsize'],
        maxsize=conf['maxsize'],
        loop=app.loop)
    app['db'] = engine

...

app.on_startup.append(init_pg)

...

async def close_pg(app):
    app['db'].close()
    await app['db'].wait_closed()

...

app.on_cleanup.append(close_pg)

https://docs.aiohttp.org/en/stable/tutorial.html#graceful-shutdown

asyncio.Semaphore

Semaphore is a synchronization tool that can be used to limit the number of coroutines that do something at some point.

sem = asyncio.Semaphore(5) 

with (yield from sem):  
    page = yield from get(url, compress=True)

Executors

ThreadPoolExecutor:

Use for networking, if no async client available.

ProcessPoolExecutor:

Use for CPU heavy jobs.

UVLoop

uvloop is a fast, drop-in replacement of the built-in asyncio event loop. uvloop is implemented in Cython and uses libuv under the hood.

There is a gunicorn worker for it: aiohttp.worker.GunicornUVLoopWebWorker.

aiopg

SQLAlchemy usage

SQLAlchemy Object Relational Tutorial

aioamqp

Retries:

import logging
import time

import asyncio
import aioamqp


RETRY_DELAY = (1, 2, 4, 8, 30, 60)

logger = logging.getLogger(__name__)
channel = None


async def consumer(channel, body, envelope, properties):
    print(body)
    await channel.basic_client_ack(delivery_tag=envelope.delivery_tag)


async def connect():
    transport, protocol = await aioamqp.connect(
        host='localhost',
        port=5672,
        login='guest',
        password='guest',
        virtualhost='/'
    )
    channel = await protocol.channel()
    await channel.queue_declare(
        queue_name='test',
        durable=True
    )
    await channel.basic_consume(
        consumer,
        queue_name='test'
    )

    return protocol, channel


async def main():
    global channel
    retry = 0
    while True:
        retry += 1
        start = time.time()
        try:
            protocol, channel = await connect()
            # https://github.com/Polyconseil/aioamqp/issues/65#issuecomment-301737344
            await protocol.wait_closed()
            channel = None
            logger.warning("Channel was closed unexpectedly.")
        except OSError:
            logger.warning("Connection attempt failed.")
        if time.time() - start > RETRY_DELAY[-1] * 2:
            # if we got connection and lost it
            retry = 1
        try:
            delay = RETRY_DELAY[retry - 1]
        except IndexError:
            delay = RETRY_DELAY[-1]
        logger.warning("Retry in {} seconds ...".format(delay))
        await asyncio.sleep(delay)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        if channel:
            loop.run_until_complete(channel.close())
            logger.warning("Connection closed in a clean way.")

socket.io

import asyncio
import json
import re

import aiohttp


async def ping(ws, interval):
    while True:
        await asyncio.sleep(interval)
        await ws.send_str('2')


async def listen():
    """
    https://www.cryptocompare.com/api/#-api-web-socket-
    # https://github.com/socketio/engine.io-protocol/blob/master/README.md
    """
    async with aiohttp.ClientSession() as session:
        async with session.ws_connect('https://streamer.cryptocompare.com/socket.io/?EIO=3&transport=websocket') as ws:
            response = await ws.receive()
            # 0{"sid":"BBl06TntyFLKVKIzALkv","upgrades":[],"pingInterval":25000,"pingTimeout":60000}
            data = json.loads(re.search(r'(\{.+\})', response.data).groups()[0])
            await ws.send_str('42["SubAdd",{"subs":["5~CCCAGG~BTC~USD","5~CCCAGG~ETH~USD"]}]')
            asyncio.ensure_future(ping(ws=ws, interval=data['pingInterval']/1000))
            async for msg in ws:
                if msg.type == aiohttp.WSMsgType.TEXT:
                    if msg.data.startswith('2'):
                        await ws.send_str('3')
                print(msg.type, msg.data)


if __name__ == '__main__':
    ioloop = asyncio.get_event_loop()
    ioloop.run_until_complete(listen())

To investigate

Native vs green threading.

"Think in coroutines" talk on PyCon Ukraine 2017 by Lukash Langa
«Asyncio stack для веб разработчика» Ігор Давиденко LvivPy#4 at YouTube, slides
«Продвинутый async/await в Python 3.5» Igor Davydenko LvivPy#5 at YouTube
Введение в aiohttp. Андрей Светлов at YouTube
Yury Selivanov - async/await in Python 3.5 and why it is awesome at YouTube
Łukasz Langa - Thinking In Coroutines - PyCon 2016 at YouTube
How the heck does async/await work in Python 3.5? by Brett Cannon
Building Apps with Asyncio on PyCon Ukraine 2017 by Nikolay Novik

PEP 342 - Coroutines via Enhanced Generators
PEP 380 - Syntax for Delegating to a Subgenerator
PEP 492 - Coroutines with async and await syntax
PEP 3156 - Asynchronous IO Support Rebooted: the "asyncio" Module

Licensed under CC BY-SA 3.0