Compare commits

...

2 commits

Author SHA1 Message Date
Víðir Valberg Guðmundsson 1e07ed4b8d Upd. 2023-05-17 17:42:34 +02:00
Víðir Valberg Guðmundsson d5c7587e04 Upd. 2023-05-17 17:42:20 +02:00

View file

@ -1,6 +1,6 @@
Title: Server-Sent Events and PostgreSQL LISTEN/NOTIFY using Djangos StreamingHttpRequest
Date: 2023-05-17
Status: hidden
Status: published
Tags: django, sse, postgresql
Slug: django-sse-postgresql-listen-notify
Authors: Víðir Valberg Guðmundsson
@ -79,7 +79,8 @@ the view is an async view, denoted by the `async` keyword.
)
We tell the `StreamingHttpResponse` class to get its streaming content from the
`stream_messages` function. I implemented this as follows initially:
`stream_messages` function. The following is my first initial implementation of
`stream_messages`:
::python
async def stream_messages() -> AsyncGenerator[str, None]:
@ -88,7 +89,7 @@ We tell the `StreamingHttpResponse` class to get its streaming content from the
while True:
current_message = await ChatMessage.objects.order_by("-id").afirst()
# If we have a new foo yield that
# If we have a new message yield that
if latest_message != current_message:
yield "data: {current_message.text}\n\n"
latest_message = current_message
@ -100,23 +101,23 @@ request from the client every 5 seconds. But we are still doing a query to the
database every 5 seconds, and that for each client. This is not ideal and is
probably something we could have done with a synchronous view.
Let's see if we can do better. But first we'll have to talk about how to run
this code.
Let's see if we can do better.
### More old tech to the rescue: PostgreSQL LISTEN/NOTIFY
This is where we could reach for more infrastructure which could help us giving
the database a break. This could be listening for data in Redis (this is what
django-channels does), or even having a queue in RabbitMQ. No matter what, it
is more infrastructure.
is means more infrastructure.
But I use PostgreSQL - and PostgreSQL is, like Django, "batteries included".
PostgreSQL has this mechanism called "LISTEN/NOTIFY" where one client can
LISTEN to a channel and then anyone can NOTIFY on that same channel.
PostgreSQL has a mechanism called "LISTEN/NOTIFY" where a client can `LISTEN`
to a "channel" and then other clients can `NOTIFY` on that same channel which
will be broadcasted to all listeners .
This seems like something we can use - but psycopg2 isn't async, so I'm not
even sure if `sync_to_async` would help us here.
This seems like something we can use, but the good ol' psycopg2 doesn't have async support, and I'm not
even sure if `asgiref`'s `sync_to_async`[^3] would help us here.
#### Enter psycopg 3
@ -124,7 +125,7 @@ I had put the whole thing on ice until I realized that another big thing (maybe
a bit bigger than StreamingHttpResponse) in Django 4.2 is the support for
psycopg 3 - and psycopg 3 is very much async!
So I went for a stroll in the psycopg 3 documentation and found this gold[^3]:
So I went for a stroll in the psycopg 3 documentation and struck gold[^4]:
::python
import psycopg
@ -141,14 +142,14 @@ This does almost what we want! It just isn't async and isn't getting connection
info from Django.
So by combining the snippet from the psycopg 3 documentation and my previous
`stream_foos` I came up with this:
`stream_messages` I came up with this:
:::python
from collections.abc import AsyncGenerator
import psycopg
from django.db import connection
async def stream_foos() -> AsyncGenerator[str, None]:
async def stream_messages() -> AsyncGenerator[str, None]:
# Get the connection params from Django
connection_params = connection.get_connection_params()
@ -164,7 +165,8 @@ So by combining the snippet from the psycopg 3 documentation and my previous
**connection_params,
autocommit=True,
)
channel_name = "new_foo"
channel_name = "lobby"
async with aconnection.cursor() as acursor:
await acursor.execute(f"LISTEN {channel_name}")
@ -175,6 +177,8 @@ So by combining the snippet from the psycopg 3 documentation and my previous
Appart from problems with the `cursor_factory` (which I'll get back to in the
[appendix](#difference-between-42-and-421)), this code is pretty straight forward and, most importantly, works!
Whenever a `NOTIFY lobby, '<message>'` is issued, the `stream_messages` function
will yield the message to the listener.
### Test the endpoint with curl
@ -188,7 +192,7 @@ If we connect to the endpoint using curl (`-N` disables buffering and is a way t
And connect to our database and run:
:::sql
NOTIFY new_message, 'Hello, world!';
NOTIFY lobby, 'Hello, world!';
We, excitingly, get the following result :
@ -449,7 +453,7 @@ So that now looks like so:
**connection_params,
autocommit=True,
)
channel_name = "new_message"
channel_name = "lobby"
async with aconnection.cursor() as acursor:
print(type(acursor))
await acursor.execute(f"LISTEN {channel_name}")
@ -458,7 +462,8 @@ So that now looks like so:
yield f"data: {notify.payload}\n\n"
[^0]: [https://docs.djangoproject.com/en/4.2/releases/4.2/#requests-and-responses]()
[^1]: [https://docs.djangoproject.com/en/4.2/ref/request-response/#django.http.StreamingHttpResponse]()
[^2]: [https://caniuse.com/eventsource]()
[^3]: [https://www.psycopg.org/psycopg3/docs/advanced/async.html#index-4]()
[^0]: <https://docs.djangoproject.com/en/4.2/releases/4.2/#requests-and-responses>
[^1]: <https://docs.djangoproject.com/en/4.2/ref/request-response/#django.http.StreamingHttpResponse>
[^2]: <https://caniuse.com/eventsource>
[^3]: <https://docs.djangoproject.com/en/4.2/topics/async/#sync-to-async>
[^4]: <https://www.psycopg.org/psycopg3/docs/advanced/async.html#index-4>