Understanding Python's ASGI Spec

8/20/2020
A child playing on the beach

Over the past few months at work, we’ve been writing small Python-based web services that are deployed onto Kubernetes clusters. In light of this, we’ve started to see some advantages to writing services with leaner profiles where we can dedicate fewer resources (CPU and memory) and still end up with a low-latency result. While some might read this and suggest instead that we should switch to other languages (such as Go), we still have a lot of people on our team comfortable with Python, and there are advantages to continuing to deploy services that the whole team is comfortable owning. To this end, we’ve started using a newer generation of Python web frameworks based on the ASGI spec and we’ve been pleased with the results.

For a previous generation of Python web frameworks (including Django, Flask, and friends), the WSGI spec, the “Python Web Server Gateway Interface” (see PEP 333), guided how web traffic should be handled. The benefits of having a spec meant that entirely different libraries for building web servers could all reply to web requests in a generic way. This, in turn, allows something like the gunicorn project to generically serve a request to a Django app or a Flask app, for instance, without knowing any of the internal details of those libraries.

With Python’s recent adoption of async/await syntax, possibilities have opened up for building web services that exploit “cooperative multitasking”, a form of concurrency popularized by the NodeJS ecosystem. Out of this and building on their experience with the WSGI spec, the Python community has produced a new document called the ASGI spec, which guides the flow of web requests in an async world.

The ASGI spec is a readable document and by following it and we can build a basic web service. This is a good exercise for exploring and understanding the spec, although it’s probably obvious that achieving the feature set available in a framework like Starlette would require significantly more effort.

Our First Attempt: Handling scope, receive, send

The spec starts out with a description of the basic form of an ASGI service along with an example,

In its simplest form, an application can be written as an asynchronous function, like this:

async def application(scope, receive, send):
    event = await receive()
    ...
    await send({"type": "websocket.send", ...})

Although the spec covers both websockets and HTTP, we’re going to restrict our efforts to HTTP here in order to start with the smallest possible featureset. We’re also going to build on top of libraries like uvicorn and hypercorn, so that we can focus purely on implementing a service that satisfies the ASGI spec. These other libraries will translate our raw HTTP requests into something our ASGI app can respond to.

With that said, let’s try this code out. We’ll start with a function that takes these arguments and sends back some values that conform to the spec, again with a focus on HTTP requests:

async def app(scope, receive, send):
    message = await receive()
    if message["type"] == "http.request":
        body = message.get("body", b"")
        if body:
            print("Body ", body)
        # here's our response:
        await send({"type": "http.response.start", "status": 200})
        await send({"type": "http.response.body", "body": b"OK"})
  
    elif message["type"] == "http.disconnect":
        print("Disconnected! ")

For our first attempt here we are ignoring the scope argument, a dictionary that would normally give us more detail about a request. Instead, we elect to pull a single message from the receive coroutine (if we have received a message of type "http.request") and then reply with an HTTP response.

For HTTP the spec dictates that we should send two messages back to complete an HTTP request, the “start” message and the response “body”, so that’s what we’ll do:

await send({"type": "http.response.start", "status": 200})
await send({"type": "http.response.body", "body": b"OK"})

Notice that the spec mandates that both the send and receive coroutines handle Python dictionaries and that the scope argument is also a dictionary. This keeps things generic. Lastly, our response body above must be a bytes object.

Finally, in addition to our app function, we also import uvicorn and ask it to run our ASGI app:

import uvicorn

if __name__ == "__main__":
    uvicorn.run(app, host="localhost", port=8000)

Let’s give it a try!

$ curl -XPOST http://localhost:8000/
OK

It works! Here are the log messages that were emitted by uvicorn on this request:

$ python demo.py
INFO:     Started server process [87440]
INFO:     Waiting for application startup.
INFO:     ASGI 'lifespan' protocol appears unsupported.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://localhost:8000 (Press CTRL+C to quit)
INFO:     127.0.0.1:51053 - "GET / HTTP/1.1" 200 OK

Let’s see what happens when we send in a request body:

$ curl -XPOST http://localhost:8000/some-random-url -d '{"hey": "now"}'
OK

Because we’re printing the body received, we should see it in our logs:

INFO:     Started server process [87689]
INFO:     Waiting for application startup.
INFO:     ASGI 'lifespan' protocol appears unsupported.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://localhost:8000 (Press CTRL+C to quit)
Body  b'{"hey": "now"}'
INFO:     127.0.0.1:51103 - "POST /some-random-url HTTP/1.1" 200 OK

Notice also that we’re not doing anything with URL-routing. Instead, we’re replying to every request that comes in with the same response. This means we can request any arbitrary endpoint from our app and it will always respond in the exact same way.

Switching to Hypercorn

Let’s try our ultra-basic ASGI app with hypercorn and an alternative async library trio, just to see what happens.

First, we’ll modify our script to look like this:

from hypercorn.config import Config
from hypercorn.trio import serve
import trio

if __name__ == "__main__":
    config = Config()
    config.bind = ["localhost:8000"]
    trio.run(partial(serve, app, config))

After that, we’ll fire it up and see what happens when we issue a request:

$ curl http://localhost:8000
curl: (7) Failed to connect to localhost port 8000: Connection refused

Hmm. That seems problematic. What do our application logs report…

Unfortunately, there is no output in our logs until we hit CTRL+C to terminate the app, at which point we see the following:

$ python demo.py

^CTraceback (most recent call last):
  File "demo.py", line 195, in <module>
    trio.run(partial(serve, app, config))
  File ".lib/python3.8/site-packages/trio/_core/_run.py", line 1896, in run
    raise runner.main_task_outcome.error
  File ".lib/python3.8/site-packages/hypercorn/trio/__init__.py", line 42, in serve
    await worker_serve(app, config, shutdown_trigger=shutdown_trigger, task_status=task_status)
  File ".lib/python3.8/site-packages/hypercorn/trio/run.py", line 40, in worker_serve
    await lifespan.wait_for_startup()
  File ".lib/python3.8/site-packages/hypercorn/trio/lifespan.py", line 49, in wait_for_startup
    await self.startup.wait()
  File ".lib/python3.8/site-packages/trio/_sync.py", line 66, in wait
    await self._lot.park()
  File ".lib/python3.8/site-packages/trio/_core/_parking_lot.py", line 136, in park
    await _core.wait_task_rescheduled(abort_fn)
  File ".lib/python3.8/site-packages/trio/_core/_traps.py", line 166, in wait_task_rescheduled
    return (await _async_yield(WaitTaskRescheduled(abort_func))).unwrap()
  File ".lib/python3.8/site-packages/outcome/_sync.py", line 111, in unwrap
    raise captured_error
  File ".lib/python3.8/site-packages/trio/_core/_run.py", line 1107, in raise_cancel
    raise KeyboardInterrupt
KeyboardInterrupt

It’s maybe not so obvious what’s going on here, but if we switch our application instance to reply to any invocation with an HTTP response, the problem will likely become clearer (in addition to including a longer traceback).

First, we’ll temporarily replace our app function with the following code:

async def app(scope, receive, send):
    await send({"type": "http.response.start", "status": 200})
    await send({"type": "http.response.body", "body": b"OK"})

This will treat any invocation as an HTTP request and send back an HTTP response.

After that, when we run it and we send a SIGTERM, we’ll see the following:

ASGI Framework Lifespan error, continuing without Lifespan support
Traceback (most recent call last):
  File "./lib/python3.8/site-packages/hypercorn/trio/lifespan.py", line 29, in handle_lifespan
    await invoke_asgi(self.app, scope, self.asgi_receive, self.asgi_send)
  File "./lib/python3.8/site-packages/hypercorn/utils.py", line 219, in invoke_asgi
    await app(scope, receive, send)
  File "demo.py", line 156, in app_v1
    await send({"type": "http.response.start", "status": 200})
  File "./lib/python3.8/site-packages/hypercorn/trio/lifespan.py", line 77, in asgi_send
    raise UnexpectedMessage(message["type"])
hypercorn.trio.lifespan.UnexpectedMessage: http.response.start
Traceback (most recent call last):
  File "./lib/python3.8/site-packages/trio/_timeouts.py", line 105, in fail_at
    yield scope
  File "./lib/python3.8/site-packages/hypercorn/trio/lifespan.py", line 49, in wait_for_startup
    await self.startup.wait()
  File "./lib/python3.8/site-packages/trio/_sync.py", line 66, in wait
    await self._lot.park()
  File "./lib/python3.8/site-packages/trio/_core/_parking_lot.py", line 136, in park
    await _core.wait_task_rescheduled(abort_fn)
  File "./lib/python3.8/site-packages/trio/_core/_traps.py", line 166, in wait_task_rescheduled
    return (await _async_yield(WaitTaskRescheduled(abort_func))).unwrap()
  File "./lib/python3.8/site-packages/outcome/_sync.py", line 111, in unwrap
    raise captured_error
  File "./lib/python3.8/site-packages/trio/_core/_run.py", line 1096, in raise_cancel
    raise Cancelled._create()
trio.Cancelled: Cancelled

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "./lib/python3.8/site-packages/hypercorn/trio/lifespan.py", line 49, in wait_for_startup
    await self.startup.wait()
  File "./lib/python3.8/contextlib.py", line 131, in __exit__
    self.gen.throw(type, value, traceback)
  File "./lib/python3.8/site-packages/trio/_timeouts.py", line 107, in fail_at
    raise TooSlowError
trio.TooSlowError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "demo.py", line 177, in <module>
    trio.run(partial(serve, app, config))
  File "./lib/python3.8/site-packages/trio/_core/_run.py", line 1896, in run
    raise runner.main_task_outcome.error
  File "./lib/python3.8/site-packages/hypercorn/trio/__init__.py", line 42, in serve
    await worker_serve(app, config, shutdown_trigger=shutdown_trigger, task_status=task_status)
  File "./lib/python3.8/site-packages/hypercorn/trio/run.py", line 40, in worker_serve
    await lifespan.wait_for_startup()
  File "./lib/python3.8/site-packages/hypercorn/trio/lifespan.py", line 51, in wait_for_startup
    raise LifespanTimeout("startup") from error
hypercorn.utils.LifespanTimeout: Timeout whilst awaiting startup. Your application may not support the ASGI Lifespan protocol correctly, alternatively the startup_timeout configuration is incorrect.

This last error should make things a bit clearer: our application doesn’t handle lifespan events. Indeed, looking back at our logs while running with uvicorn, we’ll see an innocuous-looking message that corroborates this:

INFO:     Waiting for application startup.
INFO:     ASGI 'lifespan' protocol appears unsupported.
INFO:     Application startup complete.

It turns out that hypercorn is less forgiving about a failure to handle the lifespan protocol than uvicorn! So what is the lifespan protocol?

Lifespan Events

From the hints given by this massive traceback, we can see that our failure to run our basic service with hypercorn has to do with a failure to adequately handle lifespan events. Returning to the ASGI spec’s section on lifespan evnts, we find the following text:

The lifespan messages allow for an application to initialise and shutdown in the context of a running event loop. An example of this would be creating a connection pool and subsequently closing the connection pool to release the connections.

A possible implementation of this protocol is given below:

async def app(scope, receive, send):
    if scope['type'] == 'lifespan':
        while True:
            message = await receive()
            if message['type'] == 'lifespan.startup':
                ... # Do some startup here!
                await send({'type': 'lifespan.startup.complete'})
            elif message['type'] == 'lifespan.shutdown':
                ... # Do some shutdown here!
                await send({'type': 'lifespan.shutdown.complete'})
                return
    else:
        pass # Handle other types

With code snippets like these provided by the spec, we can quickly get started handling these events. Let’s use the example given above and join it to our existing application.

To simplify it somewhat, we’ll create a new, primary application function and then send any actual requests that are not lifespan events into our previous app function:

async def app(scope, receive, send):
    message = await receive()
    if message["type"] == "http.request":
        body = message.get("body", b"")
        if body:
            print("Body ", body)
        # here's our response:
        await send({"type": "http.response.start", "status": 200})
        await send({"type": "http.response.body", "body": b"OK"})
  
    elif message["type"] == "http.disconnect":
        print("Disconnected! ")


async def app_lifespan(scope, receive, send):
    if scope["type"] == "lifespan":
        while True:
            message = await receive()
            if message["type"] == "lifespan.startup":
                ...  # Do some startup here!
                await send({"type": "lifespan.startup.complete"})
            elif message["type"] == "lifespan.shutdown":
                ...  # Do some shutdown here!
                await send({"type": "lifespan.shutdown.complete"})
                return
    if scope["type"] == "http":
        return await app(scope, receive, send)

Let’s try this latest version with our trio/hypercorn version again:

$ python demo.py
Running on 127.0.0.1:8000 over http (CTRL + C to quit)

Now this looks a lot more promising: we no longer have error messages about unhandled “lifespan” events. Let’s issue a request and see what happens:

$ curl -XPOST http://localhost:8000/some-random-url -d '{"hey": "now"}'
OK

So it looks to be working again. After checking the logs for our service, we should see this request body appear, just as before while running with uvicorn:

Body  b'{"hey": "now"}'

Using Scope

Now, the application we’re writing would not be a very interesting web server except perhaps as some kind of odd health check. However, to perform more interesting work, we’ll need to start utlizing the scope argument passed into our app. In an HTTP request, scope is where all of the useful auxiliary information about the request (aside from the request body) would be present.

Let’s start by printing out the scope and seeing what elements we can use. First, we’ll modify our app to print the scope on each invocation:

async def app_lifespan(scope, receive, send):
    print(scope)
    if scope["type"] == "lifespan":
        while True:
          ...

With this, we can make a few requests and see what our scope looks like.

After restarting our server and rerunning the previous requests from above, we should see something like the following:

{'type': 'lifespan', 'asgi': {'spec_version': '2.0', 'version': '3.0'}}
Running on 127.0.0.1:8000 over http (CTRL + C to quit)
{'type': 'http', 'http_version': '1.1', 'asgi': {'spec_version': '2.1', 'version': '3.0'}, 'method': 'POST', 'scheme': 'http', 'path': '/some-random-url', 'raw_path': b'/some-random-url', 'query_string': b'', 'root_path': '', 'headers': [(b'host', b'localhost:8000'), (b'user-agent', b'curl/7.64.1'), (b'accept', b'*/*'), (b'content-length', b'14'), (b'content-type', b'application/x-www-form-urlencoded')], 'client': ('127.0.0.1', 50027), 'server': ('127.0.0.1', 8000)}

There are three lines of text here, two lines from printing the scope sandwiched around a message from hypercorn which tells us that our app is running.

The first printed scope is of type lifespan, while the second is of type http. This makes sense from what we’ve learned so far.

Further, from the scope dictionary present on our HTTP request, we can see various keys and values that would also be useful for constructing a more fully featured web framework:

'method': 'POST',
'scheme': 'http',
'path': '/some-random-url',
'raw_path': b'/some-random-url',
'query_string': b'',
'root_path': '',
'headers': [list-of-tuples-of-bytes]

Request Routing

Let’s assume we’d like to modify our simple ASGI app to handle some simple request routing. Here, we’ll offer only two routes, and these will represent different types of Content-Type responses. We’ll also create a way to reply to requests for unknown paths (404s).

Because we have only two types of requests, we’ll use a dictionary to map the request path to its appropriate handler and we’ll default to our 404 handler if we can’t match the path. (And it probably goes without saying that request routing can get more complicated than this, but it should suffice for demonstration purposes.)

First, we’ll create a handler for plaintext and one for JSON, like this:

async def plaintext_handler(scope, receive, send):
    await send({"type": "http.response.start", "status": 200})
    await send({"type": "http.response.body", "body": b"OK"})


async def json_handler(scope, receive, send):
    if (b"content-type", b"application/json") not in scope["headers"]:
        await unknown_handler(scope, receive, send)
        return None

    message = await receive()
    body = message.get("body", b"")
    data = json.loads(body)
    payload = {"received": data, "emperor": "of ice cream"}
    await send(
        {"type": "http.response.start", "status": 200, "headers": [(b"content-type", b"application/json")]}
    )
    await send(
        {"type": "http.response.body", "body": json.dumps(payload).encode("utf-8")}
    )

We’ll also create a fallback to handle requests for unknown paths and you may notice that we’ve already included a call to this handler above for when JSON-matched requests comes in without the proper Content-Type header:

async def unknown_handler(scope, receive, send):
    await send(
        {
            "type": "http.response.start",
            "status": 404,
            "headers": [(b"Content-Type", b"text/html; charset=UTF-8")],
        }
    )
    await send(
        {
            "type": "http.response.body",
            "body": b"<html><body><h1>404 Not Found!</h1></body></html>",
        }
    )

In this case, we’ve elected to make a fancy HTML page for our 404s (granted, it’s not that fancy).

Lastly, here’s the dictionary we’ll use to match routes to handlers when a new request comes in, defaulting to the unknown_handler when a requested path doesn’t match a known endpoint:

ROUTE_HANDLERS = {"/json": json_handler, "/plaintext": plaintext_handler}


async def app(scope, receive, send):
    handler = ROUTE_HANDLERS.get(scope["path"], unknown_handler)
    await handler(scope, receive, send)

Note: Here, I have also removed the first message = await receive() call as well as the check for message["type"] because I want my handlers to be able to pull messages out of the receive coroutine.

Finally, after making these changes and running the new version of our app in another terminal, I’ll make some requests and see what we get in response:

$ curl -XPOST http://localhost:8000/plaintext
OK
$ curl -XPOST -H "Content-Type: application/json" http://localhost:8000/json -d '{"Hey": "JSON"}'
{"received": {"Hey": "JSON"}, "emperor": "of ice cream"}
$ curl -XPOST http://localhost:8000/UNKNOWN
<html><body><h1>404 Not Found!</h1></body></html>

Notice that I am submitting the header "Content-Type: application/json", but matching the header (b"content-type", b"application/json"): this is because headers are automatically lower-cased owing to the fact that HTTP headers are meant to be case-insensitive.

As we can see, it seems to be working. We should also see that we get the same unknown_handler response if we leave off the Content-Type header when requesting our JSON endpoint:

$ curl -XPOST http://localhost:8000/json -d '{"Hey": "JSON"}'
<html><body><h1>404 Not Found!</h1></body></html>

Building Out: Background and Middleware

As I’ve stated at various points above, an extremely simple application like the above, while certainly useful for demonstrating how to write an application that meets the ASGI spec, is still pretty far from a full-fledged web server. There are hints, though, as to how to progress further: we could use the scope argument to pull out query paramaters, headers, and the requested path, and we could also use the receive coroutine to retrieve and process the request body (even in cases of streaming).

There are also other interesting aspects of the ASGI spec that we haven’t covered here, including “middleware” and “Extra Coroutines”. The latter, in fact, is one of the most appealing things to my team because for our Python applications we have typically had to launch Celery processes when we have computations that don’t want clients to have to wait for, such as for routine data management or simple tasks such as sending email. This concept of “Extra Coroutines” has been able to free us from shipping as many Celery deployments, but we do still often find ourselves reaching for it when we have CPU-intensive or lengthier processes.

On the whole the ASGI spec is a readable and informative document, and I encourage reading through it if only to understand in more detail what the async web frameworks in the Python world are all about.

Tags: python,http