From d5c7587e042a98714c585879075803d43e56ce9a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=AD=C3=B0ir=20Valberg=20Gu=C3=B0mundsson?= Date: Wed, 17 May 2023 17:42:20 +0200 Subject: [PATCH] Upd. --- content/django-server-sent-events.md | 43 ++++++++++++++++------------ 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/content/django-server-sent-events.md b/content/django-server-sent-events.md index a6dd846..f0096da 100644 --- a/content/django-server-sent-events.md +++ b/content/django-server-sent-events.md @@ -79,7 +79,8 @@ the view is an async view, denoted by the `async` keyword. ) We tell the `StreamingHttpResponse` class to get its streaming content from the -`stream_messages` function. I implemented this as follows initially: +`stream_messages` function. The following is my first initial implementation of +`stream_messages`: ::python async def stream_messages() -> AsyncGenerator[str, None]: @@ -88,7 +89,7 @@ We tell the `StreamingHttpResponse` class to get its streaming content from the while True: current_message = await ChatMessage.objects.order_by("-id").afirst() - # If we have a new foo yield that + # If we have a new message yield that if latest_message != current_message: yield "data: {current_message.text}\n\n" latest_message = current_message @@ -100,23 +101,23 @@ request from the client every 5 seconds. But we are still doing a query to the database every 5 seconds, and that for each client. This is not ideal and is probably something we could have done with a synchronous view. -Let's see if we can do better. But first we'll have to talk about how to run -this code. +Let's see if we can do better. ### More old tech to the rescue: PostgreSQL LISTEN/NOTIFY This is where we could reach for more infrastructure which could help us giving the database a break. This could be listening for data in Redis (this is what django-channels does), or even having a queue in RabbitMQ. No matter what, it -is more infrastructure. +is means more infrastructure. But I use PostgreSQL - and PostgreSQL is, like Django, "batteries included". -PostgreSQL has this mechanism called "LISTEN/NOTIFY" where one client can -LISTEN to a channel and then anyone can NOTIFY on that same channel. +PostgreSQL has a mechanism called "LISTEN/NOTIFY" where a client can `LISTEN` +to a "channel" and then other clients can `NOTIFY` on that same channel which +will be broadcasted to all listeners . -This seems like something we can use - but psycopg2 isn't async, so I'm not -even sure if `sync_to_async` would help us here. +This seems like something we can use, but the good ol' psycopg2 doesn't have async support, and I'm not +even sure if `asgiref`'s `sync_to_async`[^3] would help us here. #### Enter psycopg 3 @@ -124,7 +125,7 @@ I had put the whole thing on ice until I realized that another big thing (maybe a bit bigger than StreamingHttpResponse) in Django 4.2 is the support for psycopg 3 - and psycopg 3 is very much async! -So I went for a stroll in the psycopg 3 documentation and found this gold[^3]: +So I went for a stroll in the psycopg 3 documentation and struck gold[^4]: ::python import psycopg @@ -141,14 +142,14 @@ This does almost what we want! It just isn't async and isn't getting connection info from Django. So by combining the snippet from the psycopg 3 documentation and my previous -`stream_foos` I came up with this: +`stream_messages` I came up with this: :::python from collections.abc import AsyncGenerator import psycopg from django.db import connection - async def stream_foos() -> AsyncGenerator[str, None]: + async def stream_messages() -> AsyncGenerator[str, None]: # Get the connection params from Django connection_params = connection.get_connection_params() @@ -164,7 +165,8 @@ So by combining the snippet from the psycopg 3 documentation and my previous **connection_params, autocommit=True, ) - channel_name = "new_foo" + + channel_name = "lobby" async with aconnection.cursor() as acursor: await acursor.execute(f"LISTEN {channel_name}") @@ -175,6 +177,8 @@ So by combining the snippet from the psycopg 3 documentation and my previous Appart from problems with the `cursor_factory` (which I'll get back to in the [appendix](#difference-between-42-and-421)), this code is pretty straight forward and, most importantly, works! +Whenever a `NOTIFY lobby, ''` is issued, the `stream_messages` function +will yield the message to the listener. ### Test the endpoint with curl @@ -188,7 +192,7 @@ If we connect to the endpoint using curl (`-N` disables buffering and is a way t And connect to our database and run: :::sql - NOTIFY new_message, 'Hello, world!'; + NOTIFY lobby, 'Hello, world!'; We, excitingly, get the following result : @@ -449,7 +453,7 @@ So that now looks like so: **connection_params, autocommit=True, ) - channel_name = "new_message" + channel_name = "lobby" async with aconnection.cursor() as acursor: print(type(acursor)) await acursor.execute(f"LISTEN {channel_name}") @@ -458,7 +462,8 @@ So that now looks like so: yield f"data: {notify.payload}\n\n" -[^0]: [https://docs.djangoproject.com/en/4.2/releases/4.2/#requests-and-responses]() -[^1]: [https://docs.djangoproject.com/en/4.2/ref/request-response/#django.http.StreamingHttpResponse]() -[^2]: [https://caniuse.com/eventsource]() -[^3]: [https://www.psycopg.org/psycopg3/docs/advanced/async.html#index-4]() +[^0]: +[^1]: +[^2]: +[^3]: +[^4]: