Source code for chat.consumers

import json

from channels.db import database_sync_to_async
from channels.generic.websocket import AsyncWebsocketConsumer

from users.models import User
from users.serializers import full_name

from .models import Message


[docs] class ChatConsumer(AsyncWebsocketConsumer): """ Async WebSocket consumer for real-time chat between matched mentor-mentee pairs. Manages the full WebSocket lifecycle — connection authentication, message routing, typing indicators, read receipts, and graceful disconnection. All database operations are wrapped with ``@database_sync_to_async`` to avoid blocking the async event loop. **Connection URL**:: ws://<host>/ws/chat/<partner_email>/ **Authentication:** relies on the Django session cookie attached to the WebSocket handshake (``credentials: 'include'`` on the client). The session is resolved by Django Channels' ``AuthMiddlewareStack``. **Close codes:** - ``4001`` — WebSocket closed because the user is not authenticated. - ``4003`` — WebSocket closed because the user has no active match, or is not matched with the requested partner. **Inbound message types** (JSON sent from the client): - ``chat_message`` — ``{"type": "chat_message", "content": "Hello!"}`` - ``typing`` — ``{"type": "typing", "is_typing": true}`` - ``mark_read`` — ``{"type": "mark_read"}`` **Outbound message types** (JSON pushed to the client): - ``connection_established`` — Sent immediately after a successful handshake. - ``chat_message`` — A new message broadcast to both participants. - ``typing`` — Typing indicator forwarded to the other participant only. - ``read_receipt`` — Acknowledgement that messages were read, forwarded to the other participant only. - ``force_disconnect`` — Sent before the server closes the connection (e.g. after an unmatch). - ``error`` — Sent to the originating client when a message cannot be processed. """
[docs] async def connect(self): """ Handle an incoming WebSocket connection request. Performs the following checks in order before accepting: 1. **Authentication** — closes with code ``4001`` if the scope user is absent or unauthenticated. 2. **Active match** — closes with code ``4003`` if the user has no current match (role-aware: checks ``isMatched`` for mentees, ``current_mentees`` for mentors). 3. **Match relationship** — closes with code ``4003`` if the user is not specifically matched with ``<partner_email>`` from the URL. On success, joins the deterministic channel-layer group for the pair (derived from sorted emails), accepts the connection, and sends a ``connection_established`` message. :raises None: All error conditions result in ``self.close()`` rather than raised exceptions. """ self.user = self.scope.get('user') if not self.user or not hasattr(self.user, 'is_authenticated') or not self.user.is_authenticated: await self.close(code=4001) return if self.user.role == 'MENTEE': has_match = self.user.isMatched and bool(self.user.matchedMentorEmail) elif self.user.role == 'MENTOR': has_match = bool(self.user.current_mentees) else: has_match = False if not has_match: await self.close(code=4003) return self.partner_email = self.scope['url_route']['kwargs']['partner_email'] is_valid = await self.validate_match() if not is_valid: await self.close(code=4003) return emails = sorted([self.user.email, self.partner_email]) self.room_name = f"chat_{emails[0]}_{emails[1]}".replace('@', '_at_').replace('.', '_dot_') self.room_group_name = self.room_name await self.channel_layer.group_add( self.room_group_name, self.channel_name ) await self.accept() await self.send(text_data=json.dumps({ 'type': 'connection_established', 'message': 'Connected to chat' }))
[docs] async def disconnect(self, close_code): """ Handle WebSocket disconnection. Removes this channel from the room group so it no longer receives broadcast events. Safe to call even if ``connect`` never completed (guarded by ``hasattr`` check on ``room_group_name``). :param close_code: The WebSocket close code sent by the client or server. Not used internally but required by the Channels interface. :type close_code: int """ if hasattr(self, 'room_group_name'): await self.channel_layer.group_discard( self.room_group_name, self.channel_name )
[docs] async def receive(self, text_data): """ Route an incoming WebSocket frame to the appropriate handler. Parses the JSON payload and dispatches on ``type``: - ``'chat_message'`` → :meth:`handle_chat_message` - ``'typing'`` → :meth:`handle_typing` - ``'mark_read'`` → :meth:`handle_mark_read` - anything else → :meth:`send_error` with ``'Unknown message type'`` JSON parse errors and unexpected handler exceptions are caught and forwarded to the client via :meth:`send_error` rather than crashing the consumer. :param text_data: Raw text payload received from the WebSocket client. :type text_data: str """ try: data = json.loads(text_data) message_type = data.get('type') if message_type == 'chat_message': await self.handle_chat_message(data) elif message_type == 'typing': await self.handle_typing(data) elif message_type == 'mark_read': await self.handle_mark_read() else: await self.send_error('Unknown message type') except json.JSONDecodeError: await self.send_error('Invalid JSON format') except Exception as e: await self.send_error(str(e))
[docs] async def handle_chat_message(self, data): """ Validate, persist, and broadcast a chat message. Validates that ``content`` is non-empty and does not exceed 5 000 characters, then delegates persistence to :meth:`save_message` and broadcasts the result to all members of the room group via ``chat_message_broadcast``. :param data: Parsed JSON payload from the client. Expected to contain a ``'content'`` key with the message text. :type data: dict """ content = data.get('content', '').strip() if not content: await self.send_error('Message content cannot be empty') return if len(content) > 5000: await self.send_error('Message too long (max 5000 characters)') return message = await self.save_message(content) await self.channel_layer.group_send( self.room_group_name, { 'type': 'chat_message_broadcast', 'message_id': str(message.pk), 'sender_email': self.user.email, 'sender_name': full_name(self.user), 'content': content, 'timestamp': message.timestamp.isoformat(), } )
[docs] async def handle_typing(self, data): """ Broadcast a typing indicator to all room participants. The broadcast is sent to the entire room group; filtering so that the originating client does not receive its own indicator is handled in :meth:`typing_broadcast`. :param data: Parsed JSON payload from the client. Expected to contain an ``'is_typing'`` boolean key. Defaults to ``False`` if absent. :type data: dict """ is_typing = data.get('is_typing', False) await self.channel_layer.group_send( self.room_group_name, { 'type': 'typing_broadcast', 'sender_email': self.user.email, 'is_typing': is_typing, } )
[docs] async def handle_mark_read(self): """ Mark all unread messages from the partner as read and notify the room. Delegates the database update to :meth:`mark_messages_read`, then broadcasts a ``read_receipt_broadcast`` event to the room group. Filtering so that the reader does not receive their own receipt is handled in :meth:`read_receipt_broadcast`. """ count = await self.mark_messages_read() await self.channel_layer.group_send( self.room_group_name, { 'type': 'read_receipt_broadcast', 'reader_email': self.user.email, 'count': count, } )
[docs] async def chat_message_broadcast(self, event): """ Channel-layer event handler: forward a chat message to this WebSocket. Called by the Channels layer when a ``chat_message_broadcast`` event is dispatched to the room group. Forwards the payload to the connected client as a ``chat_message`` JSON frame. :param event: Channel-layer event dict containing ``message_id``, ``sender_email``, ``sender_name``, ``content``, and ``timestamp``. :type event: dict """ await self.send(text_data=json.dumps({ 'type': 'chat_message', 'message_id': event['message_id'], 'sender_email': event['sender_email'], 'sender_name': event['sender_name'], 'content': event['content'], 'timestamp': event['timestamp'], }))
[docs] async def typing_broadcast(self, event): """ Channel-layer event handler: forward a typing indicator to this WebSocket. Called by the Channels layer when a ``typing_broadcast`` event is dispatched to the room group. The indicator is suppressed for the originating sender so that a user does not receive their own typing status. :param event: Channel-layer event dict containing ``sender_email`` and ``is_typing``. :type event: dict """ if event['sender_email'] != self.user.email: await self.send(text_data=json.dumps({ 'type': 'typing', 'sender_email': event['sender_email'], 'is_typing': event['is_typing'], }))
[docs] async def read_receipt_broadcast(self, event): """ Channel-layer event handler: forward a read receipt to this WebSocket. Called by the Channels layer when a ``read_receipt_broadcast`` event is dispatched to the room group. The receipt is suppressed for the reader themselves so that only the other participant is notified. :param event: Channel-layer event dict containing ``reader_email`` and ``count``. :type event: dict """ if event['reader_email'] != self.user.email: await self.send(text_data=json.dumps({ 'type': 'read_receipt', 'reader_email': event['reader_email'], }))
[docs] async def force_disconnect(self, event): """ Channel-layer event handler: notify the client and close the connection. Called by the Channels layer when a ``force_disconnect`` event is dispatched to the room group — for example, after two users are unmatched. Sends a ``force_disconnect`` frame to the client before closing so the frontend can display an appropriate message. :param event: Channel-layer event dict optionally containing a ``'reason'`` string. Defaults to ``'disconnected'`` if absent. :type event: dict """ await self.send(text_data=json.dumps({ 'type': 'force_disconnect', 'reason': event.get('reason', 'disconnected'), })) await self.close()
[docs] async def send_error(self, message): """ Send an error frame to the connected WebSocket client. Used internally to surface validation failures and unexpected exceptions without closing the connection, allowing the client to recover and retry. :param message: Human-readable error description to include in the ``'message'`` field of the outbound JSON frame. :type message: str """ await self.send(text_data=json.dumps({ 'type': 'error', 'message': message, }))
[docs] @database_sync_to_async def validate_match(self): """ Verify that the session user and ``partner_email`` are actively matched. Applies the same role-aware relationship rules used in ``matching/views.py`` and ``chat/views.py``: - **MENTEE** — valid if ``isMatched`` is ``True``, ``matchedMentorEmail`` equals ``partner_email``, and the partner holds the ``MENTOR`` role. - **MENTOR** — valid if ``partner_email`` is in ``current_mentees`` and the partner holds the ``MENTEE`` role. Wrapped with ``@database_sync_to_async`` so it can be safely awaited from the async ``connect`` method without blocking the event loop. :returns: ``True`` if the relationship is valid, ``False`` if the partner does not exist or the match condition is not satisfied. :rtype: bool """ try: partner = User.objects.get(email=self.partner_email) if self.user.role == 'MENTEE': return ( self.user.isMatched and self.user.matchedMentorEmail == self.partner_email and partner.role == 'MENTOR' ) elif self.user.role == 'MENTOR': return ( self.partner_email in (self.user.current_mentees or []) and partner.role == 'MENTEE' ) return False except User.DoesNotExist: return False
[docs] @database_sync_to_async def save_message(self, content): """ Persist a new :class:`~chat.models.Message` to the database. Wrapped with ``@database_sync_to_async`` so it can be safely awaited from the async consumer without blocking the event loop. :param content: Validated plain-text message body to store. :type content: str :returns: The newly created :class:`~chat.models.Message` instance. :rtype: chat.models.Message """ return Message.objects.create( sender_email=self.user.email, receiver_email=self.partner_email, content=content, )
[docs] @database_sync_to_async def mark_messages_read(self): """ Bulk-mark all unread messages from the partner as read. Delegates to :meth:`~chat.models.Message.mark_as_read`, which issues a single ``UPDATE`` query. Wrapped with ``@database_sync_to_async`` so it can be safely awaited from the async consumer. :returns: The number of :class:`~chat.models.Message` rows updated. :rtype: int """ return Message.mark_as_read( receiver_email=self.user.email, sender_email=self.partner_email )