Compare commits

..

No commits in common. "1e07ed4b8d1a0a8384762acde7148f5651143279" and "56537fe004f33068bdf0e3c42954aa42ed397713" have entirely different histories.

View file

@ -1,6 +1,6 @@
Title: Server-Sent Events and PostgreSQL LISTEN/NOTIFY using Djangos StreamingHttpRequest
Date: 2023-05-17
Status: published
Status: hidden
Tags: django, sse, postgresql
Slug: django-sse-postgresql-listen-notify
Authors: Víðir Valberg Guðmundsson
@ -79,8 +79,7 @@ the view is an async view, denoted by the `async` keyword.
)
We tell the `StreamingHttpResponse` class to get its streaming content from the
`stream_messages` function. The following is my first initial implementation of
`stream_messages`:
`stream_messages` function. I implemented this as follows initially:
::python
async def stream_messages() -> AsyncGenerator[str, None]:
@ -89,7 +88,7 @@ We tell the `StreamingHttpResponse` class to get its streaming content from the
while True:
current_message = await ChatMessage.objects.order_by("-id").afirst()
# If we have a new message yield that
# If we have a new foo yield that
if latest_message != current_message:
yield "data: {current_message.text}\n\n"
latest_message = current_message
@ -101,23 +100,23 @@ request from the client every 5 seconds. But we are still doing a query to the
database every 5 seconds, and that for each client. This is not ideal and is
probably something we could have done with a synchronous view.
Let's see if we can do better.
Let's see if we can do better. But first we'll have to talk about how to run
this code.
### More old tech to the rescue: PostgreSQL LISTEN/NOTIFY
This is where we could reach for more infrastructure which could help us giving
the database a break. This could be listening for data in Redis (this is what
django-channels does), or even having a queue in RabbitMQ. No matter what, it
is means more infrastructure.
is more infrastructure.
But I use PostgreSQL - and PostgreSQL is, like Django, "batteries included".
PostgreSQL has a mechanism called "LISTEN/NOTIFY" where a client can `LISTEN`
to a "channel" and then other clients can `NOTIFY` on that same channel which
will be broadcasted to all listeners .
PostgreSQL has this mechanism called "LISTEN/NOTIFY" where one client can
LISTEN to a channel and then anyone can NOTIFY on that same channel.
This seems like something we can use, but the good ol' psycopg2 doesn't have async support, and I'm not
even sure if `asgiref`'s `sync_to_async`[^3] would help us here.
This seems like something we can use - but psycopg2 isn't async, so I'm not
even sure if `sync_to_async` would help us here.
#### Enter psycopg 3
@ -125,7 +124,7 @@ I had put the whole thing on ice until I realized that another big thing (maybe
a bit bigger than StreamingHttpResponse) in Django 4.2 is the support for
psycopg 3 - and psycopg 3 is very much async!
So I went for a stroll in the psycopg 3 documentation and struck gold[^4]:
So I went for a stroll in the psycopg 3 documentation and found this gold[^3]:
::python
import psycopg
@ -142,14 +141,14 @@ This does almost what we want! It just isn't async and isn't getting connection
info from Django.
So by combining the snippet from the psycopg 3 documentation and my previous
`stream_messages` I came up with this:
`stream_foos` I came up with this:
:::python
from collections.abc import AsyncGenerator
import psycopg
from django.db import connection
async def stream_messages() -> AsyncGenerator[str, None]:
async def stream_foos() -> AsyncGenerator[str, None]:
# Get the connection params from Django
connection_params = connection.get_connection_params()
@ -165,8 +164,7 @@ So by combining the snippet from the psycopg 3 documentation and my previous
**connection_params,
autocommit=True,
)
channel_name = "lobby"
channel_name = "new_foo"
async with aconnection.cursor() as acursor:
await acursor.execute(f"LISTEN {channel_name}")
@ -177,8 +175,6 @@ So by combining the snippet from the psycopg 3 documentation and my previous
Appart from problems with the `cursor_factory` (which I'll get back to in the
[appendix](#difference-between-42-and-421)), this code is pretty straight forward and, most importantly, works!
Whenever a `NOTIFY lobby, '<message>'` is issued, the `stream_messages` function
will yield the message to the listener.
### Test the endpoint with curl
@ -192,7 +188,7 @@ If we connect to the endpoint using curl (`-N` disables buffering and is a way t
And connect to our database and run:
:::sql
NOTIFY lobby, 'Hello, world!';
NOTIFY new_message, 'Hello, world!';
We, excitingly, get the following result :
@ -453,7 +449,7 @@ So that now looks like so:
**connection_params,
autocommit=True,
)
channel_name = "lobby"
channel_name = "new_message"
async with aconnection.cursor() as acursor:
print(type(acursor))
await acursor.execute(f"LISTEN {channel_name}")
@ -462,8 +458,7 @@ So that now looks like so:
yield f"data: {notify.payload}\n\n"
[^0]: <https://docs.djangoproject.com/en/4.2/releases/4.2/#requests-and-responses>
[^1]: <https://docs.djangoproject.com/en/4.2/ref/request-response/#django.http.StreamingHttpResponse>
[^2]: <https://caniuse.com/eventsource>
[^3]: <https://docs.djangoproject.com/en/4.2/topics/async/#sync-to-async>
[^4]: <https://www.psycopg.org/psycopg3/docs/advanced/async.html#index-4>
[^0]: [https://docs.djangoproject.com/en/4.2/releases/4.2/#requests-and-responses]()
[^1]: [https://docs.djangoproject.com/en/4.2/ref/request-response/#django.http.StreamingHttpResponse]()
[^2]: [https://caniuse.com/eventsource]()
[^3]: [https://www.psycopg.org/psycopg3/docs/advanced/async.html#index-4]()