Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.bookovia.com/llms.txt

Use this file to discover all available pages before exploring further.

Real-time Streaming Integration Guide

Build high-performance real-time streaming applications with Bookovia’s telematics platform. This guide covers WebSocket implementation, stream processing, real-time analytics, and scalable architecture patterns for live fleet monitoring and decision-making systems.

Architecture Overview

Real-time System Components

A complete real-time streaming solution includes:
  • Data Ingestion - Vehicle sensors, GPS, and telemetry streams
  • Stream Processing - Real-time data transformation and analysis
  • Event Detection - Safety alerts, geofencing, and anomaly detection
  • WebSocket Management - Client connection handling and load balancing
  • State Management - Live data synchronization across clients
  • Notification System - Push notifications and alert delivery
  • Analytics Engine - Real-time metrics and dashboards
  • Data Storage - Time-series data and event history

System Architecture

WebSocket Implementation

Server-Side WebSocket Management

// server/websocket-server.js
import { Server as SocketIOServer } from 'socket.io';
import { createAdapter } from '@socket.io/redis-adapter';
import { createClient } from 'redis';
import Bookovia from '@bookovia/javascript-sdk';

export class RealtimeServer {
  constructor(server, config) {
    this.io = new SocketIOServer(server, {
      cors: {
        origin: config.allowedOrigins,
        methods: ["GET", "POST"]
      },
      transports: ['websocket', 'polling']
    });
    
    this.bookovia = new Bookovia({ apiKey: config.bookoviaApiKey });
    this.redis = createClient(config.redis);
    this.connections = new Map();
    this.subscriptions = new Map();
    
    this.setupRedisAdapter();
    this.setupEventHandlers();
    this.initializeBookoviaStream();
  }
  
  setupRedisAdapter() {
    // Enable horizontal scaling with Redis adapter
    const pubClient = this.redis.duplicate();
    const subClient = this.redis.duplicate();
    
    this.io.adapter(createAdapter(pubClient, subClient));
  }
  
  setupEventHandlers() {
    this.io.use(this.authenticateSocket.bind(this));
    
    this.io.on('connection', (socket) => {
      console.log(`Client connected: ${socket.id}`);
      
      // Store connection metadata
      this.connections.set(socket.id, {
        userId: socket.userId,
        organizationId: socket.organizationId,
        connectedAt: new Date(),
        lastActivity: new Date()
      });
      
      // Handle client events
      socket.on('subscribe', this.handleSubscription.bind(this, socket));
      socket.on('unsubscribe', this.handleUnsubscription.bind(this, socket));
      socket.on('heartbeat', this.handleHeartbeat.bind(this, socket));
      
      socket.on('disconnect', () => {
        this.handleDisconnection(socket);
      });
      
      // Send initial data
      this.sendInitialData(socket);
    });
  }
  
  async authenticateSocket(socket, next) {
    try {
      const token = socket.handshake.auth.token;
      const user = await this.validateToken(token);
      
      socket.userId = user.id;
      socket.organizationId = user.organizationId;
      socket.permissions = user.permissions;
      
      next();
    } catch (error) {
      next(new Error('Authentication failed'));
    }
  }
  
  async handleSubscription(socket, subscriptionData) {
    const { channels, filters = {} } = subscriptionData;
    
    // Validate subscription permissions
    if (!this.validateSubscriptionPermissions(socket, channels)) {
      socket.emit('error', { message: 'Insufficient permissions for subscription' });
      return;
    }
    
    // Store subscription
    const subscriptionKey = `${socket.id}:${channels.join(':')}`;
    this.subscriptions.set(subscriptionKey, {
      socketId: socket.id,
      channels,
      filters,
      subscribedAt: new Date()
    });
    
    // Join Socket.IO rooms for efficient broadcasting
    channels.forEach(channel => {
      const roomName = this.generateRoomName(channel, filters);
      socket.join(roomName);
    });
    
    // Subscribe to Bookovia streams
    await this.subscribeToBookoviaChannels(channels, filters);
    
    socket.emit('subscription_confirmed', { channels, filters });
  }
  
  async initializeBookoviaStream() {
    try {
      this.bookoviaStream = await this.bookovia.streaming.connect();
      
      this.bookoviaStream.onMessage((message) => {
        this.processStreamMessage(message);
      });
      
      this.bookoviaStream.onError((error) => {
        console.error('Bookovia stream error:', error);
        this.handleStreamError(error);
      });
      
      this.bookoviaStream.onReconnect(() => {
        console.log('Bookovia stream reconnected');
        this.resubscribeToAllChannels();
      });
      
    } catch (error) {
      console.error('Failed to initialize Bookovia stream:', error);
    }
  }
  
  processStreamMessage(message) {
    const { type, data } = message;
    
    // Apply business logic and filters
    const processedData = this.applyBusinessLogic(message);
    
    // Determine which clients should receive this message
    const targetRooms = this.determineTargetRooms(message);
    
    // Broadcast to relevant clients
    targetRooms.forEach(room => {
      this.io.to(room).emit(type, processedData);
    });
    
    // Store in cache for late-joining clients
    this.updateCache(message);
    
    // Trigger any alert rules
    this.checkAlertRules(message);
  }
  
  applyBusinessLogic(message) {
    switch (message.type) {
      case 'location_update':
        return this.enrichLocationData(message.data);
      
      case 'safety_event':
        return this.processSafetyEvent(message.data);
      
      case 'trip_update':
        return this.processTripUpdate(message.data);
      
      default:
        return message.data;
    }
  }
  
  enrichLocationData(locationData) {
    return {
      ...locationData,
      timestamp_server: new Date().toISOString(),
      processed_at: Date.now(),
      // Add reverse geocoding if needed
      address: this.getCachedAddress(locationData.latitude, locationData.longitude),
      // Add geofence status
      geofence_status: this.checkGeofences(locationData)
    };
  }
  
  processSafetyEvent(safetyData) {
    // Calculate severity based on business rules
    const enhancedSeverity = this.calculateEnhancedSeverity(safetyData);
    
    // Add contextual information
    const context = this.getSafetyContext(safetyData);
    
    return {
      ...safetyData,
      enhanced_severity: enhancedSeverity,
      context,
      alert_level: this.determineAlertLevel(enhancedSeverity),
      recommended_actions: this.getRecommendedActions(safetyData)
    };
  }
  
  checkAlertRules(message) {
    const alertRules = this.getAlertRules(message.type);
    
    alertRules.forEach(rule => {
      if (this.evaluateRule(rule, message)) {
        this.triggerAlert(rule, message);
      }
    });
  }
  
  async sendInitialData(socket) {
    const connection = this.connections.get(socket.id);
    
    try {
      // Send cached recent data
      const cachedData = await this.getCachedData(connection.organizationId);
      socket.emit('initial_data', cachedData);
      
      // Send current fleet status
      const fleetStatus = await this.bookovia.fleet.getOverview({
        organization_id: connection.organizationId
      });
      socket.emit('fleet_status', fleetStatus);
      
    } catch (error) {
      console.error('Failed to send initial data:', error);
      socket.emit('error', { message: 'Failed to load initial data' });
    }
  }
}

// Usage
const server = new RealtimeServer(httpServer, {
  allowedOrigins: ['http://localhost:3000', 'https://dashboard.company.com'],
  bookoviaApiKey: process.env.BOOKOVIA_API_KEY,
  redis: {
    host: 'localhost',
    port: 6379
  }
});

Client-Side WebSocket Management

// hooks/useRealtimeConnection.js
import { useState, useEffect, useRef, useCallback } from 'react';
import io from 'socket.io-client';

export const useRealtimeConnection = (config) => {
  const [connectionStatus, setConnectionStatus] = useState('disconnected');
  const [data, setData] = useState({});
  const [error, setError] = useState(null);
  const socketRef = useRef(null);
  const reconnectAttemptsRef = useRef(0);
  const subscriptionsRef = useRef(new Set());
  
  const connect = useCallback(() => {
    if (socketRef.current?.connected) return;
    
    setConnectionStatus('connecting');
    
    socketRef.current = io(config.url, {
      auth: {
        token: config.token
      },
      transports: ['websocket'],
      reconnection: true,
      reconnectionDelay: 1000,
      reconnectionDelayMax: 5000,
      maxReconnectionAttempts: 10
    });
    
    const socket = socketRef.current;
    
    socket.on('connect', () => {
      console.log('Connected to real-time server');
      setConnectionStatus('connected');
      setError(null);
      reconnectAttemptsRef.current = 0;
      
      // Resubscribe to all channels
      resubscribeAll();
    });
    
    socket.on('disconnect', (reason) => {
      console.log('Disconnected from real-time server:', reason);
      setConnectionStatus('disconnected');
    });
    
    socket.on('connect_error', (err) => {
      console.error('Connection error:', err);
      setConnectionStatus('error');
      setError(err.message);
      reconnectAttemptsRef.current++;
    });
    
    // Handle different message types
    socket.on('location_update', (locationData) => {
      setData(prev => ({
        ...prev,
        locations: {
          ...prev.locations,
          [locationData.vehicle_id]: locationData
        }
      }));
    });
    
    socket.on('safety_event', (safetyEvent) => {
      setData(prev => ({
        ...prev,
        safetyEvents: [...(prev.safetyEvents || []), safetyEvent].slice(-100) // Keep last 100
      }));
    });
    
    socket.on('trip_update', (tripUpdate) => {
      setData(prev => ({
        ...prev,
        trips: {
          ...prev.trips,
          [tripUpdate.trip_id]: tripUpdate
        }
      }));
    });
    
    socket.on('fleet_status', (fleetStatus) => {
      setData(prev => ({
        ...prev,
        fleetStatus
      }));
    });
    
    socket.on('initial_data', (initialData) => {
      setData(initialData);
    });
    
    socket.on('error', (errorData) => {
      console.error('Server error:', errorData);
      setError(errorData.message);
    });
    
    socket.on('subscription_confirmed', (confirmation) => {
      console.log('Subscription confirmed:', confirmation);
    });
    
  }, [config.url, config.token]);
  
  const disconnect = useCallback(() => {
    if (socketRef.current) {
      socketRef.current.disconnect();
      socketRef.current = null;
      setConnectionStatus('disconnected');
      subscriptionsRef.current.clear();
    }
  }, []);
  
  const subscribe = useCallback((channels, filters = {}) => {
    if (!socketRef.current?.connected) {
      console.warn('Cannot subscribe: not connected');
      return;
    }
    
    const subscriptionKey = JSON.stringify({ channels, filters });
    subscriptionsRef.current.add(subscriptionKey);
    
    socketRef.current.emit('subscribe', { channels, filters });
  }, []);
  
  const unsubscribe = useCallback((channels, filters = {}) => {
    if (!socketRef.current?.connected) return;
    
    const subscriptionKey = JSON.stringify({ channels, filters });
    subscriptionsRef.current.delete(subscriptionKey);
    
    socketRef.current.emit('unsubscribe', { channels, filters });
  }, []);
  
  const resubscribeAll = useCallback(() => {
    subscriptionsRef.current.forEach(subscriptionKey => {
      const { channels, filters } = JSON.parse(subscriptionKey);
      socketRef.current.emit('subscribe', { channels, filters });
    });
  }, []);
  
  const sendHeartbeat = useCallback(() => {
    if (socketRef.current?.connected) {
      socketRef.current.emit('heartbeat', { timestamp: Date.now() });
    }
  }, []);
  
  // Auto-connect on mount
  useEffect(() => {
    connect();
    
    return () => {
      disconnect();
    };
  }, [connect, disconnect]);
  
  // Heartbeat interval
  useEffect(() => {
    if (connectionStatus === 'connected') {
      const heartbeatInterval = setInterval(sendHeartbeat, 30000); // 30 seconds
      return () => clearInterval(heartbeatInterval);
    }
  }, [connectionStatus, sendHeartbeat]);
  
  // Reconnect on token change
  useEffect(() => {
    if (socketRef.current && config.token) {
      disconnect();
      connect();
    }
  }, [config.token, connect, disconnect]);
  
  return {
    connectionStatus,
    data,
    error,
    connect,
    disconnect,
    subscribe,
    unsubscribe,
    isConnected: connectionStatus === 'connected'
  };
};

// Usage in component
const FleetDashboard = () => {
  const { data, connectionStatus, subscribe, isConnected } = useRealtimeConnection({
    url: 'ws://localhost:3000',
    token: localStorage.getItem('authToken')
  });
  
  useEffect(() => {
    if (isConnected) {
      subscribe(['locations', 'safety_events'], {
        organization_id: 'org_123'
      });
    }
  }, [isConnected, subscribe]);
  
  return (
    <div>
      <div>Status: {connectionStatus}</div>
      <div>Vehicles: {Object.keys(data.locations || {}).length}</div>
      <div>Safety Events: {(data.safetyEvents || []).length}</div>
    </div>
  );
};

Stream Processing

Real-time Data Pipeline

// stream-processor/kafka-processor.js
import { Kafka } from 'kafkajs';
import Bookovia from '@bookovia/javascript-sdk';

export class StreamProcessor {
  constructor(config) {
    this.kafka = Kafka({
      clientId: 'bookovia-stream-processor',
      brokers: config.kafka.brokers
    });
    
    this.consumer = this.kafka.consumer({
      groupId: 'bookovia-processors'
    });
    
    this.producer = this.kafka.producer();
    
    this.bookovia = new Bookovia({ apiKey: config.bookoviaApiKey });
    this.processors = new Map();
    
    this.setupProcessors();
  }
  
  setupProcessors() {
    // Location data processor
    this.processors.set('location_data', new LocationProcessor());
    
    // Safety event processor
    this.processors.set('safety_events', new SafetyEventProcessor());
    
    // Trip data processor
    this.processors.set('trip_data', new TripProcessor());
    
    // Geofence processor
    this.processors.set('geofence_events', new GeofenceProcessor());
  }
  
  async start() {
    await this.consumer.connect();
    await this.producer.connect();
    
    // Subscribe to Bookovia topics
    await this.consumer.subscribe({
      topics: [
        'bookovia.locations',
        'bookovia.safety-events',
        'bookovia.trips',
        'bookovia.geofences'
      ]
    });
    
    await this.consumer.run({
      eachMessage: async ({ topic, partition, message }) => {
        await this.processMessage(topic, message);
      }
    });
  }
  
  async processMessage(topic, kafkaMessage) {
    try {
      const messageData = JSON.parse(kafkaMessage.value.toString());
      const processorKey = this.getProcessorKey(topic);
      
      if (this.processors.has(processorKey)) {
        const processor = this.processors.get(processorKey);
        const processedData = await processor.process(messageData);
        
        // Publish processed data to output topics
        await this.publishProcessedData(processedData);
        
        // Update real-time analytics
        await this.updateAnalytics(processedData);
        
        // Check for alerts
        await this.checkAlerts(processedData);
      }
    } catch (error) {
      console.error('Message processing error:', error);
    }
  }
  
  async publishProcessedData(processedData) {
    const messages = [];
    
    // Publish to different topics based on data type
    if (processedData.type === 'location_update') {
      messages.push({
        topic: 'processed.locations',
        value: JSON.stringify(processedData)
      });
    }
    
    if (processedData.type === 'safety_event') {
      messages.push({
        topic: 'processed.safety-events',
        value: JSON.stringify(processedData)
      });
      
      // Also publish to alerts topic if critical
      if (processedData.severity === 'critical') {
        messages.push({
          topic: 'alerts.critical',
          value: JSON.stringify(processedData)
        });
      }
    }
    
    if (messages.length > 0) {
      await this.producer.sendBatch({ topicMessages: messages });
    }
  }
  
  async updateAnalytics(processedData) {
    // Update real-time analytics aggregations
    switch (processedData.type) {
      case 'location_update':
        await this.updateLocationAnalytics(processedData);
        break;
      case 'safety_event':
        await this.updateSafetyAnalytics(processedData);
        break;
      case 'trip_update':
        await this.updateTripAnalytics(processedData);
        break;
    }
  }
  
  async updateLocationAnalytics(locationData) {
    const analytics = {
      vehicle_id: locationData.vehicle_id,
      timestamp: locationData.timestamp,
      speed: locationData.speed,
      distance_increment: locationData.distance_since_last,
      fuel_consumption_rate: locationData.fuel_rate
    };
    
    // Update time-series database
    await this.writeToTimeSeries('vehicle_analytics', analytics);
    
    // Update Redis cache for real-time access
    await this.updateRedisCache('vehicle_status', locationData.vehicle_id, {
      last_location: locationData.location,
      last_update: locationData.timestamp,
      current_speed: locationData.speed
    });
  }
}

// Specialized processors
class LocationProcessor {
  async process(locationData) {
    // Validate location data
    if (!this.isValidLocation(locationData)) {
      throw new Error('Invalid location data');
    }
    
    // Calculate derived metrics
    const enrichedData = {
      ...locationData,
      distance_since_last: this.calculateDistance(locationData),
      speed_change: this.calculateSpeedChange(locationData),
      direction_change: this.calculateDirectionChange(locationData),
      estimated_fuel_rate: this.estimateFuelConsumption(locationData)
    };
    
    // Detect anomalies
    const anomalies = this.detectAnomalies(enrichedData);
    if (anomalies.length > 0) {
      enrichedData.anomalies = anomalies;
    }
    
    return {
      type: 'location_update',
      data: enrichedData,
      processed_at: new Date().toISOString()
    };
  }
  
  isValidLocation(data) {
    return (
      data.latitude >= -90 && data.latitude <= 90 &&
      data.longitude >= -180 && data.longitude <= 180 &&
      data.speed >= 0 && data.speed <= 300 // reasonable speed limit
    );
  }
  
  detectAnomalies(locationData) {
    const anomalies = [];
    
    // Speed anomaly detection
    if (locationData.speed > 120) { // > 120 km/h
      anomalies.push({
        type: 'high_speed',
        severity: 'medium',
        value: locationData.speed
      });
    }
    
    // Sudden speed changes
    if (Math.abs(locationData.speed_change) > 30) {
      anomalies.push({
        type: 'sudden_speed_change',
        severity: 'high',
        value: locationData.speed_change
      });
    }
    
    return anomalies;
  }
}

class SafetyEventProcessor {
  async process(safetyData) {
    // Enhance safety event with context
    const enhancedEvent = {
      ...safetyData,
      risk_score: this.calculateRiskScore(safetyData),
      historical_context: await this.getHistoricalContext(safetyData),
      weather_conditions: await this.getWeatherConditions(safetyData),
      traffic_conditions: await this.getTrafficConditions(safetyData)
    };
    
    // Generate recommendations
    enhancedEvent.recommendations = this.generateRecommendations(enhancedEvent);
    
    return {
      type: 'safety_event',
      data: enhancedEvent,
      processed_at: new Date().toISOString()
    };
  }
  
  calculateRiskScore(safetyData) {
    let riskScore = 0;
    
    // Base risk by event type
    const baseRiskMap = {
      harsh_braking: 0.6,
      harsh_acceleration: 0.5,
      harsh_cornering: 0.7,
      speeding: 0.4,
      phone_usage: 0.8
    };
    
    riskScore += baseRiskMap[safetyData.type] || 0.3;
    
    // Adjust for severity
    const severityMultiplier = {
      low: 0.5,
      medium: 1.0,
      high: 1.5,
      critical: 2.0
    };
    
    riskScore *= severityMultiplier[safetyData.severity] || 1.0;
    
    // Adjust for time of day (higher risk at night)
    const eventHour = new Date(safetyData.timestamp).getHours();
    if (eventHour >= 22 || eventHour <= 6) {
      riskScore *= 1.3;
    }
    
    return Math.min(riskScore, 1.0); // Cap at 1.0
  }
}

Real-time Analytics

Live Aggregations and Metrics

// analytics/real-time-analytics.js
import { InfluxDB, Point } from '@influxdata/influxdb-client';
import { WebSocket } from 'ws';

export class RealTimeAnalytics {
  constructor(config) {
    this.influxDB = new InfluxDB({
      url: config.influxdb.url,
      token: config.influxdb.token
    });
    
    this.writeApi = this.influxDB.getWriteApi(
      config.influxdb.org,
      config.influxdb.bucket
    );
    
    this.queryApi = this.influxDB.getQueryApi(config.influxdb.org);
    
    this.aggregators = new Map();
    this.setupAggregators();
    
    this.clientConnections = new Set();
  }
  
  setupAggregators() {
    // Fleet-wide aggregators
    this.aggregators.set('fleet_summary', new FleetSummaryAggregator());
    this.aggregators.set('safety_metrics', new SafetyMetricsAggregator());
    this.aggregators.set('efficiency_metrics', new EfficiencyAggregator());
    
    // Vehicle-specific aggregators
    this.aggregators.set('vehicle_performance', new VehiclePerformanceAggregator());
    this.aggregators.set('driver_behavior', new DriverBehaviorAggregator());
  }
  
  async processLocationUpdate(locationData) {
    // Write raw data to InfluxDB
    const point = new Point('vehicle_location')
      .tag('vehicle_id', locationData.vehicle_id)
      .tag('driver_id', locationData.driver_id)
      .floatField('latitude', locationData.latitude)
      .floatField('longitude', locationData.longitude)
      .floatField('speed', locationData.speed)
      .floatField('heading', locationData.heading)
      .timestamp(new Date(locationData.timestamp));
    
    this.writeApi.writePoint(point);
    
    // Update real-time aggregations
    await this.updateAggregations('location', locationData);
    
    // Broadcast to connected clients
    await this.broadcastUpdate('location_metrics', await this.getCurrentMetrics());
  }
  
  async processSafetyEvent(safetyData) {
    // Write safety event to InfluxDB
    const point = new Point('safety_event')
      .tag('vehicle_id', safetyData.vehicle_id)
      .tag('driver_id', safetyData.driver_id)
      .tag('event_type', safetyData.type)
      .tag('severity', safetyData.severity)
      .floatField('magnitude', safetyData.magnitude)
      .floatField('risk_score', safetyData.risk_score)
      .timestamp(new Date(safetyData.timestamp));
    
    this.writeApi.writePoint(point);
    
    // Update safety metrics
    await this.updateAggregations('safety', safetyData);
    
    // Broadcast safety update
    await this.broadcastUpdate('safety_metrics', await this.getSafetyMetrics());
  }
  
  async updateAggregations(dataType, data) {
    const updatePromises = [];
    
    for (const [name, aggregator] of this.aggregators) {
      if (aggregator.shouldProcess(dataType)) {
        updatePromises.push(aggregator.process(data));
      }
    }
    
    await Promise.all(updatePromises);
  }
  
  async getCurrentMetrics() {
    const fleetSummary = this.aggregators.get('fleet_summary');
    const safetyMetrics = this.aggregators.get('safety_metrics');
    const efficiencyMetrics = this.aggregators.get('efficiency_metrics');
    
    return {
      fleet: await fleetSummary.getCurrentSummary(),
      safety: await safetyMetrics.getCurrentMetrics(),
      efficiency: await efficiencyMetrics.getCurrentMetrics(),
      timestamp: new Date().toISOString()
    };
  }
  
  async getSafetyMetrics() {
    const safetyAggregator = this.aggregators.get('safety_metrics');
    return await safetyAggregator.getCurrentMetrics();
  }
  
  async getVehicleMetrics(vehicleId, timeRange = '1h') {
    const query = `
      from(bucket: "bookovia")
        |> range(start: -${timeRange})
        |> filter(fn: (r) => r._measurement == "vehicle_location")
        |> filter(fn: (r) => r.vehicle_id == "${vehicleId}")
        |> aggregateWindow(every: 1m, fn: mean)
    `;
    
    const results = [];
    const queryResult = this.queryApi.iterateRows(query);
    
    for await (const { values, tableMeta } of queryResult) {
      const row = tableMeta.toObject(values);
      results.push(row);
    }
    
    return this.transformVehicleMetrics(results);
  }
  
  transformVehicleMetrics(rawData) {
    const metrics = {
      distance_traveled: 0,
      average_speed: 0,
      max_speed: 0,
      idle_time: 0,
      efficiency_score: 0
    };
    
    if (rawData.length === 0) return metrics;
    
    // Calculate metrics from time-series data
    let totalSpeed = 0;
    let maxSpeed = 0;
    let idleCount = 0;
    
    for (let i = 0; i < rawData.length; i++) {
      const point = rawData[i];
      const speed = point.speed || 0;
      
      totalSpeed += speed;
      maxSpeed = Math.max(maxSpeed, speed);
      
      if (speed < 5) idleCount++; // Consider < 5 km/h as idle
      
      // Calculate distance between consecutive points
      if (i > 0) {
        const prevPoint = rawData[i - 1];
        const distance = this.calculateDistance(
          prevPoint.latitude, prevPoint.longitude,
          point.latitude, point.longitude
        );
        metrics.distance_traveled += distance;
      }
    }
    
    metrics.average_speed = totalSpeed / rawData.length;
    metrics.max_speed = maxSpeed;
    metrics.idle_time = (idleCount / rawData.length) * 100; // Percentage
    metrics.efficiency_score = this.calculateEfficiencyScore(metrics);
    
    return metrics;
  }
  
  async startRealtimeQueries() {
    // Set up continuous queries for real-time dashboards
    setInterval(async () => {
      const currentMetrics = await this.getCurrentMetrics();
      await this.broadcastUpdate('dashboard_update', currentMetrics);
    }, 5000); // Update every 5 seconds
    
    // Set up sliding window calculations
    setInterval(async () => {
      const slidingMetrics = await this.calculateSlidingWindowMetrics();
      await this.broadcastUpdate('sliding_metrics', slidingMetrics);
    }, 10000); // Update every 10 seconds
  }
  
  async calculateSlidingWindowMetrics() {
    const timeWindows = ['5m', '15m', '1h'];
    const metrics = {};
    
    for (const window of timeWindows) {
      metrics[window] = {
        fleet_average_speed: await this.getFleetAverageSpeed(window),
        total_distance: await this.getTotalDistance(window),
        safety_events: await this.getSafetyEventCount(window),
        active_vehicles: await this.getActiveVehicleCount(window)
      };
    }
    
    return metrics;
  }
  
  async broadcastUpdate(type, data) {
    const message = JSON.stringify({ type, data, timestamp: Date.now() });
    
    this.clientConnections.forEach(ws => {
      if (ws.readyState === WebSocket.OPEN) {
        ws.send(message);
      }
    });
  }
  
  addClient(ws) {
    this.clientConnections.add(ws);
    
    ws.on('close', () => {
      this.clientConnections.delete(ws);
    });
  }
}

// Specialized aggregators
class FleetSummaryAggregator {
  constructor() {
    this.summary = {
      total_vehicles: 0,
      active_vehicles: 0,
      total_distance_today: 0,
      average_speed: 0,
      fuel_consumption: 0
    };
    
    this.vehicleStates = new Map();
    this.lastUpdate = Date.now();
  }
  
  shouldProcess(dataType) {
    return dataType === 'location' || dataType === 'trip';
  }
  
  async process(data) {
    if (data.vehicle_id) {
      // Update vehicle state
      this.vehicleStates.set(data.vehicle_id, {
        ...data,
        last_update: Date.now()
      });
      
      // Recalculate summary
      await this.recalculateSummary();
    }
  }
  
  async recalculateSummary() {
    const now = Date.now();
    const activeThreshold = 5 * 60 * 1000; // 5 minutes
    
    let activeCount = 0;
    let totalSpeed = 0;
    let speedCount = 0;
    
    for (const [vehicleId, state] of this.vehicleStates) {
      if (now - state.last_update < activeThreshold) {
        activeCount++;
        
        if (state.speed > 0) {
          totalSpeed += state.speed;
          speedCount++;
        }
      }
    }
    
    this.summary.total_vehicles = this.vehicleStates.size;
    this.summary.active_vehicles = activeCount;
    this.summary.average_speed = speedCount > 0 ? totalSpeed / speedCount : 0;
  }
  
  async getCurrentSummary() {
    return { ...this.summary };
  }
}

class SafetyMetricsAggregator {
  constructor() {
    this.metrics = {
      events_last_hour: 0,
      events_today: 0,
      average_safety_score: 85,
      critical_events: 0,
      trend: 'stable'
    };
    
    this.recentEvents = [];
    this.hourlyBuckets = new Map();
  }
  
  shouldProcess(dataType) {
    return dataType === 'safety';
  }
  
  async process(safetyData) {
    const now = new Date();
    const hour = now.getHours();
    
    // Add to recent events
    this.recentEvents.push({
      ...safetyData,
      processed_at: now
    });
    
    // Keep only last 24 hours
    const oneDayAgo = now.getTime() - 24 * 60 * 60 * 1000;
    this.recentEvents = this.recentEvents.filter(
      event => new Date(event.processed_at).getTime() > oneDayAgo
    );
    
    // Update hourly buckets
    const hourKey = `${now.getDate()}-${hour}`;
    if (!this.hourlyBuckets.has(hourKey)) {
      this.hourlyBuckets.set(hourKey, []);
    }
    this.hourlyBuckets.get(hourKey).push(safetyData);
    
    // Update metrics
    await this.recalculateMetrics();
  }
  
  async recalculateMetrics() {
    const now = new Date();
    const oneHourAgo = now.getTime() - 60 * 60 * 1000;
    
    // Events in last hour
    this.metrics.events_last_hour = this.recentEvents.filter(
      event => new Date(event.processed_at).getTime() > oneHourAgo
    ).length;
    
    // Events today
    this.metrics.events_today = this.recentEvents.length;
    
    // Critical events
    this.metrics.critical_events = this.recentEvents.filter(
      event => event.severity === 'critical'
    ).length;
    
    // Calculate trend
    this.metrics.trend = this.calculateTrend();
  }
  
  calculateTrend() {
    if (this.recentEvents.length < 10) return 'insufficient_data';
    
    const recent = this.recentEvents.slice(-5);
    const previous = this.recentEvents.slice(-10, -5);
    
    const recentAvg = recent.reduce((sum, e) => sum + (e.risk_score || 0.5), 0) / recent.length;
    const previousAvg = previous.reduce((sum, e) => sum + (e.risk_score || 0.5), 0) / previous.length;
    
    const change = recentAvg - previousAvg;
    
    if (change > 0.1) return 'deteriorating';
    if (change < -0.1) return 'improving';
    return 'stable';
  }
  
  async getCurrentMetrics() {
    return { ...this.metrics };
  }
}

Alert and Notification System

Multi-Channel Alert Management

// alerts/alert-engine.js
import nodemailer from 'nodemailer';
import twilio from 'twilio';
import { WebClient } from '@slack/web-api';

export class AlertEngine {
  constructor(config) {
    this.config = config;
    this.rules = new Map();
    this.alertHistory = new Map();
    this.suppressionRules = new Map();
    
    // Initialize notification channels
    this.emailTransporter = nodemailer.createTransporter(config.email);
    this.smsClient = twilio(config.twilio.accountSid, config.twilio.authToken);
    this.slackClient = new WebClient(config.slack.token);
    
    this.setupDefaultRules();
  }
  
  setupDefaultRules() {
    // Critical safety events
    this.addRule({
      id: 'critical_safety_event',
      name: 'Critical Safety Event',
      condition: (data) => data.type === 'safety_event' && data.severity === 'critical',
      channels: ['sms', 'email', 'slack', 'push'],
      priority: 'urgent',
      suppressionWindow: 300 // 5 minutes
    });
    
    // Vehicle breakdown
    this.addRule({
      id: 'vehicle_breakdown',
      name: 'Vehicle Breakdown Detected',
      condition: (data) => data.type === 'breakdown' || 
        (data.type === 'location_update' && data.speed === 0 && data.engine_running === false),
      channels: ['sms', 'email'],
      priority: 'high',
      suppressionWindow: 600 // 10 minutes
    });
    
    // Geofence violations
    this.addRule({
      id: 'geofence_violation',
      name: 'Geofence Boundary Violation',
      condition: (data) => data.type === 'geofence_event' && data.event === 'exit' && data.authorized === false,
      channels: ['sms', 'push'],
      priority: 'medium',
      suppressionWindow: 180 // 3 minutes
    });
    
    // Multiple safety events
    this.addRule({
      id: 'multiple_safety_events',
      name: 'Multiple Safety Events',
      condition: (data) => this.checkMultipleEvents(data),
      channels: ['email', 'slack'],
      priority: 'medium',
      suppressionWindow: 900 // 15 minutes
    });
  }
  
  addRule(rule) {
    this.rules.set(rule.id, {
      ...rule,
      createdAt: new Date(),
      triggerCount: 0,
      lastTriggered: null
    });
  }
  
  async processEvent(eventData) {
    const triggeredRules = [];
    
    for (const [ruleId, rule] of this.rules) {
      try {
        if (await this.evaluateRule(rule, eventData)) {
          if (!this.isSupressed(ruleId, eventData)) {
            triggeredRules.push(rule);
            await this.triggerAlert(rule, eventData);
          }
        }
      } catch (error) {
        console.error(`Error evaluating rule ${ruleId}:`, error);
      }
    }
    
    return triggeredRules;
  }
  
  async evaluateRule(rule, eventData) {
    try {
      return await rule.condition(eventData);
    } catch (error) {
      console.error(`Rule evaluation error for ${rule.id}:`, error);
      return false;
    }
  }
  
  isSupressed(ruleId, eventData) {
    const rule = this.rules.get(ruleId);
    if (!rule.lastTriggered) return false;
    
    const suppressionKey = this.getSuppressionKey(ruleId, eventData);
    const lastAlert = this.alertHistory.get(suppressionKey);
    
    if (!lastAlert) return false;
    
    const timeSinceLastAlert = Date.now() - lastAlert.timestamp;
    return timeSinceLastAlert < (rule.suppressionWindow * 1000);
  }
  
  getSuppressionKey(ruleId, eventData) {
    // Create a key that groups similar events for suppression
    let key = ruleId;
    
    if (eventData.vehicle_id) key += `:${eventData.vehicle_id}`;
    if (eventData.driver_id) key += `:${eventData.driver_id}`;
    if (eventData.location) key += `:${Math.floor(eventData.location.latitude * 100)}:${Math.floor(eventData.location.longitude * 100)}`;
    
    return key;
  }
  
  async triggerAlert(rule, eventData) {
    const alert = {
      id: this.generateAlertId(),
      ruleId: rule.id,
      ruleName: rule.name,
      priority: rule.priority,
      eventData,
      timestamp: Date.now(),
      status: 'active',
      channels: rule.channels,
      attempts: {}
    };
    
    // Update rule statistics
    rule.triggerCount++;
    rule.lastTriggered = new Date();
    
    // Store alert history for suppression
    const suppressionKey = this.getSuppressionKey(rule.id, eventData);
    this.alertHistory.set(suppressionKey, alert);
    
    // Send notifications through all specified channels
    for (const channel of rule.channels) {
      try {
        await this.sendNotification(channel, alert);
        alert.attempts[channel] = { status: 'success', timestamp: Date.now() };
      } catch (error) {
        console.error(`Failed to send ${channel} notification:`, error);
        alert.attempts[channel] = { status: 'failed', error: error.message, timestamp: Date.now() };
      }
    }
    
    return alert;
  }
  
  async sendNotification(channel, alert) {
    switch (channel) {
      case 'email':
        return await this.sendEmailAlert(alert);
      case 'sms':
        return await this.sendSMSAlert(alert);
      case 'slack':
        return await this.sendSlackAlert(alert);
      case 'push':
        return await this.sendPushNotification(alert);
      case 'webhook':
        return await this.sendWebhookAlert(alert);
      default:
        throw new Error(`Unknown notification channel: ${channel}`);
    }
  }
  
  async sendEmailAlert(alert) {
    const { eventData, ruleName, priority } = alert;
    
    const emailContent = this.generateEmailContent(alert);
    
    const mailOptions = {
      from: this.config.email.from,
      to: this.getEmailRecipients(alert),
      subject: `[${priority.toUpperCase()}] ${ruleName}`,
      html: emailContent.html,
      text: emailContent.text
    };
    
    return await this.emailTransporter.sendMail(mailOptions);
  }
  
  generateEmailContent(alert) {
    const { eventData, ruleName, priority } = alert;
    
    const html = `
      <div style="font-family: Arial, sans-serif; max-width: 600px;">
        <div style="background: ${this.getPriorityColor(priority)}; color: white; padding: 20px; border-radius: 5px 5px 0 0;">
          <h2 style="margin: 0;">${ruleName}</h2>
          <p style="margin: 5px 0 0 0;">Priority: ${priority.toUpperCase()}</p>
        </div>
        
        <div style="padding: 20px; background: #f9f9f9; border: 1px solid #ddd; border-radius: 0 0 5px 5px;">
          <h3>Event Details:</h3>
          <ul>
            ${eventData.vehicle_id ? `<li><strong>Vehicle ID:</strong> ${eventData.vehicle_id}</li>` : ''}
            ${eventData.driver_id ? `<li><strong>Driver ID:</strong> ${eventData.driver_id}</li>` : ''}
            ${eventData.location ? `<li><strong>Location:</strong> ${eventData.location.latitude}, ${eventData.location.longitude}</li>` : ''}
            ${eventData.timestamp ? `<li><strong>Time:</strong> ${new Date(eventData.timestamp).toLocaleString()}</li>` : ''}
            ${eventData.type ? `<li><strong>Event Type:</strong> ${eventData.type}</li>` : ''}
            ${eventData.severity ? `<li><strong>Severity:</strong> ${eventData.severity}</li>` : ''}
          </ul>
          
          ${eventData.description ? `<p><strong>Description:</strong> ${eventData.description}</p>` : ''}
          
          <p style="margin-top: 20px;">
            <a href="${this.config.dashboardUrl}/alerts/${alert.id}" 
               style="background: #007cba; color: white; padding: 10px 15px; text-decoration: none; border-radius: 3px;">
              View in Dashboard
            </a>
          </p>
        </div>
      </div>
    `;
    
    const text = `
${ruleName} - ${priority.toUpperCase()}

Event Details:
${eventData.vehicle_id ? `Vehicle ID: ${eventData.vehicle_id}` : ''}
${eventData.driver_id ? `Driver ID: ${eventData.driver_id}` : ''}
${eventData.location ? `Location: ${eventData.location.latitude}, ${eventData.location.longitude}` : ''}
${eventData.timestamp ? `Time: ${new Date(eventData.timestamp).toLocaleString()}` : ''}

View details: ${this.config.dashboardUrl}/alerts/${alert.id}
    `;
    
    return { html, text };
  }
  
  async sendSMSAlert(alert) {
    const { eventData, ruleName, priority } = alert;
    
    let message = `[${priority.toUpperCase()}] ${ruleName}`;
    
    if (eventData.vehicle_id) message += `\nVehicle: ${eventData.vehicle_id}`;
    if (eventData.driver_id) message += `\nDriver: ${eventData.driver_id}`;
    if (eventData.location) {
      message += `\nLocation: ${eventData.location.latitude}, ${eventData.location.longitude}`;
    }
    
    message += `\nView: ${this.config.dashboardUrl}/alerts/${alert.id}`;
    
    const recipients = this.getSMSRecipients(alert);
    const promises = recipients.map(number => 
      this.smsClient.messages.create({
        body: message,
        from: this.config.twilio.fromNumber,
        to: number
      })
    );
    
    return await Promise.all(promises);
  }
  
  async sendSlackAlert(alert) {
    const { eventData, ruleName, priority } = alert;
    
    const color = this.getPriorityColor(priority);
    
    const attachment = {
      color: color,
      title: ruleName,
      title_link: `${this.config.dashboardUrl}/alerts/${alert.id}`,
      fields: [
        {
          title: 'Priority',
          value: priority.toUpperCase(),
          short: true
        }
      ],
      footer: 'Bookovia Fleet Management',
      ts: Math.floor(Date.now() / 1000)
    };
    
    if (eventData.vehicle_id) {
      attachment.fields.push({
        title: 'Vehicle ID',
        value: eventData.vehicle_id,
        short: true
      });
    }
    
    if (eventData.driver_id) {
      attachment.fields.push({
        title: 'Driver ID',
        value: eventData.driver_id,
        short: true
      });
    }
    
    if (eventData.location) {
      attachment.fields.push({
        title: 'Location',
        value: `${eventData.location.latitude}, ${eventData.location.longitude}`,
        short: true
      });
    }
    
    return await this.slackClient.chat.postMessage({
      channel: this.config.slack.channel,
      text: `Fleet Alert: ${ruleName}`,
      attachments: [attachment]
    });
  }
  
  async sendPushNotification(alert) {
    // Implementation would integrate with push notification service
    // (Firebase, Apple Push, etc.)
    const pushData = {
      title: alert.ruleName,
      body: this.generatePushMessage(alert),
      data: {
        alertId: alert.id,
        priority: alert.priority,
        type: 'fleet_alert'
      }
    };
    
    // Send to registered devices
    // return await this.pushService.sendToAll(pushData);
  }
  
  checkMultipleEvents(eventData) {
    if (eventData.type !== 'safety_event') return false;
    
    const vehicleId = eventData.vehicle_id;
    const driverId = eventData.driver_id;
    const timeWindow = 10 * 60 * 1000; // 10 minutes
    const now = Date.now();
    
    // Check recent events for same vehicle/driver
    let eventCount = 0;
    for (const [key, alert] of this.alertHistory) {
      if (alert.timestamp > now - timeWindow) {
        const alertEventData = alert.eventData;
        if (alertEventData.type === 'safety_event' && 
            (alertEventData.vehicle_id === vehicleId || alertEventData.driver_id === driverId)) {
          eventCount++;
        }
      }
    }
    
    return eventCount >= 2; // 3 or more events (including current)
  }
  
  getPriorityColor(priority) {
    const colors = {
      urgent: '#ff0000',
      high: '#ff6600',
      medium: '#ffaa00',
      low: '#00aa00'
    };
    return colors[priority] || '#666666';
  }
  
  getEmailRecipients(alert) {
    // Get recipients based on alert priority and type
    let recipients = this.config.notifications.email.default;
    
    if (alert.priority === 'urgent') {
      recipients = [...recipients, ...this.config.notifications.email.urgent];
    }
    
    return [...new Set(recipients)]; // Remove duplicates
  }
  
  getSMSRecipients(alert) {
    let recipients = this.config.notifications.sms.default;
    
    if (alert.priority === 'urgent') {
      recipients = [...recipients, ...this.config.notifications.sms.urgent];
    }
    
    return [...new Set(recipients)];
  }
  
  generateAlertId() {
    return `alert_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
  }
}

Scalability and Performance

High-Performance Architecture

# nginx.conf for WebSocket load balancing
upstream websocket_backend {
    # Enable sticky sessions for WebSocket connections
    ip_hash;
    
    server 10.0.1.10:3000 max_fails=3 fail_timeout=30s;
    server 10.0.1.11:3000 max_fails=3 fail_timeout=30s;
    server 10.0.1.12:3000 max_fails=3 fail_timeout=30s;
    server 10.0.1.13:3000 max_fails=3 fail_timeout=30s;
    
    # Health checks
    keepalive 32;
}

server {
    listen 443 ssl http2;
    server_name realtime.bookovia.com;
    
    # SSL configuration
    ssl_certificate /etc/ssl/certs/realtime.bookovia.com.crt;
    ssl_certificate_key /etc/ssl/private/realtime.bookovia.com.key;
    ssl_protocols TLSv1.2 TLSv1.3;
    ssl_ciphers ECDHE-RSA-AES256-GCM-SHA512:DHE-RSA-AES256-GCM-SHA512;
    
    # WebSocket upgrade headers
    location /ws {
        proxy_pass http://websocket_backend;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        
        # WebSocket specific timeouts
        proxy_read_timeout 86400s;
        proxy_send_timeout 86400s;
        
        # Disable proxy buffering for real-time streaming
        proxy_buffering off;
    }
    
    # REST API endpoints
    location /api {
        proxy_pass http://websocket_backend;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        
        # Enable compression
        gzip on;
        gzip_types application/json text/plain text/css application/javascript;
    }
    
    # Health check endpoint
    location /health {
        access_log off;
        return 200 "healthy\n";
        add_header Content-Type text/plain;
    }
}

# Rate limiting
limit_req_zone $binary_remote_addr zone=api:10m rate=100r/s;
limit_req_zone $binary_remote_addr zone=websocket:10m rate=10r/s;

server {
    listen 80;
    server_name realtime.bookovia.com;
    return 301 https://$server_name$request_uri;
}

Best Practices

Performance Optimization

  • Connection Pooling - Limit concurrent connections per client
  • Message Batching - Batch multiple small updates into single messages
  • Compression - Use WebSocket per-message deflate extension
  • Heartbeat Management - Implement proper ping/pong for connection health
  • Graceful Degradation - Fall back to polling when WebSocket fails
  • Horizontal Scaling - Use message queues for parallel processing
  • Backpressure Handling - Implement flow control for high-volume streams
  • State Management - Use distributed state stores for multi-instance deployments
  • Error Recovery - Implement retry logic with exponential backoff
  • Data Partitioning - Partition streams by vehicle/organization for scaling
  • Time-Series Optimization - Use appropriate retention policies
  • Indexing Strategy - Index frequently queried fields
  • Aggregation Pipeline - Pre-compute common aggregations
  • Caching Layer - Cache frequently accessed data in Redis
  • Archive Strategy - Move old data to cheaper storage

Security Considerations

  • Token Validation - Validate JWT tokens on every WebSocket message
  • Permission Checking - Verify user permissions for each subscription
  • Rate Limiting - Implement per-user rate limiting
  • IP Allowlisting - Restrict connections to known IP ranges
  • Audit Logging - Log all authentication attempts and failures
  • Message Encryption - Encrypt sensitive data in transit
  • Field-Level Security - Redact sensitive fields based on permissions
  • Data Masking - Mask PII data in logs and metrics
  • Secure Headers - Implement proper security headers
  • Input Validation - Validate all incoming data strictly

Next Steps

Mobile Integration

Integrate real-time streaming in mobile applications

Web Dashboard

Build real-time dashboards with live data updates

Fleet Management

Implement comprehensive fleet management with real-time features

API Reference

Explore the complete streaming API documentation

Ready to implement real-time streaming? Start with our quickstart guide to set up your development environment and establish your first WebSocket connection.