I Want To Create An Api of notification In Django Rest API ![Screenshot from 2023-11-30 03-30-39|690x388](upload://6RwAUFNKt3TWKgrocOu2t4k2TeJ.png) I Created AN Notification App By Using The Django Channels, Websockt and Successfully Sending Notification To The `Webscoketking.com`, Now I Want To Create The API of This App For Sending The Notifications Of the Website models.py ``` from django.db import models from django.contrib.auth.models import AbstractUser from channels.layers import get_channel_layer from asgiref.sync import async_to_sync import json # Create your models here.
class CustomUser(AbstractUser): """Model class for users""" USER_CHOICES = [ ('expert', 'Expert'), ('business', 'Business'), ('admin', 'Admin'), ] username = models.CharField(max_length=40, null=True, blank=True) full_name = models.CharField(max_length=40) email = models.EmailField(unique=True) password = models.CharField(max_length=40, null=False, blank=False) confirm_password = models.CharField(max_length=40, null=False, blank=False) user_choices = models.CharField(max_length=30, choices=USER_CHOICES, default='expert') USERNAME_FIELD = 'email' REQUIRED_FIELDS = ['username'] def __str__(self) -> str: return f"{self.email}" class Notification(models.Model): account_user = models.ForeignKey(CustomUser, on_delete=models.CASCADE) message = models.TextField() is_read = models.BooleanField(default=False) created_at = models.DateTimeField(auto_now_add=True) def save(self, *args, **kwargs): print('saving notification') channel_layer = get_channel_layer() notification_objs = Notification.objects.filter(is_read=False).count() data = {'count': notification_objs, 'current_notification': self.message} async_to_sync(channel_layer.group_send)( "test_group", { "type": "send_notification", "value": json.dumps(data) } ) super(Notification, self).save(*args, **kwargs) class AccountApproval(models.Model): account_user = models.ForeignKey(CustomUser, on_delete=models.CASCADE) approved = models.BooleanField(default=False) approval_message = models.TextField(blank=True, null=True) ``` COnsumers.py ``` from channels.generic.websocket import WebsocketConsumer from asgiref.sync import async_to_sync import json class TestConsumer(WebsocketConsumer): def connect(self): self.room_name = "test" self.room_group_name = "test_group" async_to_sync(self.channel_layer.group_add)( self.room_group_name, self.channel_name, ) self.accept() print("connected") self.send(text_data=json.dumps({'status': 'connected Through Django Channels'})) def receive(self, text_data): print(text_data) self.send(text_data=json.dumps({'status': 'We Got You'})) def disconneted(self, *args, **kwargs): print("disconnected") def send_notification(self, event): print('Send Notification: ') data = json.loads(event.get('value')) self.send(text_data=json.dumps({'status': data})) print('Send Notification: ') ``` asgi.py ``` """ ASGI config for hamza project. It exposes the ASGI callable as a module-level variable named ``application``. For more information on this file, see https://docs.djangoproject.com/en/4.2/howto/deployment/asgi/ """ from channels.routing import ProtocolTypeRouter, URLRouter from django.urls import path from bilal.consumers import TestConsumer import os from django.core.asgi import get_asgi_application os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'hamza.settings') asgiapplication = get_asgi_application() ws_pattern = [ path('ws/test/', TestConsumer.as_asgi()) ] application = ProtocolTypeRouter({ 'http': get_asgi_application(), 'websocket': URLRouter(ws_pattern) }) ``` -- You received this message because you are subscribed to the Google Groups "Django users" group. To unsubscribe from this group and stop receiving emails from it, send an email to django-users+unsubscr...@googlegroups.com. To view this discussion on the web visit https://groups.google.com/d/msgid/django-users/6e2c0bdf-b67f-4167-9236-ac310f1a4a52n%40googlegroups.com.