520 lines
19 KiB
Markdown
520 lines
19 KiB
Markdown
Title: Writing a chat application in Django 4.2 using async StreamingHttpResponse, Server-Sent Events and PostgreSQL LISTEN/NOTIFY
|
|
Date: 2023-05-17
|
|
Status: published
|
|
Tags: django, sse, postgresql
|
|
Slug: django-sse-postgresql-listen-notify
|
|
Authors: Víðir Valberg Guðmundsson
|
|
Summary: A write-up of how I implemented server-sent events using Django 4.2 and PostgreSQL LISTEN/NOTIFY
|
|
|
|
---
|
|
|
|
With the release of Django 4.2 we got the following [^0]:
|
|
|
|
> [`StreamingHttpResponse`](https://docs.djangoproject.com/en/4.2/ref/request-response/#django.http.StreamingHttpResponse "django.http.StreamingHttpResponse") now supports async iterators when Django is served via ASGI.
|
|
|
|
And the documentation has been expanded with the following [^1]:
|
|
|
|
> When serving under ASGI, however, a [`StreamingHttpResponse`](https://docs.djangoproject.com/en/4.2/ref/request-response/#django.http.StreamingHttpResponse "django.http.StreamingHttpResponse") need not stop other requests from being served whilst waiting for I/O. This opens up the possibility of long-lived requests for streaming content and implementing patterns such as long-polling, and server-sent events.
|
|
|
|
Being a sucker for simplicity I got quite intrigued by the possibility to serve
|
|
server-sent events (also known as SSE) from Django in an asynchronous manner.
|
|
|
|
So I set out to write a small, drumroll please, chat application!
|
|
|
|
This blog post documents my process of writing this application and how the bits
|
|
and pieces fit together.
|
|
|
|
The code for the chat application can be found at
|
|
[github.com/valberg/django-sse](https://github.com/valberg/django-sse).
|
|
|
|
**Table of contents**
|
|
|
|
[TOC]
|
|
|
|
### What are server-sent events and why do we want to use them?
|
|
|
|
Server-sent events is "old tech", as in that is has been supported in major
|
|
browser since around 2010-2011 [^2]. The idea is that the client "subscribes" to
|
|
an HTTP endpoint, and the server can then issue data to the client as long as
|
|
the connection is open. This is a great performance boost compared to other
|
|
techniques as for instance polling the server.
|
|
|
|
_But wait, isn't websockets "shinier"?_
|
|
|
|
It depends. In many situations when it comes to developing web applications, we
|
|
just want a way to push data to the client, and here a bidirectional
|
|
connection like websockets feel like an overkill. Also, I would argue that using
|
|
POST/PUT requests from the client and SSE to the client might be "just enough"
|
|
compared to websockets.
|
|
|
|
SSE also has the added benefit of having a built-in reconnection mechanism,
|
|
which is something we would have to implement ourselves with websockets.
|
|
|
|
All in all SSE is a much simpler solution than websockets, and in many (most?)
|
|
cases that is all we need.
|
|
|
|
### A simple implementation of an SSE endpoint
|
|
|
|
So lets get to some code!
|
|
|
|
First we need our model for storing the chat messages:
|
|
|
|
:::python
|
|
class ChatMessage(models.Model):
|
|
user = models.CharField(max_length=255)
|
|
text = models.CharField(max_length=255)
|
|
|
|
With the model defined we can write our view to stream the messages.
|
|
|
|
The following is something along the lines of my initial attempt. First we have
|
|
to define the view, which in fact will not change for the remainder of this
|
|
blog post. The juicy bits are in the `stream_messages()` function. Note that
|
|
the view is an async view, denoted by the `async` keyword.
|
|
|
|
:::python
|
|
async def stream_messages_view(request: HttpRequest) -> StreamingHttpResponse:
|
|
return StreamingHttpResponse(
|
|
streaming_content=stream_messages(),
|
|
content_type="text/event-stream",
|
|
)
|
|
|
|
We tell the `StreamingHttpResponse` class to get its streaming content from the
|
|
`stream_messages` function. The following is my first initial implementation of
|
|
`stream_messages`:
|
|
|
|
::python
|
|
async def stream_messages() -> AsyncGenerator[str, None]:
|
|
latest_message = None
|
|
|
|
while True:
|
|
current_message = await ChatMessage.objects.order_by("-id").afirst()
|
|
|
|
# If we have a new message yield that
|
|
if latest_message != current_message:
|
|
yield "data: {current_message.text}\n\n"
|
|
latest_message = current_message
|
|
|
|
await asyncio.sleep(5)
|
|
|
|
So we've gotten rid of the HTTP overhead of polling by not having to do a
|
|
request from the client every 5 seconds. But we are still doing a query to the
|
|
database every 5 seconds, and that for each client. This is not ideal and is
|
|
probably something we could have done with a synchronous view.
|
|
|
|
Let's see if we can do better.
|
|
|
|
### More old tech to the rescue: PostgreSQL LISTEN/NOTIFY
|
|
|
|
This is where we could reach for more infrastructure which could help us giving
|
|
the database a break. This could be listening for data in Redis (this is what
|
|
django-channels does), or even having a queue in RabbitMQ. No matter what, it
|
|
is means more infrastructure.
|
|
|
|
But I use PostgreSQL - and PostgreSQL is, like Django, "batteries included".
|
|
|
|
PostgreSQL has a mechanism called "LISTEN/NOTIFY" where a client can `LISTEN`
|
|
to a "channel" and then other clients can `NOTIFY` on that same channel which
|
|
will be broadcasted to all listeners .
|
|
|
|
This seems like something we can use, but the good ol' psycopg2 doesn't have async support, and I'm not
|
|
even sure if `asgiref`'s `sync_to_async`[^3] would help us here.
|
|
|
|
#### Enter psycopg 3
|
|
|
|
I had put the whole thing on ice until I realized that another big thing (maybe
|
|
a bit bigger than StreamingHttpResponse) in Django 4.2 is the support for
|
|
psycopg 3 - and psycopg 3 is very much async!
|
|
|
|
So I went for a stroll in the psycopg 3 documentation and struck gold[^4]:
|
|
|
|
::python
|
|
import psycopg
|
|
conn = psycopg.connect("", autocommit=True)
|
|
conn.execute("LISTEN mychan")
|
|
gen = conn.notifies()
|
|
for notify in gen:
|
|
print(notify)
|
|
if notify.payload == "stop":
|
|
gen.close()
|
|
print("there, I stopped")
|
|
|
|
This does almost what we want! It just isn't async and isn't getting connection
|
|
info from Django.
|
|
|
|
So by combining the snippet from the psycopg 3 documentation and my previous
|
|
`stream_messages` I came up with this:
|
|
|
|
:::python
|
|
from collections.abc import AsyncGenerator
|
|
import psycopg
|
|
from django.db import connection
|
|
|
|
async def stream_messages() -> AsyncGenerator[str, None]:
|
|
|
|
# Get the connection params from Django
|
|
connection_params = connection.get_connection_params()
|
|
|
|
# Somehow Django 4.2.1 sets the cursor_factory to
|
|
# django.db.backends.postgresql.base.Cursor
|
|
# which causes problems. Read more about it in the
|
|
# "Differences between 4.2 and 4.2.1" section in the Appendix.
|
|
# Removing it from the connection parameters works around this.
|
|
connection_params.pop('cursor_factory')
|
|
|
|
aconnection = await psycopg.AsyncConnection.connect(
|
|
**connection_params,
|
|
autocommit=True,
|
|
)
|
|
|
|
channel_name = "lobby"
|
|
|
|
async with aconnection.cursor() as acursor:
|
|
await acursor.execute(f"LISTEN {channel_name}")
|
|
gen = aconnection.notifies()
|
|
async for notify in gen:
|
|
yield f"data: {notify.payload}\n\n"
|
|
|
|
Appart from problems with the `cursor_factory` (which I'll get back to in the
|
|
[appendix](#difference-between-42-and-421)), this code is pretty straight forward and, most importantly, works!
|
|
|
|
Whenever a `NOTIFY lobby, '<message>'` is issued, the `stream_messages` function
|
|
will yield the message to the listener.
|
|
|
|
### Test the endpoint with curl
|
|
|
|
So now we've got the `LISTEN` part in place.
|
|
|
|
If we connect to the endpoint using curl (`-N` disables buffering and is a way to consume streming content with curl):
|
|
|
|
:::console
|
|
$ curl -N http://localhost:8000/messages/
|
|
|
|
And connect to our database and run:
|
|
|
|
:::sql
|
|
NOTIFY lobby, 'Hello, world!';
|
|
|
|
|
|
We, excitingly, get the following result :
|
|
|
|
:::text
|
|
data: Hello, world!
|
|
|
|
Amazing!
|
|
|
|
### Issuing the NOTIFY command from Django
|
|
|
|
But we want the `NOTIFY` command to be issued when a new chat message is submitted.
|
|
|
|
For this we'll have a small utility function which does the heavy lifting. Note
|
|
that this is just a very simple synchronous function since everything is just
|
|
happening within a single request-response cycle.
|
|
|
|
:::python
|
|
from django.db import connection
|
|
|
|
|
|
def notify(*, channel: str, event: str, payload: str) -> None:
|
|
payload = json.dumps({
|
|
"event": event,
|
|
"content": payload,
|
|
})
|
|
with connection.cursor() as cursor:
|
|
cursor.execute(
|
|
f"NOTIFY {channel}, '{payload}'",
|
|
)
|
|
|
|
And then we can use this in our view (I'm using `@csrf_exempt` here since this is just a quick proof of concept):
|
|
|
|
:::python
|
|
@csrf_exempt
|
|
@require_POST
|
|
def post_message_view(request: HttpRequest) -> HttpResponse:
|
|
message = request.POST.get("message")
|
|
user = request.POST.get("user")
|
|
message = ChatMessage.objects.create(user=user, text=message)
|
|
notify(
|
|
channel="lobby",
|
|
event="message_created",
|
|
content=json.dumps({
|
|
"text": message.text,
|
|
"user": message.user,
|
|
})
|
|
)
|
|
return HttpResponse("OK")
|
|
|
|
The keen observer will notice that we are storing the payload content as a JSON string within a JSON string.
|
|
|
|
This is because we have two recipients of the payload. The first is the `stream_messages` function which is going to
|
|
send the payload to the client with a `event`, and the second is the browser which is going to parse the payload and use
|
|
the `event` to determine what to do with the payload.
|
|
|
|
For this we'll have to update our `stream_messages` function as follows:
|
|
|
|
:::python
|
|
async def stream_messages() -> AsyncGenerator[str, None]:
|
|
connection_params = connection.get_connection_params()
|
|
|
|
# Remove the cursor_factory parameter since I can't get
|
|
# the default from Django 4.2.1 to work.
|
|
# Django 4.2 didn't have the parameter and that worked.
|
|
connection_params.pop('cursor_factory')
|
|
|
|
aconnection = await psycopg.AsyncConnection.connect(
|
|
**connection_params,
|
|
autocommit=True,
|
|
)
|
|
channel_name = "lobby"
|
|
async with aconnection.cursor() as acursor:
|
|
await acursor.execute(f"LISTEN {channel_name}")
|
|
gen = aconnection.notifies()
|
|
async for notify in gen:
|
|
payload = json.loads(notify.payload)
|
|
event = payload.pop("event")
|
|
data = payload.pop("data")
|
|
yield f"event: {event}\ndata: {data}\n\n"
|
|
|
|
Everything is the same except that we now parse the payload from the `NOTIFY` command and construct the SSE payload with
|
|
an `event` and a `data` field. This will come in handy when dealing with the frontend.
|
|
|
|
Another way to do this would be to use Django's
|
|
[signals](https://docs.djangoproject.com/en/4.2/topics/signals/) or event
|
|
writing a PostgreSQL
|
|
[trigger](https://www.postgresql.org/docs/15/plpgsql-trigger.html) which issues
|
|
the `NOTIFY` command.
|
|
|
|
### Hooking up the frontend
|
|
|
|
Now that we've got the backend in place, we can get something up and running on
|
|
the frontend.
|
|
|
|
We could use HTMX's [SSE
|
|
extension](https://htmx.org/extensions/server-sent-events/) but for this
|
|
example we'll just use the
|
|
[EventSource](https://developer.mozilla.org/en-US/docs/Web/API/EventSource) API
|
|
directly.
|
|
|
|
:::html
|
|
<template id="message">
|
|
<div style="border: 1px solid black; margin: 5px; padding: 5px;">
|
|
<strong class="user"></strong>: <span class="message"></span>
|
|
</div>
|
|
</template>
|
|
|
|
<div id="messages"></div>
|
|
|
|
<script>
|
|
const source = new EventSource("/messages/");
|
|
|
|
// Note that the event we gave our notify utility function is called "message_created"
|
|
// so that's what we listen for here.
|
|
source.addEventListener("message_created", function(evt) {
|
|
// Parse the payload
|
|
let payload = JSON.parse(evt.data);
|
|
|
|
// Get and clone our template
|
|
let template = document.getElementById('message');
|
|
let clone = template.content.cloneNode(true);
|
|
|
|
// Update our cloned template
|
|
clone.querySelector('.user').innerText = payload.user;
|
|
clone.querySelector('.message').innerText = payload.text;
|
|
|
|
// Append the cloned template to our list of messages
|
|
document.getElementById('messages').appendChild(clone);
|
|
});
|
|
</script>
|
|
|
|
|
|
And that's it! We can now open two browser windows and see the messages appear in real time.
|
|
|
|
Check out the repo for the full code where I've also added a simple form for submitting new messages.
|
|
|
|
### Dealing with reconnections
|
|
|
|
One of the nice things about SSE is that it will automatically reconnect if the connection is lost. It even has a
|
|
mechanism for dealing with the fact that the client might have missed some events while it was disconnected.
|
|
|
|
This is done by sending a `Last-Event-ID` header with the request. The value of this header is the `id` of the last
|
|
event that the client received. The server can then use this to determine which events to send to the client.
|
|
|
|
To deal with this we can expand on our `stream_messages` function and view as follows:
|
|
|
|
:::python
|
|
async def stream_messages(last_id: int | None = None) -> AsyncGenerator[str, None]:
|
|
connection_params = connection.get_connection_params()
|
|
connection_params.pop('cursor_factory')
|
|
aconnection = await psycopg.AsyncConnection.connect(
|
|
**connection_params,
|
|
autocommit=True,
|
|
)
|
|
channel_name = "lobby"
|
|
|
|
if last_id:
|
|
messages = ChatMessage.objects.filter(id__gt=last_id)
|
|
async for message in messages:
|
|
yield f"id: {message.id}\nevent: message_created\ndata: {message.as_json()}\n\n"
|
|
|
|
async with aconnection.cursor() as acursor:
|
|
await acursor.execute(f"LISTEN {channel_name}")
|
|
gen = aconnection.notifies()
|
|
async for notify in gen:
|
|
payload = json.loads(notify.payload)
|
|
event = payload.get("event")
|
|
event_id = payload.get("event_id")
|
|
data = payload.get("data")
|
|
yield f"id: {event_id}\nevent: {event}\ndata: {data}\n\n"
|
|
|
|
|
|
async def stream_messages_view(
|
|
request: HttpRequest,
|
|
) -> StreamingHttpResponse:
|
|
last_id = request.headers.get("Last-Event-ID")
|
|
return StreamingHttpResponse(
|
|
streaming_content=stream_messages(last_id=last_id),
|
|
content_type="text/event-stream",
|
|
)
|
|
|
|
We now send the `id` of each message, and whenever a (re)connection is made we check if the client sent a `Last-Event-ID`
|
|
and if so we send all messages with an `id` greater than that.
|
|
|
|
This change does also require some changes to our utility functions and model. Those are to be found in the git repo.
|
|
|
|
### Conclusion
|
|
|
|
Django is boring, which is a good thing, to the degree where it is always the safe option. But with the advances in
|
|
async support it is becoming a viable, and shiny, option for doing real time stuff. Mix in some other solid and boring
|
|
tech like PostgreSQL and SSE, and you end up with a very solid foundation for building real time applications.
|
|
|
|
|
|
### Appendix
|
|
|
|
#### How to run ASGI applications in development
|
|
|
|
One thing that took me some time to realise is that the Django runserver is not
|
|
capable of running async views returning `StreamingHttpResponse`.
|
|
|
|
Running the view with the builtin runserver results in the following error:
|
|
|
|
:::text
|
|
.../django/http/response.py:514: Warning: StreamingHttpResponse must
|
|
consume asynchronous iterators in order to serve them synchronously.
|
|
Use a synchronous iterator instead.
|
|
|
|
Fortunately Daphne, the ASGI server which was developed to power Django Channels, has an async runserver which we can use:
|
|
|
|
To set this up we'll have to install the `daphne` package, add `daphne` to the top of our installed apps, and set
|
|
the `ASGI_APPLICATION` setting to point to our ASGI application.
|
|
|
|
:::python
|
|
INSTALLED_APPS = [
|
|
"daphne",
|
|
...
|
|
"chat", # Our app
|
|
]
|
|
|
|
ASGI_APPLICATION = "project.asgi.application"
|
|
|
|
Now we can just run `./manage.py runserver` as before and we are async ready!
|
|
|
|
|
|
#### Difference between 4.2 and 4.2.1
|
|
|
|
The code worked initially in 4.2, but 4.2.1 fixed a regression regarding
|
|
setting a custom cursor in the database configuration.
|
|
|
|
In 4.2 we get this from `connection.get_connection_params()`:
|
|
|
|
:::javascript
|
|
{
|
|
'dbname': 'postgres',
|
|
'user': 'postgres',
|
|
'password': 'postgres',
|
|
'host': 'localhost',
|
|
'port': 5432,
|
|
'context': <psycopg.adapt.AdaptersMap object at 0x7f019cda7a60>,
|
|
'prepare_threshold': None
|
|
}
|
|
|
|
in 4.2.1 we get this:
|
|
|
|
:::javascript
|
|
{
|
|
'dbname': 'postgres',
|
|
'client_encoding': 'UTF8',
|
|
'cursor_factory': <class 'django.db.backends.postgresql.base.Cursor'>,
|
|
'user': 'postgres',
|
|
'password': 'postgres',
|
|
'host': 'localhost',
|
|
'port': '5432',
|
|
'context': <psycopg.adapt.AdaptersMap object at 0x7f56464bcdd0>,
|
|
'prepare_threshold': None
|
|
}
|
|
|
|
`django.db.backends.postgresql.base.Cursor` is not async iterable.
|
|
|
|
So we can probably try to set our own `cursor_factory` in settings:
|
|
|
|
:::python
|
|
from psycopg import AsyncCursor
|
|
|
|
DATABASES = {
|
|
'default': {
|
|
'ENGINE': 'django.db.backends.postgresql',
|
|
'NAME': 'postgres',
|
|
'USER': 'postgres',
|
|
'PASSWORD': 'postgres',
|
|
'HOST': 'localhost',
|
|
'PORT': '5432',
|
|
'OPTIONS': {
|
|
"cursor_factory": AsyncCursor
|
|
}
|
|
}
|
|
}
|
|
|
|
But alas. For some reason this does not work. I guess that Django does some
|
|
wrapping of the cursor - or maybe I've just encountered a bug. The cursor is at
|
|
least not treated as an async cursor and thus we get the following error:
|
|
|
|
:::pytb
|
|
|
|
.../django-sse/venv/lib/python3.11/site-packages/django/db/backends/utils.py:41:
|
|
RuntimeWarning: coroutine 'AsyncCursor.close' was never awaited
|
|
|
|
self.close()
|
|
|
|
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
|
|
|
|
.../django-sse/venv/lib/python3.11/site-packages/django/db/models/sql/compiler.py:1560:
|
|
RuntimeWarning: coroutine 'AsyncCursor.execute' was never awaited
|
|
|
|
cursor.execute(sql, params)
|
|
|
|
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
|
|
|
|
So instead I opted for removing the `cursor_factory` in the streaming function.
|
|
So that now looks like so:
|
|
|
|
:::python
|
|
async def stream_messages() -> AsyncGenerator[str, None]:
|
|
connection_params = connection.get_connection_params()
|
|
connection_params.pop('cursor_factory')
|
|
aconnection = await psycopg.AsyncConnection.connect(
|
|
**connection_params,
|
|
autocommit=True,
|
|
)
|
|
channel_name = "lobby"
|
|
async with aconnection.cursor() as acursor:
|
|
print(type(acursor))
|
|
await acursor.execute(f"LISTEN {channel_name}")
|
|
gen = aconnection.notifies()
|
|
async for notify in gen:
|
|
yield f"data: {notify.payload}\n\n"
|
|
|
|
|
|
[^0]: <https://docs.djangoproject.com/en/4.2/releases/4.2/#requests-and-responses>
|
|
[^1]: <https://docs.djangoproject.com/en/4.2/ref/request-response/#django.http.StreamingHttpResponse>
|
|
[^2]: <https://caniuse.com/eventsource>
|
|
[^3]: <https://docs.djangoproject.com/en/4.2/topics/async/#sync-to-async>
|
|
[^4]: <https://www.psycopg.org/psycopg3/docs/advanced/async.html#index-4>
|