This commit is contained in:
Víðir Valberg Guðmundsson 2023-05-17 17:42:20 +02:00
parent 56537fe004
commit d5c7587e04
1 changed files with 24 additions and 19 deletions

View File

@ -79,7 +79,8 @@ the view is an async view, denoted by the `async` keyword.
) )
We tell the `StreamingHttpResponse` class to get its streaming content from the We tell the `StreamingHttpResponse` class to get its streaming content from the
`stream_messages` function. I implemented this as follows initially: `stream_messages` function. The following is my first initial implementation of
`stream_messages`:
::python ::python
async def stream_messages() -> AsyncGenerator[str, None]: async def stream_messages() -> AsyncGenerator[str, None]:
@ -88,7 +89,7 @@ We tell the `StreamingHttpResponse` class to get its streaming content from the
while True: while True:
current_message = await ChatMessage.objects.order_by("-id").afirst() current_message = await ChatMessage.objects.order_by("-id").afirst()
# If we have a new foo yield that # If we have a new message yield that
if latest_message != current_message: if latest_message != current_message:
yield "data: {current_message.text}\n\n" yield "data: {current_message.text}\n\n"
latest_message = current_message latest_message = current_message
@ -100,23 +101,23 @@ request from the client every 5 seconds. But we are still doing a query to the
database every 5 seconds, and that for each client. This is not ideal and is database every 5 seconds, and that for each client. This is not ideal and is
probably something we could have done with a synchronous view. probably something we could have done with a synchronous view.
Let's see if we can do better. But first we'll have to talk about how to run Let's see if we can do better.
this code.
### More old tech to the rescue: PostgreSQL LISTEN/NOTIFY ### More old tech to the rescue: PostgreSQL LISTEN/NOTIFY
This is where we could reach for more infrastructure which could help us giving This is where we could reach for more infrastructure which could help us giving
the database a break. This could be listening for data in Redis (this is what the database a break. This could be listening for data in Redis (this is what
django-channels does), or even having a queue in RabbitMQ. No matter what, it django-channels does), or even having a queue in RabbitMQ. No matter what, it
is more infrastructure. is means more infrastructure.
But I use PostgreSQL - and PostgreSQL is, like Django, "batteries included". But I use PostgreSQL - and PostgreSQL is, like Django, "batteries included".
PostgreSQL has this mechanism called "LISTEN/NOTIFY" where one client can PostgreSQL has a mechanism called "LISTEN/NOTIFY" where a client can `LISTEN`
LISTEN to a channel and then anyone can NOTIFY on that same channel. to a "channel" and then other clients can `NOTIFY` on that same channel which
will be broadcasted to all listeners .
This seems like something we can use - but psycopg2 isn't async, so I'm not This seems like something we can use, but the good ol' psycopg2 doesn't have async support, and I'm not
even sure if `sync_to_async` would help us here. even sure if `asgiref`'s `sync_to_async`[^3] would help us here.
#### Enter psycopg 3 #### Enter psycopg 3
@ -124,7 +125,7 @@ I had put the whole thing on ice until I realized that another big thing (maybe
a bit bigger than StreamingHttpResponse) in Django 4.2 is the support for a bit bigger than StreamingHttpResponse) in Django 4.2 is the support for
psycopg 3 - and psycopg 3 is very much async! psycopg 3 - and psycopg 3 is very much async!
So I went for a stroll in the psycopg 3 documentation and found this gold[^3]: So I went for a stroll in the psycopg 3 documentation and struck gold[^4]:
::python ::python
import psycopg import psycopg
@ -141,14 +142,14 @@ This does almost what we want! It just isn't async and isn't getting connection
info from Django. info from Django.
So by combining the snippet from the psycopg 3 documentation and my previous So by combining the snippet from the psycopg 3 documentation and my previous
`stream_foos` I came up with this: `stream_messages` I came up with this:
:::python :::python
from collections.abc import AsyncGenerator from collections.abc import AsyncGenerator
import psycopg import psycopg
from django.db import connection from django.db import connection
async def stream_foos() -> AsyncGenerator[str, None]: async def stream_messages() -> AsyncGenerator[str, None]:
# Get the connection params from Django # Get the connection params from Django
connection_params = connection.get_connection_params() connection_params = connection.get_connection_params()
@ -164,7 +165,8 @@ So by combining the snippet from the psycopg 3 documentation and my previous
**connection_params, **connection_params,
autocommit=True, autocommit=True,
) )
channel_name = "new_foo"
channel_name = "lobby"
async with aconnection.cursor() as acursor: async with aconnection.cursor() as acursor:
await acursor.execute(f"LISTEN {channel_name}") await acursor.execute(f"LISTEN {channel_name}")
@ -175,6 +177,8 @@ So by combining the snippet from the psycopg 3 documentation and my previous
Appart from problems with the `cursor_factory` (which I'll get back to in the Appart from problems with the `cursor_factory` (which I'll get back to in the
[appendix](#difference-between-42-and-421)), this code is pretty straight forward and, most importantly, works! [appendix](#difference-between-42-and-421)), this code is pretty straight forward and, most importantly, works!
Whenever a `NOTIFY lobby, '<message>'` is issued, the `stream_messages` function
will yield the message to the listener.
### Test the endpoint with curl ### Test the endpoint with curl
@ -188,7 +192,7 @@ If we connect to the endpoint using curl (`-N` disables buffering and is a way t
And connect to our database and run: And connect to our database and run:
:::sql :::sql
NOTIFY new_message, 'Hello, world!'; NOTIFY lobby, 'Hello, world!';
We, excitingly, get the following result : We, excitingly, get the following result :
@ -449,7 +453,7 @@ So that now looks like so:
**connection_params, **connection_params,
autocommit=True, autocommit=True,
) )
channel_name = "new_message" channel_name = "lobby"
async with aconnection.cursor() as acursor: async with aconnection.cursor() as acursor:
print(type(acursor)) print(type(acursor))
await acursor.execute(f"LISTEN {channel_name}") await acursor.execute(f"LISTEN {channel_name}")
@ -458,7 +462,8 @@ So that now looks like so:
yield f"data: {notify.payload}\n\n" yield f"data: {notify.payload}\n\n"
[^0]: [https://docs.djangoproject.com/en/4.2/releases/4.2/#requests-and-responses]() [^0]: <https://docs.djangoproject.com/en/4.2/releases/4.2/#requests-and-responses>
[^1]: [https://docs.djangoproject.com/en/4.2/ref/request-response/#django.http.StreamingHttpResponse]() [^1]: <https://docs.djangoproject.com/en/4.2/ref/request-response/#django.http.StreamingHttpResponse>
[^2]: [https://caniuse.com/eventsource]() [^2]: <https://caniuse.com/eventsource>
[^3]: [https://www.psycopg.org/psycopg3/docs/advanced/async.html#index-4]() [^3]: <https://docs.djangoproject.com/en/4.2/topics/async/#sync-to-async>
[^4]: <https://www.psycopg.org/psycopg3/docs/advanced/async.html#index-4>