Skip to main content

Task 4.8: WebSocket & Real-Time APIs

Overview

This task implements WebSocket support for the SmartRAG system, enabling real-time bidirectional communication between the server and clients. The implementation provides real-time updates for query processing, system notifications, data ingestion progress, and schema changes.

Objectives

  • ✅ Implement WebSocket connection management for multiple concurrent clients
  • ✅ Create WebSocket endpoints for real-time query and system updates
  • ✅ Build subscription-based notification system
  • ✅ Support user-specific and broadcast messaging
  • ✅ Implement robust error handling and connection lifecycle management
  • ✅ Create comprehensive tests for WebSocket functionality
  • ✅ Document WebSocket API usage and integration patterns

Architecture

Connection Management

graph TB
subgraph "WebSocket Architecture"
Client1[Client 1] -->|WS Connection| WSEndpoint[WebSocket Endpoint]
Client2[Client 2] -->|WS Connection| WSEndpoint
Client3[Client 3] -->|WS Connection| WSEndpoint

WSEndpoint --> CM[Connection Manager]

CM --> CS[Connection Store]
CM --> SS[Subscription Store]
CM --> US[User Store]

subgraph "Message Broadcasting"
NS[Notification Service] --> CM
QP[Query Processor] --> NS
IS[Ingestion Service] --> NS
SM[Schema Manager] --> NS
end
end

Message Flow

sequenceDiagram
participant C as Client
participant WS as WebSocket Endpoint
participant CM as Connection Manager
participant NS as Notification Service
participant QP as Query Processor

C->>WS: Connect
WS->>CM: Register Connection
CM->>C: Connection Established

C->>WS: Subscribe to Updates
WS->>CM: Update Subscriptions
CM->>C: Subscription Confirmed

QP->>NS: Query Progress Update
NS->>CM: Broadcast Update
CM->>C: Query Update Message

C->>WS: Disconnect
WS->>CM: Unregister Connection

Implementation Details

1. Connection Manager

The ConnectionManager class handles all WebSocket connections:

class ConnectionManager:
"""Manages WebSocket connections and message broadcasting"""

async def connect(self, websocket: WebSocket, user_id: Optional[str] = None) -> UUID
async def disconnect(self, connection_id: UUID)
async def subscribe(self, connection_id: UUID, request: SubscriptionRequest)
async def broadcast_query_update(self, update: QueryUpdateMessage)
async def send_to_user(self, user_id: str, message: WebSocketMessage)

Key features:

  • Thread-safe connection management with asyncio locks
  • User-based connection tracking for targeted messaging
  • Subscription-based message filtering
  • Automatic cleanup of stale connections
  • Connection statistics and monitoring

2. WebSocket Endpoints

Two main endpoints provide real-time functionality:

/ws/query - Query Updates

  • Real-time query processing updates
  • Partial result streaming
  • Progress tracking
  • Error notifications

/ws/updates - System Updates

  • System-wide notifications
  • Data ingestion progress
  • Schema change alerts
  • Maintenance announcements

3. Message Types

The system supports various message types:

class MessageType(str, Enum):
CONNECTION_ESTABLISHED = "connection_established"
QUERY_UPDATE = "query_update"
SYSTEM_UPDATE = "system_update"
INGESTION_PROGRESS = "ingestion_progress"
SCHEMA_CHANGE = "schema_change"
ERROR = "error"

4. Subscription System

Clients can subscribe to specific update types:

class SubscriptionType(str, Enum):
QUERY_UPDATES = "query_updates"
SYSTEM_UPDATES = "system_updates"
INGESTION_PROGRESS = "ingestion_progress"
SCHEMA_CHANGES = "schema_changes"
ALL_UPDATES = "all_updates"

API Usage

Client Connection

// Connect to WebSocket
const ws = new WebSocket('ws://localhost:8000/api/v1/ws/updates?token=<jwt_token>');

ws.onopen = (event) => {
console.log('Connected to WebSocket');
};

ws.onmessage = (event) => {
const message = JSON.parse(event.data);
console.log('Received:', message);
};

Subscribing to Updates

// Subscribe to specific update types
ws.send(JSON.stringify({
action: "subscribe",
data: {
subscription_types: ["query_updates", "system_updates"],
filters: { user_id: "user123" }
}
}));

Handling Messages

ws.onmessage = (event) => {
const message = JSON.parse(event.data);

switch(message.type) {
case 'query_update':
handleQueryUpdate(message.data);
break;
case 'system_update':
handleSystemUpdate(message.data);
break;
case 'ingestion_progress':
handleIngestionProgress(message.data);
break;
// ... handle other message types
}
};

Integration Examples

Query Processing with Real-time Updates

from backend.api.services.websocket_notifications import websocket_notifications

async def process_query_with_updates(query_id: str, query_text: str):
# Notify query started
await websocket_notifications.send_query_update(
query_id=query_id,
status="processing",
progress=0.0
)

# Process query with progress updates
for step, progress in enumerate([0.2, 0.5, 0.8, 1.0]):
# ... processing logic ...

await websocket_notifications.send_query_update(
query_id=query_id,
status="processing",
progress=progress,
partial_result=f"Step {step + 1} complete..."
)

Data Ingestion Progress

async def ingest_documents_with_progress(job_id: str, documents: List[Document]):
total = len(documents)

for i, doc in enumerate(documents):
# Process document
await process_document(doc)

# Send progress update
await websocket_notifications.send_ingestion_progress(
job_id=job_id,
status="processing",
progress=(i + 1) / total,
documents_processed=i + 1,
total_documents=total
)

Testing

Unit Tests

The implementation includes comprehensive unit tests:

# Test connection management
async def test_connect(connection_manager, mock_websocket):
connection_id = await connection_manager.connect(mock_websocket)
assert connection_manager.active_connections == 1

# Test message broadcasting
async def test_broadcast_query_update(connection_manager, mock_websocket):
# Subscribe to updates
await connection_manager.subscribe(connection_id, request)

# Broadcast update
await connection_manager.broadcast_query_update(update)

# Verify message received
mock_websocket.send_json.assert_called_once()

Integration Tests

Integration tests verify end-to-end functionality:

def test_websocket_subscription(client):
with client.websocket_connect("/api/v1/ws/updates") as websocket:
# Test subscription flow
websocket.send_json({
"action": "subscribe",
"data": {"subscription_types": ["query_updates"]}
})

response = websocket.receive_json()
assert response["type"] == "subscription_created"

Security Considerations

  1. Authentication: WebSocket connections support JWT token authentication via query parameters
  2. Authorization: User-based message filtering ensures users only receive authorized updates
  3. Rate Limiting: Connection limits prevent resource exhaustion
  4. Input Validation: All incoming messages are validated before processing
  5. Error Handling: Graceful error handling prevents connection leaks

Performance Optimizations

  1. Connection Pooling: Efficient management of concurrent connections
  2. Message Batching: Multiple updates can be batched for efficiency
  3. Selective Broadcasting: Only send messages to subscribed connections
  4. Async Processing: Non-blocking message handling
  5. Stale Connection Cleanup: Automatic removal of inactive connections

Monitoring and Metrics

The WebSocket system exposes metrics for monitoring:

  • Active connection count
  • Subscription statistics by type
  • Message throughput rates
  • Error rates and types
  • Connection duration metrics

Future Enhancements

  1. Message Persistence: Store messages for offline clients
  2. Compression: Implement message compression for large payloads
  3. Clustering: Support WebSocket connections across multiple servers
  4. Binary Protocol: Support binary message format for efficiency
  5. GraphQL Subscriptions: Add GraphQL subscription support

Conclusion

The WebSocket implementation provides a robust foundation for real-time features in the SmartRAG system. It enables responsive user experiences through live updates while maintaining security, scalability, and reliability.