Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.bookovia.com/llms.txt

Use this file to discover all available pages before exploring further.

Overview

The Real-Time Event Streaming API delivers instant notifications for all fleet events as they occur. Perfect for building responsive applications, automated workflows, alert systems, and real-time operational dashboards that need immediate awareness of fleet activities.

Authentication

All streaming connections require authentication using your API key:
Authorization: Bearer YOUR_API_KEY

Endpoints

Start Event Stream

POST /api/v1/streaming/events/start

Stop Event Stream

POST /api/v1/streaming/events/stop

Get Stream Status

GET /api/v1/streaming/events/status/{stream_id}

Configure Event Filters

PUT /api/v1/streaming/events/configure/{stream_id}

Event Categories

CategoryDescriptionTrigger Conditions
trip_lifecycleTrip start, stop, pause, resume eventsTrip state changes
safety_eventsHarsh driving, collisions, safety alertsSafety threshold violations
maintenance_eventsService due, breakdowns, diagnosticsMaintenance conditions
operational_eventsRoute deviations, delays, arrivalsOperational milestones
driver_eventsLogin, logout, break start/endDriver activity changes
vehicle_eventsEngine on/off, door open/closeVehicle state changes
geofence_eventsEnter/exit defined areasGeographic boundaries
emergency_eventsPanic button, crash detectionEmergency conditions
system_eventsDevice connectivity, data qualitySystem status changes

Event Stream Configuration

Start Event Stream Request

{
  "fleet_id": "fleet_001",
  "event_filters": {
    "categories": [
      "trip_lifecycle",
      "safety_events",
      "emergency_events"
    ],
    "severity_levels": ["high", "critical"],
    "vehicle_ids": ["vehicle_001", "vehicle_002"],
    "driver_ids": ["driver_456"],
    "geofences": ["depot_001", "customer_zone_A"]
  },
  "stream_config": {
    "delivery_method": "websocket",
    "reliability": "at_least_once",
    "batch_events": false,
    "include_metadata": true,
    "encryption": true
  },
  "processing_rules": {
    "deduplication": true,
    "event_ordering": true,
    "correlation_enabled": true,
    "enrichment": ["location_names", "driver_info", "vehicle_details"]
  }
}

Event Filter Options

{
  "categories": ["trip_lifecycle", "safety_events"],
  "severity_levels": ["low", "medium", "high", "critical"],
  "priority_levels": ["normal", "urgent", "emergency"],
  "vehicle_ids": ["vehicle_001", "vehicle_002"],
  "driver_ids": ["driver_456", "driver_789"],
  "geofences": ["depot_001", "customer_zone_A"],
  "time_filters": {
    "business_hours_only": false,
    "exclude_weekends": false,
    "timezone": "America/New_York"
  },
  "custom_conditions": [
    {
      "field": "speed",
      "operator": ">",
      "value": 55
    }
  ]
}

Event Data Formats

Trip Lifecycle Events

{
  "event_id": "evt_12345",
  "event_type": "trip_started",
  "category": "trip_lifecycle",
  "timestamp": "2024-03-15T14:30:00Z",
  "severity": "low",
  "priority": "normal",
  "source": {
    "vehicle_id": "vehicle_001",
    "driver_id": "driver_456",
    "trip_id": "trip_789"
  },
  "location": {
    "latitude": 40.7128,
    "longitude": -74.0060,
    "address": "123 Depot St, New York, NY",
    "geofence": "depot_001"
  },
  "event_data": {
    "trip_type": "delivery",
    "planned_destinations": 8,
    "estimated_duration": 480,
    "estimated_distance": 125.5,
    "route_id": "route_456",
    "dispatch_time": "2024-03-15T14:15:00Z"
  },
  "metadata": {
    "correlation_id": "corr_abc123",
    "sequence_number": 1,
    "processing_time": "2024-03-15T14:30:00.150Z",
    "source_system": "telematics_device"
  }
}

Safety Events

{
  "event_id": "evt_54321",
  "event_type": "harsh_braking",
  "category": "safety_events",
  "timestamp": "2024-03-15T15:45:30Z",
  "severity": "high",
  "priority": "urgent",
  "source": {
    "vehicle_id": "vehicle_001",
    "driver_id": "driver_456",
    "trip_id": "trip_789"
  },
  "location": {
    "latitude": 40.7489,
    "longitude": -73.9857,
    "address": "Broadway & 42nd St, New York, NY",
    "speed_limit": 35,
    "road_type": "urban"
  },
  "event_data": {
    "deceleration_g": -0.8,
    "initial_speed": 42.5,
    "final_speed": 8.2,
    "braking_duration": 3.2,
    "weather_condition": "clear",
    "traffic_condition": "moderate",
    "preceding_events": [
      {
        "event": "speed_increase",
        "timestamp": "2024-03-15T15:45:15Z"
      }
    ]
  },
  "risk_assessment": {
    "collision_risk": "medium",
    "passenger_impact": "low",
    "recommended_actions": [
      "driver_coaching",
      "route_review"
    ]
  },
  "automatic_actions": {
    "alert_sent": true,
    "incident_logged": true,
    "supervisor_notified": false
  }
}

Emergency Events

{
  "event_id": "evt_99999",
  "event_type": "panic_button_pressed",
  "category": "emergency_events",
  "timestamp": "2024-03-15T16:22:45Z",
  "severity": "critical",
  "priority": "emergency",
  "source": {
    "vehicle_id": "vehicle_001",
    "driver_id": "driver_456",
    "trip_id": "trip_789"
  },
  "location": {
    "latitude": 40.7282,
    "longitude": -73.9942,
    "address": "Unknown Location - GPS Coordinates",
    "accuracy": 5.2,
    "emergency_services_distance": 2.1
  },
  "event_data": {
    "button_held_duration": 3.5,
    "vehicle_speed": 0,
    "engine_status": "running",
    "doors_locked": false,
    "hazard_lights": "activated"
  },
  "emergency_response": {
    "response_initiated": true,
    "emergency_contacts_notified": true,
    "authorities_contacted": false,
    "estimated_response_time": 12
  },
  "escalation": {
    "level": "immediate",
    "next_escalation": "2024-03-15T16:25:00Z",
    "escalation_contacts": [
      "supervisor_001",
      "emergency_coordinator"
    ]
  }
}

Maintenance Events

{
  "event_id": "evt_67890",
  "event_type": "maintenance_due",
  "category": "maintenance_events",
  "timestamp": "2024-03-15T17:00:00Z",
  "severity": "medium",
  "priority": "normal",
  "source": {
    "vehicle_id": "vehicle_001",
    "current_mileage": 48250.0
  },
  "event_data": {
    "maintenance_type": "scheduled_service",
    "service_category": "oil_change",
    "due_mileage": 48000.0,
    "overdue_miles": 250.0,
    "last_service_date": "2023-12-15T10:00:00Z",
    "last_service_mileage": 43000.0,
    "estimated_cost": 150.00
  },
  "recommendations": {
    "urgency": "schedule_within_week",
    "preferred_service_center": "service_center_A",
    "estimated_downtime": "2 hours",
    "alternative_vehicles": ["vehicle_003", "vehicle_007"]
  },
  "predictive_indicators": {
    "oil_life_remaining": "5%",
    "filter_condition": "poor",
    "next_predicted_failure": "transmission_service_due_soon"
  }
}

Geofence Events

{
  "event_id": "evt_13579",
  "event_type": "geofence_exit",
  "category": "geofence_events",
  "timestamp": "2024-03-15T18:30:15Z",
  "severity": "low",
  "priority": "normal",
  "source": {
    "vehicle_id": "vehicle_001",
    "driver_id": "driver_456",
    "trip_id": "trip_789"
  },
  "location": {
    "latitude": 40.7580,
    "longitude": -73.9855,
    "address": "Customer Site A, New York, NY"
  },
  "event_data": {
    "geofence_id": "customer_zone_A",
    "geofence_name": "Customer Site A",
    "entry_time": "2024-03-15T18:15:30Z",
    "duration_inside": 14.75,
    "planned_duration": 15,
    "service_completed": true,
    "next_destination": "customer_zone_B"
  },
  "performance_metrics": {
    "on_time_departure": true,
    "service_efficiency": 98.3,
    "customer_satisfaction": "pending"
  }
}

SDK Examples

import { BookoviaSDK } from '@bookovia/sdk';

const sdk = new BookoviaSDK({ apiKey: 'your-api-key' });

class EventStreamProcessor {
  constructor() {
    this.activeStreams = new Map();
    this.eventHandlers = new Map();
    this.eventBuffer = [];
    this.correlationEngine = new EventCorrelationEngine();
  }

  async startEventStream(fleetId, config = {}) {
    try {
      const streamConfig = {
        fleet_id: fleetId,
        event_filters: {
          categories: config.categories || [
            'trip_lifecycle',
            'safety_events',
            'emergency_events',
            'maintenance_events'
          ],
          severity_levels: config.severityLevels || ['medium', 'high', 'critical'],
          vehicle_ids: config.vehicleIds,
          driver_ids: config.driverIds
        },
        stream_config: {
          delivery_method: 'websocket',
          reliability: 'at_least_once',
          batch_events: false,
          include_metadata: true
        },
        processing_rules: {
          deduplication: true,
          event_ordering: true,
          correlation_enabled: true,
          enrichment: ['location_names', 'driver_info', 'vehicle_details']
        }
      };

      const stream = await sdk.streaming.events.start(streamConfig);
      
      this.activeStreams.set(stream.stream_id, {
        config: streamConfig,
        status: 'active',
        startTime: new Date(),
        eventCount: 0
      });

      // Set up event handler
      stream.onEvent = (event) => {
        this.processEvent(event);
      };

      // Set up error handler
      stream.onError = (error) => {
        console.error('Event stream error:', error);
        this.handleStreamError(stream.stream_id, error);
      };

      console.log(`Event stream started: ${stream.stream_id}`);
      return stream;

    } catch (error) {
      console.error('Failed to start event stream:', error);
      throw error;
    }
  }

  processEvent(event) {
    // Increment event counter
    const streamInfo = this.activeStreams.get(event.stream_id);
    if (streamInfo) {
      streamInfo.eventCount++;
    }

    // Add to correlation engine
    this.correlationEngine.addEvent(event);

    // Route event based on category
    this.routeEventByCategory(event);

    // Check for event patterns
    this.checkEventPatterns(event);

    // Store in buffer for batch processing
    this.bufferEvent(event);

    // Emit processed event
    this.emitProcessedEvent(event);
  }

  routeEventByCategory(event) {
    const { category, event_type } = event;

    switch (category) {
      case 'trip_lifecycle':
        this.handleTripLifecycleEvent(event);
        break;
      case 'safety_events':
        this.handleSafetyEvent(event);
        break;
      case 'emergency_events':
        this.handleEmergencyEvent(event);
        break;
      case 'maintenance_events':
        this.handleMaintenanceEvent(event);
        break;
      case 'geofence_events':
        this.handleGeofenceEvent(event);
        break;
      default:
        this.handleGenericEvent(event);
    }
  }

  handleTripLifecycleEvent(event) {
    const { event_type, source, event_data } = event;

    switch (event_type) {
      case 'trip_started':
        console.log(`🚗 Trip started: ${source.trip_id} by ${source.driver_id}`);
        this.updateTripStatus(source.trip_id, 'active');
        this.notifyDispatch('trip_started', event);
        break;

      case 'trip_completed':
        console.log(`✅ Trip completed: ${source.trip_id}`);
        this.updateTripStatus(source.trip_id, 'completed');
        this.generateTripSummary(source.trip_id);
        break;

      case 'waypoint_reached':
        console.log(`📍 Waypoint reached: ${event_data.waypoint_name}`);
        this.updateRouteProgress(source.trip_id, event_data);
        this.checkDeliverySchedule(source.trip_id, event_data);
        break;

      case 'trip_paused':
        console.log(`⏸️ Trip paused: ${source.trip_id}`);
        this.handleTripPause(event);
        break;
    }
  }

  handleSafetyEvent(event) {
    const { event_type, severity, source, event_data } = event;

    console.log(`🚨 Safety event: ${event_type} - Severity: ${severity}`);

    // Log safety incident
    this.logSafetyIncident(event);

    // Update driver safety score
    this.updateDriverSafetyScore(source.driver_id, event_type, severity);

    // Check if coaching is needed
    if (severity === 'high' || severity === 'critical') {
      this.scheduleDriverCoaching(source.driver_id, event_type);
    }

    // Send real-time alerts
    this.sendSafetyAlert(event);

    // Check for patterns
    this.checkSafetyPatterns(source.vehicle_id, source.driver_id, event_type);
  }

  handleEmergencyEvent(event) {
    const { event_type, source, event_data, emergency_response } = event;

    console.log(`🆘 EMERGENCY: ${event_type} - Vehicle: ${source.vehicle_id}`);

    // Immediate response protocol
    this.activateEmergencyProtocol(event);

    // Notify emergency contacts
    this.notifyEmergencyContacts(event);

    // Coordinate emergency response
    if (emergency_response && emergency_response.response_initiated) {
      this.trackEmergencyResponse(event);
    }

    // Log emergency incident
    this.logEmergencyIncident(event);

    // Update vehicle status
    this.updateVehicleStatus(source.vehicle_id, 'emergency');
  }

  handleMaintenanceEvent(event) {
    const { event_type, source, event_data, recommendations } = event;

    console.log(`🔧 Maintenance: ${event_type} - Vehicle: ${source.vehicle_id}`);

    switch (event_type) {
      case 'maintenance_due':
        this.scheduleMaintenanceService(source.vehicle_id, event_data);
        break;

      case 'breakdown_detected':
        this.handleVehicleBreakdown(source.vehicle_id, event);
        break;

      case 'diagnostic_alert':
        this.processDiagnosticAlert(source.vehicle_id, event_data);
        break;
    }

    // Update maintenance tracking
    this.updateMaintenanceTracking(source.vehicle_id, event);

    // Check fleet maintenance status
    this.checkFleetMaintenanceStatus();
  }

  handleGeofenceEvent(event) {
    const { event_type, source, event_data } = event;

    switch (event_type) {
      case 'geofence_entry':
        console.log(`📍 Entered: ${event_data.geofence_name}`);
        this.handleGeofenceEntry(event);
        break;

      case 'geofence_exit':
        console.log(`📍 Exited: ${event_data.geofence_name}`);
        this.handleGeofenceExit(event);
        break;
    }

    // Update location tracking
    this.updateLocationTracking(source.vehicle_id, event);

    // Check service completion
    if (event_data.service_completed !== undefined) {
      this.updateServiceCompletion(source.trip_id, event_data);
    }
  }

  checkEventPatterns(event) {
    // Detect event patterns that might indicate issues
    const patterns = this.correlationEngine.analyzePatterns(event);

    patterns.forEach(pattern => {
      switch (pattern.type) {
        case 'frequent_harsh_braking':
          this.handleFrequentHarshBraking(pattern);
          break;

        case 'recurring_maintenance_issues':
          this.handleRecurringMaintenanceIssues(pattern);
          break;

        case 'route_deviation_pattern':
          this.handleRouteDeviationPattern(pattern);
          break;

        case 'driver_fatigue_indicators':
          this.handleDriverFatiguePattern(pattern);
          break;
      }
    });
  }

  // Event correlation and pattern detection
  analyzeEventCorrelations(timeWindow = 300000) { // 5 minutes
    const recentEvents = this.eventBuffer.filter(
      event => Date.now() - new Date(event.timestamp).getTime() < timeWindow
    );

    // Group events by vehicle and analyze
    const eventsByVehicle = recentEvents.reduce((acc, event) => {
      const vehicleId = event.source.vehicle_id;
      if (!acc[vehicleId]) acc[vehicleId] = [];
      acc[vehicleId].push(event);
      return acc;
    }, {});

    Object.entries(eventsByVehicle).forEach(([vehicleId, events]) => {
      this.analyzeVehicleEventSequence(vehicleId, events);
    });
  }

  analyzeVehicleEventSequence(vehicleId, events) {
    // Look for concerning event sequences
    const safetyEvents = events.filter(e => e.category === 'safety_events');
    
    if (safetyEvents.length >= 3) {
      this.raisePattern({
        type: 'multiple_safety_events',
        vehicle_id: vehicleId,
        event_count: safetyEvents.length,
        severity: 'high',
        recommendation: 'immediate_driver_intervention'
      });
    }

    // Check for maintenance-safety correlation
    const maintenanceEvents = events.filter(e => e.category === 'maintenance_events');
    if (maintenanceEvents.length > 0 && safetyEvents.length > 0) {
      this.raisePattern({
        type: 'maintenance_safety_correlation',
        vehicle_id: vehicleId,
        severity: 'medium',
        recommendation: 'expedite_maintenance'
      });
    }
  }

  // Utility methods
  updateTripStatus(tripId, status) {
    // Implementation to update trip status in database
  }

  notifyDispatch(eventType, event) {
    // Implementation to notify dispatch system
  }

  generateTripSummary(tripId) {
    // Implementation to generate trip summary
  }

  logSafetyIncident(event) {
    // Implementation to log safety incident
  }

  updateDriverSafetyScore(driverId, eventType, severity) {
    // Implementation to update driver safety metrics
  }

  scheduleDriverCoaching(driverId, eventType) {
    // Implementation to schedule driver coaching
  }

  sendSafetyAlert(event) {
    // Implementation to send safety alerts
  }

  activateEmergencyProtocol(event) {
    // Implementation for emergency response protocol
  }

  bufferEvent(event) {
    this.eventBuffer.push(event);
    
    // Keep buffer size manageable
    if (this.eventBuffer.length > 1000) {
      this.eventBuffer = this.eventBuffer.slice(-500);
    }
  }

  emitProcessedEvent(event) {
    // Emit event to subscribers
    const handlers = this.eventHandlers.get(event.category) || [];
    handlers.forEach(handler => {
      try {
        handler(event);
      } catch (error) {
        console.error('Event handler error:', error);
      }
    });
  }

  // Public API for subscribing to events
  onEvent(category, handler) {
    if (!this.eventHandlers.has(category)) {
      this.eventHandlers.set(category, []);
    }
    this.eventHandlers.get(category).push(handler);
  }

  offEvent(category, handler) {
    const handlers = this.eventHandlers.get(category) || [];
    const index = handlers.indexOf(handler);
    if (index > -1) {
      handlers.splice(index, 1);
    }
  }
}

// Event correlation engine for pattern detection
class EventCorrelationEngine {
  constructor() {
    this.eventHistory = [];
    this.patterns = new Map();
    this.correlationRules = this.initializeCorrelationRules();
  }

  initializeCorrelationRules() {
    return [
      {
        name: 'harsh_braking_sequence',
        events: ['harsh_braking'],
        timeWindow: 600000, // 10 minutes
        threshold: 3,
        severity: 'high'
      },
      {
        name: 'maintenance_breakdown_correlation',
        events: ['maintenance_due', 'breakdown_detected'],
        timeWindow: 86400000, // 24 hours
        threshold: 1,
        severity: 'medium'
      }
    ];
  }

  addEvent(event) {
    this.eventHistory.push({
      ...event,
      processed_at: Date.now()
    });

    // Keep history manageable
    const cutoff = Date.now() - 86400000; // 24 hours
    this.eventHistory = this.eventHistory.filter(
      e => e.processed_at > cutoff
    );

    return this.checkCorrelationRules(event);
  }

  analyzePatterns(event) {
    const patterns = [];

    // Check each correlation rule
    this.correlationRules.forEach(rule => {
      const matchingEvents = this.findMatchingEvents(rule, event);
      if (matchingEvents.length >= rule.threshold) {
        patterns.push({
          type: rule.name,
          events: matchingEvents,
          severity: rule.severity,
          vehicle_id: event.source.vehicle_id
        });
      }
    });

    return patterns;
  }

  findMatchingEvents(rule, currentEvent) {
    const timeWindow = Date.now() - rule.timeWindow;
    
    return this.eventHistory.filter(event => 
      event.source.vehicle_id === currentEvent.source.vehicle_id &&
      event.processed_at > timeWindow &&
      rule.events.includes(event.event_type)
    );
  }

  checkCorrelationRules(event) {
    // Implementation for real-time correlation checking
    return this.analyzePatterns(event);
  }
}

// Usage example
const eventProcessor = new EventStreamProcessor();

// Start event streaming
eventProcessor.startEventStream('fleet_001', {
  categories: ['safety_events', 'emergency_events', 'trip_lifecycle'],
  severityLevels: ['high', 'critical']
});

// Subscribe to specific event types
eventProcessor.onEvent('safety_events', (event) => {
  console.log('Safety event received:', event.event_type);
  // Handle safety event
});

eventProcessor.onEvent('emergency_events', (event) => {
  console.log('EMERGENCY EVENT:', event.event_type);
  // Immediate emergency response
});

Use Cases

Emergency Response System

Implement automated emergency response with event-driven workflows.
class EmergencyResponseSystem {
  constructor(eventProcessor) {
    this.eventProcessor = eventProcessor;
    this.activeEmergencies = new Map();
    this.responseTeams = new Map();
    this.escalationLevels = this.initializeEscalationLevels();
    
    this.setupEventSubscriptions();
  }

  setupEventSubscriptions() {
    this.eventProcessor.onEvent('emergency_events', (event) => {
      this.handleEmergencyEvent(event);
    });

    this.eventProcessor.onEvent('safety_events', (event) => {
      if (event.severity === 'critical') {
        this.handleCriticalSafetyEvent(event);
      }
    });
  }

  initializeEscalationLevels() {
    return {
      level_1: {
        timeout: 120000, // 2 minutes
        contacts: ['supervisor', 'dispatcher'],
        actions: ['notify_contacts', 'log_incident']
      },
      level_2: {
        timeout: 300000, // 5 minutes
        contacts: ['manager', 'safety_coordinator'],
        actions: ['escalate_to_management', 'prepare_response_team']
      },
      level_3: {
        timeout: 600000, // 10 minutes
        contacts: ['emergency_services', 'executive_team'],
        actions: ['contact_authorities', 'media_preparation']
      }
    };
  }

  async handleEmergencyEvent(event) {
    const emergencyId = this.generateEmergencyId();
    const emergency = {
      id: emergencyId,
      event: event,
      status: 'active',
      level: 1,
      startTime: new Date(),
      responses: [],
      escalations: []
    };

    this.activeEmergencies.set(emergencyId, emergency);

    // Immediate response
    await this.initiateEmergencyResponse(emergency);

    // Set up escalation timers
    this.setupEscalationTimers(emergency);

    // Start tracking
    this.startEmergencyTracking(emergency);
  }

  async initiateEmergencyResponse(emergency) {
    const { event } = emergency;
    const vehicleId = event.source.vehicle_id;
    const location = event.location;

    console.log(`🚨 EMERGENCY RESPONSE INITIATED: ${event.event_type}`);
    console.log(`Vehicle: ${vehicleId} at ${location.address}`);

    // Immediate actions based on event type
    switch (event.event_type) {
      case 'panic_button_pressed':
        await this.handlePanicButton(emergency);
        break;

      case 'collision_detected':
        await this.handleCollisionDetection(emergency);
        break;

      case 'medical_emergency':
        await this.handleMedicalEmergency(emergency);
        break;

      case 'vehicle_fire_detected':
        await this.handleVehicleFire(emergency);
        break;

      default:
        await this.handleGenericEmergency(emergency);
    }

    // Log response initiation
    emergency.responses.push({
      type: 'response_initiated',
      timestamp: new Date(),
      actions_taken: ['emergency_protocol_activated']
    });
  }

  async handlePanicButton(emergency) {
    const { event } = emergency;
    const vehicleId = event.source.vehicle_id;

    // 1. Immediate location tracking
    await this.activateEmergencyTracking(vehicleId);

    // 2. Contact driver first
    const driverResponse = await this.attemptDriverContact(event.source.driver_id);
    
    if (!driverResponse.successful) {
      // 3. If no driver response, escalate immediately
      await this.escalateToLevel2(emergency);
    }

    // 4. Notify immediate contacts
    await this.notifyEmergencyContacts(emergency, 'level_1');

    // 5. Prepare response team standby
    await this.prepareResponseTeam(emergency);
  }

  async handleCollisionDetection(emergency) {
    const { event } = emergency;

    // High-priority collision response
    emergency.level = 2; // Start at level 2 for collisions

    // 1. Immediate emergency services alert
    await this.contactEmergencyServices(emergency);

    // 2. Notify all relevant parties
    await this.notifyEmergencyContacts(emergency, 'level_2');

    // 3. Deploy response team immediately
    await this.deployResponseTeam(emergency);

    // 4. Coordinate with authorities
    await this.coordinateWithAuthorities(emergency);
  }

  async handleMedicalEmergency(emergency) {
    const { event } = emergency;

    // Medical emergency - highest priority
    emergency.level = 3;

    // 1. Immediate medical services
    await this.contactMedicalServices(emergency);

    // 2. Driver assistance protocol
    await this.activateDriverAssistance(emergency);

    // 3. Route to nearest medical facility
    await this.routeToMedicalFacility(emergency);
  }

  setupEscalationTimers(emergency) {
    const levels = ['level_1', 'level_2', 'level_3'];
    
    levels.forEach((level, index) => {
      const escalation = this.escalationLevels[level];
      
      setTimeout(async () => {
        if (emergency.status === 'active' && emergency.level <= index + 1) {
          await this.escalateEmergency(emergency, index + 2);
        }
      }, escalation.timeout);
    });
  }

  async escalateEmergency(emergency, newLevel) {
    if (newLevel > 3) return; // Max level reached

    console.log(`⬆️ Escalating emergency ${emergency.id} to level ${newLevel}`);

    emergency.level = newLevel;
    emergency.escalations.push({
      level: newLevel,
      timestamp: new Date(),
      reason: 'timeout_escalation'
    });

    const levelKey = `level_${newLevel}`;
    const escalationConfig = this.escalationLevels[levelKey];

    // Execute escalation actions
    for (const action of escalationConfig.actions) {
      await this.executeEscalationAction(emergency, action);
    }

    // Notify escalation contacts
    await this.notifyEmergencyContacts(emergency, levelKey);
  }

  async executeEscalationAction(emergency, action) {
    switch (action) {
      case 'escalate_to_management':
        await this.notifyManagement(emergency);
        break;

      case 'prepare_response_team':
        await this.prepareResponseTeam(emergency);
        break;

      case 'contact_authorities':
        await this.contactAuthorities(emergency);
        break;

      case 'media_preparation':
        await this.prepareMediaResponse(emergency);
        break;
    }
  }

  async resolveEmergency(emergencyId, resolution) {
    const emergency = this.activeEmergencies.get(emergencyId);
    if (!emergency) return;

    emergency.status = 'resolved';
    emergency.endTime = new Date();
    emergency.resolution = resolution;

    console.log(`✅ Emergency resolved: ${emergencyId}`);

    // Notify all parties of resolution
    await this.notifyEmergencyResolution(emergency);

    // Generate incident report
    await this.generateIncidentReport(emergency);

    // Archive emergency
    this.activeEmergencies.delete(emergencyId);
  }

  async generateIncidentReport(emergency) {
    const report = {
      emergency_id: emergency.id,
      event_type: emergency.event.event_type,
      start_time: emergency.startTime,
      end_time: emergency.endTime,
      duration: emergency.endTime - emergency.startTime,
      max_escalation_level: emergency.level,
      responses: emergency.responses,
      escalations: emergency.escalations,
      resolution: emergency.resolution,
      lessons_learned: await this.analyzeLessonsLearned(emergency)
    };

    // Store report and send to relevant parties
    await this.storeIncidentReport(report);
    await this.distributeIncidentReport(report);

    return report;
  }

  // Utility methods
  generateEmergencyId() {
    return `emrg_${Date.now()}_${Math.random().toString(36).substr(2, 5)}`;
  }

  async activateEmergencyTracking(vehicleId) {
    // Implementation for enhanced GPS tracking
  }

  async attemptDriverContact(driverId) {
    // Implementation for contacting driver
    return { successful: false, response: null };
  }

  async notifyEmergencyContacts(emergency, level) {
    // Implementation for notifying emergency contacts
  }

  async contactEmergencyServices(emergency) {
    // Implementation for contacting emergency services
  }

  async prepareResponseTeam(emergency) {
    // Implementation for preparing response team
  }
}

Operational Intelligence Dashboard

Build real-time operational intelligence using event streams.
class OperationalIntelligenceDashboard:
    def __init__(self, event_processor: EventStreamProcessor):
        self.event_processor = event_processor
        self.metrics = defaultdict(lambda: defaultdict(int))
        self.kpi_calculator = KPICalculator()
        self.alert_manager = AlertManager()
        self.trend_analyzer = TrendAnalyzer()
        
        self.setup_event_handlers()
    
    def setup_event_handlers(self):
        """Set up handlers for different event categories"""
        self.event_processor.on_event('trip_lifecycle', self.handle_trip_metrics)
        self.event_processor.on_event('safety_events', self.handle_safety_metrics)
        self.event_processor.on_event('maintenance_events', self.handle_maintenance_metrics)
        self.event_processor.on_event('geofence_events', self.handle_operational_metrics)
    
    async def handle_trip_metrics(self, event: dict):
        """Process trip events for operational metrics"""
        event_type = event['event_type']
        timestamp = datetime.fromisoformat(event['timestamp'].replace('Z', '+00:00'))
        
        # Update metrics based on event type
        if event_type == 'trip_started':
            self.metrics['trips']['started_today'] += 1
            await self.update_active_trips_count(1)
            
        elif event_type == 'trip_completed':
            self.metrics['trips']['completed_today'] += 1
            await self.update_active_trips_count(-1)
            await self.calculate_trip_efficiency(event)
            
        elif event_type == 'waypoint_reached':
            self.metrics['deliveries']['completed_today'] += 1
            await self.update_delivery_performance(event)
    
    async def handle_safety_metrics(self, event: dict):
        """Process safety events for safety KPIs"""
        event_type = event['event_type']
        severity = event['severity']
        
        # Update safety metrics
        self.metrics['safety']['total_events_today'] += 1
        self.metrics['safety'][f'{severity}_events_today'] += 1
        self.metrics['safety'][f'{event_type}_today'] += 1
        
        # Calculate safety scores
        await self.update_fleet_safety_score(event)
        
        # Check for safety trends
        safety_trend = await self.trend_analyzer.analyze_safety_trend(event)
        if safety_trend['concerning']:
            await self.alert_manager.raise_trend_alert('safety_deterioration', safety_trend)
    
    async def handle_maintenance_metrics(self, event: dict):
        """Process maintenance events for fleet health metrics"""
        event_type = event['event_type']
        vehicle_id = event['source']['vehicle_id']
        
        if event_type == 'maintenance_due':
            self.metrics['maintenance']['vehicles_due'] += 1
            
        elif event_type == 'breakdown_detected':
            self.metrics['maintenance']['breakdowns_today'] += 1
            await self.update_vehicle_availability(vehicle_id, False)
            
        elif event_type == 'maintenance_completed':
            self.metrics['maintenance']['services_completed'] += 1
            await self.update_vehicle_availability(vehicle_id, True)
        
        # Update fleet health score
        await self.update_fleet_health_metrics()
    
    async def handle_operational_metrics(self, event: dict):
        """Process operational events for efficiency metrics"""
        event_type = event['event_type']
        event_data = event.get('event_data', {})
        
        if event_type == 'geofence_entry':
            geofence_type = event_data.get('geofence_type', 'customer')
            if geofence_type == 'customer':
                await self.track_arrival_time(event)
                
        elif event_type == 'geofence_exit':
            service_completed = event_data.get('service_completed', False)
            if service_completed:
                await self.track_service_completion(event)
                await self.calculate_service_efficiency(event)
    
    async def calculate_trip_efficiency(self, event: dict):
        """Calculate trip efficiency metrics"""
        trip_id = event['source']['trip_id']
        event_data = event.get('event_data', {})
        
        # Get trip details
        planned_duration = event_data.get('planned_duration', 0)
        actual_duration = event_data.get('actual_duration', 0)
        planned_distance = event_data.get('planned_distance', 0)
        actual_distance = event_data.get('actual_distance', 0)
        
        if planned_duration > 0:
            time_efficiency = min(100, (planned_duration / actual_duration) * 100)
            self.metrics['efficiency']['average_time_efficiency'] = (
                (self.metrics['efficiency']['average_time_efficiency'] + time_efficiency) / 2
            )
        
        if planned_distance > 0:
            distance_efficiency = min(100, (planned_distance / actual_distance) * 100)
            self.metrics['efficiency']['average_distance_efficiency'] = (
                (self.metrics['efficiency']['average_distance_efficiency'] + distance_efficiency) / 2
            )
    
    async def update_delivery_performance(self, event: dict):
        """Update delivery performance metrics"""
        event_data = event.get('event_data', {})
        
        # Check if delivery was on time
        planned_time = event_data.get('planned_arrival')
        actual_time = event['timestamp']
        
        if planned_time:
            planned_dt = datetime.fromisoformat(planned_time.replace('Z', '+00:00'))
            actual_dt = datetime.fromisoformat(actual_time.replace('Z', '+00:00'))
            
            delay_minutes = (actual_dt - planned_dt).total_seconds() / 60
            
            if delay_minutes <= 5:  # 5-minute tolerance
                self.metrics['deliveries']['on_time_today'] += 1
            else:
                self.metrics['deliveries']['late_today'] += 1
            
            # Update on-time performance percentage
            total_deliveries = (self.metrics['deliveries']['on_time_today'] + 
                              self.metrics['deliveries']['late_today'])
            
            if total_deliveries > 0:
                on_time_percentage = (self.metrics['deliveries']['on_time_today'] / 
                                    total_deliveries) * 100
                self.metrics['performance']['on_time_percentage'] = on_time_percentage
    
    async def update_fleet_safety_score(self, event: dict):
        """Update fleet-wide safety score"""
        severity_weights = {
            'low': 1,
            'medium': 3,
            'high': 8,
            'critical': 15
        }
        
        severity = event['severity']
        weight = severity_weights.get(severity, 1)
        
        # Decrease safety score based on severity
        current_score = self.metrics['safety'].get('fleet_safety_score', 100)
        score_decrease = weight * 0.1  # Adjust multiplier as needed
        
        new_score = max(0, current_score - score_decrease)
        self.metrics['safety']['fleet_safety_score'] = new_score
        
        # Check if safety score is critically low
        if new_score < 70:
            await self.alert_manager.raise_alert('low_fleet_safety_score', {
                'current_score': new_score,
                'threshold': 70,
                'recent_event': event
            })
    
    def get_real_time_dashboard_data(self) -> dict:
        """Get current dashboard data"""
        return {
            'timestamp': datetime.now().isoformat(),
            'fleet_overview': {
                'active_trips': self.metrics['trips']['started_today'] - 
                               self.metrics['trips']['completed_today'],
                'completed_trips': self.metrics['trips']['completed_today'],
                'total_deliveries': self.metrics['deliveries']['completed_today'],
                'on_time_percentage': self.metrics['performance'].get('on_time_percentage', 100)
            },
            'safety_metrics': {
                'safety_score': self.metrics['safety'].get('fleet_safety_score', 100),
                'safety_events_today': self.metrics['safety']['total_events_today'],
                'critical_events': self.metrics['safety']['critical_events_today'],
                'high_events': self.metrics['safety']['high_events_today']
            },
            'maintenance_status': {
                'vehicles_due_maintenance': self.metrics['maintenance']['vehicles_due'],
                'breakdowns_today': self.metrics['maintenance']['breakdowns_today'],
                'services_completed': self.metrics['maintenance']['services_completed']
            },
            'efficiency_metrics': {
                'time_efficiency': self.metrics['efficiency'].get('average_time_efficiency', 100),
                'distance_efficiency': self.metrics['efficiency'].get('average_distance_efficiency', 100),
                'fuel_efficiency': await self.calculate_fuel_efficiency(),
                'utilization_rate': await self.calculate_utilization_rate()
            },
            'alerts': await self.get_active_alerts(),
            'trends': await self.get_performance_trends()
        }
    
    async def get_active_alerts(self) -> List[dict]:
        """Get list of active alerts"""
        return await self.alert_manager.get_active_alerts()
    
    async def get_performance_trends(self) -> dict:
        """Get performance trend analysis"""
        return await self.trend_analyzer.get_current_trends()
    
    async def calculate_fuel_efficiency(self) -> float:
        """Calculate fleet fuel efficiency"""
        # Implementation would aggregate fuel data from telemetry
        return 12.5  # Placeholder MPG
    
    async def calculate_utilization_rate(self) -> float:
        """Calculate fleet utilization rate"""
        # Implementation would calculate based on active vs. total vehicles
        return 85.2  # Placeholder percentage


class KPICalculator:
    """Calculate key performance indicators"""
    
    def __init__(self):
        self.historical_data = defaultdict(list)
    
    async def calculate_operational_kpis(self, metrics: dict) -> dict:
        """Calculate operational KPIs"""
        return {
            'delivery_performance': await self.calculate_delivery_kpi(metrics),
            'safety_performance': await self.calculate_safety_kpi(metrics),
            'efficiency_performance': await self.calculate_efficiency_kpi(metrics),
            'maintenance_performance': await self.calculate_maintenance_kpi(metrics)
        }
    
    async def calculate_delivery_kpi(self, metrics: dict) -> dict:
        """Calculate delivery-related KPIs"""
        deliveries = metrics.get('deliveries', {})
        
        total_deliveries = deliveries.get('completed_today', 0)
        on_time_deliveries = deliveries.get('on_time_today', 0)
        
        on_time_rate = (on_time_deliveries / total_deliveries * 100) if total_deliveries > 0 else 100
        
        return {
            'on_time_delivery_rate': on_time_rate,
            'total_deliveries': total_deliveries,
            'performance_grade': self.get_performance_grade(on_time_rate)
        }
    
    def get_performance_grade(self, percentage: float) -> str:
        """Convert percentage to performance grade"""
        if percentage >= 95:
            return 'A'
        elif percentage >= 90:
            return 'B'
        elif percentage >= 80:
            return 'C'
        elif percentage >= 70:
            return 'D'
        else:
            return 'F'

Best Practices

Event Processing

  • Implement idempotent event handlers to handle duplicate events
  • Use event ordering and correlation for complex workflows
  • Design events with sufficient context for downstream processing
  • Implement event replay capabilities for recovery scenarios

Alert Management

  • Set appropriate severity levels to avoid alert fatigue
  • Implement escalation rules based on event criticality
  • Use suppression rules to prevent duplicate alerts
  • Maintain alert acknowledgment and resolution workflows

Performance Optimization

  • Use event filtering to reduce unnecessary processing
  • Implement batch processing for high-volume event streams
  • Cache frequently accessed data to reduce database load
  • Use asynchronous processing for non-critical events

System Reliability

  • Implement circuit breakers for external service calls
  • Use dead letter queues for failed event processing
  • Monitor event processing latency and throughput
  • Implement graceful degradation during system overload

Related Endpoints

Explore other real-time capabilities: