Thanks for your reply Andrew. I've created a custom manage command called 
`websocket_producer.py` that is responsible for feeding the firehose of 
market data (price changes, market volatility etc).

It looks like this

from django.core.management.base import BaseCommand, CommandError


from lomond.websocket import WebSocket


import channels.layers
from asgiref.sync import async_to_sync




class Command(BaseCommand):


    def handle(self, *args, **kwargs):
        channel_layer = channels.layers.get_channel_layer()
        while True:
            ws = WebSocket('wss://<my-url>/ws')
            # this for loop is effectively infinite
            for event in ws:
                if event.name == "text":
                    data = event.json
                    async_to_sync(channel_layer.send)(
                        'abc',
                        {'type': 'abc', 'message': str(data)}
                    )



However, this can only run for 1 min before resulting in the following 
error:


Traceback (most recent call last):
  File "./manage.py", line 21, in <module>
    execute_from_command_line(sys.argv)
  File 
"/usr/local/lib/python3.6/site-packages/django/core/management/__init__.py", 
line 371, in execute_from_command_line
    utility.execute()
  File 
"/usr/local/lib/python3.6/site-packages/django/core/management/__init__.py", 
line 365, in execute
    self.fetch_command(subcommand).run_from_argv(self.argv)
  File 
"/usr/local/lib/python3.6/site-packages/django/core/management/base.py", 
line 288, in run_from_argv
    self.execute(*args, **cmd_options)
  File 
"/usr/local/lib/python3.6/site-packages/django/core/management/base.py", 
line 335, in execute
    output = self.handle(*args, **options)
  File "/app/savings_apps/accounts/management/commands/wsproducer.py", line 
22, in handle
    {'type': 'action_reports', 'message': str(data)}
  File "/usr/local/lib/python3.6/site-packages/asgiref/sync.py", line 64, in 
__call__
    return call_result.result()
  File "/usr/local/lib/python3.6/concurrent/futures/_base.py", line 398, in 
result
    return self.__get_result()
  File "/usr/local/lib/python3.6/concurrent/futures/_base.py", line 357, in 
__get_result
    raise self._exception
  File "/usr/local/lib/python3.6/site-packages/asgiref/sync.py", line 78, in 
main_wrap
    result = await self.awaitable(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/channels_redis/core.py", 
line 136, in send
    raise ChannelFull()
channels.exceptions.ChannelFull




Any ideas? Doing a real-time market data feed seems like it would be a 
pretty common use case, but I can't seem to find any prior art in Django 
Channels.


On Saturday, April 28, 2018 at 4:22:20 AM UTC-6, Andrew Godwin wrote:
>
> I can't help you with real-time streaming architecture overall - that's a 
> much bigger scope of thing - but I can say that you shouldn't be keeping a 
> synchronous consumer open like that (you're using a whole thread). You 
> should either rewrite it to be async-native, so it doesn't use up a thread 
> and potentially block the server, or rework it to put the feed events onto 
> the channel layer from an external process.
>
> Andrew
>
> On Sat, Apr 28, 2018 at 1:12 AM, Michael <writemicha...@gmail.com 
> <javascript:>> wrote:
>
>> Hi,
>>
>> What is the best way to architect a Django Channels app that provides a 
>> very fast infinite stream of market data? This is what I have so far, but I 
>> think it's not the best solution.
>>
>> This data is updated every millisecond so I would prefer to not persist 
>> it (unless there is a way of using redis pub/sub without actually saving 
>> the data, only for messaging)
>>
>>
>>
>>
>> class ChatConsumer(WebsocketConsumer):
>>     def connect(self):
>>         self.room_name = 'foo'
>>         self.room_group_name = 'foo'
>>         async_to_sync(self.channel_layer.group_add)(
>>             self.room_group_name,
>>             self.channel_name
>>         )
>>
>>
>>         self.accept()
>>         while True:
>>           # Imagine this is another WS feed or Zero MQ Feed.
>>           feed = Feed(....)
>>           for event in feed:
>>               if event.name == "text":
>>                   data = event.json
>>                   self.send(str(data)
>>
>> -- 
>> You received this message because you are subscribed to the Google Groups 
>> "Django users" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to django-users...@googlegroups.com <javascript:>.
>> To post to this group, send email to django...@googlegroups.com 
>> <javascript:>.
>> Visit this group at https://groups.google.com/group/django-users.
>> To view this discussion on the web visit 
>> https://groups.google.com/d/msgid/django-users/01bf458c-ff1a-4cf6-bd58-da9b2f43123c%40googlegroups.com
>>  
>> <https://groups.google.com/d/msgid/django-users/01bf458c-ff1a-4cf6-bd58-da9b2f43123c%40googlegroups.com?utm_medium=email&utm_source=footer>
>> .
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>

-- 
You received this message because you are subscribed to the Google Groups 
"Django users" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to django-users+unsubscr...@googlegroups.com.
To post to this group, send email to django-users@googlegroups.com.
Visit this group at https://groups.google.com/group/django-users.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/django-users/381ae5ed-95c4-4d0e-a194-5df291d77def%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to