#36916: Add support for streaming with TaskGroups
-------------------------------------+-------------------------------------
Reporter: Thomas Grainger | Owner: Thomas
| Grainger
Type: New feature | Status: assigned
Component: HTTP handling | Version: dev
Severity: Normal | Resolution:
Keywords: structured- | Triage Stage: Accepted
concurrency, taskgroups |
Has patch: 0 | Needs documentation: 1
Needs tests: 0 | Patch needs improvement: 0
Easy pickings: 0 | UI/UX: 0
-------------------------------------+-------------------------------------
Comment (by Thomas Grainger):
Attaching updated patch based on the approach discussed in
[https://github.com/django/django/pull/19364 PR #19364].
=== Summary of changes ===
'''`django/http/response.py`''':
- `_set_streaming_content` detects async context managers via
`hasattr(__aenter__, __aexit__)`, sets `is_acmgr = True` and stores the
value
- `streaming_content` property returns an `@asynccontextmanager` wrapping
`make_bytes` when `is_acmgr` is True
- `StreamingHttpResponse` gains `__aenter__`/`__aexit__`: for acmgr
responses, enters the streaming_content CM; for regular responses, returns
`aiter(self)`. `__aexit__` only handles acmgr CM cleanup — `aclosing` in
the ASGI handler handles iterator close
- `__iter__`, `__aiter__`, and `getvalue` raise `IsAcmgrException` when
`is_acmgr` is True
'''`django/core/handlers/asgi.py`''':
- Single code path: `async with response as agen, aclosing(agen) as
content:` — works for both regular streaming and acmgr responses
'''`django/middleware/gzip.py`''':
- Adds `is_acmgr` branch that wraps the acmgr in a new
`@asynccontextmanager` feeding into `acompress_sequence`
'''`django/utils/text.py`''':
- `acompress_sequence` wraps its body in `try/finally` to `aclose` its
operand
=== Tests ===
- 13 new tests in `tests/httpwrappers/tests.py` covering: basic acmgr
streaming, make_bytes coercion, `IsAcmgrException` guards on
`__iter__`/`__aiter__`/`getvalue`, `__aexit__` on error and break, non-
acmgr `__aenter__` fallback, reassignment, single-producer
!TaskGroup+Queue, and multi-producer fan-in (news-and-weather pattern)
- 1 new test in `tests/middleware/tests.py` for gzip compression of acmgr
streaming responses
=== Docs ===
- New "Streaming with !TaskGroup" section in `docs/ref/request-
response.txt` with full `news_and_weather` example, key points re PEP 789,
and anyio note
- Release note in `docs/releases/6.1.txt`
=== Patch ===
{{{#!diff
diff --git a/django/core/handlers/asgi.py b/django/core/handlers/asgi.py
index 7ee5208..bcdafdc 100644
--- a/django/core/handlers/asgi.py
+++ b/django/core/handlers/asgi.py
@@ -318,11 +318,9 @@ class ASGIHandler(base.BaseHandler):
)
# Streaming responses need to be pinned to their iterator.
if response.streaming:
- # - Consume via `__aiter__` and not `streaming_content`
directly,
- # to allow mapping of a sync iterator.
- # - Use aclosing() when consuming aiter. See
- #
https://github.com/python/cpython/commit/6e8dcdaaa49d4313bf9fab9f9923ca5828fbb10e
- async with aclosing(aiter(response)) as content:
+ # Use aclosing() when consuming aiter. See
+ #
https://github.com/python/cpython/commit/6e8dcdaaa49d4313bf9fab9f9923ca5828fbb10e
+ async with response as agen, aclosing(agen) as content:
async for part in content:
for chunk, _ in self.chunk_bytes(part):
await send(
diff --git a/django/http/__init__.py b/django/http/__init__.py
index 628564e..2df8fa6 100644
--- a/django/http/__init__.py
+++ b/django/http/__init__.py
@@ -8,6 +8,7 @@ from django.http.request import (
)
from django.http.response import (
BadHeaderError,
+ IsAcmgrException,
FileResponse,
Http404,
HttpResponse,
@@ -47,6 +48,7 @@ __all__ = [
"HttpResponseServerError",
"Http404",
"BadHeaderError",
+ "IsAcmgrException",
"JsonResponse",
"FileResponse",
]
diff --git a/django/http/response.py b/django/http/response.py
index 9bf0b14..c1c0c20 100644
--- a/django/http/response.py
+++ b/django/http/response.py
@@ -7,6 +7,7 @@ import re
import sys
import time
import warnings
+from contextlib import asynccontextmanager
from email.header import Header
from http.client import responses
from urllib.parse import urlsplit
@@ -104,6 +105,10 @@ class BadHeaderError(ValueError):
pass
+class IsAcmgrException(Exception):
+ pass
+
+
class HttpResponseBase:
"""
An HTTP response base class with dictionary-accessed headers.
@@ -479,14 +484,36 @@ class StreamingHttpResponse(HttpResponseBase):
@property
def streaming_content(self):
+ if self.is_acmgr:
+ # Pull to lexical scope in case streaming_content is set
again.
+ _acmgr = self.__acmgr
+
+ @asynccontextmanager
+ async def acmgr_wrapper():
+ async with _acmgr as agen:
+ async def awrapper():
+ try:
+ async for part in agen:
+ yield self.make_bytes(part)
+ finally:
+ if hasattr(agen, "aclose"):
+ await agen.aclose()
+
+ yield awrapper()
+
+ return acmgr_wrapper()
if self.is_async:
# pull to lexical scope to capture fixed reference in case
# streaming_content is set again later.
_iterator = self._iterator
async def awrapper():
- async for part in _iterator:
- yield self.make_bytes(part)
+ try:
+ async for part in _iterator:
+ yield self.make_bytes(part)
+ finally:
+ if hasattr(_iterator, "aclose"):
+ await _iterator.aclose()
return awrapper()
else:
@@ -498,6 +525,12 @@ class StreamingHttpResponse(HttpResponseBase):
def _set_streaming_content(self, value):
# Ensure we can never iterate on "value" more than once.
+ if hasattr(value, "__aenter__") and hasattr(value, "__aexit__"):
+ self.__acmgr = value
+ self.is_acmgr = True
+ self.is_async = True
+ return
+ self.is_acmgr = False
try:
self._iterator = iter(value)
self.is_async = False
@@ -507,7 +540,22 @@ class StreamingHttpResponse(HttpResponseBase):
if hasattr(value, "close"):
self._resource_closers.append(value.close)
+ async def __aenter__(self):
+ if self.is_acmgr:
+ self.__acmgr_ctx = self.streaming_content
+ return await self.__acmgr_ctx.__aenter__()
+ return aiter(self)
+
+ async def __aexit__(self, *exc_info):
+ if self.is_acmgr:
+ return await self.__acmgr_ctx.__aexit__(*exc_info)
+
def __iter__(self):
+ if self.is_acmgr:
+ raise IsAcmgrException(
+ "%s must be consumed via `async with`. Use `async with
response` "
+ "and iterate the yielded content." %
self.__class__.__name__
+ )
try:
return iter(self.streaming_content)
except TypeError:
@@ -528,6 +576,11 @@ class StreamingHttpResponse(HttpResponseBase):
return map(self.make_bytes,
iter(async_to_sync(to_list)(self._iterator)))
async def __aiter__(self):
+ if self.is_acmgr:
+ raise IsAcmgrException(
+ "%s must be consumed via `async with`. Use `async with
response` "
+ "and iterate the yielded content." %
self.__class__.__name__
+ )
try:
async for part in self.streaming_content:
yield part
@@ -544,6 +597,11 @@ class StreamingHttpResponse(HttpResponseBase):
yield part
def getvalue(self):
+ if self.is_acmgr:
+ raise IsAcmgrException(
+ "%s must be consumed via `async with`. Use `async with
response` "
+ "and iterate the yielded content." %
self.__class__.__name__
+ )
return b"".join(self.streaming_content)
diff --git a/django/middleware/gzip.py b/django/middleware/gzip.py
index eb151d7..78b5739 100644
--- a/django/middleware/gzip.py
+++ b/django/middleware/gzip.py
@@ -1,3 +1,5 @@
+from contextlib import asynccontextmanager
+
from django.utils.cache import patch_vary_headers
from django.utils.deprecation import MiddlewareMixin
from django.utils.regex_helper import _lazy_re_compile
@@ -31,7 +33,20 @@ class GZipMiddleware(MiddlewareMixin):
return response
if response.streaming:
- if response.is_async:
+ if response.is_acmgr:
+ original_acmgr = response.streaming_content
+ max_random_bytes = self.max_random_bytes
+
+ @asynccontextmanager
+ async def compressed_acmgr():
+ async with original_acmgr as agen:
+ yield acompress_sequence(
+ agen,
+ max_random_bytes=max_random_bytes,
+ )
+
+ response.streaming_content = compressed_acmgr()
+ elif response.is_async:
response.streaming_content = acompress_sequence(
response.streaming_content,
max_random_bytes=self.max_random_bytes,
diff --git a/django/utils/text.py b/django/utils/text.py
index d1306f9..55bd6f5 100644
--- a/django/utils/text.py
+++ b/django/utils/text.py
@@ -390,20 +390,24 @@ def compress_sequence(sequence, *,
max_random_bytes=None):
async def acompress_sequence(sequence, *, max_random_bytes=None):
- buf = StreamingBuffer()
- filename = _get_random_filename(max_random_bytes) if max_random_bytes
else None
- with GzipFile(
- filename=filename, mode="wb", compresslevel=6, fileobj=buf,
mtime=0
- ) as zfile:
- # Output headers...
+ try:
+ buf = StreamingBuffer()
+ filename = _get_random_filename(max_random_bytes) if
max_random_bytes else None
+ with GzipFile(
+ filename=filename, mode="wb", compresslevel=6, fileobj=buf,
mtime=0
+ ) as zfile:
+ # Output headers...
+ yield buf.read()
+ async for item in sequence:
+ zfile.write(item)
+ zfile.flush()
+ data = buf.read()
+ if data:
+ yield data
yield buf.read()
- async for item in sequence:
- zfile.write(item)
- zfile.flush()
- data = buf.read()
- if data:
- yield data
- yield buf.read()
+ finally:
+ if hasattr(sequence, "aclose"):
+ await sequence.aclose()
# Expression to match some_token and some_token="with spaces" (and
similarly
diff --git a/docs/ref/request-response.txt b/docs/ref/request-response.txt
index ba60415..945683a 100644
--- a/docs/ref/request-response.txt
+++ b/docs/ref/request-response.txt
@@ -1424,6 +1424,103 @@ is streaming. If you perform long-running
operations in your view before
returning the ``StreamingHttpResponse`` object, then you may also want to
:ref:`handle disconnections in the view <async-handling-disconnect>`
itself.
+.. _request-response-streaming-task-groups:
+
+Streaming with ``TaskGroup``
+----------------------------
+
+.. versionadded:: 6.1
+
+When serving under ASGI you may want to stream content that is produced
by
+background tasks — for example, fan-in from multiple websocket
connections or
+long-running computations. Python's :class:`asyncio.TaskGroup` is the
natural
+way to manage those tasks, but an ``async for`` loop cannot ``yield``
inside a
+``TaskGroup`` context (see :pep:`789` for details).
+
+The solution is to pass an :func:`~contextlib.asynccontextmanager`
instance as
+the ``streaming_content`` argument. The context manager's ``__aenter__``
sets up
+the ``TaskGroup`` (and any other resources), then ``yield`` s an async
iterator
+that the framework will consume. When the response finishes — or the
client
+disconnects — the context manager's ``__aexit__`` tears everything down
in the
+correct order.
+
+Here is a complete example that merges two websocket feeds into a single
+streaming response::
+
+ import asyncio
+ from collections.abc import AsyncGenerator
+ from contextlib import asynccontextmanager
+
+ from django.http import StreamingHttpResponse
+
+
+ @asynccontextmanager
+ async def news_and_weather() -> AsyncGenerator[AsyncGenerator[bytes,
None], None]:
+ queue: asyncio.Queue[bytes | None] = asyncio.Queue(maxsize=1)
+
+ async def consume_ws(url: str) -> None:
+ async with await connect_ws(url) as ws:
+ async for msg in ws:
+ await queue.put(msg)
+
+ async def run_producers() -> None:
+ async with asyncio.TaskGroup() as tg:
+ tg.create_task(consume_ws("ws://news.example.com/feed"))
+
tg.create_task(consume_ws("ws://weather.example.com/feed"))
+ await queue.put(None) # sentinel: all producers finished
+
+ async def items() -> AsyncGenerator[bytes, None]:
+ while (item := await queue.get()) is not None:
+ yield item
+
+ async with asyncio.TaskGroup() as tg:
+ tg.create_task(run_producers())
+ yield items()
+
+
+ async def news_and_weather_view(request):
+ return StreamingHttpResponse(news_and_weather())
+
+Key points:
+
+* ``news_and_weather()`` is decorated with
+ :func:`~contextlib.asynccontextmanager`. Django detects this via
+ ``hasattr(value, "__aenter__")`` in ``_set_streaming_content`` and sets
+ :attr:`~StreamingHttpResponse.is_acmgr` to ``True``.
+
+* The ``yield`` that suspends the frame happens inside
+ ``@asynccontextmanager``, which correctly routes exceptions back to the
+ ``TaskGroup`` cancel scope. The ``items()`` async generator yielded
*out* to
+ the caller never yields inside a cancel scope, so it is safe.
+
+* Middleware that wraps ``streaming_content`` (such as
+ :class:`~django.middleware.gzip.GZipMiddleware`) is aware of the
+ ``is_acmgr`` flag and preserves the context-manager protocol.
+
+* Attempting to iterate an ``is_acmgr`` response directly (via
``__iter__``,
+ ``__aiter__``, or ``getvalue()``) raises
+ :exc:`~django.http.IsAcmgrException`. The response must be consumed
with
+ ``async with``::
+
+ async with response as agen:
+ async for chunk in agen:
+ ...
+
+.. note::
+
+ The `anyio <https://anyio.readthedocs.io/>`_ library provides
+ :func:`anyio.create_task_group` and
:func:`anyio.create_memory_object_stream`,
+ which can be used as an alternative to :class:`asyncio.TaskGroup` and
+ :class:`asyncio.Queue`. A ``MemoryObjectReceiveStream`` is an async
+ iterable and can be yielded directly from the context manager.
+
+.. attribute:: StreamingHttpResponse.is_acmgr
+
+ Boolean indicating whether the response was created with an async
context
+ manager as its streaming content. When ``True``,
+ :attr:`~StreamingHttpResponse.is_async` is also ``True`` and the
response
+ must be consumed via ``async with``.
+
``FileResponse`` objects
========================
diff --git a/docs/releases/6.1.txt b/docs/releases/6.1.txt
index 00eaf53..790782c 100644
--- a/docs/releases/6.1.txt
+++ b/docs/releases/6.1.txt
@@ -323,6 +323,11 @@ Pagination
Requests and Responses
~~~~~~~~~~~~~~~~~~~~~~
+* :class:`~django.http.StreamingHttpResponse` now accepts an async
context
+ manager as ``streaming_content``, enabling streaming patterns that
require
+ :class:`asyncio.TaskGroup` or similar structured concurrency
primitives. See
+ :ref:`request-response-streaming-task-groups` for details.
+
* :attr:`HttpRequest.multipart_parser_class
<django.http.HttpRequest.multipart_parser_class>`
can now be customized to use a different multipart parser class.
diff --git a/tests/asgi/urls.py b/tests/asgi/urls.py
index 0311cf3..b7fa206 100644
--- a/tests/asgi/urls.py
+++ b/tests/asgi/urls.py
@@ -1,4 +1,5 @@
import asyncio
+import contextlib
import threading
import time
diff --git a/tests/httpwrappers/tests.py b/tests/httpwrappers/tests.py
index 3e8364e..a2add61 100644
--- a/tests/httpwrappers/tests.py
+++ b/tests/httpwrappers/tests.py
@@ -1,7 +1,10 @@
+import asyncio
+import contextlib
import copy
import json
import os
import pickle
+import sys
import unittest
import uuid
@@ -16,6 +19,7 @@ from django.http import (
HttpResponseNotModified,
HttpResponsePermanentRedirect,
HttpResponseRedirect,
+ IsAcmgrException,
JsonResponse,
QueryDict,
SimpleCookie,
@@ -808,6 +812,250 @@ class StreamingHttpResponseTests(SimpleTestCase):
with self.assertWarnsMessage(Warning, msg):
self.assertEqual(b"hello", await anext(aiter(r)))
+ async def test_async_context_manager_streaming_response(self):
+ """StreamingHttpResponse accepts an async context manager as
content."""
+
+ @contextlib.asynccontextmanager
+ async def acmgr():
+ async def iterator():
+ yield b"hello"
+ yield b"world"
+
+ yield iterator()
+
+ r = StreamingHttpResponse(acmgr())
+ self.assertTrue(r.is_acmgr)
+ self.assertTrue(r.is_async)
+
+ chunks = []
+ async with r as agen, contextlib.aclosing(agen) as content:
+ async for chunk in content:
+ chunks.append(chunk)
+ self.assertEqual(chunks, [b"hello", b"world"])
+
+ async def test_acmgr_make_bytes(self):
+ """Acmgr streaming content is coerced to bytes via make_bytes."""
+
+ @contextlib.asynccontextmanager
+ async def acmgr():
+ async def iterator():
+ yield "hello"
+ yield "café"
+
+ yield iterator()
+
+ r = StreamingHttpResponse(acmgr())
+ chunks = []
+ async with r as agen, contextlib.aclosing(agen) as content:
+ async for chunk in content:
+ chunks.append(chunk)
+ self.assertEqual(chunks, [b"hello", b"caf\xc3\xa9"])
+
+ async def test_acmgr_iter_raises(self):
+ """Iterating an acmgr response via __iter__ raises
IsAcmgrException."""
+
+ @contextlib.asynccontextmanager
+ async def acmgr():
+ yield iter([b"hello"])
+
+ r = StreamingHttpResponse(acmgr())
+ msg = (
+ "StreamingHttpResponse must be consumed via `async with`. "
+ "Use `async with response` and iterate the yielded content."
+ )
+ with self.assertRaisesMessage(IsAcmgrException, msg):
+ iter(r)
+
+ async def test_acmgr_aiter_raises(self):
+ """Iterating an acmgr response via __aiter__ raises
IsAcmgrException."""
+
+ @contextlib.asynccontextmanager
+ async def acmgr():
+ yield iter([b"hello"])
+
+ r = StreamingHttpResponse(acmgr())
+ msg = (
+ "StreamingHttpResponse must be consumed via `async with`. "
+ "Use `async with response` and iterate the yielded content."
+ )
+ with self.assertRaisesMessage(IsAcmgrException, msg):
+ async for _ in r:
+ pass
+
+ async def test_acmgr_getvalue_raises(self):
+ """Calling getvalue() on an acmgr response raises
IsAcmgrException."""
+
+ @contextlib.asynccontextmanager
+ async def acmgr():
+ yield iter([b"hello"])
+
+ r = StreamingHttpResponse(acmgr())
+ with self.assertRaises(IsAcmgrException):
+ r.getvalue()
+
+ async def test_acmgr_aexit_called_on_error(self):
+ """The acmgr's __aexit__ is called even when iteration raises."""
+ exited = []
+
+ @contextlib.asynccontextmanager
+ async def acmgr():
+ try:
+ async def iterator():
+ yield b"hello"
+ raise ValueError("boom")
+
+ yield iterator()
+ finally:
+ exited.append(True)
+
+ r = StreamingHttpResponse(acmgr())
+ with self.assertRaises(ValueError):
+ async with r as agen, contextlib.aclosing(agen) as content:
+ async for _ in content:
+ pass
+ self.assertEqual(exited, [True])
+
+ async def test_acmgr_aexit_called_on_break(self):
+ """The acmgr cleans up when the consumer breaks out early."""
+ exited = []
+
+ @contextlib.asynccontextmanager
+ async def acmgr():
+ try:
+ async def iterator():
+ yield b"hello"
+ yield b"world"
+
+ yield iterator()
+ finally:
+ exited.append(True)
+
+ r = StreamingHttpResponse(acmgr())
+ async with r as agen, contextlib.aclosing(agen) as content:
+ async for _ in content:
+ break
+ self.assertEqual(exited, [True])
+
+ async def test_acmgr_non_acmgr_aenter_returns_aiter(self):
+ """__aenter__ on a non-acmgr async response returns
aiter(self)."""
+
+ async def async_iter():
+ yield b"hello"
+
+ r = StreamingHttpResponse(async_iter())
+ self.assertFalse(r.is_acmgr)
+ agen = await r.__aenter__()
+ chunk = await agen.__anext__()
+ self.assertEqual(chunk, b"hello")
+ await r.__aexit__(None, None, None)
+
+ async def test_acmgr_reassign_streaming_content(self):
+ """Assigning a new acmgr to streaming_content replaces the old
one."""
+
+ @contextlib.asynccontextmanager
+ async def acmgr1():
+ async def it():
+ yield b"first"
+
+ yield it()
+
+ @contextlib.asynccontextmanager
+ async def acmgr2():
+ async def it():
+ yield b"second"
+
+ yield it()
+
+ r = StreamingHttpResponse(acmgr1())
+ r.streaming_content = acmgr2()
+ self.assertTrue(r.is_acmgr)
+
+ chunks = []
+ async with r as agen, contextlib.aclosing(agen) as content:
+ async for chunk in content:
+ chunks.append(chunk)
+ self.assertEqual(chunks, [b"second"])
+
+ @unittest.skipUnless(
+ sys.version_info >= (3, 11), "asyncio.TaskGroup requires Python
3.11+"
+ )
+ async def test_taskgroup_queue_streaming_response(self):
+ """
+ StreamingHttpResponse works with asyncio.TaskGroup and
asyncio.Queue,
+ the canonical pattern for background-task-driven streaming.
+ """
+
+ @contextlib.asynccontextmanager
+ async def taskgroup_stream():
+ queue = asyncio.Queue()
+
+ async def producer():
+ for chunk in [b"hello", b"world"]:
+ await queue.put(chunk)
+ await queue.put(None) # sentinel
+
+ async def generator():
+ while True:
+ chunk = await queue.get()
+ if chunk is None:
+ break
+ yield chunk
+
+ async with asyncio.TaskGroup() as tg:
+ tg.create_task(producer())
+ yield generator()
+
+ r = StreamingHttpResponse(taskgroup_stream())
+ self.assertTrue(r.is_acmgr)
+
+ chunks = []
+ async with r as agen, contextlib.aclosing(agen) as content:
+ async for chunk in content:
+ chunks.append(chunk)
+ self.assertEqual(chunks, [b"hello", b"world"])
+
+ @unittest.skipUnless(
+ sys.version_info >= (3, 11), "asyncio.TaskGroup requires Python
3.11+"
+ )
+ async def test_taskgroup_multiple_producers_streaming_response(self):
+ """
+ StreamingHttpResponse works with multiple producer tasks feeding
a
+ shared queue, as in the news-and-weather websocket fan-in
pattern.
+ """
+
+ @contextlib.asynccontextmanager
+ async def multi_producer_stream():
+ queue = asyncio.Queue(maxsize=1)
+
+ async def consume(source):
+ """Simulate a websocket consumer pushing to the shared
queue."""
+ for msg in source:
+ await queue.put(msg)
+
+ async def run_producers():
+ async with asyncio.TaskGroup() as tg:
+ tg.create_task(consume([b"news-1", b"news-2"]))
+ tg.create_task(consume([b"weather-1"]))
+ await queue.put(None) # sentinel after all producers
finish
+
+ async def items():
+ while (item := await queue.get()) is not None:
+ yield item
+
+ async with asyncio.TaskGroup() as tg:
+ tg.create_task(run_producers())
+ yield items()
+
+ r = StreamingHttpResponse(multi_producer_stream())
+ self.assertTrue(r.is_acmgr)
+
+ chunks = []
+ async with r as agen, contextlib.aclosing(agen) as content:
+ async for chunk in content:
+ chunks.append(chunk)
+ # All three messages arrive (order depends on scheduling).
+ self.assertEqual(sorted(chunks), [b"news-1", b"news-2",
b"weather-1"])
+
def test_text_attribute_error(self):
r = StreamingHttpResponse(iter(["hello", "world"]))
msg = "This %s instance has no `text` attribute." %
r.__class__.__name__
diff --git a/tests/middleware/tests.py b/tests/middleware/tests.py
index a61c4b1..4192ada 100644
--- a/tests/middleware/tests.py
+++ b/tests/middleware/tests.py
@@ -1,3 +1,4 @@
+import contextlib
import gzip
import random
import re
@@ -936,6 +937,37 @@ class GZipMiddlewareTest(SimpleTestCase):
self.assertEqual(r.get("Content-Encoding"), "gzip")
self.assertFalse(r.has_header("Content-Length"))
+ async def test_compress_acmgr_streaming_response(self):
+ """
+ Compression is performed on responses with async context manager
content.
+ """
+
+ async def get_stream_response(request):
+ @contextlib.asynccontextmanager
+ async def acmgr():
+ async def iterator():
+ for chunk in self.sequence:
+ yield chunk
+
+ yield iterator()
+
+ resp = StreamingHttpResponse(acmgr())
+ resp["Content-Type"] = "text/html; charset=UTF-8"
+ return resp
+
+ r = await GZipMiddleware(get_stream_response)(self.req)
+ self.assertTrue(r.is_acmgr)
+ chunks = []
+ async with r as agen, contextlib.aclosing(agen) as content:
+ async for chunk in content:
+ chunks.append(chunk)
+ self.assertEqual(
+ self.decompress(b"".join(chunks)),
+ b"".join(self.sequence),
+ )
+ self.assertEqual(r.get("Content-Encoding"), "gzip")
+ self.assertFalse(r.has_header("Content-Length"))
+
def test_compress_streaming_response_unicode(self):
"""
Compression is performed on responses with streaming Unicode
content.
}}}
--
Ticket URL: <https://code.djangoproject.com/ticket/36916#comment:9>
Django <https://code.djangoproject.com/>
The Web framework for perfectionists with deadlines.
--
You received this message because you are subscribed to the Google Groups
"Django updates" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To view this discussion visit
https://groups.google.com/d/msgid/django-updates/0107019d6c486b11-b0fb0139-1f88-4fa5-8407-7111a2478a4d-000000%40eu-central-1.amazonses.com.