Documentation Index
Fetch the complete documentation index at: https://docs.bookovia.com/llms.txt
Use this file to discover all available pages before exploring further.
Overview
The Real-Time Event Streaming API delivers instant notifications for all fleet events as they occur. Perfect for building responsive applications, automated workflows, alert systems, and real-time operational dashboards that need immediate awareness of fleet activities.Authentication
All streaming connections require authentication using your API key:Authorization: Bearer YOUR_API_KEY
Endpoints
Start Event Stream
POST /api/v1/streaming/events/start
Stop Event Stream
POST /api/v1/streaming/events/stop
Get Stream Status
GET /api/v1/streaming/events/status/{stream_id}
Configure Event Filters
PUT /api/v1/streaming/events/configure/{stream_id}
Event Categories
| Category | Description | Trigger Conditions |
|---|---|---|
trip_lifecycle | Trip start, stop, pause, resume events | Trip state changes |
safety_events | Harsh driving, collisions, safety alerts | Safety threshold violations |
maintenance_events | Service due, breakdowns, diagnostics | Maintenance conditions |
operational_events | Route deviations, delays, arrivals | Operational milestones |
driver_events | Login, logout, break start/end | Driver activity changes |
vehicle_events | Engine on/off, door open/close | Vehicle state changes |
geofence_events | Enter/exit defined areas | Geographic boundaries |
emergency_events | Panic button, crash detection | Emergency conditions |
system_events | Device connectivity, data quality | System status changes |
Event Stream Configuration
Start Event Stream Request
{
"fleet_id": "fleet_001",
"event_filters": {
"categories": [
"trip_lifecycle",
"safety_events",
"emergency_events"
],
"severity_levels": ["high", "critical"],
"vehicle_ids": ["vehicle_001", "vehicle_002"],
"driver_ids": ["driver_456"],
"geofences": ["depot_001", "customer_zone_A"]
},
"stream_config": {
"delivery_method": "websocket",
"reliability": "at_least_once",
"batch_events": false,
"include_metadata": true,
"encryption": true
},
"processing_rules": {
"deduplication": true,
"event_ordering": true,
"correlation_enabled": true,
"enrichment": ["location_names", "driver_info", "vehicle_details"]
}
}
Event Filter Options
{
"categories": ["trip_lifecycle", "safety_events"],
"severity_levels": ["low", "medium", "high", "critical"],
"priority_levels": ["normal", "urgent", "emergency"],
"vehicle_ids": ["vehicle_001", "vehicle_002"],
"driver_ids": ["driver_456", "driver_789"],
"geofences": ["depot_001", "customer_zone_A"],
"time_filters": {
"business_hours_only": false,
"exclude_weekends": false,
"timezone": "America/New_York"
},
"custom_conditions": [
{
"field": "speed",
"operator": ">",
"value": 55
}
]
}
Event Data Formats
Trip Lifecycle Events
{
"event_id": "evt_12345",
"event_type": "trip_started",
"category": "trip_lifecycle",
"timestamp": "2024-03-15T14:30:00Z",
"severity": "low",
"priority": "normal",
"source": {
"vehicle_id": "vehicle_001",
"driver_id": "driver_456",
"trip_id": "trip_789"
},
"location": {
"latitude": 40.7128,
"longitude": -74.0060,
"address": "123 Depot St, New York, NY",
"geofence": "depot_001"
},
"event_data": {
"trip_type": "delivery",
"planned_destinations": 8,
"estimated_duration": 480,
"estimated_distance": 125.5,
"route_id": "route_456",
"dispatch_time": "2024-03-15T14:15:00Z"
},
"metadata": {
"correlation_id": "corr_abc123",
"sequence_number": 1,
"processing_time": "2024-03-15T14:30:00.150Z",
"source_system": "telematics_device"
}
}
Safety Events
{
"event_id": "evt_54321",
"event_type": "harsh_braking",
"category": "safety_events",
"timestamp": "2024-03-15T15:45:30Z",
"severity": "high",
"priority": "urgent",
"source": {
"vehicle_id": "vehicle_001",
"driver_id": "driver_456",
"trip_id": "trip_789"
},
"location": {
"latitude": 40.7489,
"longitude": -73.9857,
"address": "Broadway & 42nd St, New York, NY",
"speed_limit": 35,
"road_type": "urban"
},
"event_data": {
"deceleration_g": -0.8,
"initial_speed": 42.5,
"final_speed": 8.2,
"braking_duration": 3.2,
"weather_condition": "clear",
"traffic_condition": "moderate",
"preceding_events": [
{
"event": "speed_increase",
"timestamp": "2024-03-15T15:45:15Z"
}
]
},
"risk_assessment": {
"collision_risk": "medium",
"passenger_impact": "low",
"recommended_actions": [
"driver_coaching",
"route_review"
]
},
"automatic_actions": {
"alert_sent": true,
"incident_logged": true,
"supervisor_notified": false
}
}
Emergency Events
{
"event_id": "evt_99999",
"event_type": "panic_button_pressed",
"category": "emergency_events",
"timestamp": "2024-03-15T16:22:45Z",
"severity": "critical",
"priority": "emergency",
"source": {
"vehicle_id": "vehicle_001",
"driver_id": "driver_456",
"trip_id": "trip_789"
},
"location": {
"latitude": 40.7282,
"longitude": -73.9942,
"address": "Unknown Location - GPS Coordinates",
"accuracy": 5.2,
"emergency_services_distance": 2.1
},
"event_data": {
"button_held_duration": 3.5,
"vehicle_speed": 0,
"engine_status": "running",
"doors_locked": false,
"hazard_lights": "activated"
},
"emergency_response": {
"response_initiated": true,
"emergency_contacts_notified": true,
"authorities_contacted": false,
"estimated_response_time": 12
},
"escalation": {
"level": "immediate",
"next_escalation": "2024-03-15T16:25:00Z",
"escalation_contacts": [
"supervisor_001",
"emergency_coordinator"
]
}
}
Maintenance Events
{
"event_id": "evt_67890",
"event_type": "maintenance_due",
"category": "maintenance_events",
"timestamp": "2024-03-15T17:00:00Z",
"severity": "medium",
"priority": "normal",
"source": {
"vehicle_id": "vehicle_001",
"current_mileage": 48250.0
},
"event_data": {
"maintenance_type": "scheduled_service",
"service_category": "oil_change",
"due_mileage": 48000.0,
"overdue_miles": 250.0,
"last_service_date": "2023-12-15T10:00:00Z",
"last_service_mileage": 43000.0,
"estimated_cost": 150.00
},
"recommendations": {
"urgency": "schedule_within_week",
"preferred_service_center": "service_center_A",
"estimated_downtime": "2 hours",
"alternative_vehicles": ["vehicle_003", "vehicle_007"]
},
"predictive_indicators": {
"oil_life_remaining": "5%",
"filter_condition": "poor",
"next_predicted_failure": "transmission_service_due_soon"
}
}
Geofence Events
{
"event_id": "evt_13579",
"event_type": "geofence_exit",
"category": "geofence_events",
"timestamp": "2024-03-15T18:30:15Z",
"severity": "low",
"priority": "normal",
"source": {
"vehicle_id": "vehicle_001",
"driver_id": "driver_456",
"trip_id": "trip_789"
},
"location": {
"latitude": 40.7580,
"longitude": -73.9855,
"address": "Customer Site A, New York, NY"
},
"event_data": {
"geofence_id": "customer_zone_A",
"geofence_name": "Customer Site A",
"entry_time": "2024-03-15T18:15:30Z",
"duration_inside": 14.75,
"planned_duration": 15,
"service_completed": true,
"next_destination": "customer_zone_B"
},
"performance_metrics": {
"on_time_departure": true,
"service_efficiency": 98.3,
"customer_satisfaction": "pending"
}
}
SDK Examples
import { BookoviaSDK } from '@bookovia/sdk';
const sdk = new BookoviaSDK({ apiKey: 'your-api-key' });
class EventStreamProcessor {
constructor() {
this.activeStreams = new Map();
this.eventHandlers = new Map();
this.eventBuffer = [];
this.correlationEngine = new EventCorrelationEngine();
}
async startEventStream(fleetId, config = {}) {
try {
const streamConfig = {
fleet_id: fleetId,
event_filters: {
categories: config.categories || [
'trip_lifecycle',
'safety_events',
'emergency_events',
'maintenance_events'
],
severity_levels: config.severityLevels || ['medium', 'high', 'critical'],
vehicle_ids: config.vehicleIds,
driver_ids: config.driverIds
},
stream_config: {
delivery_method: 'websocket',
reliability: 'at_least_once',
batch_events: false,
include_metadata: true
},
processing_rules: {
deduplication: true,
event_ordering: true,
correlation_enabled: true,
enrichment: ['location_names', 'driver_info', 'vehicle_details']
}
};
const stream = await sdk.streaming.events.start(streamConfig);
this.activeStreams.set(stream.stream_id, {
config: streamConfig,
status: 'active',
startTime: new Date(),
eventCount: 0
});
// Set up event handler
stream.onEvent = (event) => {
this.processEvent(event);
};
// Set up error handler
stream.onError = (error) => {
console.error('Event stream error:', error);
this.handleStreamError(stream.stream_id, error);
};
console.log(`Event stream started: ${stream.stream_id}`);
return stream;
} catch (error) {
console.error('Failed to start event stream:', error);
throw error;
}
}
processEvent(event) {
// Increment event counter
const streamInfo = this.activeStreams.get(event.stream_id);
if (streamInfo) {
streamInfo.eventCount++;
}
// Add to correlation engine
this.correlationEngine.addEvent(event);
// Route event based on category
this.routeEventByCategory(event);
// Check for event patterns
this.checkEventPatterns(event);
// Store in buffer for batch processing
this.bufferEvent(event);
// Emit processed event
this.emitProcessedEvent(event);
}
routeEventByCategory(event) {
const { category, event_type } = event;
switch (category) {
case 'trip_lifecycle':
this.handleTripLifecycleEvent(event);
break;
case 'safety_events':
this.handleSafetyEvent(event);
break;
case 'emergency_events':
this.handleEmergencyEvent(event);
break;
case 'maintenance_events':
this.handleMaintenanceEvent(event);
break;
case 'geofence_events':
this.handleGeofenceEvent(event);
break;
default:
this.handleGenericEvent(event);
}
}
handleTripLifecycleEvent(event) {
const { event_type, source, event_data } = event;
switch (event_type) {
case 'trip_started':
console.log(`🚗 Trip started: ${source.trip_id} by ${source.driver_id}`);
this.updateTripStatus(source.trip_id, 'active');
this.notifyDispatch('trip_started', event);
break;
case 'trip_completed':
console.log(`✅ Trip completed: ${source.trip_id}`);
this.updateTripStatus(source.trip_id, 'completed');
this.generateTripSummary(source.trip_id);
break;
case 'waypoint_reached':
console.log(`📍 Waypoint reached: ${event_data.waypoint_name}`);
this.updateRouteProgress(source.trip_id, event_data);
this.checkDeliverySchedule(source.trip_id, event_data);
break;
case 'trip_paused':
console.log(`⏸️ Trip paused: ${source.trip_id}`);
this.handleTripPause(event);
break;
}
}
handleSafetyEvent(event) {
const { event_type, severity, source, event_data } = event;
console.log(`🚨 Safety event: ${event_type} - Severity: ${severity}`);
// Log safety incident
this.logSafetyIncident(event);
// Update driver safety score
this.updateDriverSafetyScore(source.driver_id, event_type, severity);
// Check if coaching is needed
if (severity === 'high' || severity === 'critical') {
this.scheduleDriverCoaching(source.driver_id, event_type);
}
// Send real-time alerts
this.sendSafetyAlert(event);
// Check for patterns
this.checkSafetyPatterns(source.vehicle_id, source.driver_id, event_type);
}
handleEmergencyEvent(event) {
const { event_type, source, event_data, emergency_response } = event;
console.log(`🆘 EMERGENCY: ${event_type} - Vehicle: ${source.vehicle_id}`);
// Immediate response protocol
this.activateEmergencyProtocol(event);
// Notify emergency contacts
this.notifyEmergencyContacts(event);
// Coordinate emergency response
if (emergency_response && emergency_response.response_initiated) {
this.trackEmergencyResponse(event);
}
// Log emergency incident
this.logEmergencyIncident(event);
// Update vehicle status
this.updateVehicleStatus(source.vehicle_id, 'emergency');
}
handleMaintenanceEvent(event) {
const { event_type, source, event_data, recommendations } = event;
console.log(`🔧 Maintenance: ${event_type} - Vehicle: ${source.vehicle_id}`);
switch (event_type) {
case 'maintenance_due':
this.scheduleMaintenanceService(source.vehicle_id, event_data);
break;
case 'breakdown_detected':
this.handleVehicleBreakdown(source.vehicle_id, event);
break;
case 'diagnostic_alert':
this.processDiagnosticAlert(source.vehicle_id, event_data);
break;
}
// Update maintenance tracking
this.updateMaintenanceTracking(source.vehicle_id, event);
// Check fleet maintenance status
this.checkFleetMaintenanceStatus();
}
handleGeofenceEvent(event) {
const { event_type, source, event_data } = event;
switch (event_type) {
case 'geofence_entry':
console.log(`📍 Entered: ${event_data.geofence_name}`);
this.handleGeofenceEntry(event);
break;
case 'geofence_exit':
console.log(`📍 Exited: ${event_data.geofence_name}`);
this.handleGeofenceExit(event);
break;
}
// Update location tracking
this.updateLocationTracking(source.vehicle_id, event);
// Check service completion
if (event_data.service_completed !== undefined) {
this.updateServiceCompletion(source.trip_id, event_data);
}
}
checkEventPatterns(event) {
// Detect event patterns that might indicate issues
const patterns = this.correlationEngine.analyzePatterns(event);
patterns.forEach(pattern => {
switch (pattern.type) {
case 'frequent_harsh_braking':
this.handleFrequentHarshBraking(pattern);
break;
case 'recurring_maintenance_issues':
this.handleRecurringMaintenanceIssues(pattern);
break;
case 'route_deviation_pattern':
this.handleRouteDeviationPattern(pattern);
break;
case 'driver_fatigue_indicators':
this.handleDriverFatiguePattern(pattern);
break;
}
});
}
// Event correlation and pattern detection
analyzeEventCorrelations(timeWindow = 300000) { // 5 minutes
const recentEvents = this.eventBuffer.filter(
event => Date.now() - new Date(event.timestamp).getTime() < timeWindow
);
// Group events by vehicle and analyze
const eventsByVehicle = recentEvents.reduce((acc, event) => {
const vehicleId = event.source.vehicle_id;
if (!acc[vehicleId]) acc[vehicleId] = [];
acc[vehicleId].push(event);
return acc;
}, {});
Object.entries(eventsByVehicle).forEach(([vehicleId, events]) => {
this.analyzeVehicleEventSequence(vehicleId, events);
});
}
analyzeVehicleEventSequence(vehicleId, events) {
// Look for concerning event sequences
const safetyEvents = events.filter(e => e.category === 'safety_events');
if (safetyEvents.length >= 3) {
this.raisePattern({
type: 'multiple_safety_events',
vehicle_id: vehicleId,
event_count: safetyEvents.length,
severity: 'high',
recommendation: 'immediate_driver_intervention'
});
}
// Check for maintenance-safety correlation
const maintenanceEvents = events.filter(e => e.category === 'maintenance_events');
if (maintenanceEvents.length > 0 && safetyEvents.length > 0) {
this.raisePattern({
type: 'maintenance_safety_correlation',
vehicle_id: vehicleId,
severity: 'medium',
recommendation: 'expedite_maintenance'
});
}
}
// Utility methods
updateTripStatus(tripId, status) {
// Implementation to update trip status in database
}
notifyDispatch(eventType, event) {
// Implementation to notify dispatch system
}
generateTripSummary(tripId) {
// Implementation to generate trip summary
}
logSafetyIncident(event) {
// Implementation to log safety incident
}
updateDriverSafetyScore(driverId, eventType, severity) {
// Implementation to update driver safety metrics
}
scheduleDriverCoaching(driverId, eventType) {
// Implementation to schedule driver coaching
}
sendSafetyAlert(event) {
// Implementation to send safety alerts
}
activateEmergencyProtocol(event) {
// Implementation for emergency response protocol
}
bufferEvent(event) {
this.eventBuffer.push(event);
// Keep buffer size manageable
if (this.eventBuffer.length > 1000) {
this.eventBuffer = this.eventBuffer.slice(-500);
}
}
emitProcessedEvent(event) {
// Emit event to subscribers
const handlers = this.eventHandlers.get(event.category) || [];
handlers.forEach(handler => {
try {
handler(event);
} catch (error) {
console.error('Event handler error:', error);
}
});
}
// Public API for subscribing to events
onEvent(category, handler) {
if (!this.eventHandlers.has(category)) {
this.eventHandlers.set(category, []);
}
this.eventHandlers.get(category).push(handler);
}
offEvent(category, handler) {
const handlers = this.eventHandlers.get(category) || [];
const index = handlers.indexOf(handler);
if (index > -1) {
handlers.splice(index, 1);
}
}
}
// Event correlation engine for pattern detection
class EventCorrelationEngine {
constructor() {
this.eventHistory = [];
this.patterns = new Map();
this.correlationRules = this.initializeCorrelationRules();
}
initializeCorrelationRules() {
return [
{
name: 'harsh_braking_sequence',
events: ['harsh_braking'],
timeWindow: 600000, // 10 minutes
threshold: 3,
severity: 'high'
},
{
name: 'maintenance_breakdown_correlation',
events: ['maintenance_due', 'breakdown_detected'],
timeWindow: 86400000, // 24 hours
threshold: 1,
severity: 'medium'
}
];
}
addEvent(event) {
this.eventHistory.push({
...event,
processed_at: Date.now()
});
// Keep history manageable
const cutoff = Date.now() - 86400000; // 24 hours
this.eventHistory = this.eventHistory.filter(
e => e.processed_at > cutoff
);
return this.checkCorrelationRules(event);
}
analyzePatterns(event) {
const patterns = [];
// Check each correlation rule
this.correlationRules.forEach(rule => {
const matchingEvents = this.findMatchingEvents(rule, event);
if (matchingEvents.length >= rule.threshold) {
patterns.push({
type: rule.name,
events: matchingEvents,
severity: rule.severity,
vehicle_id: event.source.vehicle_id
});
}
});
return patterns;
}
findMatchingEvents(rule, currentEvent) {
const timeWindow = Date.now() - rule.timeWindow;
return this.eventHistory.filter(event =>
event.source.vehicle_id === currentEvent.source.vehicle_id &&
event.processed_at > timeWindow &&
rule.events.includes(event.event_type)
);
}
checkCorrelationRules(event) {
// Implementation for real-time correlation checking
return this.analyzePatterns(event);
}
}
// Usage example
const eventProcessor = new EventStreamProcessor();
// Start event streaming
eventProcessor.startEventStream('fleet_001', {
categories: ['safety_events', 'emergency_events', 'trip_lifecycle'],
severityLevels: ['high', 'critical']
});
// Subscribe to specific event types
eventProcessor.onEvent('safety_events', (event) => {
console.log('Safety event received:', event.event_type);
// Handle safety event
});
eventProcessor.onEvent('emergency_events', (event) => {
console.log('EMERGENCY EVENT:', event.event_type);
// Immediate emergency response
});
Use Cases
Emergency Response System
Implement automated emergency response with event-driven workflows.class EmergencyResponseSystem {
constructor(eventProcessor) {
this.eventProcessor = eventProcessor;
this.activeEmergencies = new Map();
this.responseTeams = new Map();
this.escalationLevels = this.initializeEscalationLevels();
this.setupEventSubscriptions();
}
setupEventSubscriptions() {
this.eventProcessor.onEvent('emergency_events', (event) => {
this.handleEmergencyEvent(event);
});
this.eventProcessor.onEvent('safety_events', (event) => {
if (event.severity === 'critical') {
this.handleCriticalSafetyEvent(event);
}
});
}
initializeEscalationLevels() {
return {
level_1: {
timeout: 120000, // 2 minutes
contacts: ['supervisor', 'dispatcher'],
actions: ['notify_contacts', 'log_incident']
},
level_2: {
timeout: 300000, // 5 minutes
contacts: ['manager', 'safety_coordinator'],
actions: ['escalate_to_management', 'prepare_response_team']
},
level_3: {
timeout: 600000, // 10 minutes
contacts: ['emergency_services', 'executive_team'],
actions: ['contact_authorities', 'media_preparation']
}
};
}
async handleEmergencyEvent(event) {
const emergencyId = this.generateEmergencyId();
const emergency = {
id: emergencyId,
event: event,
status: 'active',
level: 1,
startTime: new Date(),
responses: [],
escalations: []
};
this.activeEmergencies.set(emergencyId, emergency);
// Immediate response
await this.initiateEmergencyResponse(emergency);
// Set up escalation timers
this.setupEscalationTimers(emergency);
// Start tracking
this.startEmergencyTracking(emergency);
}
async initiateEmergencyResponse(emergency) {
const { event } = emergency;
const vehicleId = event.source.vehicle_id;
const location = event.location;
console.log(`🚨 EMERGENCY RESPONSE INITIATED: ${event.event_type}`);
console.log(`Vehicle: ${vehicleId} at ${location.address}`);
// Immediate actions based on event type
switch (event.event_type) {
case 'panic_button_pressed':
await this.handlePanicButton(emergency);
break;
case 'collision_detected':
await this.handleCollisionDetection(emergency);
break;
case 'medical_emergency':
await this.handleMedicalEmergency(emergency);
break;
case 'vehicle_fire_detected':
await this.handleVehicleFire(emergency);
break;
default:
await this.handleGenericEmergency(emergency);
}
// Log response initiation
emergency.responses.push({
type: 'response_initiated',
timestamp: new Date(),
actions_taken: ['emergency_protocol_activated']
});
}
async handlePanicButton(emergency) {
const { event } = emergency;
const vehicleId = event.source.vehicle_id;
// 1. Immediate location tracking
await this.activateEmergencyTracking(vehicleId);
// 2. Contact driver first
const driverResponse = await this.attemptDriverContact(event.source.driver_id);
if (!driverResponse.successful) {
// 3. If no driver response, escalate immediately
await this.escalateToLevel2(emergency);
}
// 4. Notify immediate contacts
await this.notifyEmergencyContacts(emergency, 'level_1');
// 5. Prepare response team standby
await this.prepareResponseTeam(emergency);
}
async handleCollisionDetection(emergency) {
const { event } = emergency;
// High-priority collision response
emergency.level = 2; // Start at level 2 for collisions
// 1. Immediate emergency services alert
await this.contactEmergencyServices(emergency);
// 2. Notify all relevant parties
await this.notifyEmergencyContacts(emergency, 'level_2');
// 3. Deploy response team immediately
await this.deployResponseTeam(emergency);
// 4. Coordinate with authorities
await this.coordinateWithAuthorities(emergency);
}
async handleMedicalEmergency(emergency) {
const { event } = emergency;
// Medical emergency - highest priority
emergency.level = 3;
// 1. Immediate medical services
await this.contactMedicalServices(emergency);
// 2. Driver assistance protocol
await this.activateDriverAssistance(emergency);
// 3. Route to nearest medical facility
await this.routeToMedicalFacility(emergency);
}
setupEscalationTimers(emergency) {
const levels = ['level_1', 'level_2', 'level_3'];
levels.forEach((level, index) => {
const escalation = this.escalationLevels[level];
setTimeout(async () => {
if (emergency.status === 'active' && emergency.level <= index + 1) {
await this.escalateEmergency(emergency, index + 2);
}
}, escalation.timeout);
});
}
async escalateEmergency(emergency, newLevel) {
if (newLevel > 3) return; // Max level reached
console.log(`⬆️ Escalating emergency ${emergency.id} to level ${newLevel}`);
emergency.level = newLevel;
emergency.escalations.push({
level: newLevel,
timestamp: new Date(),
reason: 'timeout_escalation'
});
const levelKey = `level_${newLevel}`;
const escalationConfig = this.escalationLevels[levelKey];
// Execute escalation actions
for (const action of escalationConfig.actions) {
await this.executeEscalationAction(emergency, action);
}
// Notify escalation contacts
await this.notifyEmergencyContacts(emergency, levelKey);
}
async executeEscalationAction(emergency, action) {
switch (action) {
case 'escalate_to_management':
await this.notifyManagement(emergency);
break;
case 'prepare_response_team':
await this.prepareResponseTeam(emergency);
break;
case 'contact_authorities':
await this.contactAuthorities(emergency);
break;
case 'media_preparation':
await this.prepareMediaResponse(emergency);
break;
}
}
async resolveEmergency(emergencyId, resolution) {
const emergency = this.activeEmergencies.get(emergencyId);
if (!emergency) return;
emergency.status = 'resolved';
emergency.endTime = new Date();
emergency.resolution = resolution;
console.log(`✅ Emergency resolved: ${emergencyId}`);
// Notify all parties of resolution
await this.notifyEmergencyResolution(emergency);
// Generate incident report
await this.generateIncidentReport(emergency);
// Archive emergency
this.activeEmergencies.delete(emergencyId);
}
async generateIncidentReport(emergency) {
const report = {
emergency_id: emergency.id,
event_type: emergency.event.event_type,
start_time: emergency.startTime,
end_time: emergency.endTime,
duration: emergency.endTime - emergency.startTime,
max_escalation_level: emergency.level,
responses: emergency.responses,
escalations: emergency.escalations,
resolution: emergency.resolution,
lessons_learned: await this.analyzeLessonsLearned(emergency)
};
// Store report and send to relevant parties
await this.storeIncidentReport(report);
await this.distributeIncidentReport(report);
return report;
}
// Utility methods
generateEmergencyId() {
return `emrg_${Date.now()}_${Math.random().toString(36).substr(2, 5)}`;
}
async activateEmergencyTracking(vehicleId) {
// Implementation for enhanced GPS tracking
}
async attemptDriverContact(driverId) {
// Implementation for contacting driver
return { successful: false, response: null };
}
async notifyEmergencyContacts(emergency, level) {
// Implementation for notifying emergency contacts
}
async contactEmergencyServices(emergency) {
// Implementation for contacting emergency services
}
async prepareResponseTeam(emergency) {
// Implementation for preparing response team
}
}
Operational Intelligence Dashboard
Build real-time operational intelligence using event streams.class OperationalIntelligenceDashboard:
def __init__(self, event_processor: EventStreamProcessor):
self.event_processor = event_processor
self.metrics = defaultdict(lambda: defaultdict(int))
self.kpi_calculator = KPICalculator()
self.alert_manager = AlertManager()
self.trend_analyzer = TrendAnalyzer()
self.setup_event_handlers()
def setup_event_handlers(self):
"""Set up handlers for different event categories"""
self.event_processor.on_event('trip_lifecycle', self.handle_trip_metrics)
self.event_processor.on_event('safety_events', self.handle_safety_metrics)
self.event_processor.on_event('maintenance_events', self.handle_maintenance_metrics)
self.event_processor.on_event('geofence_events', self.handle_operational_metrics)
async def handle_trip_metrics(self, event: dict):
"""Process trip events for operational metrics"""
event_type = event['event_type']
timestamp = datetime.fromisoformat(event['timestamp'].replace('Z', '+00:00'))
# Update metrics based on event type
if event_type == 'trip_started':
self.metrics['trips']['started_today'] += 1
await self.update_active_trips_count(1)
elif event_type == 'trip_completed':
self.metrics['trips']['completed_today'] += 1
await self.update_active_trips_count(-1)
await self.calculate_trip_efficiency(event)
elif event_type == 'waypoint_reached':
self.metrics['deliveries']['completed_today'] += 1
await self.update_delivery_performance(event)
async def handle_safety_metrics(self, event: dict):
"""Process safety events for safety KPIs"""
event_type = event['event_type']
severity = event['severity']
# Update safety metrics
self.metrics['safety']['total_events_today'] += 1
self.metrics['safety'][f'{severity}_events_today'] += 1
self.metrics['safety'][f'{event_type}_today'] += 1
# Calculate safety scores
await self.update_fleet_safety_score(event)
# Check for safety trends
safety_trend = await self.trend_analyzer.analyze_safety_trend(event)
if safety_trend['concerning']:
await self.alert_manager.raise_trend_alert('safety_deterioration', safety_trend)
async def handle_maintenance_metrics(self, event: dict):
"""Process maintenance events for fleet health metrics"""
event_type = event['event_type']
vehicle_id = event['source']['vehicle_id']
if event_type == 'maintenance_due':
self.metrics['maintenance']['vehicles_due'] += 1
elif event_type == 'breakdown_detected':
self.metrics['maintenance']['breakdowns_today'] += 1
await self.update_vehicle_availability(vehicle_id, False)
elif event_type == 'maintenance_completed':
self.metrics['maintenance']['services_completed'] += 1
await self.update_vehicle_availability(vehicle_id, True)
# Update fleet health score
await self.update_fleet_health_metrics()
async def handle_operational_metrics(self, event: dict):
"""Process operational events for efficiency metrics"""
event_type = event['event_type']
event_data = event.get('event_data', {})
if event_type == 'geofence_entry':
geofence_type = event_data.get('geofence_type', 'customer')
if geofence_type == 'customer':
await self.track_arrival_time(event)
elif event_type == 'geofence_exit':
service_completed = event_data.get('service_completed', False)
if service_completed:
await self.track_service_completion(event)
await self.calculate_service_efficiency(event)
async def calculate_trip_efficiency(self, event: dict):
"""Calculate trip efficiency metrics"""
trip_id = event['source']['trip_id']
event_data = event.get('event_data', {})
# Get trip details
planned_duration = event_data.get('planned_duration', 0)
actual_duration = event_data.get('actual_duration', 0)
planned_distance = event_data.get('planned_distance', 0)
actual_distance = event_data.get('actual_distance', 0)
if planned_duration > 0:
time_efficiency = min(100, (planned_duration / actual_duration) * 100)
self.metrics['efficiency']['average_time_efficiency'] = (
(self.metrics['efficiency']['average_time_efficiency'] + time_efficiency) / 2
)
if planned_distance > 0:
distance_efficiency = min(100, (planned_distance / actual_distance) * 100)
self.metrics['efficiency']['average_distance_efficiency'] = (
(self.metrics['efficiency']['average_distance_efficiency'] + distance_efficiency) / 2
)
async def update_delivery_performance(self, event: dict):
"""Update delivery performance metrics"""
event_data = event.get('event_data', {})
# Check if delivery was on time
planned_time = event_data.get('planned_arrival')
actual_time = event['timestamp']
if planned_time:
planned_dt = datetime.fromisoformat(planned_time.replace('Z', '+00:00'))
actual_dt = datetime.fromisoformat(actual_time.replace('Z', '+00:00'))
delay_minutes = (actual_dt - planned_dt).total_seconds() / 60
if delay_minutes <= 5: # 5-minute tolerance
self.metrics['deliveries']['on_time_today'] += 1
else:
self.metrics['deliveries']['late_today'] += 1
# Update on-time performance percentage
total_deliveries = (self.metrics['deliveries']['on_time_today'] +
self.metrics['deliveries']['late_today'])
if total_deliveries > 0:
on_time_percentage = (self.metrics['deliveries']['on_time_today'] /
total_deliveries) * 100
self.metrics['performance']['on_time_percentage'] = on_time_percentage
async def update_fleet_safety_score(self, event: dict):
"""Update fleet-wide safety score"""
severity_weights = {
'low': 1,
'medium': 3,
'high': 8,
'critical': 15
}
severity = event['severity']
weight = severity_weights.get(severity, 1)
# Decrease safety score based on severity
current_score = self.metrics['safety'].get('fleet_safety_score', 100)
score_decrease = weight * 0.1 # Adjust multiplier as needed
new_score = max(0, current_score - score_decrease)
self.metrics['safety']['fleet_safety_score'] = new_score
# Check if safety score is critically low
if new_score < 70:
await self.alert_manager.raise_alert('low_fleet_safety_score', {
'current_score': new_score,
'threshold': 70,
'recent_event': event
})
def get_real_time_dashboard_data(self) -> dict:
"""Get current dashboard data"""
return {
'timestamp': datetime.now().isoformat(),
'fleet_overview': {
'active_trips': self.metrics['trips']['started_today'] -
self.metrics['trips']['completed_today'],
'completed_trips': self.metrics['trips']['completed_today'],
'total_deliveries': self.metrics['deliveries']['completed_today'],
'on_time_percentage': self.metrics['performance'].get('on_time_percentage', 100)
},
'safety_metrics': {
'safety_score': self.metrics['safety'].get('fleet_safety_score', 100),
'safety_events_today': self.metrics['safety']['total_events_today'],
'critical_events': self.metrics['safety']['critical_events_today'],
'high_events': self.metrics['safety']['high_events_today']
},
'maintenance_status': {
'vehicles_due_maintenance': self.metrics['maintenance']['vehicles_due'],
'breakdowns_today': self.metrics['maintenance']['breakdowns_today'],
'services_completed': self.metrics['maintenance']['services_completed']
},
'efficiency_metrics': {
'time_efficiency': self.metrics['efficiency'].get('average_time_efficiency', 100),
'distance_efficiency': self.metrics['efficiency'].get('average_distance_efficiency', 100),
'fuel_efficiency': await self.calculate_fuel_efficiency(),
'utilization_rate': await self.calculate_utilization_rate()
},
'alerts': await self.get_active_alerts(),
'trends': await self.get_performance_trends()
}
async def get_active_alerts(self) -> List[dict]:
"""Get list of active alerts"""
return await self.alert_manager.get_active_alerts()
async def get_performance_trends(self) -> dict:
"""Get performance trend analysis"""
return await self.trend_analyzer.get_current_trends()
async def calculate_fuel_efficiency(self) -> float:
"""Calculate fleet fuel efficiency"""
# Implementation would aggregate fuel data from telemetry
return 12.5 # Placeholder MPG
async def calculate_utilization_rate(self) -> float:
"""Calculate fleet utilization rate"""
# Implementation would calculate based on active vs. total vehicles
return 85.2 # Placeholder percentage
class KPICalculator:
"""Calculate key performance indicators"""
def __init__(self):
self.historical_data = defaultdict(list)
async def calculate_operational_kpis(self, metrics: dict) -> dict:
"""Calculate operational KPIs"""
return {
'delivery_performance': await self.calculate_delivery_kpi(metrics),
'safety_performance': await self.calculate_safety_kpi(metrics),
'efficiency_performance': await self.calculate_efficiency_kpi(metrics),
'maintenance_performance': await self.calculate_maintenance_kpi(metrics)
}
async def calculate_delivery_kpi(self, metrics: dict) -> dict:
"""Calculate delivery-related KPIs"""
deliveries = metrics.get('deliveries', {})
total_deliveries = deliveries.get('completed_today', 0)
on_time_deliveries = deliveries.get('on_time_today', 0)
on_time_rate = (on_time_deliveries / total_deliveries * 100) if total_deliveries > 0 else 100
return {
'on_time_delivery_rate': on_time_rate,
'total_deliveries': total_deliveries,
'performance_grade': self.get_performance_grade(on_time_rate)
}
def get_performance_grade(self, percentage: float) -> str:
"""Convert percentage to performance grade"""
if percentage >= 95:
return 'A'
elif percentage >= 90:
return 'B'
elif percentage >= 80:
return 'C'
elif percentage >= 70:
return 'D'
else:
return 'F'
Best Practices
Event Processing
- Implement idempotent event handlers to handle duplicate events
- Use event ordering and correlation for complex workflows
- Design events with sufficient context for downstream processing
- Implement event replay capabilities for recovery scenarios
Alert Management
- Set appropriate severity levels to avoid alert fatigue
- Implement escalation rules based on event criticality
- Use suppression rules to prevent duplicate alerts
- Maintain alert acknowledgment and resolution workflows
Performance Optimization
- Use event filtering to reduce unnecessary processing
- Implement batch processing for high-volume event streams
- Cache frequently accessed data to reduce database load
- Use asynchronous processing for non-critical events
System Reliability
- Implement circuit breakers for external service calls
- Use dead letter queues for failed event processing
- Monitor event processing latency and throughput
- Implement graceful degradation during system overload
Related Endpoints
Explore other real-time capabilities:
- WebSocket Streaming API - Real-time communication
- Telemetry Streaming API - Vehicle diagnostics
- Safety Analytics API - Safety monitoring
- Fleet Management API - Fleet operations