Andrew, thank you for quick response!

Unfortunately I see something different. If I raise `StopConsumer` after 
processing a single message, the next one is lost. From the code 
https://github.com/django/asgiref/blob/master/asgiref/server.py#L75 I see 
that `get_or_create_application_instance` returns existing application 
instance, so when message arrives it is put into the queue of the existing 
application. If I raise `StopConsumer` this application got killed and the 
message is lost.

Another observation: when message handler awaits something (like in my 
snippet `await some_process()`) this application instance does not process 
new messages. When one message handler function exits - the next one starts.

Actually, I was offloading message processing to the threadpool (using `
sync_to_async`) and trying to limit the number of messages being processed 
at the same time. I hoped that if I await for some coroutine the worker 
will stop accepting messages, so other workers can process them until this 
one gets free again. Can you give me a clue how to achieve this bevaviour?


среда, 2 мая 2018 г., 18:39:01 UTC+3 пользователь Andrew Godwin написал:
>
>
>    - Will `MyConsumer` receive new `wakeup` messages while awaiting `
>    some_process`?
>    
> Yes. The worker server runs as many application instances as there are 
> messages coming in, even though they all have the same scope. You can see 
> the main listening loop here: 
> https://github.com/django/channels/blob/master/channels/worker.py#L32
>
>    - When do I need to raise `StopConsumer`? I can do it after each 
>    processing of `wakeup` message (like in the code above) is that 
>    correct? What will happen with all the `pending` messages in such case?
>
> You need to raise it when the specific application instance you have is 
> completed - because there's a different instance for each message, that 
> means "raise when you've finished processing the message". Nothing happens 
> to other messages as they're being handled by different instances.
>
> Andrew
>
> On Wed, May 2, 2018 at 7:50 AM Alexander Prokhorov <pro...@gmail.com 
> <javascript:>> wrote:
>
>> Dear Andrew,
>>
>> I would like to ask couple of questions about the lifecycle of consumers 
>> running in Channels workers and serving custom channels.
>>
>> Consider a consumer:
>>
>> # myconsumer.py
>> class MyConsumer(channels.consumer.AsyncConsumer):
>>     async def wakeup(self, message):
>>          await some_process()
>>          raise channels.exceptions.StopConsumer()
>>
>> which I "register" to process messages in the channel `my_channel`:
>>
>> # routing.py
>> application = channels.routing.ProtocolTypeRouter({
>>     'channel': channels.routing.ChannelNameRouter({
>>         'my_channel': MyConsumer
>>     })
>> })
>>
>> and eventually I run designated Channels worker to process `my_channel` 
>> messages.
>>
>> ./manage.py runworker my_channel
>>
>> So the questions are:
>>
>>    - Will `MyConsumer` receive new `wakeup` messages while awaiting `
>>    some_process`?
>>    - When do I need to raise `StopConsumer`? I can do it after each 
>>    processing of `wakeup` message (like in the code above) is that 
>>    correct? What will happen with all the `pending` messages in such 
>>    case?
>>
>> Actually, I do not raise "StopConsumer" in the implementation I 
>> currently have, but this leads to an issue with tests. In tests I need to 
>> somehow wait until all workers finish processing their messages. I tried 
>> calling `channels.testing.ApplicationCommunicator.wait()` but as I see it 
>> from the code it waits the application/consumer to finish, i.e. to raise 
>> `StopConsumer` exception. Probably you can share some recommendations. 
>> Thanks in advance.
>>
>> Best regards,
>> Alexander.
>>
>> -- 
>> You received this message because you are subscribed to the Google Groups 
>> "Django users" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to django-users...@googlegroups.com <javascript:>.
>> To post to this group, send email to django...@googlegroups.com 
>> <javascript:>.
>> Visit this group at https://groups.google.com/group/django-users.
>> To view this discussion on the web visit 
>> https://groups.google.com/d/msgid/django-users/00e6398b-a71f-4509-a95b-3ced88b26ee0%40googlegroups.com
>>  
>> <https://groups.google.com/d/msgid/django-users/00e6398b-a71f-4509-a95b-3ced88b26ee0%40googlegroups.com?utm_medium=email&utm_source=footer>
>> .
>> For more options, visit https://groups.google.com/d/optout.
>>
>

-- 
You received this message because you are subscribed to the Google Groups 
"Django users" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to django-users+unsubscr...@googlegroups.com.
To post to this group, send email to django-users@googlegroups.com.
Visit this group at https://groups.google.com/group/django-users.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/django-users/d991b5d9-5cb7-46f0-9fb9-749e65443122%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to