#36916: Add support for streaming with TaskGroups
-------------------------------------+-------------------------------------
     Reporter:  Thomas Grainger      |                    Owner:  Thomas
                                     |  Grainger
         Type:  New feature          |                   Status:  assigned
    Component:  HTTP handling        |                  Version:  dev
     Severity:  Normal               |               Resolution:
     Keywords:  structured-          |             Triage Stage:  Accepted
  concurrency, taskgroups            |
    Has patch:  0                    |      Needs documentation:  1
  Needs tests:  0                    |  Patch needs improvement:  0
Easy pickings:  0                    |                    UI/UX:  0
-------------------------------------+-------------------------------------
Comment (by Thomas Grainger):

 Attaching updated patch based on the approach discussed in
 [https://github.com/django/django/pull/19364 PR #19364].

 === Summary of changes ===

 '''`django/http/response.py`''':
 - `_set_streaming_content` detects async context managers via
 `hasattr(__aenter__, __aexit__)`, sets `is_acmgr = True` and stores the
 value
 - `streaming_content` property returns an `@asynccontextmanager` wrapping
 `make_bytes` when `is_acmgr` is True
 - `StreamingHttpResponse` gains `__aenter__`/`__aexit__`: for acmgr
 responses, enters the streaming_content CM; for regular responses, returns
 `aiter(self)`. `__aexit__` only handles acmgr CM cleanup — `aclosing` in
 the ASGI handler handles iterator close
 - `__iter__`, `__aiter__`, and `getvalue` raise `IsAcmgrException` when
 `is_acmgr` is True

 '''`django/core/handlers/asgi.py`''':
 - Single code path: `async with response as agen, aclosing(agen) as
 content:` — works for both regular streaming and acmgr responses

 '''`django/middleware/gzip.py`''':
 - Adds `is_acmgr` branch that wraps the acmgr in a new
 `@asynccontextmanager` feeding into `acompress_sequence`

 '''`django/utils/text.py`''':
 - `acompress_sequence` wraps its body in `try/finally` to `aclose` its
 operand

 === Tests ===

 - 13 new tests in `tests/httpwrappers/tests.py` covering: basic acmgr
 streaming, make_bytes coercion, `IsAcmgrException` guards on
 `__iter__`/`__aiter__`/`getvalue`, `__aexit__` on error and break, non-
 acmgr `__aenter__` fallback, reassignment, single-producer
 !TaskGroup+Queue, and multi-producer fan-in (news-and-weather pattern)
 - 1 new test in `tests/middleware/tests.py` for gzip compression of acmgr
 streaming responses

 === Docs ===

 - New "Streaming with !TaskGroup" section in `docs/ref/request-
 response.txt` with full `news_and_weather` example, key points re PEP 789,
 and anyio note
 - Release note in `docs/releases/6.1.txt`

 === Patch ===

 {{{#!diff
 diff --git a/django/core/handlers/asgi.py b/django/core/handlers/asgi.py
 index 7ee5208..bcdafdc 100644
 --- a/django/core/handlers/asgi.py
 +++ b/django/core/handlers/asgi.py
 @@ -318,11 +318,9 @@ class ASGIHandler(base.BaseHandler):
          )
          # Streaming responses need to be pinned to their iterator.
          if response.streaming:
 -            # - Consume via `__aiter__` and not `streaming_content`
 directly,
 -            #   to allow mapping of a sync iterator.
 -            # - Use aclosing() when consuming aiter. See
 -            #
 
https://github.com/python/cpython/commit/6e8dcdaaa49d4313bf9fab9f9923ca5828fbb10e
 -            async with aclosing(aiter(response)) as content:
 +            # Use aclosing() when consuming aiter. See
 +            #
 
https://github.com/python/cpython/commit/6e8dcdaaa49d4313bf9fab9f9923ca5828fbb10e
 +            async with response as agen, aclosing(agen) as content:
                  async for part in content:
                      for chunk, _ in self.chunk_bytes(part):
                          await send(
 diff --git a/django/http/__init__.py b/django/http/__init__.py
 index 628564e..2df8fa6 100644
 --- a/django/http/__init__.py
 +++ b/django/http/__init__.py
 @@ -8,6 +8,7 @@ from django.http.request import (
  )
  from django.http.response import (
      BadHeaderError,
 +    IsAcmgrException,
      FileResponse,
      Http404,
      HttpResponse,
 @@ -47,6 +48,7 @@ __all__ = [
      "HttpResponseServerError",
      "Http404",
      "BadHeaderError",
 +    "IsAcmgrException",
      "JsonResponse",
      "FileResponse",
  ]
 diff --git a/django/http/response.py b/django/http/response.py
 index 9bf0b14..c1c0c20 100644
 --- a/django/http/response.py
 +++ b/django/http/response.py
 @@ -7,6 +7,7 @@ import re
  import sys
  import time
  import warnings
 +from contextlib import asynccontextmanager
  from email.header import Header
  from http.client import responses
  from urllib.parse import urlsplit
 @@ -104,6 +105,10 @@ class BadHeaderError(ValueError):
      pass


 +class IsAcmgrException(Exception):
 +    pass
 +
 +
  class HttpResponseBase:
      """
      An HTTP response base class with dictionary-accessed headers.
 @@ -479,14 +484,36 @@ class StreamingHttpResponse(HttpResponseBase):

      @property
      def streaming_content(self):
 +        if self.is_acmgr:
 +            # Pull to lexical scope in case streaming_content is set
 again.
 +            _acmgr = self.__acmgr
 +
 +            @asynccontextmanager
 +            async def acmgr_wrapper():
 +                async with _acmgr as agen:
 +                    async def awrapper():
 +                        try:
 +                            async for part in agen:
 +                                yield self.make_bytes(part)
 +                        finally:
 +                            if hasattr(agen, "aclose"):
 +                                await agen.aclose()
 +
 +                    yield awrapper()
 +
 +            return acmgr_wrapper()
          if self.is_async:
              # pull to lexical scope to capture fixed reference in case
              # streaming_content is set again later.
              _iterator = self._iterator

              async def awrapper():
 -                async for part in _iterator:
 -                    yield self.make_bytes(part)
 +                try:
 +                    async for part in _iterator:
 +                        yield self.make_bytes(part)
 +                finally:
 +                    if hasattr(_iterator, "aclose"):
 +                        await _iterator.aclose()

              return awrapper()
          else:
 @@ -498,6 +525,12 @@ class StreamingHttpResponse(HttpResponseBase):

      def _set_streaming_content(self, value):
          # Ensure we can never iterate on "value" more than once.
 +        if hasattr(value, "__aenter__") and hasattr(value, "__aexit__"):
 +            self.__acmgr = value
 +            self.is_acmgr = True
 +            self.is_async = True
 +            return
 +        self.is_acmgr = False
          try:
              self._iterator = iter(value)
              self.is_async = False
 @@ -507,7 +540,22 @@ class StreamingHttpResponse(HttpResponseBase):
          if hasattr(value, "close"):
              self._resource_closers.append(value.close)

 +    async def __aenter__(self):
 +        if self.is_acmgr:
 +            self.__acmgr_ctx = self.streaming_content
 +            return await self.__acmgr_ctx.__aenter__()
 +        return aiter(self)
 +
 +    async def __aexit__(self, *exc_info):
 +        if self.is_acmgr:
 +            return await self.__acmgr_ctx.__aexit__(*exc_info)
 +
      def __iter__(self):
 +        if self.is_acmgr:
 +            raise IsAcmgrException(
 +                "%s must be consumed via `async with`. Use `async with
 response` "
 +                "and iterate the yielded content." %
 self.__class__.__name__
 +            )
          try:
              return iter(self.streaming_content)
          except TypeError:
 @@ -528,6 +576,11 @@ class StreamingHttpResponse(HttpResponseBase):
              return map(self.make_bytes,
 iter(async_to_sync(to_list)(self._iterator)))

      async def __aiter__(self):
 +        if self.is_acmgr:
 +            raise IsAcmgrException(
 +                "%s must be consumed via `async with`. Use `async with
 response` "
 +                "and iterate the yielded content." %
 self.__class__.__name__
 +            )
          try:
              async for part in self.streaming_content:
                  yield part
 @@ -544,6 +597,11 @@ class StreamingHttpResponse(HttpResponseBase):
                  yield part

      def getvalue(self):
 +        if self.is_acmgr:
 +            raise IsAcmgrException(
 +                "%s must be consumed via `async with`. Use `async with
 response` "
 +                "and iterate the yielded content." %
 self.__class__.__name__
 +            )
          return b"".join(self.streaming_content)


 diff --git a/django/middleware/gzip.py b/django/middleware/gzip.py
 index eb151d7..78b5739 100644
 --- a/django/middleware/gzip.py
 +++ b/django/middleware/gzip.py
 @@ -1,3 +1,5 @@
 +from contextlib import asynccontextmanager
 +
  from django.utils.cache import patch_vary_headers
  from django.utils.deprecation import MiddlewareMixin
  from django.utils.regex_helper import _lazy_re_compile
 @@ -31,7 +33,20 @@ class GZipMiddleware(MiddlewareMixin):
              return response

          if response.streaming:
 -            if response.is_async:
 +            if response.is_acmgr:
 +                original_acmgr = response.streaming_content
 +                max_random_bytes = self.max_random_bytes
 +
 +                @asynccontextmanager
 +                async def compressed_acmgr():
 +                    async with original_acmgr as agen:
 +                        yield acompress_sequence(
 +                            agen,
 +                            max_random_bytes=max_random_bytes,
 +                        )
 +
 +                response.streaming_content = compressed_acmgr()
 +            elif response.is_async:
                  response.streaming_content = acompress_sequence(
                      response.streaming_content,
                      max_random_bytes=self.max_random_bytes,
 diff --git a/django/utils/text.py b/django/utils/text.py
 index d1306f9..55bd6f5 100644
 --- a/django/utils/text.py
 +++ b/django/utils/text.py
 @@ -390,20 +390,24 @@ def compress_sequence(sequence, *,
 max_random_bytes=None):


  async def acompress_sequence(sequence, *, max_random_bytes=None):
 -    buf = StreamingBuffer()
 -    filename = _get_random_filename(max_random_bytes) if max_random_bytes
 else None
 -    with GzipFile(
 -        filename=filename, mode="wb", compresslevel=6, fileobj=buf,
 mtime=0
 -    ) as zfile:
 -        # Output headers...
 +    try:
 +        buf = StreamingBuffer()
 +        filename = _get_random_filename(max_random_bytes) if
 max_random_bytes else None
 +        with GzipFile(
 +            filename=filename, mode="wb", compresslevel=6, fileobj=buf,
 mtime=0
 +        ) as zfile:
 +            # Output headers...
 +            yield buf.read()
 +            async for item in sequence:
 +                zfile.write(item)
 +                zfile.flush()
 +                data = buf.read()
 +                if data:
 +                    yield data
          yield buf.read()
 -        async for item in sequence:
 -            zfile.write(item)
 -            zfile.flush()
 -            data = buf.read()
 -            if data:
 -                yield data
 -    yield buf.read()
 +    finally:
 +        if hasattr(sequence, "aclose"):
 +            await sequence.aclose()


  # Expression to match some_token and some_token="with spaces" (and
 similarly
 diff --git a/docs/ref/request-response.txt b/docs/ref/request-response.txt
 index ba60415..945683a 100644
 --- a/docs/ref/request-response.txt
 +++ b/docs/ref/request-response.txt
 @@ -1424,6 +1424,103 @@ is streaming. If you perform long-running
 operations in your view before
  returning the ``StreamingHttpResponse`` object, then you may also want to
  :ref:`handle disconnections in the view <async-handling-disconnect>`
 itself.

 +.. _request-response-streaming-task-groups:
 +
 +Streaming with ``TaskGroup``
 +----------------------------
 +
 +.. versionadded:: 6.1
 +
 +When serving under ASGI you may want to stream content that is produced
 by
 +background tasks — for example, fan-in from multiple websocket
 connections or
 +long-running computations. Python's :class:`asyncio.TaskGroup` is the
 natural
 +way to manage those tasks, but an ``async for`` loop cannot ``yield``
 inside a
 +``TaskGroup`` context (see :pep:`789` for details).
 +
 +The solution is to pass an :func:`~contextlib.asynccontextmanager`
 instance as
 +the ``streaming_content`` argument. The context manager's ``__aenter__``
 sets up
 +the ``TaskGroup`` (and any other resources), then ``yield`` s an async
 iterator
 +that the framework will consume. When the response finishes — or the
 client
 +disconnects — the context manager's ``__aexit__`` tears everything down
 in the
 +correct order.
 +
 +Here is a complete example that merges two websocket feeds into a single
 +streaming response::
 +
 +    import asyncio
 +    from collections.abc import AsyncGenerator
 +    from contextlib import asynccontextmanager
 +
 +    from django.http import StreamingHttpResponse
 +
 +
 +    @asynccontextmanager
 +    async def news_and_weather() -> AsyncGenerator[AsyncGenerator[bytes,
 None], None]:
 +        queue: asyncio.Queue[bytes | None] = asyncio.Queue(maxsize=1)
 +
 +        async def consume_ws(url: str) -> None:
 +            async with await connect_ws(url) as ws:
 +                async for msg in ws:
 +                    await queue.put(msg)
 +
 +        async def run_producers() -> None:
 +            async with asyncio.TaskGroup() as tg:
 +                tg.create_task(consume_ws("ws://news.example.com/feed"))
 +
 tg.create_task(consume_ws("ws://weather.example.com/feed"))
 +            await queue.put(None)  # sentinel: all producers finished
 +
 +        async def items() -> AsyncGenerator[bytes, None]:
 +            while (item := await queue.get()) is not None:
 +                yield item
 +
 +        async with asyncio.TaskGroup() as tg:
 +            tg.create_task(run_producers())
 +            yield items()
 +
 +
 +    async def news_and_weather_view(request):
 +        return StreamingHttpResponse(news_and_weather())
 +
 +Key points:
 +
 +* ``news_and_weather()`` is decorated with
 +  :func:`~contextlib.asynccontextmanager`. Django detects this via
 +  ``hasattr(value, "__aenter__")`` in ``_set_streaming_content`` and sets
 +  :attr:`~StreamingHttpResponse.is_acmgr` to ``True``.
 +
 +* The ``yield`` that suspends the frame happens inside
 +  ``@asynccontextmanager``, which correctly routes exceptions back to the
 +  ``TaskGroup`` cancel scope. The ``items()`` async generator yielded
 *out* to
 +  the caller never yields inside a cancel scope, so it is safe.
 +
 +* Middleware that wraps ``streaming_content`` (such as
 +  :class:`~django.middleware.gzip.GZipMiddleware`) is aware of the
 +  ``is_acmgr`` flag and preserves the context-manager protocol.
 +
 +* Attempting to iterate an ``is_acmgr`` response directly (via
 ``__iter__``,
 +  ``__aiter__``, or ``getvalue()``) raises
 +  :exc:`~django.http.IsAcmgrException`. The response must be consumed
 with
 +  ``async with``::
 +
 +      async with response as agen:
 +          async for chunk in agen:
 +              ...
 +
 +.. note::
 +
 +    The `anyio <https://anyio.readthedocs.io/>`_ library provides
 +    :func:`anyio.create_task_group` and
 :func:`anyio.create_memory_object_stream`,
 +    which can be used as an alternative to :class:`asyncio.TaskGroup` and
 +    :class:`asyncio.Queue`. A ``MemoryObjectReceiveStream`` is an async
 +    iterable and can be yielded directly from the context manager.
 +
 +.. attribute:: StreamingHttpResponse.is_acmgr
 +
 +    Boolean indicating whether the response was created with an async
 context
 +    manager as its streaming content. When ``True``,
 +    :attr:`~StreamingHttpResponse.is_async` is also ``True`` and the
 response
 +    must be consumed via ``async with``.
 +
  ``FileResponse`` objects
  ========================

 diff --git a/docs/releases/6.1.txt b/docs/releases/6.1.txt
 index 00eaf53..790782c 100644
 --- a/docs/releases/6.1.txt
 +++ b/docs/releases/6.1.txt
 @@ -323,6 +323,11 @@ Pagination
  Requests and Responses
  ~~~~~~~~~~~~~~~~~~~~~~

 +* :class:`~django.http.StreamingHttpResponse` now accepts an async
 context
 +  manager as ``streaming_content``, enabling streaming patterns that
 require
 +  :class:`asyncio.TaskGroup` or similar structured concurrency
 primitives. See
 +  :ref:`request-response-streaming-task-groups` for details.
 +
  * :attr:`HttpRequest.multipart_parser_class
 <django.http.HttpRequest.multipart_parser_class>`
    can now be customized to use a different multipart parser class.

 diff --git a/tests/asgi/urls.py b/tests/asgi/urls.py
 index 0311cf3..b7fa206 100644
 --- a/tests/asgi/urls.py
 +++ b/tests/asgi/urls.py
 @@ -1,4 +1,5 @@
  import asyncio
 +import contextlib
  import threading
  import time

 diff --git a/tests/httpwrappers/tests.py b/tests/httpwrappers/tests.py
 index 3e8364e..a2add61 100644
 --- a/tests/httpwrappers/tests.py
 +++ b/tests/httpwrappers/tests.py
 @@ -1,7 +1,10 @@
 +import asyncio
 +import contextlib
  import copy
  import json
  import os
  import pickle
 +import sys
  import unittest
  import uuid

 @@ -16,6 +19,7 @@ from django.http import (
      HttpResponseNotModified,
      HttpResponsePermanentRedirect,
      HttpResponseRedirect,
 +    IsAcmgrException,
      JsonResponse,
      QueryDict,
      SimpleCookie,
 @@ -808,6 +812,250 @@ class StreamingHttpResponseTests(SimpleTestCase):
          with self.assertWarnsMessage(Warning, msg):
              self.assertEqual(b"hello", await anext(aiter(r)))

 +    async def test_async_context_manager_streaming_response(self):
 +        """StreamingHttpResponse accepts an async context manager as
 content."""
 +
 +        @contextlib.asynccontextmanager
 +        async def acmgr():
 +            async def iterator():
 +                yield b"hello"
 +                yield b"world"
 +
 +            yield iterator()
 +
 +        r = StreamingHttpResponse(acmgr())
 +        self.assertTrue(r.is_acmgr)
 +        self.assertTrue(r.is_async)
 +
 +        chunks = []
 +        async with r as agen, contextlib.aclosing(agen) as content:
 +            async for chunk in content:
 +                chunks.append(chunk)
 +        self.assertEqual(chunks, [b"hello", b"world"])
 +
 +    async def test_acmgr_make_bytes(self):
 +        """Acmgr streaming content is coerced to bytes via make_bytes."""
 +
 +        @contextlib.asynccontextmanager
 +        async def acmgr():
 +            async def iterator():
 +                yield "hello"
 +                yield "café"
 +
 +            yield iterator()
 +
 +        r = StreamingHttpResponse(acmgr())
 +        chunks = []
 +        async with r as agen, contextlib.aclosing(agen) as content:
 +            async for chunk in content:
 +                chunks.append(chunk)
 +        self.assertEqual(chunks, [b"hello", b"caf\xc3\xa9"])
 +
 +    async def test_acmgr_iter_raises(self):
 +        """Iterating an acmgr response via __iter__ raises
 IsAcmgrException."""
 +
 +        @contextlib.asynccontextmanager
 +        async def acmgr():
 +            yield iter([b"hello"])
 +
 +        r = StreamingHttpResponse(acmgr())
 +        msg = (
 +            "StreamingHttpResponse must be consumed via `async with`. "
 +            "Use `async with response` and iterate the yielded content."
 +        )
 +        with self.assertRaisesMessage(IsAcmgrException, msg):
 +            iter(r)
 +
 +    async def test_acmgr_aiter_raises(self):
 +        """Iterating an acmgr response via __aiter__ raises
 IsAcmgrException."""
 +
 +        @contextlib.asynccontextmanager
 +        async def acmgr():
 +            yield iter([b"hello"])
 +
 +        r = StreamingHttpResponse(acmgr())
 +        msg = (
 +            "StreamingHttpResponse must be consumed via `async with`. "
 +            "Use `async with response` and iterate the yielded content."
 +        )
 +        with self.assertRaisesMessage(IsAcmgrException, msg):
 +            async for _ in r:
 +                pass
 +
 +    async def test_acmgr_getvalue_raises(self):
 +        """Calling getvalue() on an acmgr response raises
 IsAcmgrException."""
 +
 +        @contextlib.asynccontextmanager
 +        async def acmgr():
 +            yield iter([b"hello"])
 +
 +        r = StreamingHttpResponse(acmgr())
 +        with self.assertRaises(IsAcmgrException):
 +            r.getvalue()
 +
 +    async def test_acmgr_aexit_called_on_error(self):
 +        """The acmgr's __aexit__ is called even when iteration raises."""
 +        exited = []
 +
 +        @contextlib.asynccontextmanager
 +        async def acmgr():
 +            try:
 +                async def iterator():
 +                    yield b"hello"
 +                    raise ValueError("boom")
 +
 +                yield iterator()
 +            finally:
 +                exited.append(True)
 +
 +        r = StreamingHttpResponse(acmgr())
 +        with self.assertRaises(ValueError):
 +            async with r as agen, contextlib.aclosing(agen) as content:
 +                async for _ in content:
 +                    pass
 +        self.assertEqual(exited, [True])
 +
 +    async def test_acmgr_aexit_called_on_break(self):
 +        """The acmgr cleans up when the consumer breaks out early."""
 +        exited = []
 +
 +        @contextlib.asynccontextmanager
 +        async def acmgr():
 +            try:
 +                async def iterator():
 +                    yield b"hello"
 +                    yield b"world"
 +
 +                yield iterator()
 +            finally:
 +                exited.append(True)
 +
 +        r = StreamingHttpResponse(acmgr())
 +        async with r as agen, contextlib.aclosing(agen) as content:
 +            async for _ in content:
 +                break
 +        self.assertEqual(exited, [True])
 +
 +    async def test_acmgr_non_acmgr_aenter_returns_aiter(self):
 +        """__aenter__ on a non-acmgr async response returns
 aiter(self)."""
 +
 +        async def async_iter():
 +            yield b"hello"
 +
 +        r = StreamingHttpResponse(async_iter())
 +        self.assertFalse(r.is_acmgr)
 +        agen = await r.__aenter__()
 +        chunk = await agen.__anext__()
 +        self.assertEqual(chunk, b"hello")
 +        await r.__aexit__(None, None, None)
 +
 +    async def test_acmgr_reassign_streaming_content(self):
 +        """Assigning a new acmgr to streaming_content replaces the old
 one."""
 +
 +        @contextlib.asynccontextmanager
 +        async def acmgr1():
 +            async def it():
 +                yield b"first"
 +
 +            yield it()
 +
 +        @contextlib.asynccontextmanager
 +        async def acmgr2():
 +            async def it():
 +                yield b"second"
 +
 +            yield it()
 +
 +        r = StreamingHttpResponse(acmgr1())
 +        r.streaming_content = acmgr2()
 +        self.assertTrue(r.is_acmgr)
 +
 +        chunks = []
 +        async with r as agen, contextlib.aclosing(agen) as content:
 +            async for chunk in content:
 +                chunks.append(chunk)
 +        self.assertEqual(chunks, [b"second"])
 +
 +    @unittest.skipUnless(
 +        sys.version_info >= (3, 11), "asyncio.TaskGroup requires Python
 3.11+"
 +    )
 +    async def test_taskgroup_queue_streaming_response(self):
 +        """
 +        StreamingHttpResponse works with asyncio.TaskGroup and
 asyncio.Queue,
 +        the canonical pattern for background-task-driven streaming.
 +        """
 +
 +        @contextlib.asynccontextmanager
 +        async def taskgroup_stream():
 +            queue = asyncio.Queue()
 +
 +            async def producer():
 +                for chunk in [b"hello", b"world"]:
 +                    await queue.put(chunk)
 +                await queue.put(None)  # sentinel
 +
 +            async def generator():
 +                while True:
 +                    chunk = await queue.get()
 +                    if chunk is None:
 +                        break
 +                    yield chunk
 +
 +            async with asyncio.TaskGroup() as tg:
 +                tg.create_task(producer())
 +                yield generator()
 +
 +        r = StreamingHttpResponse(taskgroup_stream())
 +        self.assertTrue(r.is_acmgr)
 +
 +        chunks = []
 +        async with r as agen, contextlib.aclosing(agen) as content:
 +            async for chunk in content:
 +                chunks.append(chunk)
 +        self.assertEqual(chunks, [b"hello", b"world"])
 +
 +    @unittest.skipUnless(
 +        sys.version_info >= (3, 11), "asyncio.TaskGroup requires Python
 3.11+"
 +    )
 +    async def test_taskgroup_multiple_producers_streaming_response(self):
 +        """
 +        StreamingHttpResponse works with multiple producer tasks feeding
 a
 +        shared queue, as in the news-and-weather websocket fan-in
 pattern.
 +        """
 +
 +        @contextlib.asynccontextmanager
 +        async def multi_producer_stream():
 +            queue = asyncio.Queue(maxsize=1)
 +
 +            async def consume(source):
 +                """Simulate a websocket consumer pushing to the shared
 queue."""
 +                for msg in source:
 +                    await queue.put(msg)
 +
 +            async def run_producers():
 +                async with asyncio.TaskGroup() as tg:
 +                    tg.create_task(consume([b"news-1", b"news-2"]))
 +                    tg.create_task(consume([b"weather-1"]))
 +                await queue.put(None)  # sentinel after all producers
 finish
 +
 +            async def items():
 +                while (item := await queue.get()) is not None:
 +                    yield item
 +
 +            async with asyncio.TaskGroup() as tg:
 +                tg.create_task(run_producers())
 +                yield items()
 +
 +        r = StreamingHttpResponse(multi_producer_stream())
 +        self.assertTrue(r.is_acmgr)
 +
 +        chunks = []
 +        async with r as agen, contextlib.aclosing(agen) as content:
 +            async for chunk in content:
 +                chunks.append(chunk)
 +        # All three messages arrive (order depends on scheduling).
 +        self.assertEqual(sorted(chunks), [b"news-1", b"news-2",
 b"weather-1"])
 +
      def test_text_attribute_error(self):
          r = StreamingHttpResponse(iter(["hello", "world"]))
          msg = "This %s instance has no `text` attribute." %
 r.__class__.__name__
 diff --git a/tests/middleware/tests.py b/tests/middleware/tests.py
 index a61c4b1..4192ada 100644
 --- a/tests/middleware/tests.py
 +++ b/tests/middleware/tests.py
 @@ -1,3 +1,4 @@
 +import contextlib
  import gzip
  import random
  import re
 @@ -936,6 +937,37 @@ class GZipMiddlewareTest(SimpleTestCase):
          self.assertEqual(r.get("Content-Encoding"), "gzip")
          self.assertFalse(r.has_header("Content-Length"))

 +    async def test_compress_acmgr_streaming_response(self):
 +        """
 +        Compression is performed on responses with async context manager
 content.
 +        """
 +
 +        async def get_stream_response(request):
 +            @contextlib.asynccontextmanager
 +            async def acmgr():
 +                async def iterator():
 +                    for chunk in self.sequence:
 +                        yield chunk
 +
 +                yield iterator()
 +
 +            resp = StreamingHttpResponse(acmgr())
 +            resp["Content-Type"] = "text/html; charset=UTF-8"
 +            return resp
 +
 +        r = await GZipMiddleware(get_stream_response)(self.req)
 +        self.assertTrue(r.is_acmgr)
 +        chunks = []
 +        async with r as agen, contextlib.aclosing(agen) as content:
 +            async for chunk in content:
 +                chunks.append(chunk)
 +        self.assertEqual(
 +            self.decompress(b"".join(chunks)),
 +            b"".join(self.sequence),
 +        )
 +        self.assertEqual(r.get("Content-Encoding"), "gzip")
 +        self.assertFalse(r.has_header("Content-Length"))
 +
      def test_compress_streaming_response_unicode(self):
          """
          Compression is performed on responses with streaming Unicode
 content.
 }}}
-- 
Ticket URL: <https://code.djangoproject.com/ticket/36916#comment:9>
Django <https://code.djangoproject.com/>
The Web framework for perfectionists with deadlines.

-- 
You received this message because you are subscribed to the Google Groups 
"Django updates" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion visit 
https://groups.google.com/d/msgid/django-updates/0107019d6c486b11-b0fb0139-1f88-4fa5-8407-7111a2478a4d-000000%40eu-central-1.amazonses.com.

Reply via email to