I'm going to take a stab at this - with the warning that I'm extremely uncertain that I'm anywhere close to being correct, but I'm hoping that I'm at least close enough that it might give you some clues.
So where I think the issue may be is that you're trying to do a "self.send" from a worker task, not from a websocket task. If you look at the docs for the "Worker and Background Tasks", you'll see that to send an event you call "self.channel_layer.send" with a first parameter of the channel to which you wish to send the message. Since this worker task isn't running in the context of a websocket consumer, it doesn't automatically know to which channel a response should be sent. (I _know_ that my wording there is imprecise and probably inaccurate, but I think I'm sufficiently close for casual conversation.) If you want the worker task to send a message back to a client, what _I_ have done is create another method in my WebSocket consumer that takes a message and forwards it back out to the client. In thinking about it, I guess it may be possible that if I pass something (whatever that "something" might be) to the worker task, I might be able to send a message back out without going back through the consumer - again, that's a guess for something I haven't tried yet. Anyway, I hope this helps. Ken On Saturday, February 24, 2018 at 7:56:45 PM UTC-5, muha...@swordfish.co.za wrote: > > I'm still trying to find my way around channels 2.0, so I'm not sure if my > implementation is incorrect or if this is a valid issue. I'm going to post > as much info as possible in the hope that it will assist with finding the > problem. > > I have a single page app which opens a JS websocket connection - with > channels 1 I used to add a *session_key *to the querystring and that used > to handle the authentication. > > I see this is no longer the case, so I now have a custom middleware class > that sets the user object on the scope: > > from django.contrib.sessions.models import Session > > from users.models import User > > > class QueryAuthMiddleware: > > def __init__(self, inner): > # Store the ASGI application we were passed > self.inner = inner > > def __call__(self, scope): > # Look up user from query string (you should also do things like > # check it's a valid user ID, or if scope["user"] is already > populated) > if scope.get("user", None) is None: > session_obj = > Session.objects.get(session_key=scope["query_string"].decode("utf-8").split("=")[1]) > session_decoded = session_obj.get_decoded() > > scope["user"] = > User.objects.get(id=session_decoded.get("_auth_user_id")) > > # Return the inner application directly and let it run everything > else > return self.inner(scope) > > > > > This is in turn added to my routing (channels.py): > > from django.conf.urls import url > from django.conf import settings > from channels.routing import ProtocolTypeRouter, URLRouter, ChannelNameRouter > > from notifications.consumer import TestWebsocketConsumer, TestConsumer > from notifications.middleware.query_auth_middleware import QueryAuthMiddleware > > ROOT_PATH = "" if settings.DEBUG else "/ws/" > > > application = ProtocolTypeRouter({ > > "websocket": QueryAuthMiddleware( > URLRouter([ > url(f"^{ROOT_PATH}(?P<user_id>[-\w]+)/$", TestWebsocketConsumer), > > ]) > ), > > "channel": ChannelNameRouter({ > "user-notifications": TestConsumer, > }) > > > }) > > > > > > Here's my *consumers.py*: > > from asgiref.sync import async_to_sync > from channels.consumer import SyncConsumer > from channels.generic.websocket import WebsocketConsumer > > > class TestWebsocketConsumer(WebsocketConsumer): > def websocket_connect(self, message): > > async_to_sync(self.channel_layer.group_add)(str(self.scope["user"].id), > "user-notifications") > self.connect() > > > class TestConsumer(SyncConsumer): > def notification_handler(self, message): > > self.send( > { > "type": "websocket.send", > "text": message["text"] > } > ) > > > > > > > The idea of the app is that each user that logs in on the front end is > able to receive messages meant only for them sent by the back end. I have > been trying to test it like this: > > >>> channel_layer = get_channel_layer() > >>> async_to_sync(channel_layer.send)("user-notifications", {"type": > >>> "notification.handler", "text": "My Message"}) > > > > > Here's the traceback in the *runworker* output: > > 2018-02-25 02:34:14,002 - INFO - runworker - Running worker for channels > ['user-notifications'] > ERROR:root:Exception inside application: You must implement > application_send() > File > "/Users/muhammed/projects/xxx/lib/python3.6/site-packages/channels/consumer.py", > > line 54, in __call__ > await await_many_dispatch([receive, self.channel_receive], > self.dispatch) > File > "/Users/muhammed/projects/xxx/lib/python3.6/site-packages/channels/utils.py", > line 48, in await_many_dispatch > await dispatch(result) > File > "/Users/muhammed/projects/xxx/lib/python3.6/site-packages/asgiref/sync.py", > line 110, in __call__ > return await asyncio.wait_for(future, timeout=None) > File > "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py", > > line 333, in wait_for > return (yield from fut) > File > "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/concurrent/futures/thread.py", > > line 55, in run > result = self.fn(*self.args, **self.kwargs) > File > "/Users/muhammed/projects/xxx/lib/python3.6/site-packages/channels/db.py", > line 13, in thread_handler > return super().thread_handler(loop, *args, **kwargs) > File > "/Users/muhammed/projects/xxx/lib/python3.6/site-packages/asgiref/sync.py", > line 125, in thread_handler > return self.func(*args, **kwargs) > File > "/Users/muhammed/projects/xxx/lib/python3.6/site-packages/channels/consumer.py", > > line 99, in dispatch > handler(message) > File > "/Users/muhammed/projects/xxx/my-app/app/notifications/consumer.py", line > 18, in notification_handler > "text": message["text"] > File > "/Users/muhammed/projects/xxx/lib/python3.6/site-packages/channels/consumer.py", > > line 107, in send > self.base_send(message) > File > "/Users/muhammed/projects/xxx/lib/python3.6/site-packages/asgiref/sync.py", > line 64, in __call__ > return call_result.result() > File > "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/concurrent/futures/_base.py", > > line 405, in result > return self.__get_result() > File > "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/concurrent/futures/_base.py", > > line 357, in __get_result > raise self._exception > File > "/Users/muhammed/projects/xxx/lib/python3.6/site-packages/asgiref/sync.py", > line 78, in main_wrap > result = await self.awaitable(*args, **kwargs) > File > "/Users/muhammed/projects/xxx/lib/python3.6/site-packages/asgiref/server.py", > line 71, in application_send > raise NotImplementedError("You must implement application_send()") > You must implement application_send() > > > OS: MacOS High Sierra > Python: 3.6.1 > > Django==2.0.2 > channels==2.0.2 > channels-redis==2.1.0 > asgiref==2.1.6 > daphne==2.0.4 > > Config from *settings.py:* > > > > > > > > > > > > > > > *# -- Channels DetailsCHANNEL_LAYERS = { "default": { "BACKEND": > "channels_redis.core.RedisChannelLayer", "CONFIG": { "hosts": > [("localhost", 6379)] } }}ASGI_APPLICATION = > "myapp.settings.channels.application"* > > Any help would be appreciated. > > -- You received this message because you are subscribed to the Google Groups "Django users" group. To unsubscribe from this group and stop receiving emails from it, send an email to django-users+unsubscr...@googlegroups.com. To post to this group, send email to django-users@googlegroups.com. Visit this group at https://groups.google.com/group/django-users. To view this discussion on the web visit https://groups.google.com/d/msgid/django-users/c5c4bef6-0469-401a-9624-1eed4374c1cf%40googlegroups.com. For more options, visit https://groups.google.com/d/optout.