# Real-time fleet monitoring system
import asyncio
import json
from datetime import datetime, timedelta
class FleetMonitor:
def __init__(self, client):
self.client = client
self.active_vehicles = {}
self.alert_thresholds = {
'speed_limit': 80, # km/h
'harsh_event_count': 3, # per hour
'low_battery': 20 # percentage
}
async def process_vehicle_location(self, vehicle_id, trip_id, gps_data):
"""Process incoming GPS data from vehicle"""
try:
result = await self.client.locations.upload_single(
trip_id=trip_id,
latitude=gps_data['lat'],
longitude=gps_data['lng'],
timestamp=datetime.utcnow().isoformat() + 'Z',
speed_kmh=gps_data['speed'],
heading=gps_data['heading'],
battery_level=gps_data.get('battery_level', 100),
device_info={
'device_id': vehicle_id,
'signal_strength': gps_data.get('signal_strength', -70)
}
)
# Update vehicle status
self.active_vehicles[vehicle_id] = {
'last_update': datetime.utcnow(),
'location': (gps_data['lat'], gps_data['lng']),
'speed': gps_data['speed'],
'safety_score': result.trip_analytics.current_safety_score,
'battery_level': gps_data.get('battery_level', 100)
}
# Check for alerts
await self.check_vehicle_alerts(vehicle_id, result)
return result
except Exception as e:
print(f"Error processing location for vehicle {vehicle_id}: {e}")
async def check_vehicle_alerts(self, vehicle_id, location_result):
"""Check for various alert conditions"""
vehicle_status = self.active_vehicles[vehicle_id]
# Speed alerts
if vehicle_status['speed'] > self.alert_thresholds['speed_limit']:
await self.send_alert('speeding', vehicle_id, {
'speed': vehicle_status['speed'],
'limit': self.alert_thresholds['speed_limit']
})
# Safety event alerts
for event in location_result.events_detected:
if event.severity in ['high', 'critical']:
await self.send_alert('safety_event', vehicle_id, {
'event_type': event.event_type,
'severity': event.severity,
'location': vehicle_status['location']
})
# Battery alerts
if vehicle_status['battery_level'] < self.alert_thresholds['low_battery']:
await self.send_alert('low_battery', vehicle_id, {
'battery_level': vehicle_status['battery_level']
})
async def send_alert(self, alert_type, vehicle_id, data):
"""Send alert to fleet manager"""
alert = {
'type': alert_type,
'vehicle_id': vehicle_id,
'timestamp': datetime.utcnow().isoformat(),
'data': data
}
print(f"🚨 FLEET ALERT: {json.dumps(alert, indent=2)}")
# Integration with notification systems would go here
# Usage
monitor = FleetMonitor(client)
# Simulate incoming vehicle data
vehicle_data = {
'lat': 40.7128,
'lng': -74.0060,
'speed': 85.0, # Over speed limit
'heading': 90,
'battery_level': 15 # Low battery
}
await monitor.process_vehicle_location('vehicle_123', 'trip_456', vehicle_data)