Documentation Index
Fetch the complete documentation index at: https://docs.bookovia.com/llms.txt
Use this file to discover all available pages before exploring further.
Real-time Streaming Integration Guide
Build high-performance real-time streaming applications with Bookovia’s telematics platform. This guide covers WebSocket implementation, stream processing, real-time analytics, and scalable architecture patterns for live fleet monitoring and decision-making systems.Architecture Overview
Real-time System Components
A complete real-time streaming solution includes:- Data Ingestion - Vehicle sensors, GPS, and telemetry streams
- Stream Processing - Real-time data transformation and analysis
- Event Detection - Safety alerts, geofencing, and anomaly detection
- WebSocket Management - Client connection handling and load balancing
- State Management - Live data synchronization across clients
- Notification System - Push notifications and alert delivery
- Analytics Engine - Real-time metrics and dashboards
- Data Storage - Time-series data and event history
System Architecture
WebSocket Implementation
Server-Side WebSocket Management
- Node.js + Socket.IO
- Python + FastAPI + WebSocket
// server/websocket-server.js
import { Server as SocketIOServer } from 'socket.io';
import { createAdapter } from '@socket.io/redis-adapter';
import { createClient } from 'redis';
import Bookovia from '@bookovia/javascript-sdk';
export class RealtimeServer {
constructor(server, config) {
this.io = new SocketIOServer(server, {
cors: {
origin: config.allowedOrigins,
methods: ["GET", "POST"]
},
transports: ['websocket', 'polling']
});
this.bookovia = new Bookovia({ apiKey: config.bookoviaApiKey });
this.redis = createClient(config.redis);
this.connections = new Map();
this.subscriptions = new Map();
this.setupRedisAdapter();
this.setupEventHandlers();
this.initializeBookoviaStream();
}
setupRedisAdapter() {
// Enable horizontal scaling with Redis adapter
const pubClient = this.redis.duplicate();
const subClient = this.redis.duplicate();
this.io.adapter(createAdapter(pubClient, subClient));
}
setupEventHandlers() {
this.io.use(this.authenticateSocket.bind(this));
this.io.on('connection', (socket) => {
console.log(`Client connected: ${socket.id}`);
// Store connection metadata
this.connections.set(socket.id, {
userId: socket.userId,
organizationId: socket.organizationId,
connectedAt: new Date(),
lastActivity: new Date()
});
// Handle client events
socket.on('subscribe', this.handleSubscription.bind(this, socket));
socket.on('unsubscribe', this.handleUnsubscription.bind(this, socket));
socket.on('heartbeat', this.handleHeartbeat.bind(this, socket));
socket.on('disconnect', () => {
this.handleDisconnection(socket);
});
// Send initial data
this.sendInitialData(socket);
});
}
async authenticateSocket(socket, next) {
try {
const token = socket.handshake.auth.token;
const user = await this.validateToken(token);
socket.userId = user.id;
socket.organizationId = user.organizationId;
socket.permissions = user.permissions;
next();
} catch (error) {
next(new Error('Authentication failed'));
}
}
async handleSubscription(socket, subscriptionData) {
const { channels, filters = {} } = subscriptionData;
// Validate subscription permissions
if (!this.validateSubscriptionPermissions(socket, channels)) {
socket.emit('error', { message: 'Insufficient permissions for subscription' });
return;
}
// Store subscription
const subscriptionKey = `${socket.id}:${channels.join(':')}`;
this.subscriptions.set(subscriptionKey, {
socketId: socket.id,
channels,
filters,
subscribedAt: new Date()
});
// Join Socket.IO rooms for efficient broadcasting
channels.forEach(channel => {
const roomName = this.generateRoomName(channel, filters);
socket.join(roomName);
});
// Subscribe to Bookovia streams
await this.subscribeToBookoviaChannels(channels, filters);
socket.emit('subscription_confirmed', { channels, filters });
}
async initializeBookoviaStream() {
try {
this.bookoviaStream = await this.bookovia.streaming.connect();
this.bookoviaStream.onMessage((message) => {
this.processStreamMessage(message);
});
this.bookoviaStream.onError((error) => {
console.error('Bookovia stream error:', error);
this.handleStreamError(error);
});
this.bookoviaStream.onReconnect(() => {
console.log('Bookovia stream reconnected');
this.resubscribeToAllChannels();
});
} catch (error) {
console.error('Failed to initialize Bookovia stream:', error);
}
}
processStreamMessage(message) {
const { type, data } = message;
// Apply business logic and filters
const processedData = this.applyBusinessLogic(message);
// Determine which clients should receive this message
const targetRooms = this.determineTargetRooms(message);
// Broadcast to relevant clients
targetRooms.forEach(room => {
this.io.to(room).emit(type, processedData);
});
// Store in cache for late-joining clients
this.updateCache(message);
// Trigger any alert rules
this.checkAlertRules(message);
}
applyBusinessLogic(message) {
switch (message.type) {
case 'location_update':
return this.enrichLocationData(message.data);
case 'safety_event':
return this.processSafetyEvent(message.data);
case 'trip_update':
return this.processTripUpdate(message.data);
default:
return message.data;
}
}
enrichLocationData(locationData) {
return {
...locationData,
timestamp_server: new Date().toISOString(),
processed_at: Date.now(),
// Add reverse geocoding if needed
address: this.getCachedAddress(locationData.latitude, locationData.longitude),
// Add geofence status
geofence_status: this.checkGeofences(locationData)
};
}
processSafetyEvent(safetyData) {
// Calculate severity based on business rules
const enhancedSeverity = this.calculateEnhancedSeverity(safetyData);
// Add contextual information
const context = this.getSafetyContext(safetyData);
return {
...safetyData,
enhanced_severity: enhancedSeverity,
context,
alert_level: this.determineAlertLevel(enhancedSeverity),
recommended_actions: this.getRecommendedActions(safetyData)
};
}
checkAlertRules(message) {
const alertRules = this.getAlertRules(message.type);
alertRules.forEach(rule => {
if (this.evaluateRule(rule, message)) {
this.triggerAlert(rule, message);
}
});
}
async sendInitialData(socket) {
const connection = this.connections.get(socket.id);
try {
// Send cached recent data
const cachedData = await this.getCachedData(connection.organizationId);
socket.emit('initial_data', cachedData);
// Send current fleet status
const fleetStatus = await this.bookovia.fleet.getOverview({
organization_id: connection.organizationId
});
socket.emit('fleet_status', fleetStatus);
} catch (error) {
console.error('Failed to send initial data:', error);
socket.emit('error', { message: 'Failed to load initial data' });
}
}
}
// Usage
const server = new RealtimeServer(httpServer, {
allowedOrigins: ['http://localhost:3000', 'https://dashboard.company.com'],
bookoviaApiKey: process.env.BOOKOVIA_API_KEY,
redis: {
host: 'localhost',
port: 6379
}
});
# server/websocket_server.py
import asyncio
import json
from typing import Dict, Set, List
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends
from fastapi.middleware.cors import CORSMiddleware
import redis.asyncio as redis
import bookovia
class RealtimeServer:
def __init__(self, config):
self.app = FastAPI()
self.bookovia = bookovia.Client(config['bookovia_api_key'])
self.redis = redis.from_url(config['redis_url'])
self.connections: Dict[str, WebSocket] = {}
self.subscriptions: Dict[str, Dict] = {}
self.user_connections: Dict[str, Set[str]] = {}
self.setup_middleware()
self.setup_routes()
self.bookovia_stream = None
def setup_middleware(self):
self.app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Configure appropriately for production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def setup_routes(self):
@self.app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket, token: str = None):
await self.handle_websocket(websocket, token)
async def handle_websocket(self, websocket: WebSocket, token: str):
try:
# Authenticate user
user = await self.authenticate_token(token)
if not user:
await websocket.close(code=4001, reason="Authentication failed")
return
await websocket.accept()
# Store connection
connection_id = id(websocket)
self.connections[connection_id] = {
'websocket': websocket,
'user_id': user['id'],
'organization_id': user['organization_id'],
'permissions': user['permissions'],
'connected_at': asyncio.get_event_loop().time()
}
# Track user connections
if user['id'] not in self.user_connections:
self.user_connections[user['id']] = set()
self.user_connections[user['id']].add(connection_id)
# Send initial data
await self.send_initial_data(websocket, user['organization_id'])
# Handle messages
while True:
try:
data = await websocket.receive_text()
message = json.loads(data)
await self.handle_message(connection_id, message)
except WebSocketDisconnect:
break
except json.JSONDecodeError:
await websocket.send_text(json.dumps({
'type': 'error',
'message': 'Invalid JSON format'
}))
except Exception as e:
await websocket.send_text(json.dumps({
'type': 'error',
'message': str(e)
}))
except Exception as e:
print(f"WebSocket connection error: {e}")
finally:
await self.handle_disconnection(connection_id)
async def handle_message(self, connection_id: str, message: dict):
message_type = message.get('type')
if message_type == 'subscribe':
await self.handle_subscription(connection_id, message)
elif message_type == 'unsubscribe':
await self.handle_unsubscription(connection_id, message)
elif message_type == 'heartbeat':
await self.handle_heartbeat(connection_id)
else:
await self.send_to_connection(connection_id, {
'type': 'error',
'message': f'Unknown message type: {message_type}'
})
async def handle_subscription(self, connection_id: str, message: dict):
channels = message.get('channels', [])
filters = message.get('filters', {})
connection = self.connections.get(connection_id)
if not connection:
return
# Validate permissions
if not self.validate_subscription_permissions(connection, channels):
await self.send_to_connection(connection_id, {
'type': 'error',
'message': 'Insufficient permissions for subscription'
})
return
# Store subscription
subscription_key = f"{connection_id}:{':'.join(channels)}"
self.subscriptions[subscription_key] = {
'connection_id': connection_id,
'channels': channels,
'filters': filters,
'subscribed_at': asyncio.get_event_loop().time()
}
# Subscribe to Bookovia streams
await self.subscribe_to_bookovia_channels(channels, filters)
# Confirm subscription
await self.send_to_connection(connection_id, {
'type': 'subscription_confirmed',
'channels': channels,
'filters': filters
})
async def initialize_bookovia_stream(self):
"""Initialize connection to Bookovia streaming service"""
try:
self.bookovia_stream = await self.bookovia.streaming.connect()
async def message_handler(message):
await self.process_stream_message(message)
async def error_handler(error):
print(f"Bookovia stream error: {error}")
await self.handle_stream_error(error)
self.bookovia_stream.on_message(message_handler)
self.bookovia_stream.on_error(error_handler)
except Exception as e:
print(f"Failed to initialize Bookovia stream: {e}")
async def process_stream_message(self, message):
"""Process incoming message from Bookovia stream"""
message_type = message.get('type')
data = message.get('data')
# Apply business logic
processed_data = await self.apply_business_logic(message)
# Determine target connections
target_connections = self.determine_target_connections(message)
# Broadcast to relevant connections
await self.broadcast_to_connections(target_connections, {
'type': message_type,
'data': processed_data
})
# Cache for late-joining clients
await self.update_cache(message)
# Check alert rules
await self.check_alert_rules(message)
async def apply_business_logic(self, message):
"""Apply business logic transformations to stream data"""
message_type = message.get('type')
data = message.get('data')
if message_type == 'location_update':
return await self.enrich_location_data(data)
elif message_type == 'safety_event':
return await self.process_safety_event(data)
elif message_type == 'trip_update':
return await self.process_trip_update(data)
else:
return data
async def enrich_location_data(self, location_data):
"""Enrich location data with additional context"""
enriched = {
**location_data,
'timestamp_server': asyncio.get_event_loop().time(),
'processed_at': asyncio.get_event_loop().time()
}
# Add reverse geocoding if available
address = await self.get_cached_address(
location_data['latitude'],
location_data['longitude']
)
if address:
enriched['address'] = address
# Check geofences
geofence_status = await self.check_geofences(location_data)
enriched['geofence_status'] = geofence_status
return enriched
async def broadcast_to_connections(self, connection_ids: List[str], message: dict):
"""Broadcast message to multiple connections"""
if not connection_ids:
return
message_json = json.dumps(message)
# Send to all target connections concurrently
tasks = []
for connection_id in connection_ids:
if connection_id in self.connections:
websocket = self.connections[connection_id]['websocket']
tasks.append(websocket.send_text(message_json))
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
async def send_initial_data(self, websocket: WebSocket, organization_id: str):
"""Send initial data to newly connected client"""
try:
# Get cached recent data
cached_data = await self.get_cached_data(organization_id)
if cached_data:
await websocket.send_text(json.dumps({
'type': 'initial_data',
'data': cached_data
}))
# Get current fleet status
fleet_status = await self.bookovia.fleet.get_overview(
organization_id=organization_id
)
await websocket.send_text(json.dumps({
'type': 'fleet_status',
'data': fleet_status
}))
except Exception as e:
print(f"Failed to send initial data: {e}")
await websocket.send_text(json.dumps({
'type': 'error',
'message': 'Failed to load initial data'
}))
async def handle_disconnection(self, connection_id: str):
"""Handle client disconnection cleanup"""
if connection_id not in self.connections:
return
connection = self.connections[connection_id]
user_id = connection['user_id']
# Remove from connections
del self.connections[connection_id]
# Remove from user connections
if user_id in self.user_connections:
self.user_connections[user_id].discard(connection_id)
if not self.user_connections[user_id]:
del self.user_connections[user_id]
# Clean up subscriptions
subscriptions_to_remove = [
key for key in self.subscriptions.keys()
if key.startswith(f"{connection_id}:")
]
for key in subscriptions_to_remove:
del self.subscriptions[key]
print(f"Connection {connection_id} disconnected and cleaned up")
def get_app(self):
return self.app
# Usage
config = {
'bookovia_api_key': 'your_api_key',
'redis_url': 'redis://localhost:6379'
}
server = RealtimeServer(config)
app = server.get_app()
# Run with: uvicorn server.websocket_server:app --host 0.0.0.0 --port 8000
Client-Side WebSocket Management
- React Hook
- Vue.js Composable
// hooks/useRealtimeConnection.js
import { useState, useEffect, useRef, useCallback } from 'react';
import io from 'socket.io-client';
export const useRealtimeConnection = (config) => {
const [connectionStatus, setConnectionStatus] = useState('disconnected');
const [data, setData] = useState({});
const [error, setError] = useState(null);
const socketRef = useRef(null);
const reconnectAttemptsRef = useRef(0);
const subscriptionsRef = useRef(new Set());
const connect = useCallback(() => {
if (socketRef.current?.connected) return;
setConnectionStatus('connecting');
socketRef.current = io(config.url, {
auth: {
token: config.token
},
transports: ['websocket'],
reconnection: true,
reconnectionDelay: 1000,
reconnectionDelayMax: 5000,
maxReconnectionAttempts: 10
});
const socket = socketRef.current;
socket.on('connect', () => {
console.log('Connected to real-time server');
setConnectionStatus('connected');
setError(null);
reconnectAttemptsRef.current = 0;
// Resubscribe to all channels
resubscribeAll();
});
socket.on('disconnect', (reason) => {
console.log('Disconnected from real-time server:', reason);
setConnectionStatus('disconnected');
});
socket.on('connect_error', (err) => {
console.error('Connection error:', err);
setConnectionStatus('error');
setError(err.message);
reconnectAttemptsRef.current++;
});
// Handle different message types
socket.on('location_update', (locationData) => {
setData(prev => ({
...prev,
locations: {
...prev.locations,
[locationData.vehicle_id]: locationData
}
}));
});
socket.on('safety_event', (safetyEvent) => {
setData(prev => ({
...prev,
safetyEvents: [...(prev.safetyEvents || []), safetyEvent].slice(-100) // Keep last 100
}));
});
socket.on('trip_update', (tripUpdate) => {
setData(prev => ({
...prev,
trips: {
...prev.trips,
[tripUpdate.trip_id]: tripUpdate
}
}));
});
socket.on('fleet_status', (fleetStatus) => {
setData(prev => ({
...prev,
fleetStatus
}));
});
socket.on('initial_data', (initialData) => {
setData(initialData);
});
socket.on('error', (errorData) => {
console.error('Server error:', errorData);
setError(errorData.message);
});
socket.on('subscription_confirmed', (confirmation) => {
console.log('Subscription confirmed:', confirmation);
});
}, [config.url, config.token]);
const disconnect = useCallback(() => {
if (socketRef.current) {
socketRef.current.disconnect();
socketRef.current = null;
setConnectionStatus('disconnected');
subscriptionsRef.current.clear();
}
}, []);
const subscribe = useCallback((channels, filters = {}) => {
if (!socketRef.current?.connected) {
console.warn('Cannot subscribe: not connected');
return;
}
const subscriptionKey = JSON.stringify({ channels, filters });
subscriptionsRef.current.add(subscriptionKey);
socketRef.current.emit('subscribe', { channels, filters });
}, []);
const unsubscribe = useCallback((channels, filters = {}) => {
if (!socketRef.current?.connected) return;
const subscriptionKey = JSON.stringify({ channels, filters });
subscriptionsRef.current.delete(subscriptionKey);
socketRef.current.emit('unsubscribe', { channels, filters });
}, []);
const resubscribeAll = useCallback(() => {
subscriptionsRef.current.forEach(subscriptionKey => {
const { channels, filters } = JSON.parse(subscriptionKey);
socketRef.current.emit('subscribe', { channels, filters });
});
}, []);
const sendHeartbeat = useCallback(() => {
if (socketRef.current?.connected) {
socketRef.current.emit('heartbeat', { timestamp: Date.now() });
}
}, []);
// Auto-connect on mount
useEffect(() => {
connect();
return () => {
disconnect();
};
}, [connect, disconnect]);
// Heartbeat interval
useEffect(() => {
if (connectionStatus === 'connected') {
const heartbeatInterval = setInterval(sendHeartbeat, 30000); // 30 seconds
return () => clearInterval(heartbeatInterval);
}
}, [connectionStatus, sendHeartbeat]);
// Reconnect on token change
useEffect(() => {
if (socketRef.current && config.token) {
disconnect();
connect();
}
}, [config.token, connect, disconnect]);
return {
connectionStatus,
data,
error,
connect,
disconnect,
subscribe,
unsubscribe,
isConnected: connectionStatus === 'connected'
};
};
// Usage in component
const FleetDashboard = () => {
const { data, connectionStatus, subscribe, isConnected } = useRealtimeConnection({
url: 'ws://localhost:3000',
token: localStorage.getItem('authToken')
});
useEffect(() => {
if (isConnected) {
subscribe(['locations', 'safety_events'], {
organization_id: 'org_123'
});
}
}, [isConnected, subscribe]);
return (
<div>
<div>Status: {connectionStatus}</div>
<div>Vehicles: {Object.keys(data.locations || {}).length}</div>
<div>Safety Events: {(data.safetyEvents || []).length}</div>
</div>
);
};
// composables/useRealtime.js
import { ref, onMounted, onUnmounted, watch } from 'vue';
import io from 'socket.io-client';
export function useRealtime(config) {
const connectionStatus = ref('disconnected');
const data = ref({});
const error = ref(null);
let socket = null;
let reconnectAttempts = 0;
const subscriptions = new Set();
const connect = () => {
if (socket?.connected) return;
connectionStatus.value = 'connecting';
socket = io(config.url, {
auth: { token: config.token },
transports: ['websocket'],
reconnection: true,
reconnectionDelay: 1000,
reconnectionDelayMax: 5000,
maxReconnectionAttempts: 10
});
socket.on('connect', () => {
console.log('Connected to real-time server');
connectionStatus.value = 'connected';
error.value = null;
reconnectAttempts = 0;
resubscribeAll();
});
socket.on('disconnect', () => {
connectionStatus.value = 'disconnected';
});
socket.on('connect_error', (err) => {
connectionStatus.value = 'error';
error.value = err.message;
reconnectAttempts++;
});
// Message handlers
socket.on('location_update', (locationData) => {
data.value = {
...data.value,
locations: {
...data.value.locations,
[locationData.vehicle_id]: locationData
}
};
});
socket.on('safety_event', (safetyEvent) => {
const events = data.value.safetyEvents || [];
data.value = {
...data.value,
safetyEvents: [...events, safetyEvent].slice(-100)
};
});
socket.on('fleet_status', (fleetStatus) => {
data.value = {
...data.value,
fleetStatus
};
});
};
const disconnect = () => {
if (socket) {
socket.disconnect();
socket = null;
connectionStatus.value = 'disconnected';
subscriptions.clear();
}
};
const subscribe = (channels, filters = {}) => {
if (!socket?.connected) return;
const subscriptionKey = JSON.stringify({ channels, filters });
subscriptions.add(subscriptionKey);
socket.emit('subscribe', { channels, filters });
};
const unsubscribe = (channels, filters = {}) => {
if (!socket?.connected) return;
const subscriptionKey = JSON.stringify({ channels, filters });
subscriptions.delete(subscriptionKey);
socket.emit('unsubscribe', { channels, filters });
};
const resubscribeAll = () => {
subscriptions.forEach(subscriptionKey => {
const { channels, filters } = JSON.parse(subscriptionKey);
socket.emit('subscribe', { channels, filters });
});
};
// Auto-connect
onMounted(() => {
connect();
// Heartbeat interval
const heartbeatInterval = setInterval(() => {
if (socket?.connected) {
socket.emit('heartbeat', { timestamp: Date.now() });
}
}, 30000);
onUnmounted(() => {
clearInterval(heartbeatInterval);
disconnect();
});
});
// Reconnect on token change
watch(() => config.token, (newToken) => {
if (socket && newToken) {
disconnect();
connect();
}
});
return {
connectionStatus,
data,
error,
connect,
disconnect,
subscribe,
unsubscribe,
isConnected: computed(() => connectionStatus.value === 'connected')
};
}
Stream Processing
Real-time Data Pipeline
- Apache Kafka + Node.js
- Redis Streams + Python
// stream-processor/kafka-processor.js
import { Kafka } from 'kafkajs';
import Bookovia from '@bookovia/javascript-sdk';
export class StreamProcessor {
constructor(config) {
this.kafka = Kafka({
clientId: 'bookovia-stream-processor',
brokers: config.kafka.brokers
});
this.consumer = this.kafka.consumer({
groupId: 'bookovia-processors'
});
this.producer = this.kafka.producer();
this.bookovia = new Bookovia({ apiKey: config.bookoviaApiKey });
this.processors = new Map();
this.setupProcessors();
}
setupProcessors() {
// Location data processor
this.processors.set('location_data', new LocationProcessor());
// Safety event processor
this.processors.set('safety_events', new SafetyEventProcessor());
// Trip data processor
this.processors.set('trip_data', new TripProcessor());
// Geofence processor
this.processors.set('geofence_events', new GeofenceProcessor());
}
async start() {
await this.consumer.connect();
await this.producer.connect();
// Subscribe to Bookovia topics
await this.consumer.subscribe({
topics: [
'bookovia.locations',
'bookovia.safety-events',
'bookovia.trips',
'bookovia.geofences'
]
});
await this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
await this.processMessage(topic, message);
}
});
}
async processMessage(topic, kafkaMessage) {
try {
const messageData = JSON.parse(kafkaMessage.value.toString());
const processorKey = this.getProcessorKey(topic);
if (this.processors.has(processorKey)) {
const processor = this.processors.get(processorKey);
const processedData = await processor.process(messageData);
// Publish processed data to output topics
await this.publishProcessedData(processedData);
// Update real-time analytics
await this.updateAnalytics(processedData);
// Check for alerts
await this.checkAlerts(processedData);
}
} catch (error) {
console.error('Message processing error:', error);
}
}
async publishProcessedData(processedData) {
const messages = [];
// Publish to different topics based on data type
if (processedData.type === 'location_update') {
messages.push({
topic: 'processed.locations',
value: JSON.stringify(processedData)
});
}
if (processedData.type === 'safety_event') {
messages.push({
topic: 'processed.safety-events',
value: JSON.stringify(processedData)
});
// Also publish to alerts topic if critical
if (processedData.severity === 'critical') {
messages.push({
topic: 'alerts.critical',
value: JSON.stringify(processedData)
});
}
}
if (messages.length > 0) {
await this.producer.sendBatch({ topicMessages: messages });
}
}
async updateAnalytics(processedData) {
// Update real-time analytics aggregations
switch (processedData.type) {
case 'location_update':
await this.updateLocationAnalytics(processedData);
break;
case 'safety_event':
await this.updateSafetyAnalytics(processedData);
break;
case 'trip_update':
await this.updateTripAnalytics(processedData);
break;
}
}
async updateLocationAnalytics(locationData) {
const analytics = {
vehicle_id: locationData.vehicle_id,
timestamp: locationData.timestamp,
speed: locationData.speed,
distance_increment: locationData.distance_since_last,
fuel_consumption_rate: locationData.fuel_rate
};
// Update time-series database
await this.writeToTimeSeries('vehicle_analytics', analytics);
// Update Redis cache for real-time access
await this.updateRedisCache('vehicle_status', locationData.vehicle_id, {
last_location: locationData.location,
last_update: locationData.timestamp,
current_speed: locationData.speed
});
}
}
// Specialized processors
class LocationProcessor {
async process(locationData) {
// Validate location data
if (!this.isValidLocation(locationData)) {
throw new Error('Invalid location data');
}
// Calculate derived metrics
const enrichedData = {
...locationData,
distance_since_last: this.calculateDistance(locationData),
speed_change: this.calculateSpeedChange(locationData),
direction_change: this.calculateDirectionChange(locationData),
estimated_fuel_rate: this.estimateFuelConsumption(locationData)
};
// Detect anomalies
const anomalies = this.detectAnomalies(enrichedData);
if (anomalies.length > 0) {
enrichedData.anomalies = anomalies;
}
return {
type: 'location_update',
data: enrichedData,
processed_at: new Date().toISOString()
};
}
isValidLocation(data) {
return (
data.latitude >= -90 && data.latitude <= 90 &&
data.longitude >= -180 && data.longitude <= 180 &&
data.speed >= 0 && data.speed <= 300 // reasonable speed limit
);
}
detectAnomalies(locationData) {
const anomalies = [];
// Speed anomaly detection
if (locationData.speed > 120) { // > 120 km/h
anomalies.push({
type: 'high_speed',
severity: 'medium',
value: locationData.speed
});
}
// Sudden speed changes
if (Math.abs(locationData.speed_change) > 30) {
anomalies.push({
type: 'sudden_speed_change',
severity: 'high',
value: locationData.speed_change
});
}
return anomalies;
}
}
class SafetyEventProcessor {
async process(safetyData) {
// Enhance safety event with context
const enhancedEvent = {
...safetyData,
risk_score: this.calculateRiskScore(safetyData),
historical_context: await this.getHistoricalContext(safetyData),
weather_conditions: await this.getWeatherConditions(safetyData),
traffic_conditions: await this.getTrafficConditions(safetyData)
};
// Generate recommendations
enhancedEvent.recommendations = this.generateRecommendations(enhancedEvent);
return {
type: 'safety_event',
data: enhancedEvent,
processed_at: new Date().toISOString()
};
}
calculateRiskScore(safetyData) {
let riskScore = 0;
// Base risk by event type
const baseRiskMap = {
harsh_braking: 0.6,
harsh_acceleration: 0.5,
harsh_cornering: 0.7,
speeding: 0.4,
phone_usage: 0.8
};
riskScore += baseRiskMap[safetyData.type] || 0.3;
// Adjust for severity
const severityMultiplier = {
low: 0.5,
medium: 1.0,
high: 1.5,
critical: 2.0
};
riskScore *= severityMultiplier[safetyData.severity] || 1.0;
// Adjust for time of day (higher risk at night)
const eventHour = new Date(safetyData.timestamp).getHours();
if (eventHour >= 22 || eventHour <= 6) {
riskScore *= 1.3;
}
return Math.min(riskScore, 1.0); // Cap at 1.0
}
}
# stream_processor/redis_processor.py
import asyncio
import json
import redis.asyncio as redis
from datetime import datetime, timedelta
from typing import Dict, List, Any
import bookovia
class RedisStreamProcessor:
def __init__(self, config):
self.redis = redis.from_url(config['redis_url'])
self.bookovia = bookovia.Client(config['bookovia_api_key'])
self.processors = {
'location_data': LocationProcessor(),
'safety_events': SafetyEventProcessor(),
'trip_data': TripProcessor()
}
self.consumer_group = 'bookovia-processors'
self.consumer_name = f'processor-{datetime.now().timestamp()}'
async def start_processing(self):
"""Start processing streams from Redis"""
# Create consumer groups if they don't exist
await self.create_consumer_groups()
# Start processing tasks
tasks = [
self.process_stream('bookovia:locations'),
self.process_stream('bookovia:safety-events'),
self.process_stream('bookovia:trips')
]
await asyncio.gather(*tasks)
async def create_consumer_groups(self):
"""Create consumer groups for each stream"""
streams = ['bookovia:locations', 'bookovia:safety-events', 'bookovia:trips']
for stream in streams:
try:
await self.redis.xgroup_create(
stream, self.consumer_group, id='0', mkstream=True
)
except redis.ResponseError as e:
if "BUSYGROUP" not in str(e):
print(f"Error creating consumer group for {stream}: {e}")
async def process_stream(self, stream_name: str):
"""Process messages from a specific Redis stream"""
while True:
try:
# Read messages from stream
messages = await self.redis.xreadgroup(
self.consumer_group,
self.consumer_name,
{stream_name: '>'},
count=10,
block=1000
)
for stream, stream_messages in messages:
for message_id, fields in stream_messages:
await self.process_message(
stream.decode(), message_id.decode(), fields
)
# Acknowledge message processing
await self.redis.xack(
stream_name, self.consumer_group, message_id
)
except Exception as e:
print(f"Error processing stream {stream_name}: {e}")
await asyncio.sleep(1)
async def process_message(self, stream_name: str, message_id: str, fields: dict):
"""Process individual message from stream"""
try:
# Decode message data
message_data = json.loads(fields[b'data'].decode())
# Determine processor type
processor_type = self.get_processor_type(stream_name)
if processor_type in self.processors:
processor = self.processors[processor_type]
processed_data = await processor.process(message_data)
# Publish processed data
await self.publish_processed_data(processed_data)
# Update analytics
await self.update_real_time_analytics(processed_data)
# Check alerts
await self.check_alert_conditions(processed_data)
except Exception as e:
print(f"Error processing message {message_id}: {e}")
async def publish_processed_data(self, processed_data: dict):
"""Publish processed data to output streams"""
output_stream = f"processed:{processed_data['type']}"
await self.redis.xadd(
output_stream,
{
'data': json.dumps(processed_data),
'processed_at': datetime.now().isoformat()
}
)
# Also publish to WebSocket clients via Redis pub/sub
channel = f"websocket:{processed_data['type']}"
await self.redis.publish(channel, json.dumps(processed_data))
async def update_real_time_analytics(self, processed_data: dict):
"""Update real-time analytics aggregations"""
if processed_data['type'] == 'location_update':
await self.update_location_analytics(processed_data['data'])
elif processed_data['type'] == 'safety_event':
await self.update_safety_analytics(processed_data['data'])
async def update_location_analytics(self, location_data: dict):
"""Update location-based analytics"""
vehicle_id = location_data['vehicle_id']
current_time = datetime.now()
# Update current vehicle status
await self.redis.hset(
f"vehicle:status:{vehicle_id}",
mapping={
'last_latitude': location_data['latitude'],
'last_longitude': location_data['longitude'],
'last_speed': location_data['speed'],
'last_update': current_time.isoformat()
}
)
# Update hourly distance aggregation
hour_key = current_time.strftime('%Y-%m-%d-%H')
distance_key = f"analytics:distance:{vehicle_id}:{hour_key}"
await self.redis.incrbyfloat(
distance_key,
location_data.get('distance_increment', 0)
)
await self.redis.expire(distance_key, 86400 * 7) # Keep for 7 days
# Update speed histogram
speed_bucket = int(location_data['speed'] // 10) * 10
speed_key = f"analytics:speed:{vehicle_id}:{hour_key}:{speed_bucket}"
await self.redis.incr(speed_key)
await self.redis.expire(speed_key, 86400 * 7)
async def check_alert_conditions(self, processed_data: dict):
"""Check if processed data triggers any alert conditions"""
if processed_data['type'] == 'safety_event':
safety_data = processed_data['data']
# Check for critical safety events
if safety_data['severity'] == 'critical':
await self.trigger_alert({
'type': 'critical_safety_event',
'vehicle_id': safety_data['vehicle_id'],
'driver_id': safety_data['driver_id'],
'event_type': safety_data['event_type'],
'location': safety_data['location'],
'timestamp': safety_data['timestamp']
})
# Check for multiple events in short time
await self.check_event_frequency(safety_data)
async def trigger_alert(self, alert_data: dict):
"""Trigger alert through various channels"""
# Publish to alert stream
await self.redis.xadd(
'alerts:critical',
{
'data': json.dumps(alert_data),
'triggered_at': datetime.now().isoformat()
}
)
# Publish to WebSocket for immediate notification
await self.redis.publish(
'websocket:alert',
json.dumps({
'type': 'alert',
'data': alert_data
})
)
# Could also integrate with external notification services
# await self.send_sms_alert(alert_data)
# await self.send_email_alert(alert_data)
class LocationProcessor:
async def process(self, location_data: dict) -> dict:
"""Process location data and add derived metrics"""
# Validate location data
if not self.is_valid_location(location_data):
raise ValueError("Invalid location data")
# Calculate derived metrics
enriched_data = {
**location_data,
'distance_increment': await self.calculate_distance_increment(location_data),
'acceleration': await self.calculate_acceleration(location_data),
'heading_change': await self.calculate_heading_change(location_data),
'estimated_fuel_consumption': self.estimate_fuel_consumption(location_data)
}
# Add geospatial context
enriched_data['geospatial_context'] = await self.get_geospatial_context(
location_data['latitude'], location_data['longitude']
)
return {
'type': 'location_update',
'data': enriched_data,
'processing_timestamp': datetime.now().isoformat()
}
def is_valid_location(self, data: dict) -> bool:
"""Validate location data"""
return (
-90 <= data.get('latitude', 0) <= 90 and
-180 <= data.get('longitude', 0) <= 180 and
0 <= data.get('speed', 0) <= 300
)
async def calculate_distance_increment(self, location_data: dict) -> float:
"""Calculate distance traveled since last location update"""
vehicle_id = location_data['vehicle_id']
# Get previous location from Redis
prev_location = await self.redis.hgetall(f"vehicle:last_location:{vehicle_id}")
if not prev_location:
return 0.0
# Calculate distance using Haversine formula
distance = self.haversine_distance(
float(prev_location[b'latitude']),
float(prev_location[b'longitude']),
location_data['latitude'],
location_data['longitude']
)
# Store current location for next calculation
await self.redis.hset(
f"vehicle:last_location:{vehicle_id}",
mapping={
'latitude': location_data['latitude'],
'longitude': location_data['longitude'],
'timestamp': location_data['timestamp']
}
)
return distance
def haversine_distance(self, lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Calculate distance between two points using Haversine formula"""
import math
R = 6371 # Earth's radius in kilometers
lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2])
dlat = lat2 - lat1
dlon = lon2 - lon1
a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2
c = 2 * math.asin(math.sqrt(a))
return R * c
class SafetyEventProcessor:
async def process(self, safety_data: dict) -> dict:
"""Process safety event data"""
# Calculate enhanced risk score
risk_score = self.calculate_risk_score(safety_data)
# Get contextual information
context = await self.get_safety_context(safety_data)
# Generate recommendations
recommendations = self.generate_recommendations(safety_data, risk_score)
enhanced_data = {
**safety_data,
'enhanced_risk_score': risk_score,
'context': context,
'recommendations': recommendations,
'impact_assessment': self.assess_impact(safety_data, risk_score)
}
return {
'type': 'safety_event',
'data': enhanced_data,
'processing_timestamp': datetime.now().isoformat()
}
def calculate_risk_score(self, safety_data: dict) -> float:
"""Calculate enhanced risk score for safety event"""
base_scores = {
'harsh_braking': 0.7,
'harsh_acceleration': 0.6,
'harsh_cornering': 0.8,
'speeding': 0.5,
'phone_usage': 0.9
}
base_score = base_scores.get(safety_data['type'], 0.5)
# Adjust for magnitude
magnitude = safety_data.get('magnitude', 1.0)
magnitude_factor = min(magnitude / 5.0, 2.0) # Cap at 2x
# Adjust for time of day
hour = datetime.fromisoformat(safety_data['timestamp']).hour
time_factor = 1.3 if 22 <= hour or hour <= 6 else 1.0
# Adjust for weather conditions (if available)
weather_factor = safety_data.get('weather_impact', 1.0)
final_score = base_score * magnitude_factor * time_factor * weather_factor
return min(final_score, 1.0)
Real-time Analytics
Live Aggregations and Metrics
- JavaScript + InfluxDB
// analytics/real-time-analytics.js
import { InfluxDB, Point } from '@influxdata/influxdb-client';
import { WebSocket } from 'ws';
export class RealTimeAnalytics {
constructor(config) {
this.influxDB = new InfluxDB({
url: config.influxdb.url,
token: config.influxdb.token
});
this.writeApi = this.influxDB.getWriteApi(
config.influxdb.org,
config.influxdb.bucket
);
this.queryApi = this.influxDB.getQueryApi(config.influxdb.org);
this.aggregators = new Map();
this.setupAggregators();
this.clientConnections = new Set();
}
setupAggregators() {
// Fleet-wide aggregators
this.aggregators.set('fleet_summary', new FleetSummaryAggregator());
this.aggregators.set('safety_metrics', new SafetyMetricsAggregator());
this.aggregators.set('efficiency_metrics', new EfficiencyAggregator());
// Vehicle-specific aggregators
this.aggregators.set('vehicle_performance', new VehiclePerformanceAggregator());
this.aggregators.set('driver_behavior', new DriverBehaviorAggregator());
}
async processLocationUpdate(locationData) {
// Write raw data to InfluxDB
const point = new Point('vehicle_location')
.tag('vehicle_id', locationData.vehicle_id)
.tag('driver_id', locationData.driver_id)
.floatField('latitude', locationData.latitude)
.floatField('longitude', locationData.longitude)
.floatField('speed', locationData.speed)
.floatField('heading', locationData.heading)
.timestamp(new Date(locationData.timestamp));
this.writeApi.writePoint(point);
// Update real-time aggregations
await this.updateAggregations('location', locationData);
// Broadcast to connected clients
await this.broadcastUpdate('location_metrics', await this.getCurrentMetrics());
}
async processSafetyEvent(safetyData) {
// Write safety event to InfluxDB
const point = new Point('safety_event')
.tag('vehicle_id', safetyData.vehicle_id)
.tag('driver_id', safetyData.driver_id)
.tag('event_type', safetyData.type)
.tag('severity', safetyData.severity)
.floatField('magnitude', safetyData.magnitude)
.floatField('risk_score', safetyData.risk_score)
.timestamp(new Date(safetyData.timestamp));
this.writeApi.writePoint(point);
// Update safety metrics
await this.updateAggregations('safety', safetyData);
// Broadcast safety update
await this.broadcastUpdate('safety_metrics', await this.getSafetyMetrics());
}
async updateAggregations(dataType, data) {
const updatePromises = [];
for (const [name, aggregator] of this.aggregators) {
if (aggregator.shouldProcess(dataType)) {
updatePromises.push(aggregator.process(data));
}
}
await Promise.all(updatePromises);
}
async getCurrentMetrics() {
const fleetSummary = this.aggregators.get('fleet_summary');
const safetyMetrics = this.aggregators.get('safety_metrics');
const efficiencyMetrics = this.aggregators.get('efficiency_metrics');
return {
fleet: await fleetSummary.getCurrentSummary(),
safety: await safetyMetrics.getCurrentMetrics(),
efficiency: await efficiencyMetrics.getCurrentMetrics(),
timestamp: new Date().toISOString()
};
}
async getSafetyMetrics() {
const safetyAggregator = this.aggregators.get('safety_metrics');
return await safetyAggregator.getCurrentMetrics();
}
async getVehicleMetrics(vehicleId, timeRange = '1h') {
const query = `
from(bucket: "bookovia")
|> range(start: -${timeRange})
|> filter(fn: (r) => r._measurement == "vehicle_location")
|> filter(fn: (r) => r.vehicle_id == "${vehicleId}")
|> aggregateWindow(every: 1m, fn: mean)
`;
const results = [];
const queryResult = this.queryApi.iterateRows(query);
for await (const { values, tableMeta } of queryResult) {
const row = tableMeta.toObject(values);
results.push(row);
}
return this.transformVehicleMetrics(results);
}
transformVehicleMetrics(rawData) {
const metrics = {
distance_traveled: 0,
average_speed: 0,
max_speed: 0,
idle_time: 0,
efficiency_score: 0
};
if (rawData.length === 0) return metrics;
// Calculate metrics from time-series data
let totalSpeed = 0;
let maxSpeed = 0;
let idleCount = 0;
for (let i = 0; i < rawData.length; i++) {
const point = rawData[i];
const speed = point.speed || 0;
totalSpeed += speed;
maxSpeed = Math.max(maxSpeed, speed);
if (speed < 5) idleCount++; // Consider < 5 km/h as idle
// Calculate distance between consecutive points
if (i > 0) {
const prevPoint = rawData[i - 1];
const distance = this.calculateDistance(
prevPoint.latitude, prevPoint.longitude,
point.latitude, point.longitude
);
metrics.distance_traveled += distance;
}
}
metrics.average_speed = totalSpeed / rawData.length;
metrics.max_speed = maxSpeed;
metrics.idle_time = (idleCount / rawData.length) * 100; // Percentage
metrics.efficiency_score = this.calculateEfficiencyScore(metrics);
return metrics;
}
async startRealtimeQueries() {
// Set up continuous queries for real-time dashboards
setInterval(async () => {
const currentMetrics = await this.getCurrentMetrics();
await this.broadcastUpdate('dashboard_update', currentMetrics);
}, 5000); // Update every 5 seconds
// Set up sliding window calculations
setInterval(async () => {
const slidingMetrics = await this.calculateSlidingWindowMetrics();
await this.broadcastUpdate('sliding_metrics', slidingMetrics);
}, 10000); // Update every 10 seconds
}
async calculateSlidingWindowMetrics() {
const timeWindows = ['5m', '15m', '1h'];
const metrics = {};
for (const window of timeWindows) {
metrics[window] = {
fleet_average_speed: await this.getFleetAverageSpeed(window),
total_distance: await this.getTotalDistance(window),
safety_events: await this.getSafetyEventCount(window),
active_vehicles: await this.getActiveVehicleCount(window)
};
}
return metrics;
}
async broadcastUpdate(type, data) {
const message = JSON.stringify({ type, data, timestamp: Date.now() });
this.clientConnections.forEach(ws => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(message);
}
});
}
addClient(ws) {
this.clientConnections.add(ws);
ws.on('close', () => {
this.clientConnections.delete(ws);
});
}
}
// Specialized aggregators
class FleetSummaryAggregator {
constructor() {
this.summary = {
total_vehicles: 0,
active_vehicles: 0,
total_distance_today: 0,
average_speed: 0,
fuel_consumption: 0
};
this.vehicleStates = new Map();
this.lastUpdate = Date.now();
}
shouldProcess(dataType) {
return dataType === 'location' || dataType === 'trip';
}
async process(data) {
if (data.vehicle_id) {
// Update vehicle state
this.vehicleStates.set(data.vehicle_id, {
...data,
last_update: Date.now()
});
// Recalculate summary
await this.recalculateSummary();
}
}
async recalculateSummary() {
const now = Date.now();
const activeThreshold = 5 * 60 * 1000; // 5 minutes
let activeCount = 0;
let totalSpeed = 0;
let speedCount = 0;
for (const [vehicleId, state] of this.vehicleStates) {
if (now - state.last_update < activeThreshold) {
activeCount++;
if (state.speed > 0) {
totalSpeed += state.speed;
speedCount++;
}
}
}
this.summary.total_vehicles = this.vehicleStates.size;
this.summary.active_vehicles = activeCount;
this.summary.average_speed = speedCount > 0 ? totalSpeed / speedCount : 0;
}
async getCurrentSummary() {
return { ...this.summary };
}
}
class SafetyMetricsAggregator {
constructor() {
this.metrics = {
events_last_hour: 0,
events_today: 0,
average_safety_score: 85,
critical_events: 0,
trend: 'stable'
};
this.recentEvents = [];
this.hourlyBuckets = new Map();
}
shouldProcess(dataType) {
return dataType === 'safety';
}
async process(safetyData) {
const now = new Date();
const hour = now.getHours();
// Add to recent events
this.recentEvents.push({
...safetyData,
processed_at: now
});
// Keep only last 24 hours
const oneDayAgo = now.getTime() - 24 * 60 * 60 * 1000;
this.recentEvents = this.recentEvents.filter(
event => new Date(event.processed_at).getTime() > oneDayAgo
);
// Update hourly buckets
const hourKey = `${now.getDate()}-${hour}`;
if (!this.hourlyBuckets.has(hourKey)) {
this.hourlyBuckets.set(hourKey, []);
}
this.hourlyBuckets.get(hourKey).push(safetyData);
// Update metrics
await this.recalculateMetrics();
}
async recalculateMetrics() {
const now = new Date();
const oneHourAgo = now.getTime() - 60 * 60 * 1000;
// Events in last hour
this.metrics.events_last_hour = this.recentEvents.filter(
event => new Date(event.processed_at).getTime() > oneHourAgo
).length;
// Events today
this.metrics.events_today = this.recentEvents.length;
// Critical events
this.metrics.critical_events = this.recentEvents.filter(
event => event.severity === 'critical'
).length;
// Calculate trend
this.metrics.trend = this.calculateTrend();
}
calculateTrend() {
if (this.recentEvents.length < 10) return 'insufficient_data';
const recent = this.recentEvents.slice(-5);
const previous = this.recentEvents.slice(-10, -5);
const recentAvg = recent.reduce((sum, e) => sum + (e.risk_score || 0.5), 0) / recent.length;
const previousAvg = previous.reduce((sum, e) => sum + (e.risk_score || 0.5), 0) / previous.length;
const change = recentAvg - previousAvg;
if (change > 0.1) return 'deteriorating';
if (change < -0.1) return 'improving';
return 'stable';
}
async getCurrentMetrics() {
return { ...this.metrics };
}
}
Alert and Notification System
Multi-Channel Alert Management
- JavaScript Alert Engine
// alerts/alert-engine.js
import nodemailer from 'nodemailer';
import twilio from 'twilio';
import { WebClient } from '@slack/web-api';
export class AlertEngine {
constructor(config) {
this.config = config;
this.rules = new Map();
this.alertHistory = new Map();
this.suppressionRules = new Map();
// Initialize notification channels
this.emailTransporter = nodemailer.createTransporter(config.email);
this.smsClient = twilio(config.twilio.accountSid, config.twilio.authToken);
this.slackClient = new WebClient(config.slack.token);
this.setupDefaultRules();
}
setupDefaultRules() {
// Critical safety events
this.addRule({
id: 'critical_safety_event',
name: 'Critical Safety Event',
condition: (data) => data.type === 'safety_event' && data.severity === 'critical',
channels: ['sms', 'email', 'slack', 'push'],
priority: 'urgent',
suppressionWindow: 300 // 5 minutes
});
// Vehicle breakdown
this.addRule({
id: 'vehicle_breakdown',
name: 'Vehicle Breakdown Detected',
condition: (data) => data.type === 'breakdown' ||
(data.type === 'location_update' && data.speed === 0 && data.engine_running === false),
channels: ['sms', 'email'],
priority: 'high',
suppressionWindow: 600 // 10 minutes
});
// Geofence violations
this.addRule({
id: 'geofence_violation',
name: 'Geofence Boundary Violation',
condition: (data) => data.type === 'geofence_event' && data.event === 'exit' && data.authorized === false,
channels: ['sms', 'push'],
priority: 'medium',
suppressionWindow: 180 // 3 minutes
});
// Multiple safety events
this.addRule({
id: 'multiple_safety_events',
name: 'Multiple Safety Events',
condition: (data) => this.checkMultipleEvents(data),
channels: ['email', 'slack'],
priority: 'medium',
suppressionWindow: 900 // 15 minutes
});
}
addRule(rule) {
this.rules.set(rule.id, {
...rule,
createdAt: new Date(),
triggerCount: 0,
lastTriggered: null
});
}
async processEvent(eventData) {
const triggeredRules = [];
for (const [ruleId, rule] of this.rules) {
try {
if (await this.evaluateRule(rule, eventData)) {
if (!this.isSupressed(ruleId, eventData)) {
triggeredRules.push(rule);
await this.triggerAlert(rule, eventData);
}
}
} catch (error) {
console.error(`Error evaluating rule ${ruleId}:`, error);
}
}
return triggeredRules;
}
async evaluateRule(rule, eventData) {
try {
return await rule.condition(eventData);
} catch (error) {
console.error(`Rule evaluation error for ${rule.id}:`, error);
return false;
}
}
isSupressed(ruleId, eventData) {
const rule = this.rules.get(ruleId);
if (!rule.lastTriggered) return false;
const suppressionKey = this.getSuppressionKey(ruleId, eventData);
const lastAlert = this.alertHistory.get(suppressionKey);
if (!lastAlert) return false;
const timeSinceLastAlert = Date.now() - lastAlert.timestamp;
return timeSinceLastAlert < (rule.suppressionWindow * 1000);
}
getSuppressionKey(ruleId, eventData) {
// Create a key that groups similar events for suppression
let key = ruleId;
if (eventData.vehicle_id) key += `:${eventData.vehicle_id}`;
if (eventData.driver_id) key += `:${eventData.driver_id}`;
if (eventData.location) key += `:${Math.floor(eventData.location.latitude * 100)}:${Math.floor(eventData.location.longitude * 100)}`;
return key;
}
async triggerAlert(rule, eventData) {
const alert = {
id: this.generateAlertId(),
ruleId: rule.id,
ruleName: rule.name,
priority: rule.priority,
eventData,
timestamp: Date.now(),
status: 'active',
channels: rule.channels,
attempts: {}
};
// Update rule statistics
rule.triggerCount++;
rule.lastTriggered = new Date();
// Store alert history for suppression
const suppressionKey = this.getSuppressionKey(rule.id, eventData);
this.alertHistory.set(suppressionKey, alert);
// Send notifications through all specified channels
for (const channel of rule.channels) {
try {
await this.sendNotification(channel, alert);
alert.attempts[channel] = { status: 'success', timestamp: Date.now() };
} catch (error) {
console.error(`Failed to send ${channel} notification:`, error);
alert.attempts[channel] = { status: 'failed', error: error.message, timestamp: Date.now() };
}
}
return alert;
}
async sendNotification(channel, alert) {
switch (channel) {
case 'email':
return await this.sendEmailAlert(alert);
case 'sms':
return await this.sendSMSAlert(alert);
case 'slack':
return await this.sendSlackAlert(alert);
case 'push':
return await this.sendPushNotification(alert);
case 'webhook':
return await this.sendWebhookAlert(alert);
default:
throw new Error(`Unknown notification channel: ${channel}`);
}
}
async sendEmailAlert(alert) {
const { eventData, ruleName, priority } = alert;
const emailContent = this.generateEmailContent(alert);
const mailOptions = {
from: this.config.email.from,
to: this.getEmailRecipients(alert),
subject: `[${priority.toUpperCase()}] ${ruleName}`,
html: emailContent.html,
text: emailContent.text
};
return await this.emailTransporter.sendMail(mailOptions);
}
generateEmailContent(alert) {
const { eventData, ruleName, priority } = alert;
const html = `
<div style="font-family: Arial, sans-serif; max-width: 600px;">
<div style="background: ${this.getPriorityColor(priority)}; color: white; padding: 20px; border-radius: 5px 5px 0 0;">
<h2 style="margin: 0;">${ruleName}</h2>
<p style="margin: 5px 0 0 0;">Priority: ${priority.toUpperCase()}</p>
</div>
<div style="padding: 20px; background: #f9f9f9; border: 1px solid #ddd; border-radius: 0 0 5px 5px;">
<h3>Event Details:</h3>
<ul>
${eventData.vehicle_id ? `<li><strong>Vehicle ID:</strong> ${eventData.vehicle_id}</li>` : ''}
${eventData.driver_id ? `<li><strong>Driver ID:</strong> ${eventData.driver_id}</li>` : ''}
${eventData.location ? `<li><strong>Location:</strong> ${eventData.location.latitude}, ${eventData.location.longitude}</li>` : ''}
${eventData.timestamp ? `<li><strong>Time:</strong> ${new Date(eventData.timestamp).toLocaleString()}</li>` : ''}
${eventData.type ? `<li><strong>Event Type:</strong> ${eventData.type}</li>` : ''}
${eventData.severity ? `<li><strong>Severity:</strong> ${eventData.severity}</li>` : ''}
</ul>
${eventData.description ? `<p><strong>Description:</strong> ${eventData.description}</p>` : ''}
<p style="margin-top: 20px;">
<a href="${this.config.dashboardUrl}/alerts/${alert.id}"
style="background: #007cba; color: white; padding: 10px 15px; text-decoration: none; border-radius: 3px;">
View in Dashboard
</a>
</p>
</div>
</div>
`;
const text = `
${ruleName} - ${priority.toUpperCase()}
Event Details:
${eventData.vehicle_id ? `Vehicle ID: ${eventData.vehicle_id}` : ''}
${eventData.driver_id ? `Driver ID: ${eventData.driver_id}` : ''}
${eventData.location ? `Location: ${eventData.location.latitude}, ${eventData.location.longitude}` : ''}
${eventData.timestamp ? `Time: ${new Date(eventData.timestamp).toLocaleString()}` : ''}
View details: ${this.config.dashboardUrl}/alerts/${alert.id}
`;
return { html, text };
}
async sendSMSAlert(alert) {
const { eventData, ruleName, priority } = alert;
let message = `[${priority.toUpperCase()}] ${ruleName}`;
if (eventData.vehicle_id) message += `\nVehicle: ${eventData.vehicle_id}`;
if (eventData.driver_id) message += `\nDriver: ${eventData.driver_id}`;
if (eventData.location) {
message += `\nLocation: ${eventData.location.latitude}, ${eventData.location.longitude}`;
}
message += `\nView: ${this.config.dashboardUrl}/alerts/${alert.id}`;
const recipients = this.getSMSRecipients(alert);
const promises = recipients.map(number =>
this.smsClient.messages.create({
body: message,
from: this.config.twilio.fromNumber,
to: number
})
);
return await Promise.all(promises);
}
async sendSlackAlert(alert) {
const { eventData, ruleName, priority } = alert;
const color = this.getPriorityColor(priority);
const attachment = {
color: color,
title: ruleName,
title_link: `${this.config.dashboardUrl}/alerts/${alert.id}`,
fields: [
{
title: 'Priority',
value: priority.toUpperCase(),
short: true
}
],
footer: 'Bookovia Fleet Management',
ts: Math.floor(Date.now() / 1000)
};
if (eventData.vehicle_id) {
attachment.fields.push({
title: 'Vehicle ID',
value: eventData.vehicle_id,
short: true
});
}
if (eventData.driver_id) {
attachment.fields.push({
title: 'Driver ID',
value: eventData.driver_id,
short: true
});
}
if (eventData.location) {
attachment.fields.push({
title: 'Location',
value: `${eventData.location.latitude}, ${eventData.location.longitude}`,
short: true
});
}
return await this.slackClient.chat.postMessage({
channel: this.config.slack.channel,
text: `Fleet Alert: ${ruleName}`,
attachments: [attachment]
});
}
async sendPushNotification(alert) {
// Implementation would integrate with push notification service
// (Firebase, Apple Push, etc.)
const pushData = {
title: alert.ruleName,
body: this.generatePushMessage(alert),
data: {
alertId: alert.id,
priority: alert.priority,
type: 'fleet_alert'
}
};
// Send to registered devices
// return await this.pushService.sendToAll(pushData);
}
checkMultipleEvents(eventData) {
if (eventData.type !== 'safety_event') return false;
const vehicleId = eventData.vehicle_id;
const driverId = eventData.driver_id;
const timeWindow = 10 * 60 * 1000; // 10 minutes
const now = Date.now();
// Check recent events for same vehicle/driver
let eventCount = 0;
for (const [key, alert] of this.alertHistory) {
if (alert.timestamp > now - timeWindow) {
const alertEventData = alert.eventData;
if (alertEventData.type === 'safety_event' &&
(alertEventData.vehicle_id === vehicleId || alertEventData.driver_id === driverId)) {
eventCount++;
}
}
}
return eventCount >= 2; // 3 or more events (including current)
}
getPriorityColor(priority) {
const colors = {
urgent: '#ff0000',
high: '#ff6600',
medium: '#ffaa00',
low: '#00aa00'
};
return colors[priority] || '#666666';
}
getEmailRecipients(alert) {
// Get recipients based on alert priority and type
let recipients = this.config.notifications.email.default;
if (alert.priority === 'urgent') {
recipients = [...recipients, ...this.config.notifications.email.urgent];
}
return [...new Set(recipients)]; // Remove duplicates
}
getSMSRecipients(alert) {
let recipients = this.config.notifications.sms.default;
if (alert.priority === 'urgent') {
recipients = [...recipients, ...this.config.notifications.sms.urgent];
}
return [...new Set(recipients)];
}
generateAlertId() {
return `alert_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
}
Scalability and Performance
High-Performance Architecture
- Load Balancing Configuration
- Monitoring and Observability
# nginx.conf for WebSocket load balancing
upstream websocket_backend {
# Enable sticky sessions for WebSocket connections
ip_hash;
server 10.0.1.10:3000 max_fails=3 fail_timeout=30s;
server 10.0.1.11:3000 max_fails=3 fail_timeout=30s;
server 10.0.1.12:3000 max_fails=3 fail_timeout=30s;
server 10.0.1.13:3000 max_fails=3 fail_timeout=30s;
# Health checks
keepalive 32;
}
server {
listen 443 ssl http2;
server_name realtime.bookovia.com;
# SSL configuration
ssl_certificate /etc/ssl/certs/realtime.bookovia.com.crt;
ssl_certificate_key /etc/ssl/private/realtime.bookovia.com.key;
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers ECDHE-RSA-AES256-GCM-SHA512:DHE-RSA-AES256-GCM-SHA512;
# WebSocket upgrade headers
location /ws {
proxy_pass http://websocket_backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# WebSocket specific timeouts
proxy_read_timeout 86400s;
proxy_send_timeout 86400s;
# Disable proxy buffering for real-time streaming
proxy_buffering off;
}
# REST API endpoints
location /api {
proxy_pass http://websocket_backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# Enable compression
gzip on;
gzip_types application/json text/plain text/css application/javascript;
}
# Health check endpoint
location /health {
access_log off;
return 200 "healthy\n";
add_header Content-Type text/plain;
}
}
# Rate limiting
limit_req_zone $binary_remote_addr zone=api:10m rate=100r/s;
limit_req_zone $binary_remote_addr zone=websocket:10m rate=10r/s;
server {
listen 80;
server_name realtime.bookovia.com;
return 301 https://$server_name$request_uri;
}
# docker-compose.monitoring.yml
version: '3.8'
services:
# Prometheus for metrics collection
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus_data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--web.console.libraries=/etc/prometheus/console_libraries'
- '--web.console.templates=/etc/prometheus/consoles'
- '--web.enable-lifecycle'
- '--storage.tsdb.retention.time=30d'
# Grafana for visualization
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin123
volumes:
- grafana_data:/var/lib/grafana
- ./grafana/dashboards:/etc/grafana/provisioning/dashboards
- ./grafana/datasources:/etc/grafana/provisioning/datasources
# Jaeger for distributed tracing
jaeger:
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686"
- "14268:14268"
environment:
- COLLECTOR_ZIPKIN_HTTP_PORT=9411
# ElasticSearch for logs
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.7.0
environment:
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- xpack.security.enabled=false
volumes:
- elasticsearch_data:/usr/share/elasticsearch/data
# Kibana for log visualization
kibana:
image: docker.elastic.co/kibana/kibana:8.7.0
ports:
- "5601:5601"
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
depends_on:
- elasticsearch
# Logstash for log processing
logstash:
image: docker.elastic.co/logstash/logstash:8.7.0
volumes:
- ./logstash/config:/usr/share/logstash/pipeline
depends_on:
- elasticsearch
volumes:
prometheus_data:
grafana_data:
elasticsearch_data:
// monitoring/metrics.js
import client from 'prom-client';
import express from 'express';
// Create metrics registry
const register = new client.Registry();
// Add default metrics
client.collectDefaultMetrics({ register });
// Custom metrics for real-time streaming
const websocketConnections = new client.Gauge({
name: 'websocket_connections_total',
help: 'Total number of WebSocket connections',
labelNames: ['organization', 'client_type'],
registers: [register]
});
const messagesProcessed = new client.Counter({
name: 'messages_processed_total',
help: 'Total number of messages processed',
labelNames: ['message_type', 'status'],
registers: [register]
});
const messageProcessingDuration = new client.Histogram({
name: 'message_processing_duration_seconds',
help: 'Time spent processing messages',
labelNames: ['message_type'],
buckets: [0.001, 0.01, 0.1, 1, 5],
registers: [register]
});
const alertsTriggered = new client.Counter({
name: 'alerts_triggered_total',
help: 'Total number of alerts triggered',
labelNames: ['rule_id', 'priority', 'channel'],
registers: [register]
});
const dataLatency = new client.Histogram({
name: 'data_latency_seconds',
help: 'Latency between data generation and processing',
labelNames: ['data_type'],
buckets: [0.1, 0.5, 1, 2, 5, 10],
registers: [register]
});
// Health check metrics
const healthStatus = new client.Gauge({
name: 'service_health_status',
help: 'Service health status (1 = healthy, 0 = unhealthy)',
labelNames: ['service_name'],
registers: [register]
});
export class MetricsCollector {
constructor() {
this.register = register;
this.metrics = {
websocketConnections,
messagesProcessed,
messageProcessingDuration,
alertsTriggered,
dataLatency,
healthStatus
};
}
recordWebSocketConnection(organization, clientType, action = 'connect') {
if (action === 'connect') {
this.metrics.websocketConnections.inc({ organization, client_type: clientType });
} else {
this.metrics.websocketConnections.dec({ organization, client_type: clientType });
}
}
recordMessageProcessed(messageType, status, processingTime) {
this.metrics.messagesProcessed.inc({ message_type: messageType, status });
this.metrics.messageProcessingDuration
.labels({ message_type: messageType })
.observe(processingTime);
}
recordDataLatency(dataType, latencySeconds) {
this.metrics.dataLatency
.labels({ data_type: dataType })
.observe(latencySeconds);
}
recordAlertTriggered(ruleId, priority, channel) {
this.metrics.alertsTriggered.inc({
rule_id: ruleId,
priority,
channel
});
}
setHealthStatus(serviceName, isHealthy) {
this.metrics.healthStatus.set(
{ service_name: serviceName },
isHealthy ? 1 : 0
);
}
getMetricsHandler() {
return async (req, res) => {
try {
res.set('Content-Type', this.register.contentType);
res.end(await this.register.metrics());
} catch (error) {
res.status(500).end(error.toString());
}
};
}
}
// Usage in application
export const metricsCollector = new MetricsCollector();
// Express endpoint for Prometheus scraping
const app = express();
app.get('/metrics', metricsCollector.getMetricsHandler());
app.listen(9464, () => {
console.log('Metrics server listening on port 9464');
});
Best Practices
Performance Optimization
WebSocket Optimization
WebSocket Optimization
- Connection Pooling - Limit concurrent connections per client
- Message Batching - Batch multiple small updates into single messages
- Compression - Use WebSocket per-message deflate extension
- Heartbeat Management - Implement proper ping/pong for connection health
- Graceful Degradation - Fall back to polling when WebSocket fails
Stream Processing
Stream Processing
- Horizontal Scaling - Use message queues for parallel processing
- Backpressure Handling - Implement flow control for high-volume streams
- State Management - Use distributed state stores for multi-instance deployments
- Error Recovery - Implement retry logic with exponential backoff
- Data Partitioning - Partition streams by vehicle/organization for scaling
Data Management
Data Management
- Time-Series Optimization - Use appropriate retention policies
- Indexing Strategy - Index frequently queried fields
- Aggregation Pipeline - Pre-compute common aggregations
- Caching Layer - Cache frequently accessed data in Redis
- Archive Strategy - Move old data to cheaper storage
Security Considerations
Authentication & Authorization
Authentication & Authorization
- Token Validation - Validate JWT tokens on every WebSocket message
- Permission Checking - Verify user permissions for each subscription
- Rate Limiting - Implement per-user rate limiting
- IP Allowlisting - Restrict connections to known IP ranges
- Audit Logging - Log all authentication attempts and failures
Data Protection
Data Protection
- Message Encryption - Encrypt sensitive data in transit
- Field-Level Security - Redact sensitive fields based on permissions
- Data Masking - Mask PII data in logs and metrics
- Secure Headers - Implement proper security headers
- Input Validation - Validate all incoming data strictly
Next Steps
Mobile Integration
Integrate real-time streaming in mobile applications
Web Dashboard
Build real-time dashboards with live data updates
Fleet Management
Implement comprehensive fleet management with real-time features
API Reference
Explore the complete streaming API documentation
Ready to implement real-time streaming? Start with our quickstart guide to set up your development environment and establish your first WebSocket connection.