This commit is contained in:
Víðir Valberg Guðmundsson 2023-05-16 21:30:09 +02:00
parent 9791c5530b
commit 9b38f6f403
4 changed files with 243 additions and 188 deletions

View File

@ -3,23 +3,10 @@ Date: 2023-05-16
Tags: django, sse, postgresql
Slug: django-sse-postgresql-listen-notify
Authors: Víðir Valberg Guðmundsson
Summary: How to do fancy stuff with old tech, and some new.
Summary: A write-up of how I implemented server-sent events using Django 4.2 and PostgreSQL LISTEN/NOTIFY
---
- introduction of async StreamingHttpResponse in django 4.2
- we can now stream models, but we have continously poll the database for new data
- Old tech to the rescue!
- what are server sent events
- what is LISTEN/NOTIFY
- implementation
- the view
- the streaming function
- a small aside about me not getting it to work until putting autocommit=True
- the signal handler
- the frontend
## Server Sent Events with Django and PostgreSQL LISTEN/NOTIFY
With the release of Django 4.2 we got the following [0]:
@ -29,111 +16,142 @@ And the documentation has been expanded with the following [1]:
> When serving under ASGI, however, a [`StreamingHttpResponse`](https://docs.djangoproject.com/en/4.2/ref/request-response/#django.http.StreamingHttpResponse "django.http.StreamingHttpResponse") need not stop other requests from being served whilst waiting for I/O. This opens up the possibility of long-lived requests for streaming content and implementing patterns such as long-polling, and server-sent events.
Being a sucker for simplicity I got quite intrigued by the possiblilty to serve server-sent events (also known as SSE) directly from Django, with no need for additional infrastructure like Redis.
Being a sucker for simplicity I got quite intrigued by the possibility to serve
server-sent events (also known as SSE) directly from Django, with no need for
additional infrastructure like Redis.
## What are server-sent events and why do we want to use them?
Server-sent events is "old tech", as in that is has been supported in major browser since around 2010-2011 [2]. The idea is that the client "subscribes" to a HTTP endpoint, and the server can then issue data to the client as long as the connection is open. This is a great performance boost compared to other techniques as for instance polling the server.
Server-sent events is "old tech", as in that is has been supported in major
browser since around 2010-2011 [2]. The idea is that the client "subscribes" to
a HTTP endpoint, and the server can then issue data to the client as long as
the connection is open. This is a great performance boost compared to other
techniques as for instance polling the server.
_But wait, isn't websockets "shinier"?_
It depends. In many situations when it comes to developing web applications, we just want a way to push data to the client, and here a bi-directional connection like websockets feel like an overkill. Also I would argue that using POST/PUT requests from the client and SSE to the client might be "just enough" compared to websockets.
It depends. In many situations when it comes to developing web applications, we
just want a way to push data to the client, and here a bi-directional
connection like websockets feel like an overkill. Also I would argue that using
POST/PUT requests from the client and SSE to the client might be "just enough"
compared to websockets.
## A simple implementation
So lets get to some code! The following is something along the lines of my initial attempt. First we have to define the view, which in fact will not change for the remainder of this blog post. The juicy bits are in the next part.
So lets get to some code! The following is something along the lines of my
initial attempt. First we have to define the view, which in fact will not
change for the remainder of this blog post. The juicy bits are in the next
part.
:::python
async def stream_foo_view(request: HttpRequest) -> StreamingHttpResponse:
return StreamingHttpResponse(
streaming_content=stream_foos(),
content_type="text/event-stream",
)
:::python
async def stream_foo_view(request: HttpRequest) -> StreamingHttpResponse:
return StreamingHttpResponse(
streaming_content=stream_foos(),
content_type="text/event-stream",
)
We tell the `StreamingHttpResponse` class to get its streaming content from the `stream_foos` function. I implemented this as follows initially:
We tell the `StreamingHttpResponse` class to get its streaming content from the
`stream_foos` function. I implemented this as follows initially:
::python
async def stream_foos() -> AsyncGenerator[str, None]:
latest_foo = None
::python
async def stream_foos() -> AsyncGenerator[str, None]:
latest_foo = None
while True:
current_foo = await Foo.objects.order_by("-id").afirst()
while True:
current_foo = await Foo.objects.order_by("-id").afirst()
# If we have a new foo yield that
if latest_foo != current_foo:
yield "data: {current_foo.text}\n\n"
latest_foo = current_foo
# If we have a new foo yield that
if latest_foo != current_foo:
yield "data: {current_foo.text}\n\n"
latest_foo = current_foo
await asyncio.sleep(5)
await asyncio.sleep(5)
So we've gotten rid of the HTTP overhead of polling by not having to do a request from the client every 5 seconds. But we are still doing a query to the database every 5 seconds, and that for each client.
So we've gotten rid of the HTTP overhead of polling by not having to do a
request from the client every 5 seconds. But we are still doing a query to the
database every 5 seconds, and that for each client.
### Aside: Use an ASGI server for development
One thing that took me some time to realise is that the Django runserver is not capable of running async views returning `StreamingHttpResponse`.
One thing that took me some time to realise is that the Django runserver is not
capable of running async views returning `StreamingHttpResponse`.
Running the above view with the runserver results in the following error:
```
.../django/http/response.py:514: Warning: StreamingHttpResponse must consume asynchronous iterators in order to serve them synchronously. Use a synchronous iterator instead.
```
:::text
.../django/http/response.py:514: Warning: StreamingHttpResponse must
consume asynchronous iterators in order to serve them synchronously.
Use a synchronous iterator instead.
So I had to result to installing uvicorn and run my project as so:
`$ uvicorn --log-level debug --reload project.asgi:application`
:::bash
$ uvicorn --log-level debug --reload project.asgi:application`
The `--reload` part is particulary important when doing development.
## More old tech to the rescue: PostgreSQL LISTEN/NOTIFY
This is where we could reach for more infrastructure which could help us giving the database a break. This could be listening for data in Redis (this is what django-channels does), or even having a queue in RabbitMQ. No matter what, it is more infrastructure.
This is where we could reach for more infrastructure which could help us giving
the database a break. This could be listening for data in Redis (this is what
django-channels does), or even having a queue in RabbitMQ. No matter what, it
is more infrastructure.
But I use PostgreSQL - and PostgreSQL is, like Django, "batteries included".
PostgreSQL has this mechanism called "LISTEN/NOTIFY" where one client can LISTEN to a channel and then anyone can NOTIFY on that same channel.
PostgreSQL has this mechanism called "LISTEN/NOTIFY" where one client can
LISTEN to a channel and then anyone can NOTIFY on that same channel.
This seems like something we can use - but psycopg2 isn't async, so I'm not even sure if `sync_to_async` would help us here.
This seems like something we can use - but psycopg2 isn't async, so I'm not
even sure if `sync_to_async` would help us here.
## Enter psycopg 3
I had put the whole thing on ice until I realized that another big thing (maybe a bit bigger than StreamingHttpResponse) in Django 4.2 is the support for psycopg 3 - and psycopg 3 is very much async!
I had put the whole thing on ice until I realized that another big thing (maybe
a bit bigger than StreamingHttpResponse) in Django 4.2 is the support for
psycopg 3 - and psycopg 3 is very much async!
So I went for a stroll in the psycopg 3 documentation and found this gold[3]:
::python
import psycopg
conn = psycopg.connect("", autocommit=True)
conn.execute("LISTEN mychan")
gen = conn.notifies()
for notify in gen:
print(notify)
if notify.payload == "stop":
gen.close()
print("there, I stopped")
::python
import psycopg
conn = psycopg.connect("", autocommit=True)
conn.execute("LISTEN mychan")
gen = conn.notifies()
for notify in gen:
print(notify)
if notify.payload == "stop":
gen.close()
print("there, I stopped")
This does almost what we want! It just isn't async and isn't getting connection info from Django.
So by combining the snippet from the psycopg 3 documentation and my previous `stream_foos` I came up with this:
So by combining the snippet from the psycopg 3 documentation and my previous
`stream_foos` I came up with this:
:::python
from collections.abc import AsyncGenerator
import psycopg
from django.db import connection
:::python
from collections.abc import AsyncGenerator
import psycopg
from django.db import connection
async def stream_foos() -> AsyncGenerator[str, None]:
aconnection = await psycopg.AsyncConnection.connect(
**connection.get_connection_params(),
autocommit=True,
)
channel_name = "new_foo"
async with aconnection.cursor() as acursor:
await acursor.execute(f"LISTEN {channel_name}")
gen = aconnection.notifies()
async for notify in gen:
yield f"data: {notify.payload}\n\n"
async def stream_foos() -> AsyncGenerator[str, None]:
connection_params = connection.get_connection_params()
connection_params.pop('cursor_factory')
aconnection = await psycopg.AsyncConnection.connect(
**connection_params,
autocommit=True,
)
channel_name = "new_foo"
> I was almost about to give up again, since this approach didn't work initially. All because I for some reason had removed the `autocommit=True` in my attempts to async-ify the snippet from the psycopg 3 documentation.
async with aconnection.cursor() as acursor:
await acursor.execute(f"LISTEN {channel_name}")
gen = aconnection.notifies()
async for notify in gen:
yield f"data: {notify.payload}\n\n"
I was almost about to give up again, since this approach didn't work initially.
All because I for some reason had removed the `autocommit=True` in my attempts
to async-ify the snippet from the psycopg 3 documentation.
### Issuing the NOTIFY
@ -149,74 +167,99 @@ So by combining the snippet from the psycopg 3 documentation and my previous `st
### Difference between 4.2 and 4.2.1
the code worked initially in 4.2, but 4.2.1 fixed a regression regarding setting a custom cursor in the database configuration.
the code worked initially in 4.2, but 4.2.1 fixed a regression regarding
setting a custom cursor in the database configuration.
In 4.2 we get this from `connection.get_connection_params()`:
:::json
{'dbname': 'postgres', 'user': 'postgres', 'password': 'postgres', 'host': 'localhost', 'port': 5432, 'context': <psycopg.adapt.AdaptersMap object at 0x7f019cda7a60>, 'prepare_threshold': None}
:::javascript
{
'dbname': 'postgres',
'user': 'postgres',
'password': 'postgres',
'host': 'localhost',
'port': 5432,
'context': <psycopg.adapt.AdaptersMap object at 0x7f019cda7a60>,
'prepare_threshold': None
}
in 4.2.1 we get this:
:::json
{'dbname': 'postgres', 'client_encoding': 'UTF8', 'cursor_factory': <class 'django.db.backends.postgresql.base.Cursor'>, 'user': 'postgres', 'password': 'postgres', 'host': 'localhost', 'port': '5432', 'context': <psycopg.adapt.AdaptersMap object at 0x7f56464bcdd0>, 'prepare_threshold': None}
:::javascript
{
'dbname': 'postgres',
'client_encoding': 'UTF8',
'cursor_factory': <class 'django.db.backends.postgresql.base.Cursor'>,
'user': 'postgres',
'password': 'postgres',
'host': 'localhost',
'port': '5432',
'context': <psycopg.adapt.AdaptersMap object at 0x7f56464bcdd0>,
'prepare_threshold': None
}
the `django.db.backends.postgresql.base.Cursor` is not async iterable.
`django.db.backends.postgresql.base.Cursor` is not async iterable.
So we have to set our own cursor_factory in settings:
So we can probably try to set our own `cursor_factory` in settings:
:::python
from psycopg import AsyncCursor
:::python
from psycopg import AsyncCursor
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': 'postgres',
'USER': 'postgres',
'PASSWORD': 'postgres',
'HOST': 'localhost',
'PORT': '5432',
'OPTIONS': {
"cursor_factory": AsyncCursor
}
}
}
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': 'postgres',
'USER': 'postgres',
'PASSWORD': 'postgres',
'HOST': 'localhost',
'PORT': '5432',
'OPTIONS': {
"cursor_factory": AsyncCursor
}
}
}
This might have some implications since we are not using a Django wrapped cursor. Time will tell.
But alas. For some reason this does not work. I guess that Django does some
wrapping of the cursor - or maybe I've just encountered a bug. The cursor is at
least not treated as an async cursor and thus we get the following error:
And so it did. I kept getting the following error:
:::pytb
```
/home/valberg/code/django-sse/venv/lib/python3.11/site-packages/django/db/backends/utils.py:41: RuntimeWarning: coroutine 'AsyncCursor.close' was never awaited
self.close()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
/home/valberg/code/django-sse/venv/lib/python3.11/site-packages/django/db/models/sql/compiler.py:1560: RuntimeWarning: coroutine 'AsyncCursor.execute' was never awaited
cursor.execute(sql, params)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
```
.../django-sse/venv/lib/python3.11/site-packages/django/db/backends/utils.py:41:
RuntimeWarning: coroutine 'AsyncCursor.close' was never awaited
So instead I opted for removing the `cursor_factory` in the streaming function. So that now looks like so:
self.close()
:::python
async def stream_messages() -> AsyncGenerator[str, None]:
connection_params = connection.get_connection_params()
connection_params.pop('cursor_factory')
aconnection = await psycopg.AsyncConnection.connect(
**connection_params,
autocommit=True,
)
channel_name = "new_message"
async with aconnection.cursor() as acursor:
print(type(acursor))
await acursor.execute(f"LISTEN {channel_name}")
gen = aconnection.notifies()
async for notify in gen:
yield f"data: {notify.payload}\n\n"
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
.../django-sse/venv/lib/python3.11/site-packages/django/db/models/sql/compiler.py:1560:
RuntimeWarning: coroutine 'AsyncCursor.execute' was never awaited
cursor.execute(sql, params)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
So instead I opted for removing the `cursor_factory` in the streaming function.
So that now looks like so:
:::python
async def stream_messages() -> AsyncGenerator[str, None]:
connection_params = connection.get_connection_params()
connection_params.pop('cursor_factory')
aconnection = await psycopg.AsyncConnection.connect(
**connection_params,
autocommit=True,
)
channel_name = "new_message"
async with aconnection.cursor() as acursor:
print(type(acursor))
await acursor.execute(f"LISTEN {channel_name}")
gen = aconnection.notifies()
async for notify in gen:
yield f"data: {notify.payload}\n\n"
Take aways:
- Good documentation is key - I would not be able to stitch this together without having
[0]: https://docs.djangoproject.com/en/4.2/releases/4.2/#requests-and-responses
[1]: https://docs.djangoproject.com/en/4.2/ref/request-response/#django.http.StreamingHttpResponse
[2]: https://caniuse.com/eventsource
[3]:https://www.psycopg.org/psycopg3/docs/advanced/async.html#index-4
[3]:https://www.psycopg.org/psycopg3/docs/advanced/async.html#index-4

View File

@ -39,18 +39,6 @@
{{ article.locale_modified }}
</time>
{% endif %}
{% if article.authors %}
<address class="vcard author">
By {% for author in article.authors %}
<a class="url fn" href="{{ SITEURL }}/{{ author.url }}">{{ author }}</a>
{% endfor %}
</address>
{% endif %}
{% if article.category %}
<div class="category">
Category: <a href="{{ SITEURL }}/{{ article.category.url }}">{{ article.category }}</a>
</div>
{% endif %}
{% if article.tags %}
<div class="tags">
Tags:
@ -60,6 +48,9 @@
</div>
{% endif %}
</footer><!-- /.post-info -->
<hr>
<div class="entry-content">
{{ article.content }}
</div><!-- /.entry-content -->

View File

@ -25,21 +25,25 @@
{% endif %}
{% if CATEGORY_FEED_ATOM and category %}
<link href="{{ FEED_DOMAIN }}/
{% if CATEGORY_FEED_ATOM_URL %}{{ CATEGORY_FEED_ATOM_URL.format(slug=category.slug) }}{% else %}{{ CATEGORY_FEED_ATOM.format(slug=category.slug) }}{% endif %}"
type="application/atom+xml" rel="alternate" title="{{ SITENAME }} Categories Atom Feed"/>
{% endif %}
{% if CATEGORY_FEED_RSS and category %}
<link href="{{ FEED_DOMAIN }}/
{% if CATEGORY_FEED_RSS_URL %}{{ CATEGORY_FEED_RSS_URL.format(slug=category.slug) }}{% else %}{{ CATEGORY_FEED_RSS.format(slug=category.slug) }}{% endif %}"
type="application/rss+xml" rel="alternate" title="{{ SITENAME }} Categories RSS Feed"/>
{% endif %}
{% if TAG_FEED_ATOM and tag %}
<link href="{{ FEED_DOMAIN }}/
{% if TAG_FEED_ATOM_URL %}{{ TAG_FEED_ATOM_URL.format(slug=tag.slug) }}{% else %}{{ TAG_FEED_ATOM.format(slug=tag.slug) }}{% endif %}"
type="application/atom+xml" rel="alternate" title="{{ SITENAME }} Tags Atom Feed"/>
{% endif %}
{% if TAG_FEED_RSS and tag %}
<link href="{{ FEED_DOMAIN }}/
{% if TAG_FEED_RSS_URL %}{{ TAG_FEED_RSS_URL.format(slug=tag.slug) }}{% else %}{{ TAG_FEED_RSS.format(slug=tag.slug) }}{% endif %}"
type="application/rss+xml" rel="alternate" title="{{ SITENAME }} Tags RSS Feed"/>
{% endif %}
@ -47,46 +51,47 @@
<link rel="stylesheet" href="{{ SITEURL }}/{{ THEME_STATIC_DIR }}/code_highlight.css"/>
<link rel="stylesheet" href="{{ SITEURL }}/{{ THEME_STATIC_DIR }}/bootstrap/css/bootstrap.min.css"/>
<style>
blockquote {
font-style: italic;
margin-top: 10px;
margin-bottom: 10px;
margin-left: 20px;
padding-left: 15px;
border-left: 3px solid #ccc;
}
.highlight {
padding: 1.25rem 1.25rem .25rem 1.25rem !important;
margin-bottom: 1.5rem !important;
}
</style>
</head>
<body id="index" class="home bg-dark-subtle">
<body id="index" class="home bg-light">
<div class="container bg-light h-100">
<header class="container d-flex justify-content-center">
<h1>
<a href="{{ SITEURL }}/">
valberg.dk
</a>
</h1>
</header><!-- /#banner -->
<header id="banner" class="body">
<h1><a href="{{ SITEURL }}/">{{ SITENAME }}{% if SITESUBTITLE %} <strong>{{ SITESUBTITLE }}</strong>{% endif %}
</a></h1>
</header><!-- /#banner -->
<nav id="menu">
<ul>
{% for title, link in MENUITEMS %}
<li><a href="{{ link }}">{{ title }}</a></li>
{% endfor %}
{% if DISPLAY_PAGES_ON_MENU %}
{% for p in pages %}
<li{% if p == page %} class="active"{% endif %}><a
href="{{ SITEURL }}/{{ p.url }}">{{ p.title }}</a></li>
{% endfor %}
{% endif %}
{% if DISPLAY_CATEGORIES_ON_MENU %}
{% for cat, null in categories %}
<li{% if cat == category %} class="active"{% endif %}><a
href="{{ SITEURL }}/{{ cat.url }}">{{ cat }}</a></li>
{% endfor %}
{% endif %}
</ul>
</nav><!-- /#menu -->
{% block jumbotron %}
{% endblock %}
{% block content %}
{% endblock %}
<div class="container bg-body h-100">
<footer id="contentinfo" class="body">
<address id="about" class="vcard body">
Proudly powered by <a href="https://getpelican.com/">Pelican</a>,
which takes great advantage of <a href="https://www.python.org/">Python</a>.
</address><!-- /#about -->
</footer><!-- /#contentinfo -->
<div class="row">
<div class="col-8 offset-2 pt-5 p-3">
{% block content %}
{% endblock %}
</div>
</div>
</div>
</body>

View File

@ -1,28 +1,44 @@
{% extends "base.html" %}
{% block content %}
<section id="content">
{% block content_title %}
<h2>All articles</h2>
{% block jumbotron %}
<div class="container p-5 text-center bg-body rounded-3 mb-5">
<h1 class="text-body-emphasis">Hi!</h1>
<p class="lead">
I'm Víðir Valberg Guðmundsson. I'm a software developer and this is my blog.
</p>
</div>
{% endblock %}
<ol id="post-list">
{% for article in articles_page.object_list %}
<li><article class="hentry">
<header> <h2 class="entry-title"><a href="{{ SITEURL }}/{{ article.url }}" rel="bookmark" title="Permalink to {{ article.title|striptags }}">{{ article.title }}</a></h2> </header>
{% block content %}
<section id="content">
{% for article in articles_page.object_list %}
<article class="hentry">
<header><h2 class="entry-title">
<a href="{{ SITEURL }}/{{ article.url }}"
rel="bookmark"
title="Permalink to {{ article.title|striptags }}">
{{ article.title }}
</a>
</h2></header>
<footer class="post-info">
<time class="published" datetime="{{ article.date.isoformat() }}"> {{ article.locale_date }} </time>
<time class="published"
datetime="{{ article.date.isoformat() }}">
{{ article.locale_date }}
</time>
<address class="vcard author">By
{% for author in article.authors %}
<a class="url fn" href="{{ SITEURL }}/{{ author.url }}">{{ author }}</a>
{% endfor %}
{% for author in article.authors %}
<a class="url fn" href="{{ SITEURL }}/{{ author.url }}">{{ author }}</a>
{% endfor %}
</address>
</footer><!-- /.post-info -->
<div class="entry-content"> {{ article.summary }} </div><!-- /.entry-content -->
</article></li>
{% endfor %}
</ol><!-- /#posts-list -->
{% if articles_page.has_other_pages() %}
{% include 'pagination.html' %}
{% endif %}
</section><!-- /#content -->
</article>
{% endfor %}
{% if articles_page.has_other_pages() %}
{% include 'pagination.html' %}
{% endif %}
</section><!-- /#content -->
{% endblock content %}