import json
from channels.db import database_sync_to_async
from channels.generic.websocket import AsyncWebsocketConsumer
from users.models import User
from users.serializers import full_name
from .models import Message
[docs]
class ChatConsumer(AsyncWebsocketConsumer):
"""
Async WebSocket consumer for real-time chat between matched mentor-mentee pairs.
Manages the full WebSocket lifecycle — connection authentication, message
routing, typing indicators, read receipts, and graceful disconnection.
All database operations are wrapped with ``@database_sync_to_async`` to
avoid blocking the async event loop.
**Connection URL**::
ws://<host>/ws/chat/<partner_email>/
**Authentication:** relies on the Django session cookie attached to the
WebSocket handshake (``credentials: 'include'`` on the client). The
session is resolved by Django Channels' ``AuthMiddlewareStack``.
**Close codes:**
- ``4001`` — WebSocket closed because the user is not authenticated.
- ``4003`` — WebSocket closed because the user has no active match, or is
not matched with the requested partner.
**Inbound message types** (JSON sent from the client):
- ``chat_message`` — ``{"type": "chat_message", "content": "Hello!"}``
- ``typing`` — ``{"type": "typing", "is_typing": true}``
- ``mark_read`` — ``{"type": "mark_read"}``
**Outbound message types** (JSON pushed to the client):
- ``connection_established`` — Sent immediately after a successful handshake.
- ``chat_message`` — A new message broadcast to both participants.
- ``typing`` — Typing indicator forwarded to the other participant only.
- ``read_receipt`` — Acknowledgement that messages were read, forwarded to
the other participant only.
- ``force_disconnect`` — Sent before the server closes the connection (e.g.
after an unmatch).
- ``error`` — Sent to the originating client when a message cannot be
processed.
"""
[docs]
async def connect(self):
"""
Handle an incoming WebSocket connection request.
Performs the following checks in order before accepting:
1. **Authentication** — closes with code ``4001`` if the scope user is
absent or unauthenticated.
2. **Active match** — closes with code ``4003`` if the user has no
current match (role-aware: checks ``isMatched`` for mentees,
``current_mentees`` for mentors).
3. **Match relationship** — closes with code ``4003`` if the user is
not specifically matched with ``<partner_email>`` from the URL.
On success, joins the deterministic channel-layer group for the pair
(derived from sorted emails), accepts the connection, and sends a
``connection_established`` message.
:raises None: All error conditions result in ``self.close()`` rather
than raised exceptions.
"""
self.user = self.scope.get('user')
if not self.user or not hasattr(self.user, 'is_authenticated') or not self.user.is_authenticated:
await self.close(code=4001)
return
if self.user.role == 'MENTEE':
has_match = self.user.isMatched and bool(self.user.matchedMentorEmail)
elif self.user.role == 'MENTOR':
has_match = bool(self.user.current_mentees)
else:
has_match = False
if not has_match:
await self.close(code=4003)
return
self.partner_email = self.scope['url_route']['kwargs']['partner_email']
is_valid = await self.validate_match()
if not is_valid:
await self.close(code=4003)
return
emails = sorted([self.user.email, self.partner_email])
self.room_name = f"chat_{emails[0]}_{emails[1]}".replace('@', '_at_').replace('.', '_dot_')
self.room_group_name = self.room_name
await self.channel_layer.group_add(
self.room_group_name,
self.channel_name
)
await self.accept()
await self.send(text_data=json.dumps({
'type': 'connection_established',
'message': 'Connected to chat'
}))
[docs]
async def disconnect(self, close_code):
"""
Handle WebSocket disconnection.
Removes this channel from the room group so it no longer receives
broadcast events. Safe to call even if ``connect`` never completed
(guarded by ``hasattr`` check on ``room_group_name``).
:param close_code: The WebSocket close code sent by the client or
server. Not used internally but required by the Channels interface.
:type close_code: int
"""
if hasattr(self, 'room_group_name'):
await self.channel_layer.group_discard(
self.room_group_name,
self.channel_name
)
[docs]
async def receive(self, text_data):
"""
Route an incoming WebSocket frame to the appropriate handler.
Parses the JSON payload and dispatches on ``type``:
- ``'chat_message'`` → :meth:`handle_chat_message`
- ``'typing'`` → :meth:`handle_typing`
- ``'mark_read'`` → :meth:`handle_mark_read`
- anything else → :meth:`send_error` with ``'Unknown message type'``
JSON parse errors and unexpected handler exceptions are caught and
forwarded to the client via :meth:`send_error` rather than crashing
the consumer.
:param text_data: Raw text payload received from the WebSocket client.
:type text_data: str
"""
try:
data = json.loads(text_data)
message_type = data.get('type')
if message_type == 'chat_message':
await self.handle_chat_message(data)
elif message_type == 'typing':
await self.handle_typing(data)
elif message_type == 'mark_read':
await self.handle_mark_read()
else:
await self.send_error('Unknown message type')
except json.JSONDecodeError:
await self.send_error('Invalid JSON format')
except Exception as e:
await self.send_error(str(e))
[docs]
async def handle_chat_message(self, data):
"""
Validate, persist, and broadcast a chat message.
Validates that ``content`` is non-empty and does not exceed 5 000
characters, then delegates persistence to :meth:`save_message` and
broadcasts the result to all members of the room group via
``chat_message_broadcast``.
:param data: Parsed JSON payload from the client. Expected to contain
a ``'content'`` key with the message text.
:type data: dict
"""
content = data.get('content', '').strip()
if not content:
await self.send_error('Message content cannot be empty')
return
if len(content) > 5000:
await self.send_error('Message too long (max 5000 characters)')
return
message = await self.save_message(content)
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'chat_message_broadcast',
'message_id': str(message.pk),
'sender_email': self.user.email,
'sender_name': full_name(self.user),
'content': content,
'timestamp': message.timestamp.isoformat(),
}
)
[docs]
async def handle_typing(self, data):
"""
Broadcast a typing indicator to all room participants.
The broadcast is sent to the entire room group; filtering so that the
originating client does not receive its own indicator is handled in
:meth:`typing_broadcast`.
:param data: Parsed JSON payload from the client. Expected to contain
an ``'is_typing'`` boolean key. Defaults to ``False`` if absent.
:type data: dict
"""
is_typing = data.get('is_typing', False)
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'typing_broadcast',
'sender_email': self.user.email,
'is_typing': is_typing,
}
)
[docs]
async def handle_mark_read(self):
"""
Mark all unread messages from the partner as read and notify the room.
Delegates the database update to :meth:`mark_messages_read`, then
broadcasts a ``read_receipt_broadcast`` event to the room group.
Filtering so that the reader does not receive their own receipt is
handled in :meth:`read_receipt_broadcast`.
"""
count = await self.mark_messages_read()
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'read_receipt_broadcast',
'reader_email': self.user.email,
'count': count,
}
)
[docs]
async def chat_message_broadcast(self, event):
"""
Channel-layer event handler: forward a chat message to this WebSocket.
Called by the Channels layer when a ``chat_message_broadcast`` event
is dispatched to the room group. Forwards the payload to the
connected client as a ``chat_message`` JSON frame.
:param event: Channel-layer event dict containing ``message_id``,
``sender_email``, ``sender_name``, ``content``, and ``timestamp``.
:type event: dict
"""
await self.send(text_data=json.dumps({
'type': 'chat_message',
'message_id': event['message_id'],
'sender_email': event['sender_email'],
'sender_name': event['sender_name'],
'content': event['content'],
'timestamp': event['timestamp'],
}))
[docs]
async def typing_broadcast(self, event):
"""
Channel-layer event handler: forward a typing indicator to this WebSocket.
Called by the Channels layer when a ``typing_broadcast`` event is
dispatched to the room group. The indicator is suppressed for the
originating sender so that a user does not receive their own typing
status.
:param event: Channel-layer event dict containing ``sender_email`` and
``is_typing``.
:type event: dict
"""
if event['sender_email'] != self.user.email:
await self.send(text_data=json.dumps({
'type': 'typing',
'sender_email': event['sender_email'],
'is_typing': event['is_typing'],
}))
[docs]
async def read_receipt_broadcast(self, event):
"""
Channel-layer event handler: forward a read receipt to this WebSocket.
Called by the Channels layer when a ``read_receipt_broadcast`` event
is dispatched to the room group. The receipt is suppressed for the
reader themselves so that only the other participant is notified.
:param event: Channel-layer event dict containing ``reader_email`` and
``count``.
:type event: dict
"""
if event['reader_email'] != self.user.email:
await self.send(text_data=json.dumps({
'type': 'read_receipt',
'reader_email': event['reader_email'],
}))
[docs]
async def force_disconnect(self, event):
"""
Channel-layer event handler: notify the client and close the connection.
Called by the Channels layer when a ``force_disconnect`` event is
dispatched to the room group — for example, after two users are
unmatched. Sends a ``force_disconnect`` frame to the client before
closing so the frontend can display an appropriate message.
:param event: Channel-layer event dict optionally containing a
``'reason'`` string. Defaults to ``'disconnected'`` if absent.
:type event: dict
"""
await self.send(text_data=json.dumps({
'type': 'force_disconnect',
'reason': event.get('reason', 'disconnected'),
}))
await self.close()
[docs]
async def send_error(self, message):
"""
Send an error frame to the connected WebSocket client.
Used internally to surface validation failures and unexpected
exceptions without closing the connection, allowing the client to
recover and retry.
:param message: Human-readable error description to include in the
``'message'`` field of the outbound JSON frame.
:type message: str
"""
await self.send(text_data=json.dumps({
'type': 'error',
'message': message,
}))
[docs]
@database_sync_to_async
def validate_match(self):
"""
Verify that the session user and ``partner_email`` are actively matched.
Applies the same role-aware relationship rules used in
``matching/views.py`` and ``chat/views.py``:
- **MENTEE** — valid if ``isMatched`` is ``True``, ``matchedMentorEmail``
equals ``partner_email``, and the partner holds the ``MENTOR`` role.
- **MENTOR** — valid if ``partner_email`` is in ``current_mentees`` and
the partner holds the ``MENTEE`` role.
Wrapped with ``@database_sync_to_async`` so it can be safely awaited
from the async ``connect`` method without blocking the event loop.
:returns: ``True`` if the relationship is valid, ``False`` if the
partner does not exist or the match condition is not satisfied.
:rtype: bool
"""
try:
partner = User.objects.get(email=self.partner_email)
if self.user.role == 'MENTEE':
return (
self.user.isMatched and
self.user.matchedMentorEmail == self.partner_email and
partner.role == 'MENTOR'
)
elif self.user.role == 'MENTOR':
return (
self.partner_email in (self.user.current_mentees or []) and
partner.role == 'MENTEE'
)
return False
except User.DoesNotExist:
return False
[docs]
@database_sync_to_async
def save_message(self, content):
"""
Persist a new :class:`~chat.models.Message` to the database.
Wrapped with ``@database_sync_to_async`` so it can be safely awaited
from the async consumer without blocking the event loop.
:param content: Validated plain-text message body to store.
:type content: str
:returns: The newly created :class:`~chat.models.Message` instance.
:rtype: chat.models.Message
"""
return Message.objects.create(
sender_email=self.user.email,
receiver_email=self.partner_email,
content=content,
)
[docs]
@database_sync_to_async
def mark_messages_read(self):
"""
Bulk-mark all unread messages from the partner as read.
Delegates to :meth:`~chat.models.Message.mark_as_read`, which issues
a single ``UPDATE`` query. Wrapped with ``@database_sync_to_async``
so it can be safely awaited from the async consumer.
:returns: The number of :class:`~chat.models.Message` rows updated.
:rtype: int
"""
return Message.mark_as_read(
receiver_email=self.user.email,
sender_email=self.partner_email
)