from channels.generic.websocket import WebsocketConsumer from django.shortcuts import get_object_or_404 from customercore.models import * import json from django.template.loader import render_to_string from asgiref.sync import async_to_sync from channels.generic.websocket import WebsocketConsumer from django.shortcuts import get_object_or_404 from customercore.models import * import json from django.template.loader import render_to_string from asgiref.sync import async_to_sync class TicketRoomConsumer(WebsocketConsumer): def connect(self): self.user = self.scope['user'] self.ticket_id = self.scope['url_route']['kwargs']['ticket_id'] self.ticket = get_object_or_404(Ticket, id=self.ticket_id) self.ticket_number = self.ticket.ticket_number async_to_sync(self.channel_layer.group_add)( self.ticket_number, self.channel_name ) self.accept() def disconnect(self, close_code): async_to_sync(self.channel_layer.group_discard)( self.ticket_number, self.channel_name ) def receive(self, text_data): text_data_json = json.loads(text_data) event_type = text_data_json.get('event_type') if event_type == 'typing': event = { 'type': 'typing_handler', 'user': self.scope['user'] } async_to_sync(self.channel_layer.group_send)( self.ticket_number, event ) elif event_type == 'stop_typing': event = { 'type': 'stop_typing_handler', } async_to_sync(self.channel_layer.group_send)( self.ticket_number, event ) else: body = text_data_json['description'] file_paths = text_data_json['filePath'] ticketupdate = TicketUpdate.objects.create( added_by=self.user, description=body, ticket=self.ticket, date_added=datetime.now() ) for file_path in file_paths: ticket_attachment = TicketAttachment( ticket_update=ticketupdate, file_path=file_path ) ticket_attachment.save() event = { 'type': 'update_handler', 'update_id': ticketupdate.id } async_to_sync(self.channel_layer.group_send)( self.ticket_number, event ) def update_handler(self, event): update_id = event['update_id'] update = TicketUpdate.objects.get(id=update_id) context = { 'update': update, 'user': self.user } html = render_to_string("details_templates/new-ticket-message.html", context=context) self.send(text_data=json.dumps({ 'event_type': 'update', 'html': html })) def typing_handler(self, event): context = { 'user': event['user'] } html = render_to_string("details_templates/typing-message.html", context=context) self.send(text_data=json.dumps({ 'event_type': 'typing', 'html': html })) def stop_typing_handler(self, event): self.send(text_data=json.dumps({ 'event_type': 'stop_typing' }))