I managed to resolve the cascading crash issue by doing some more asyncio reading and using: self.t1 = asyncio.run_coroutine_threadsafe(self.listen(), loop=loop)
instead of: t1 = asyncio.ensure_future(self.listen(), loop=loop) await t1 This makes it run concurrently with the main asyncio channels threads. The 'No handler..' error still shows up in the main websocket consumer, but it seems like it doesn't cause an irrecoverable crash AND it's not blocking! *phew*! Would be nice to fix it, but it's not urgent. Now I have an issue with initializing only one background worker versus the current channels implementation of initializing a new class for every routing call. Will post under a separate issue. On Monday, June 18, 2018 at 2:27:57 PM UTC-4, itsnate_b wrote: > > ....I am unable to determine the cause of the handler issue (assuming > something is crashing the channels websocket?). Could this be a redis queue > issue? Where events being passed through the channel layer are filling up a > queue and not being flushed correctly? I have no idea where to go from > here.... > > On Monday, June 18, 2018 at 12:05:26 PM UTC-4, itsnate_b wrote: >> >> Further to the message above - I moved everything into the consumers.py >> file and routed to the class inside of there. Still getting this error >> AFTER I get the print statement >> print('awaiting t1') >> >> I have confirmed that this occurs after this runs successfully: >> background worker sends a message to the group -> websocket consumer picks >> it up at the handler -> handler sends it to the browser -> >> browser/javascript recieves it and uses the data -> *error below* >> >> awaiting t1 >> >> ERROR:root:Exception inside application: No handler for message type >> backgroundWorkerUpdate >> >> File ".../lib/python3.6/site-packages/channels/consumer.py", line 54, >> in __call__ >> >> await await_many_dispatch([receive, self.channel_receive], >> self.dispatch) >> >> File ".../lib/python3.6/site-packages/channels/utils.py", line 50, in >> await_many_dispatch >> >> await dispatch(result) >> >> File "...lib/python3.6/site-packages/channels/consumer.py", line 69, in >> dispatch >> >> raise ValueError("No handler for message type %s" % message["type"]) >> >> No handler for message type backgroundWorkerUpdate >> >> >> On Monday, June 18, 2018 at 9:33:12 AM UTC-4, itsnate_b wrote: >>> >>> Hi Andrew, >>> Thank you for your quick reply. I use AsyncConsumer in the background >>> worker because it seems to route properly (i.e. without error). If I create >>> a new class, then I run into scope issues or simply a TypeError where the >>> 'backgroundConsumer object is not callable'. *The reason for all of >>> this is that when I originally had my external websocket code directly >>> inside consumer.py, it ended up blocking any incoming/outgoing >>> communication between the AsyncWebsocketConsumer and the browser - probably >>> because, as you said 'your listen() method is blocking...[because >>> it]....listens on it for messages forever'.* Since I am new to asyncio >>> development, I figured the best way was to run the external websocket in a >>> background worker, asynchronously, and then maintain my websocket consumer >>> for dynamic browser updates - which "works" but apparently not for so long. >>> Is there a better way to architect this that you suggest? By the way, >>> you've done a terrific job with channels... >>> >>> Here is my routing, so that it's clear what I am doing: >>> >>> *routing.py* >>> >>> from channels.auth import AuthMiddlewareStack >>> from channels.routing import ProtocolTypeRouter, URLRouter, >>> ChannelNameRouter >>> import dashboard.routing >>> from dashboard.backgroundUpdater import backgroundConsumer >>> >>> application = ProtocolTypeRouter({ >>> # (http->django views is added by default) >>> 'websocket': AuthMiddlewareStack( >>> URLRouter( >>> dashboard.routing.websocket_urlpatterns >>> ) >>> ), >>> 'channel': ChannelNameRouter({ >>> "toggle": backgroundConsumer, >>> }), >>> }) >>> >>> On Sunday, June 17, 2018 at 2:40:12 PM UTC-4, Andrew Godwin wrote: >>>> >>>> I'm not quite sure why you seem to be using an AsyncConsumer outside of >>>> the context of the Channels routing framework? >>>> >>>> Anyway, it sounds like from what you're describing that your listen() >>>> method is blocking, which makes sense, as it connects to an external >>>> websocket and listens on it for messages forever. Not sure why the >>>> crashing >>>> Channels websocket takes down the client one, but maybe they are both made >>>> to crash by some third, external, force? >>>> >>>> Andrew >>>> >>>> On Sun, Jun 17, 2018 at 11:33 AM Nathan Bluvol <nathan...@gmail.com> >>>> wrote: >>>> >>>>> I am using Django 2.0.5, Python 3.6.2, Django Channels 2.1.1, aiohttp >>>>> 3.1.3. >>>>> >>>>> Background: >>>>> I am connecting to an external websocket, asynchronously, using >>>>> aiohttp within a background channels worker. Once triggered, the socket >>>>> connection stays alive, listening/streaming, and sends incoming messages >>>>> to >>>>> a consumer through a group_send. The consumer receives it at the >>>>> appropriate handler, however, after some time, it appears that the >>>>> channels >>>>> websocket dies and the handler can no longer be found - I get 'No >>>>> handler for message type....' even though it has been working >>>>> successfully until that point. Second issue is that 'await t1' seems to >>>>> block in the background. I route the 'toggle' to backgroundUpdater.py. >>>>> >>>>> *consumers.py*: >>>>> >>>>> from channels.generic.websocket import AsyncWebsocketConsumer >>>>> import json >>>>> >>>>> >>>>> class MyConsumer(AsyncWebsocketConsumer): >>>>> >>>>> async def connect(self): >>>>> await self.channel_layer.group_add('myGroup', >>>>> self.channel_name) >>>>> await self.accept() >>>>> >>>>> async def disconnect(self, closeCode): >>>>> await self.channel_layer.group_discard('myGroup', >>>>> self.channel_name) >>>>> >>>>> async def receive(self, text_data): >>>>> eventData = json.loads(text_data) >>>>> print(eventData) >>>>> if eventData['myToggle'] == 'on': >>>>> await self.channel_layer.send('toggle',{"type": >>>>> "openConnection"}) >>>>> elif eventData['myToggle'] == 'off': >>>>> await self.channel_layer.send('toggle',{"type": >>>>> "closeConnection"}) >>>>> >>>>> async def backgroundWorkerUpdate(self, data): >>>>> await self.send(text_data=json.dumps(data['updateTime'])) >>>>> >>>>> *backgroundUpdater.py*: >>>>> >>>>> from channels.consumer import AsyncConsumer >>>>> from channels.layers import get_channel_layer >>>>> import asyncio >>>>> import aiohttp >>>>> import json >>>>> from dashboard.models import Info >>>>> import datetime >>>>> >>>>> >>>>> class backgroundConsumer(AsyncConsumer): >>>>> turnOnListener = True >>>>> channelLayer = get_channel_layer() >>>>> loadComplete = False >>>>> >>>>> async def listen(self): >>>>> infoList = Info.objects.filter(active=True) >>>>> subInfoList = [] >>>>> for i in infoList: >>>>> subInfoList.append('info'+str(t))\ >>>>> self.loadComplete = False >>>>> async with aiohttp.ClientSession() as session: >>>>> async with session.ws_connect('...streamingsiteURL.../ >>>>> socket.io/?EIO=3&transport=websocket', ssl=False) as ws: >>>>> await ws.send_str(str(subInfoList)) >>>>> async for msg in ws: >>>>> if msg.type == aiohttp.WSMsgType.TEXT: >>>>> if msg.data.startswith('2'): >>>>> await ws.send_str('3') >>>>> else: >>>>> await self.parseListenData(msg) >>>>> >>>>> async def parseListenData(self, msgContent): >>>>> # handles data in here, including a call to datetime >>>>> >>>>> updateMsg = {'type': 'backgroundWorkerUpdate', >>>>> 'updateTime': str(updateTime), >>>>> } >>>>> print('built update msg and now sending to group') >>>>> await self.channelLayer.group_send('myGroup', updateMsg) >>>>> >>>>> async def openConnection(self, event): >>>>> if(self.turnOnListener): >>>>> await self.channelLayer.group_add( >>>>> 'myGroup', >>>>> self.channel_name, >>>>> ) >>>>> self.turnOnListener = False >>>>> loop = asyncio.get_event_loop() >>>>> t1 = loop.create_task(self.listen()) >>>>> await t1 >>>>> print('awaiting t1') # THIS NEVER PRINTS UNTIL THE >>>>> WEBSOCKET IN consumers.py CRASHES >>>>> else: >>>>> print('already turned on listener') >>>>> >>>>> async def closeConnection(self, event): >>>>> print('in close connection') # CANNOT SEEM TO TRIGGER THIS >>>>> ONCE await t1 is called above... >>>>> if(not self.turnOnListener): >>>>> print('turn it off') >>>>> >>>>> *Questions*: >>>>> 1. Why is does this work for some time until the websocket in >>>>> consumers.py crashes? I am assuming it has crashed because of the missing >>>>> handler (i.e. 'No handler...' error). >>>>> 2. Why is the 'await t1' call blocking? When I try to send 'await >>>>> self.channel_layer.send('toggleMarket',{"type": "closeConnection"})', it >>>>> sends, but nothing seems to be received? >>>>> >>>>> I am have been trying to get this to run in different ways, however, I >>>>> cannot seem to resolve these two issues. Any help/suggestions would be >>>>> greatly appreciated. >>>>> >>>>> Thanks! >>>>> >>>>> -- >>>>> You received this message because you are subscribed to the Google >>>>> Groups "Django users" group. >>>>> To unsubscribe from this group and stop receiving emails from it, send >>>>> an email to django-users...@googlegroups.com. >>>>> To post to this group, send email to django...@googlegroups.com. >>>>> Visit this group at https://groups.google.com/group/django-users. >>>>> To view this discussion on the web visit >>>>> https://groups.google.com/d/msgid/django-users/67993058-4e84-4c11-8acd-fd29249a97d6%40googlegroups.com >>>>> >>>>> <https://groups.google.com/d/msgid/django-users/67993058-4e84-4c11-8acd-fd29249a97d6%40googlegroups.com?utm_medium=email&utm_source=footer> >>>>> . >>>>> For more options, visit https://groups.google.com/d/optout. >>>>> >>>> -- You received this message because you are subscribed to the Google Groups "Django users" group. To unsubscribe from this group and stop receiving emails from it, send an email to django-users+unsubscr...@googlegroups.com. To post to this group, send email to django-users@googlegroups.com. Visit this group at https://groups.google.com/group/django-users. To view this discussion on the web visit https://groups.google.com/d/msgid/django-users/a64d6753-99b3-4007-9f1b-f7e6b1c5bb69%40googlegroups.com. For more options, visit https://groups.google.com/d/optout.