266 lines
9.6 KiB
Markdown
266 lines
9.6 KiB
Markdown
Title: Server-Sent Events and PostgreSQL LISTEN/NOTIFY using Djangos StreamingHttpRequest
|
|
Date: 2023-05-16
|
|
Tags: django, sse, postgresql
|
|
Slug: django-sse-postgresql-listen-notify
|
|
Authors: Víðir Valberg Guðmundsson
|
|
Summary: A write-up of how I implemented server-sent events using Django 4.2 and PostgreSQL LISTEN/NOTIFY
|
|
---
|
|
|
|
|
|
|
|
With the release of Django 4.2 we got the following [0]:
|
|
|
|
> [`StreamingHttpResponse`](https://docs.djangoproject.com/en/4.2/ref/request-response/#django.http.StreamingHttpResponse "django.http.StreamingHttpResponse") now supports async iterators when Django is served via ASGI.
|
|
|
|
And the documentation has been expanded with the following [1]:
|
|
|
|
> When serving under ASGI, however, a [`StreamingHttpResponse`](https://docs.djangoproject.com/en/4.2/ref/request-response/#django.http.StreamingHttpResponse "django.http.StreamingHttpResponse") need not stop other requests from being served whilst waiting for I/O. This opens up the possibility of long-lived requests for streaming content and implementing patterns such as long-polling, and server-sent events.
|
|
|
|
Being a sucker for simplicity I got quite intrigued by the possibility to serve
|
|
server-sent events (also known as SSE) directly from Django, with no need for
|
|
additional infrastructure like Redis.
|
|
|
|
## What are server-sent events and why do we want to use them?
|
|
|
|
Server-sent events is "old tech", as in that is has been supported in major
|
|
browser since around 2010-2011 [2]. The idea is that the client "subscribes" to
|
|
a HTTP endpoint, and the server can then issue data to the client as long as
|
|
the connection is open. This is a great performance boost compared to other
|
|
techniques as for instance polling the server.
|
|
|
|
_But wait, isn't websockets "shinier"?_
|
|
|
|
It depends. In many situations when it comes to developing web applications, we
|
|
just want a way to push data to the client, and here a bi-directional
|
|
connection like websockets feel like an overkill. Also I would argue that using
|
|
POST/PUT requests from the client and SSE to the client might be "just enough"
|
|
compared to websockets.
|
|
|
|
## A simple implementation
|
|
|
|
So lets get to some code! The following is something along the lines of my
|
|
initial attempt. First we have to define the view, which in fact will not
|
|
change for the remainder of this blog post. The juicy bits are in the next
|
|
part.
|
|
|
|
:::python
|
|
async def stream_foo_view(request: HttpRequest) -> StreamingHttpResponse:
|
|
return StreamingHttpResponse(
|
|
streaming_content=stream_foos(),
|
|
content_type="text/event-stream",
|
|
)
|
|
|
|
We tell the `StreamingHttpResponse` class to get its streaming content from the
|
|
`stream_foos` function. I implemented this as follows initially:
|
|
|
|
::python
|
|
async def stream_foos() -> AsyncGenerator[str, None]:
|
|
latest_foo = None
|
|
|
|
while True:
|
|
current_foo = await Foo.objects.order_by("-id").afirst()
|
|
|
|
# If we have a new foo yield that
|
|
if latest_foo != current_foo:
|
|
yield "data: {current_foo.text}\n\n"
|
|
latest_foo = current_foo
|
|
|
|
await asyncio.sleep(5)
|
|
|
|
So we've gotten rid of the HTTP overhead of polling by not having to do a
|
|
request from the client every 5 seconds. But we are still doing a query to the
|
|
database every 5 seconds, and that for each client.
|
|
|
|
### Aside: Use an ASGI server for development
|
|
|
|
One thing that took me some time to realise is that the Django runserver is not
|
|
capable of running async views returning `StreamingHttpResponse`.
|
|
|
|
Running the above view with the runserver results in the following error:
|
|
|
|
:::text
|
|
.../django/http/response.py:514: Warning: StreamingHttpResponse must
|
|
consume asynchronous iterators in order to serve them synchronously.
|
|
Use a synchronous iterator instead.
|
|
|
|
So I had to result to installing uvicorn and run my project as so:
|
|
|
|
:::bash
|
|
$ uvicorn --log-level debug --reload project.asgi:application`
|
|
|
|
The `--reload` part is particulary important when doing development.
|
|
|
|
## More old tech to the rescue: PostgreSQL LISTEN/NOTIFY
|
|
|
|
This is where we could reach for more infrastructure which could help us giving
|
|
the database a break. This could be listening for data in Redis (this is what
|
|
django-channels does), or even having a queue in RabbitMQ. No matter what, it
|
|
is more infrastructure.
|
|
|
|
But I use PostgreSQL - and PostgreSQL is, like Django, "batteries included".
|
|
|
|
PostgreSQL has this mechanism called "LISTEN/NOTIFY" where one client can
|
|
LISTEN to a channel and then anyone can NOTIFY on that same channel.
|
|
|
|
This seems like something we can use - but psycopg2 isn't async, so I'm not
|
|
even sure if `sync_to_async` would help us here.
|
|
|
|
## Enter psycopg 3
|
|
|
|
I had put the whole thing on ice until I realized that another big thing (maybe
|
|
a bit bigger than StreamingHttpResponse) in Django 4.2 is the support for
|
|
psycopg 3 - and psycopg 3 is very much async!
|
|
|
|
So I went for a stroll in the psycopg 3 documentation and found this gold[3]:
|
|
|
|
::python
|
|
import psycopg
|
|
conn = psycopg.connect("", autocommit=True)
|
|
conn.execute("LISTEN mychan")
|
|
gen = conn.notifies()
|
|
for notify in gen:
|
|
print(notify)
|
|
if notify.payload == "stop":
|
|
gen.close()
|
|
print("there, I stopped")
|
|
|
|
This does almost what we want! It just isn't async and isn't getting connection info from Django.
|
|
|
|
So by combining the snippet from the psycopg 3 documentation and my previous
|
|
`stream_foos` I came up with this:
|
|
|
|
:::python
|
|
from collections.abc import AsyncGenerator
|
|
import psycopg
|
|
from django.db import connection
|
|
|
|
async def stream_foos() -> AsyncGenerator[str, None]:
|
|
connection_params = connection.get_connection_params()
|
|
connection_params.pop('cursor_factory')
|
|
aconnection = await psycopg.AsyncConnection.connect(
|
|
**connection_params,
|
|
autocommit=True,
|
|
)
|
|
channel_name = "new_foo"
|
|
|
|
async with aconnection.cursor() as acursor:
|
|
await acursor.execute(f"LISTEN {channel_name}")
|
|
gen = aconnection.notifies()
|
|
async for notify in gen:
|
|
yield f"data: {notify.payload}\n\n"
|
|
|
|
I was almost about to give up again, since this approach didn't work initially.
|
|
All because I for some reason had removed the `autocommit=True` in my attempts
|
|
to async-ify the snippet from the psycopg 3 documentation.
|
|
|
|
### Issuing the NOTIFY
|
|
|
|
- using a signal handler
|
|
- setting up triggers manually - django-pgtrigger is psycopg2 only
|
|
|
|
### Frontend stuff
|
|
|
|
- Simple `EventSource` example
|
|
- Use HTMX
|
|
|
|
|
|
|
|
### Difference between 4.2 and 4.2.1
|
|
|
|
the code worked initially in 4.2, but 4.2.1 fixed a regression regarding
|
|
setting a custom cursor in the database configuration.
|
|
|
|
In 4.2 we get this from `connection.get_connection_params()`:
|
|
|
|
:::javascript
|
|
{
|
|
'dbname': 'postgres',
|
|
'user': 'postgres',
|
|
'password': 'postgres',
|
|
'host': 'localhost',
|
|
'port': 5432,
|
|
'context': <psycopg.adapt.AdaptersMap object at 0x7f019cda7a60>,
|
|
'prepare_threshold': None
|
|
}
|
|
|
|
in 4.2.1 we get this:
|
|
|
|
:::javascript
|
|
{
|
|
'dbname': 'postgres',
|
|
'client_encoding': 'UTF8',
|
|
'cursor_factory': <class 'django.db.backends.postgresql.base.Cursor'>,
|
|
'user': 'postgres',
|
|
'password': 'postgres',
|
|
'host': 'localhost',
|
|
'port': '5432',
|
|
'context': <psycopg.adapt.AdaptersMap object at 0x7f56464bcdd0>,
|
|
'prepare_threshold': None
|
|
}
|
|
|
|
`django.db.backends.postgresql.base.Cursor` is not async iterable.
|
|
|
|
So we can probably try to set our own `cursor_factory` in settings:
|
|
|
|
:::python
|
|
from psycopg import AsyncCursor
|
|
|
|
DATABASES = {
|
|
'default': {
|
|
'ENGINE': 'django.db.backends.postgresql',
|
|
'NAME': 'postgres',
|
|
'USER': 'postgres',
|
|
'PASSWORD': 'postgres',
|
|
'HOST': 'localhost',
|
|
'PORT': '5432',
|
|
'OPTIONS': {
|
|
"cursor_factory": AsyncCursor
|
|
}
|
|
}
|
|
}
|
|
|
|
But alas. For some reason this does not work. I guess that Django does some
|
|
wrapping of the cursor - or maybe I've just encountered a bug. The cursor is at
|
|
least not treated as an async cursor and thus we get the following error:
|
|
|
|
:::pytb
|
|
|
|
.../django-sse/venv/lib/python3.11/site-packages/django/db/backends/utils.py:41:
|
|
RuntimeWarning: coroutine 'AsyncCursor.close' was never awaited
|
|
|
|
self.close()
|
|
|
|
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
|
|
|
|
.../django-sse/venv/lib/python3.11/site-packages/django/db/models/sql/compiler.py:1560:
|
|
RuntimeWarning: coroutine 'AsyncCursor.execute' was never awaited
|
|
|
|
cursor.execute(sql, params)
|
|
|
|
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
|
|
|
|
So instead I opted for removing the `cursor_factory` in the streaming function.
|
|
So that now looks like so:
|
|
|
|
:::python
|
|
async def stream_messages() -> AsyncGenerator[str, None]:
|
|
connection_params = connection.get_connection_params()
|
|
connection_params.pop('cursor_factory')
|
|
aconnection = await psycopg.AsyncConnection.connect(
|
|
**connection_params,
|
|
autocommit=True,
|
|
)
|
|
channel_name = "new_message"
|
|
async with aconnection.cursor() as acursor:
|
|
print(type(acursor))
|
|
await acursor.execute(f"LISTEN {channel_name}")
|
|
gen = aconnection.notifies()
|
|
async for notify in gen:
|
|
yield f"data: {notify.payload}\n\n"
|
|
|
|
|
|
[0]: https://docs.djangoproject.com/en/4.2/releases/4.2/#requests-and-responses
|
|
[1]: https://docs.djangoproject.com/en/4.2/ref/request-response/#django.http.StreamingHttpResponse
|
|
[2]: https://caniuse.com/eventsource
|
|
[3]:https://www.psycopg.org/psycopg3/docs/advanced/async.html#index-4
|