Compare commits
2 commits
56537fe004
...
1e07ed4b8d
Author | SHA1 | Date | |
---|---|---|---|
Víðir Valberg Guðmundsson | 1e07ed4b8d | ||
Víðir Valberg Guðmundsson | d5c7587e04 |
|
@ -1,6 +1,6 @@
|
||||||
Title: Server-Sent Events and PostgreSQL LISTEN/NOTIFY using Djangos StreamingHttpRequest
|
Title: Server-Sent Events and PostgreSQL LISTEN/NOTIFY using Djangos StreamingHttpRequest
|
||||||
Date: 2023-05-17
|
Date: 2023-05-17
|
||||||
Status: hidden
|
Status: published
|
||||||
Tags: django, sse, postgresql
|
Tags: django, sse, postgresql
|
||||||
Slug: django-sse-postgresql-listen-notify
|
Slug: django-sse-postgresql-listen-notify
|
||||||
Authors: Víðir Valberg Guðmundsson
|
Authors: Víðir Valberg Guðmundsson
|
||||||
|
@ -79,7 +79,8 @@ the view is an async view, denoted by the `async` keyword.
|
||||||
)
|
)
|
||||||
|
|
||||||
We tell the `StreamingHttpResponse` class to get its streaming content from the
|
We tell the `StreamingHttpResponse` class to get its streaming content from the
|
||||||
`stream_messages` function. I implemented this as follows initially:
|
`stream_messages` function. The following is my first initial implementation of
|
||||||
|
`stream_messages`:
|
||||||
|
|
||||||
::python
|
::python
|
||||||
async def stream_messages() -> AsyncGenerator[str, None]:
|
async def stream_messages() -> AsyncGenerator[str, None]:
|
||||||
|
@ -88,7 +89,7 @@ We tell the `StreamingHttpResponse` class to get its streaming content from the
|
||||||
while True:
|
while True:
|
||||||
current_message = await ChatMessage.objects.order_by("-id").afirst()
|
current_message = await ChatMessage.objects.order_by("-id").afirst()
|
||||||
|
|
||||||
# If we have a new foo yield that
|
# If we have a new message yield that
|
||||||
if latest_message != current_message:
|
if latest_message != current_message:
|
||||||
yield "data: {current_message.text}\n\n"
|
yield "data: {current_message.text}\n\n"
|
||||||
latest_message = current_message
|
latest_message = current_message
|
||||||
|
@ -100,23 +101,23 @@ request from the client every 5 seconds. But we are still doing a query to the
|
||||||
database every 5 seconds, and that for each client. This is not ideal and is
|
database every 5 seconds, and that for each client. This is not ideal and is
|
||||||
probably something we could have done with a synchronous view.
|
probably something we could have done with a synchronous view.
|
||||||
|
|
||||||
Let's see if we can do better. But first we'll have to talk about how to run
|
Let's see if we can do better.
|
||||||
this code.
|
|
||||||
|
|
||||||
### More old tech to the rescue: PostgreSQL LISTEN/NOTIFY
|
### More old tech to the rescue: PostgreSQL LISTEN/NOTIFY
|
||||||
|
|
||||||
This is where we could reach for more infrastructure which could help us giving
|
This is where we could reach for more infrastructure which could help us giving
|
||||||
the database a break. This could be listening for data in Redis (this is what
|
the database a break. This could be listening for data in Redis (this is what
|
||||||
django-channels does), or even having a queue in RabbitMQ. No matter what, it
|
django-channels does), or even having a queue in RabbitMQ. No matter what, it
|
||||||
is more infrastructure.
|
is means more infrastructure.
|
||||||
|
|
||||||
But I use PostgreSQL - and PostgreSQL is, like Django, "batteries included".
|
But I use PostgreSQL - and PostgreSQL is, like Django, "batteries included".
|
||||||
|
|
||||||
PostgreSQL has this mechanism called "LISTEN/NOTIFY" where one client can
|
PostgreSQL has a mechanism called "LISTEN/NOTIFY" where a client can `LISTEN`
|
||||||
LISTEN to a channel and then anyone can NOTIFY on that same channel.
|
to a "channel" and then other clients can `NOTIFY` on that same channel which
|
||||||
|
will be broadcasted to all listeners .
|
||||||
|
|
||||||
This seems like something we can use - but psycopg2 isn't async, so I'm not
|
This seems like something we can use, but the good ol' psycopg2 doesn't have async support, and I'm not
|
||||||
even sure if `sync_to_async` would help us here.
|
even sure if `asgiref`'s `sync_to_async`[^3] would help us here.
|
||||||
|
|
||||||
#### Enter psycopg 3
|
#### Enter psycopg 3
|
||||||
|
|
||||||
|
@ -124,7 +125,7 @@ I had put the whole thing on ice until I realized that another big thing (maybe
|
||||||
a bit bigger than StreamingHttpResponse) in Django 4.2 is the support for
|
a bit bigger than StreamingHttpResponse) in Django 4.2 is the support for
|
||||||
psycopg 3 - and psycopg 3 is very much async!
|
psycopg 3 - and psycopg 3 is very much async!
|
||||||
|
|
||||||
So I went for a stroll in the psycopg 3 documentation and found this gold[^3]:
|
So I went for a stroll in the psycopg 3 documentation and struck gold[^4]:
|
||||||
|
|
||||||
::python
|
::python
|
||||||
import psycopg
|
import psycopg
|
||||||
|
@ -141,14 +142,14 @@ This does almost what we want! It just isn't async and isn't getting connection
|
||||||
info from Django.
|
info from Django.
|
||||||
|
|
||||||
So by combining the snippet from the psycopg 3 documentation and my previous
|
So by combining the snippet from the psycopg 3 documentation and my previous
|
||||||
`stream_foos` I came up with this:
|
`stream_messages` I came up with this:
|
||||||
|
|
||||||
:::python
|
:::python
|
||||||
from collections.abc import AsyncGenerator
|
from collections.abc import AsyncGenerator
|
||||||
import psycopg
|
import psycopg
|
||||||
from django.db import connection
|
from django.db import connection
|
||||||
|
|
||||||
async def stream_foos() -> AsyncGenerator[str, None]:
|
async def stream_messages() -> AsyncGenerator[str, None]:
|
||||||
|
|
||||||
# Get the connection params from Django
|
# Get the connection params from Django
|
||||||
connection_params = connection.get_connection_params()
|
connection_params = connection.get_connection_params()
|
||||||
|
@ -164,7 +165,8 @@ So by combining the snippet from the psycopg 3 documentation and my previous
|
||||||
**connection_params,
|
**connection_params,
|
||||||
autocommit=True,
|
autocommit=True,
|
||||||
)
|
)
|
||||||
channel_name = "new_foo"
|
|
||||||
|
channel_name = "lobby"
|
||||||
|
|
||||||
async with aconnection.cursor() as acursor:
|
async with aconnection.cursor() as acursor:
|
||||||
await acursor.execute(f"LISTEN {channel_name}")
|
await acursor.execute(f"LISTEN {channel_name}")
|
||||||
|
@ -175,6 +177,8 @@ So by combining the snippet from the psycopg 3 documentation and my previous
|
||||||
Appart from problems with the `cursor_factory` (which I'll get back to in the
|
Appart from problems with the `cursor_factory` (which I'll get back to in the
|
||||||
[appendix](#difference-between-42-and-421)), this code is pretty straight forward and, most importantly, works!
|
[appendix](#difference-between-42-and-421)), this code is pretty straight forward and, most importantly, works!
|
||||||
|
|
||||||
|
Whenever a `NOTIFY lobby, '<message>'` is issued, the `stream_messages` function
|
||||||
|
will yield the message to the listener.
|
||||||
|
|
||||||
### Test the endpoint with curl
|
### Test the endpoint with curl
|
||||||
|
|
||||||
|
@ -188,7 +192,7 @@ If we connect to the endpoint using curl (`-N` disables buffering and is a way t
|
||||||
And connect to our database and run:
|
And connect to our database and run:
|
||||||
|
|
||||||
:::sql
|
:::sql
|
||||||
NOTIFY new_message, 'Hello, world!';
|
NOTIFY lobby, 'Hello, world!';
|
||||||
|
|
||||||
|
|
||||||
We, excitingly, get the following result :
|
We, excitingly, get the following result :
|
||||||
|
@ -449,7 +453,7 @@ So that now looks like so:
|
||||||
**connection_params,
|
**connection_params,
|
||||||
autocommit=True,
|
autocommit=True,
|
||||||
)
|
)
|
||||||
channel_name = "new_message"
|
channel_name = "lobby"
|
||||||
async with aconnection.cursor() as acursor:
|
async with aconnection.cursor() as acursor:
|
||||||
print(type(acursor))
|
print(type(acursor))
|
||||||
await acursor.execute(f"LISTEN {channel_name}")
|
await acursor.execute(f"LISTEN {channel_name}")
|
||||||
|
@ -458,7 +462,8 @@ So that now looks like so:
|
||||||
yield f"data: {notify.payload}\n\n"
|
yield f"data: {notify.payload}\n\n"
|
||||||
|
|
||||||
|
|
||||||
[^0]: [https://docs.djangoproject.com/en/4.2/releases/4.2/#requests-and-responses]()
|
[^0]: <https://docs.djangoproject.com/en/4.2/releases/4.2/#requests-and-responses>
|
||||||
[^1]: [https://docs.djangoproject.com/en/4.2/ref/request-response/#django.http.StreamingHttpResponse]()
|
[^1]: <https://docs.djangoproject.com/en/4.2/ref/request-response/#django.http.StreamingHttpResponse>
|
||||||
[^2]: [https://caniuse.com/eventsource]()
|
[^2]: <https://caniuse.com/eventsource>
|
||||||
[^3]: [https://www.psycopg.org/psycopg3/docs/advanced/async.html#index-4]()
|
[^3]: <https://docs.djangoproject.com/en/4.2/topics/async/#sync-to-async>
|
||||||
|
[^4]: <https://www.psycopg.org/psycopg3/docs/advanced/async.html#index-4>
|
||||||
|
|
Loading…
Reference in a new issue