Compare commits

..

No commits in common. "1e07ed4b8d1a0a8384762acde7148f5651143279" and "56537fe004f33068bdf0e3c42954aa42ed397713" have entirely different histories.

View file

@ -1,6 +1,6 @@
Title: Server-Sent Events and PostgreSQL LISTEN/NOTIFY using Djangos StreamingHttpRequest Title: Server-Sent Events and PostgreSQL LISTEN/NOTIFY using Djangos StreamingHttpRequest
Date: 2023-05-17 Date: 2023-05-17
Status: published Status: hidden
Tags: django, sse, postgresql Tags: django, sse, postgresql
Slug: django-sse-postgresql-listen-notify Slug: django-sse-postgresql-listen-notify
Authors: Víðir Valberg Guðmundsson Authors: Víðir Valberg Guðmundsson
@ -79,8 +79,7 @@ the view is an async view, denoted by the `async` keyword.
) )
We tell the `StreamingHttpResponse` class to get its streaming content from the We tell the `StreamingHttpResponse` class to get its streaming content from the
`stream_messages` function. The following is my first initial implementation of `stream_messages` function. I implemented this as follows initially:
`stream_messages`:
::python ::python
async def stream_messages() -> AsyncGenerator[str, None]: async def stream_messages() -> AsyncGenerator[str, None]:
@ -89,7 +88,7 @@ We tell the `StreamingHttpResponse` class to get its streaming content from the
while True: while True:
current_message = await ChatMessage.objects.order_by("-id").afirst() current_message = await ChatMessage.objects.order_by("-id").afirst()
# If we have a new message yield that # If we have a new foo yield that
if latest_message != current_message: if latest_message != current_message:
yield "data: {current_message.text}\n\n" yield "data: {current_message.text}\n\n"
latest_message = current_message latest_message = current_message
@ -101,23 +100,23 @@ request from the client every 5 seconds. But we are still doing a query to the
database every 5 seconds, and that for each client. This is not ideal and is database every 5 seconds, and that for each client. This is not ideal and is
probably something we could have done with a synchronous view. probably something we could have done with a synchronous view.
Let's see if we can do better. Let's see if we can do better. But first we'll have to talk about how to run
this code.
### More old tech to the rescue: PostgreSQL LISTEN/NOTIFY ### More old tech to the rescue: PostgreSQL LISTEN/NOTIFY
This is where we could reach for more infrastructure which could help us giving This is where we could reach for more infrastructure which could help us giving
the database a break. This could be listening for data in Redis (this is what the database a break. This could be listening for data in Redis (this is what
django-channels does), or even having a queue in RabbitMQ. No matter what, it django-channels does), or even having a queue in RabbitMQ. No matter what, it
is means more infrastructure. is more infrastructure.
But I use PostgreSQL - and PostgreSQL is, like Django, "batteries included". But I use PostgreSQL - and PostgreSQL is, like Django, "batteries included".
PostgreSQL has a mechanism called "LISTEN/NOTIFY" where a client can `LISTEN` PostgreSQL has this mechanism called "LISTEN/NOTIFY" where one client can
to a "channel" and then other clients can `NOTIFY` on that same channel which LISTEN to a channel and then anyone can NOTIFY on that same channel.
will be broadcasted to all listeners .
This seems like something we can use, but the good ol' psycopg2 doesn't have async support, and I'm not This seems like something we can use - but psycopg2 isn't async, so I'm not
even sure if `asgiref`'s `sync_to_async`[^3] would help us here. even sure if `sync_to_async` would help us here.
#### Enter psycopg 3 #### Enter psycopg 3
@ -125,7 +124,7 @@ I had put the whole thing on ice until I realized that another big thing (maybe
a bit bigger than StreamingHttpResponse) in Django 4.2 is the support for a bit bigger than StreamingHttpResponse) in Django 4.2 is the support for
psycopg 3 - and psycopg 3 is very much async! psycopg 3 - and psycopg 3 is very much async!
So I went for a stroll in the psycopg 3 documentation and struck gold[^4]: So I went for a stroll in the psycopg 3 documentation and found this gold[^3]:
::python ::python
import psycopg import psycopg
@ -142,14 +141,14 @@ This does almost what we want! It just isn't async and isn't getting connection
info from Django. info from Django.
So by combining the snippet from the psycopg 3 documentation and my previous So by combining the snippet from the psycopg 3 documentation and my previous
`stream_messages` I came up with this: `stream_foos` I came up with this:
:::python :::python
from collections.abc import AsyncGenerator from collections.abc import AsyncGenerator
import psycopg import psycopg
from django.db import connection from django.db import connection
async def stream_messages() -> AsyncGenerator[str, None]: async def stream_foos() -> AsyncGenerator[str, None]:
# Get the connection params from Django # Get the connection params from Django
connection_params = connection.get_connection_params() connection_params = connection.get_connection_params()
@ -165,8 +164,7 @@ So by combining the snippet from the psycopg 3 documentation and my previous
**connection_params, **connection_params,
autocommit=True, autocommit=True,
) )
channel_name = "new_foo"
channel_name = "lobby"
async with aconnection.cursor() as acursor: async with aconnection.cursor() as acursor:
await acursor.execute(f"LISTEN {channel_name}") await acursor.execute(f"LISTEN {channel_name}")
@ -177,8 +175,6 @@ So by combining the snippet from the psycopg 3 documentation and my previous
Appart from problems with the `cursor_factory` (which I'll get back to in the Appart from problems with the `cursor_factory` (which I'll get back to in the
[appendix](#difference-between-42-and-421)), this code is pretty straight forward and, most importantly, works! [appendix](#difference-between-42-and-421)), this code is pretty straight forward and, most importantly, works!
Whenever a `NOTIFY lobby, '<message>'` is issued, the `stream_messages` function
will yield the message to the listener.
### Test the endpoint with curl ### Test the endpoint with curl
@ -192,7 +188,7 @@ If we connect to the endpoint using curl (`-N` disables buffering and is a way t
And connect to our database and run: And connect to our database and run:
:::sql :::sql
NOTIFY lobby, 'Hello, world!'; NOTIFY new_message, 'Hello, world!';
We, excitingly, get the following result : We, excitingly, get the following result :
@ -453,7 +449,7 @@ So that now looks like so:
**connection_params, **connection_params,
autocommit=True, autocommit=True,
) )
channel_name = "lobby" channel_name = "new_message"
async with aconnection.cursor() as acursor: async with aconnection.cursor() as acursor:
print(type(acursor)) print(type(acursor))
await acursor.execute(f"LISTEN {channel_name}") await acursor.execute(f"LISTEN {channel_name}")
@ -462,8 +458,7 @@ So that now looks like so:
yield f"data: {notify.payload}\n\n" yield f"data: {notify.payload}\n\n"
[^0]: <https://docs.djangoproject.com/en/4.2/releases/4.2/#requests-and-responses> [^0]: [https://docs.djangoproject.com/en/4.2/releases/4.2/#requests-and-responses]()
[^1]: <https://docs.djangoproject.com/en/4.2/ref/request-response/#django.http.StreamingHttpResponse> [^1]: [https://docs.djangoproject.com/en/4.2/ref/request-response/#django.http.StreamingHttpResponse]()
[^2]: <https://caniuse.com/eventsource> [^2]: [https://caniuse.com/eventsource]()
[^3]: <https://docs.djangoproject.com/en/4.2/topics/async/#sync-to-async> [^3]: [https://www.psycopg.org/psycopg3/docs/advanced/async.html#index-4]()
[^4]: <https://www.psycopg.org/psycopg3/docs/advanced/async.html#index-4>