Compare commits
2 commits
56537fe004
...
1e07ed4b8d
Author | SHA1 | Date | |
---|---|---|---|
Víðir Valberg Guðmundsson | 1e07ed4b8d | ||
Víðir Valberg Guðmundsson | d5c7587e04 |
|
@ -1,6 +1,6 @@
|
|||
Title: Server-Sent Events and PostgreSQL LISTEN/NOTIFY using Djangos StreamingHttpRequest
|
||||
Date: 2023-05-17
|
||||
Status: hidden
|
||||
Status: published
|
||||
Tags: django, sse, postgresql
|
||||
Slug: django-sse-postgresql-listen-notify
|
||||
Authors: Víðir Valberg Guðmundsson
|
||||
|
@ -79,7 +79,8 @@ the view is an async view, denoted by the `async` keyword.
|
|||
)
|
||||
|
||||
We tell the `StreamingHttpResponse` class to get its streaming content from the
|
||||
`stream_messages` function. I implemented this as follows initially:
|
||||
`stream_messages` function. The following is my first initial implementation of
|
||||
`stream_messages`:
|
||||
|
||||
::python
|
||||
async def stream_messages() -> AsyncGenerator[str, None]:
|
||||
|
@ -88,7 +89,7 @@ We tell the `StreamingHttpResponse` class to get its streaming content from the
|
|||
while True:
|
||||
current_message = await ChatMessage.objects.order_by("-id").afirst()
|
||||
|
||||
# If we have a new foo yield that
|
||||
# If we have a new message yield that
|
||||
if latest_message != current_message:
|
||||
yield "data: {current_message.text}\n\n"
|
||||
latest_message = current_message
|
||||
|
@ -100,23 +101,23 @@ request from the client every 5 seconds. But we are still doing a query to the
|
|||
database every 5 seconds, and that for each client. This is not ideal and is
|
||||
probably something we could have done with a synchronous view.
|
||||
|
||||
Let's see if we can do better. But first we'll have to talk about how to run
|
||||
this code.
|
||||
Let's see if we can do better.
|
||||
|
||||
### More old tech to the rescue: PostgreSQL LISTEN/NOTIFY
|
||||
|
||||
This is where we could reach for more infrastructure which could help us giving
|
||||
the database a break. This could be listening for data in Redis (this is what
|
||||
django-channels does), or even having a queue in RabbitMQ. No matter what, it
|
||||
is more infrastructure.
|
||||
is means more infrastructure.
|
||||
|
||||
But I use PostgreSQL - and PostgreSQL is, like Django, "batteries included".
|
||||
|
||||
PostgreSQL has this mechanism called "LISTEN/NOTIFY" where one client can
|
||||
LISTEN to a channel and then anyone can NOTIFY on that same channel.
|
||||
PostgreSQL has a mechanism called "LISTEN/NOTIFY" where a client can `LISTEN`
|
||||
to a "channel" and then other clients can `NOTIFY` on that same channel which
|
||||
will be broadcasted to all listeners .
|
||||
|
||||
This seems like something we can use - but psycopg2 isn't async, so I'm not
|
||||
even sure if `sync_to_async` would help us here.
|
||||
This seems like something we can use, but the good ol' psycopg2 doesn't have async support, and I'm not
|
||||
even sure if `asgiref`'s `sync_to_async`[^3] would help us here.
|
||||
|
||||
#### Enter psycopg 3
|
||||
|
||||
|
@ -124,7 +125,7 @@ I had put the whole thing on ice until I realized that another big thing (maybe
|
|||
a bit bigger than StreamingHttpResponse) in Django 4.2 is the support for
|
||||
psycopg 3 - and psycopg 3 is very much async!
|
||||
|
||||
So I went for a stroll in the psycopg 3 documentation and found this gold[^3]:
|
||||
So I went for a stroll in the psycopg 3 documentation and struck gold[^4]:
|
||||
|
||||
::python
|
||||
import psycopg
|
||||
|
@ -141,14 +142,14 @@ This does almost what we want! It just isn't async and isn't getting connection
|
|||
info from Django.
|
||||
|
||||
So by combining the snippet from the psycopg 3 documentation and my previous
|
||||
`stream_foos` I came up with this:
|
||||
`stream_messages` I came up with this:
|
||||
|
||||
:::python
|
||||
from collections.abc import AsyncGenerator
|
||||
import psycopg
|
||||
from django.db import connection
|
||||
|
||||
async def stream_foos() -> AsyncGenerator[str, None]:
|
||||
async def stream_messages() -> AsyncGenerator[str, None]:
|
||||
|
||||
# Get the connection params from Django
|
||||
connection_params = connection.get_connection_params()
|
||||
|
@ -164,7 +165,8 @@ So by combining the snippet from the psycopg 3 documentation and my previous
|
|||
**connection_params,
|
||||
autocommit=True,
|
||||
)
|
||||
channel_name = "new_foo"
|
||||
|
||||
channel_name = "lobby"
|
||||
|
||||
async with aconnection.cursor() as acursor:
|
||||
await acursor.execute(f"LISTEN {channel_name}")
|
||||
|
@ -175,6 +177,8 @@ So by combining the snippet from the psycopg 3 documentation and my previous
|
|||
Appart from problems with the `cursor_factory` (which I'll get back to in the
|
||||
[appendix](#difference-between-42-and-421)), this code is pretty straight forward and, most importantly, works!
|
||||
|
||||
Whenever a `NOTIFY lobby, '<message>'` is issued, the `stream_messages` function
|
||||
will yield the message to the listener.
|
||||
|
||||
### Test the endpoint with curl
|
||||
|
||||
|
@ -188,7 +192,7 @@ If we connect to the endpoint using curl (`-N` disables buffering and is a way t
|
|||
And connect to our database and run:
|
||||
|
||||
:::sql
|
||||
NOTIFY new_message, 'Hello, world!';
|
||||
NOTIFY lobby, 'Hello, world!';
|
||||
|
||||
|
||||
We, excitingly, get the following result :
|
||||
|
@ -449,7 +453,7 @@ So that now looks like so:
|
|||
**connection_params,
|
||||
autocommit=True,
|
||||
)
|
||||
channel_name = "new_message"
|
||||
channel_name = "lobby"
|
||||
async with aconnection.cursor() as acursor:
|
||||
print(type(acursor))
|
||||
await acursor.execute(f"LISTEN {channel_name}")
|
||||
|
@ -458,7 +462,8 @@ So that now looks like so:
|
|||
yield f"data: {notify.payload}\n\n"
|
||||
|
||||
|
||||
[^0]: [https://docs.djangoproject.com/en/4.2/releases/4.2/#requests-and-responses]()
|
||||
[^1]: [https://docs.djangoproject.com/en/4.2/ref/request-response/#django.http.StreamingHttpResponse]()
|
||||
[^2]: [https://caniuse.com/eventsource]()
|
||||
[^3]: [https://www.psycopg.org/psycopg3/docs/advanced/async.html#index-4]()
|
||||
[^0]: <https://docs.djangoproject.com/en/4.2/releases/4.2/#requests-and-responses>
|
||||
[^1]: <https://docs.djangoproject.com/en/4.2/ref/request-response/#django.http.StreamingHttpResponse>
|
||||
[^2]: <https://caniuse.com/eventsource>
|
||||
[^3]: <https://docs.djangoproject.com/en/4.2/topics/async/#sync-to-async>
|
||||
[^4]: <https://www.psycopg.org/psycopg3/docs/advanced/async.html#index-4>
|
||||
|
|
Loading…
Reference in a new issue