Documentation Index
Fetch the complete documentation index at: https://docs.bookovia.com/llms.txt
Use this file to discover all available pages before exploring further.
Bookovia Python SDK
The official Python SDK for the Bookovia Telematics API provides a Pythonic interface for data analytics, machine learning pipelines, automation scripts, and backend integrations.Features
- ✅ Pythonic API Design - Follows Python best practices and conventions
- ✅ Type Hints - Full typing support for better IDE experience
- ✅ AsyncIO Support - Concurrent operations with async/await
- ✅ Pandas Integration - Direct DataFrame conversion for analysis
- ✅ Pydantic Models - Built-in data validation and serialization
- ✅ Comprehensive Error Handling - Detailed exception hierarchy
- ✅ Automatic Retries - Configurable retry logic with exponential backoff
Installation
PyPI
pip install bookovia
With optional dependencies
# For data analysis
pip install bookovia[pandas]
# For async support
pip install bookovia[async]
# All extras
pip install bookovia[all]
Development Installation
git clone https://github.com/bookovia/python-sdk.git
cd python-sdk
pip install -e ".[dev]"
Quick Start
Synchronous Client
import bookovia
from bookovia.models import StartTripRequest, Location
# Initialize client
client = bookovia.Client('bkv_live_your_api_key')
# Start a trip
trip = client.trips.start(
vehicle_id='vehicle_123',
driver_id='driver_456',
start_location=Location(
latitude=40.7128,
longitude=-74.0060,
address='New York, NY'
),
metadata={
'purpose': 'delivery',
'route': 'downtown'
}
)
print(f"Trip started: {trip.trip_id}")
Asynchronous Client
import asyncio
import bookovia
async def main():
# Initialize async client
async with bookovia.AsyncClient('bkv_live_your_api_key') as client:
# Start multiple trips concurrently
tasks = [
client.trips.start(vehicle_id=f'vehicle_{i}')
for i in range(1, 6)
]
trips = await asyncio.gather(*tasks)
for trip in trips:
print(f"Started trip: {trip.trip_id}")
# Run async function
asyncio.run(main())
Upload Location Data
from bookovia.models import LocationPoint
from datetime import datetime, timezone
# Single location upload
client.locations.upload(
trip_id='trip_123',
latitude=40.7128,
longitude=-74.0060,
timestamp=datetime.now(timezone.utc),
speed=35,
heading=180
)
# Batch location upload
locations = [
LocationPoint(
latitude=40.7128,
longitude=-74.0060,
timestamp=datetime.now(timezone.utc),
speed=35,
heading=180
),
LocationPoint(
latitude=40.7130,
longitude=-74.0058,
timestamp=datetime.now(timezone.utc),
speed=42,
heading=175
)
]
client.locations.batch_upload(
trip_id='trip_123',
locations=locations
)
API Reference
Client Configuration
import bookovia
from bookovia.config import ClientConfig
# Basic configuration
client = bookovia.Client('your_api_key')
# Advanced configuration
config = ClientConfig(
base_url='https://api.bookovia.com/v1',
timeout=30,
max_retries=3,
retry_delay=1.0,
debug=False,
user_agent='MyApp/1.0.0',
# Custom headers
default_headers={
'X-Custom-Header': 'custom-value'
}
)
client = bookovia.Client('your_api_key', config=config)
Trip Management
User Trip Queries
# Get all active trips for a user
active_trips = await sdk.get_user_active_trips('user_123')
print(f"User has {active_trips['data']['count']} active trips")
# Get trip history for a user
completed_trips = await sdk.get_user_trip_history('user_123', status='completed')
all_history = await sdk.get_user_trip_history('user_123') # defaults to completed
# Calculate total distance
total_distance = sum(trip['distance_meters'] or 0 for trip in completed_trips['data']['trips'])
print(f"Total distance: {total_distance/1000:.1f} km")
Organization Trip Queries
# Get all active trips for an organization
org_active_trips = await sdk.get_org_active_trips('org_123')
print(f"Organization has {org_active_trips['data']['count']} active trips")
# Get trip history for an organization
org_history = await sdk.get_org_trip_history('org_123', status='completed')
cancelled_trips = await sdk.get_org_trip_history('org_123', status='cancelled')
Vehicle Trip Queries
# Get all active trips for a vehicle
vehicle_active_trips = await sdk.get_vehicle_active_trips('vehicle_123')
# Get trip history for a vehicle
vehicle_history = await sdk.get_vehicle_trip_history('vehicle_123', status='completed')
### Safety Analytics
```python
from bookovia.models import SafetyScoreRequest, EventFilters
# Get safety score
score = client.safety.get_score(
trip_id='trip_123'
# OR driver_id='driver_456'
# OR vehicle_id='vehicle_123'
)
print(f"Safety Score: {score.score}/100 (Grade: {score.grade})")
# Analyze driving behavior
analysis = client.safety.analyze_behavior(
driver_id='driver_456',
date_range=(date(2024, 4, 1), date(2024, 4, 13))
)
# Get harsh events
events = client.safety.get_harsh_events(
trip_id='trip_123',
event_type='harsh_braking',
severity='medium'
)
# Crash risk assessment
risk = client.safety.get_crash_risk(
driver_id='driver_456',
timeframe='30days'
)
Fleet Management
# Get fleet overview
overview = client.fleet.get_overview()
# List vehicles
vehicles = client.fleet.get_vehicles(status='active')
# Vehicle utilization
utilization = client.fleet.get_utilization(
vehicle_id='vehicle_123',
timeframe='7days'
)
# Fleet optimization
optimization = client.fleet.get_optimization()
Data Models
Core Models (Pydantic)
from bookovia.models import *
from typing import Optional, Dict, Any
from datetime import datetime
from pydantic import BaseModel, Field
class Trip(BaseModel):
trip_id: str
organization_id: str
vehicle_id: str
driver_id: Optional[str] = None
status: TripStatus
start_time: datetime
end_time: Optional[datetime] = None
start_location: Optional[Location] = None
end_location: Optional[Location] = None
analytics: TripAnalytics
metadata: Optional[Dict[str, Any]] = None
created_at: datetime
updated_at: datetime
class Location(BaseModel):
latitude: float = Field(..., ge=-90, le=90)
longitude: float = Field(..., ge=-180, le=180)
address: Optional[str] = None
class LocationPoint(Location):
timestamp: datetime
speed: float = Field(..., ge=0) # km/h
heading: float = Field(..., ge=0, lt=360) # degrees
accuracy: Optional[float] = None # meters
altitude: Optional[float] = None # meters
class TripAnalytics(BaseModel):
distance_km: float
duration_minutes: int
max_speed_kmh: float
avg_speed_kmh: float
idle_time_minutes: int
locations_count: int
events_count: int
safety_score: int = Field(..., ge=0, le=100)
eco_score: int = Field(..., ge=0, le=100)
Enums
from enum import Enum
class TripStatus(str, Enum):
ACTIVE = 'active'
COMPLETED = 'completed'
PAUSED = 'paused'
CANCELLED = 'cancelled'
class EventType(str, Enum):
HARSH_ACCELERATION = 'harsh_acceleration'
HARSH_BRAKING = 'harsh_braking'
HARSH_CORNERING = 'harsh_cornering'
SPEEDING = 'speeding'
IDLE_EXCESSIVE = 'idle_excessive'
PHONE_USAGE = 'phone_usage'
class EventSeverity(str, Enum):
LOW = 'low'
MEDIUM = 'medium'
HIGH = 'high'
CRITICAL = 'critical'
Request Models
class StartTripRequest(BaseModel):
vehicle_id: str
driver_id: Optional[str] = None
start_location: Optional[Location] = None
metadata: Optional[Dict[str, Any]] = None
odometer_reading: Optional[int] = None
fuel_level_percent: Optional[float] = Field(None, ge=0, le=100)
class StopTripRequest(BaseModel):
end_location: Optional[Location] = None
odometer_reading: Optional[int] = None
fuel_level_percent: Optional[float] = Field(None, ge=0, le=100)
metadata: Optional[Dict[str, Any]] = None
class TripFilters(BaseModel):
vehicle_id: Optional[str] = None
driver_id: Optional[str] = None
status: Optional[TripStatus] = None
start_date: Optional[date] = None
end_date: Optional[date] = None
limit: Optional[int] = Field(25, ge=1, le=100)
offset: Optional[int] = Field(0, ge=0)
Error Handling
from bookovia.exceptions import (
BookoviaError,
AuthenticationError,
ValidationError,
NotFoundError,
RateLimitError,
ServerError
)
try:
trip = client.trips.start(vehicle_id='vehicle_123')
except AuthenticationError as e:
print(f"Authentication failed: {e}")
# Handle auth error (check API key)
except ValidationError as e:
print(f"Validation error: {e}")
print(f"Invalid fields: {e.fields}")
# Handle validation errors
except RateLimitError as e:
print(f"Rate limited. Retry after: {e.retry_after}s")
# Handle rate limiting
except NotFoundError as e:
print(f"Resource not found: {e}")
# Handle not found
except ServerError as e:
print(f"Server error: {e}")
# Handle server errors
except BookoviaError as e:
print(f"General Bookovia error: {e}")
# Handle other API errors
except Exception as e:
print(f"Unexpected error: {e}")
# Handle unexpected errors
Pandas Integration
Convert API Data to DataFrames
import pandas as pd
from bookovia.utils import to_dataframe
# Get trips and convert to DataFrame
trips = client.trips.list(limit=100)
df = to_dataframe(trips.trips)
print(df.head())
print(df.info())
# Analyze trip data
print("Average trip distance:", df['analytics.distance_km'].mean())
print("Safety score distribution:")
print(df['analytics.safety_score'].describe())
# Filter and group data
active_trips = df[df['status'] == 'active']
by_vehicle = df.groupby('vehicle_id')['analytics.distance_km'].sum()
Location Data Analysis
# Get route data as DataFrame
route = client.locations.get_route('trip_123')
locations_df = pd.DataFrame([
{
'latitude': coord[1],
'longitude': coord[0],
'sequence': i
}
for i, coord in enumerate(route.coordinates)
])
# Calculate distances between points
from geopy.distance import geodesic
def calculate_distance(row1, row2):
return geodesic(
(row1['latitude'], row1['longitude']),
(row2['latitude'], row2['longitude'])
).kilometers
locations_df['distance_km'] = locations_df.apply(
lambda row: calculate_distance(
locations_df.iloc[row.name - 1], row
) if row.name > 0 else 0,
axis=1
)
print("Total route distance:", locations_df['distance_km'].sum(), "km")
Safety Analytics with Pandas
# Analyze safety events across fleet
events = client.safety.get_harsh_events(limit=1000)
events_df = to_dataframe(events.events)
# Group by event type and severity
event_summary = events_df.groupby(['event_type', 'severity']).size().reset_index(name='count')
print(event_summary)
# Time-based analysis
events_df['timestamp'] = pd.to_datetime(events_df['timestamp'])
events_df['hour'] = events_df['timestamp'].dt.hour
hourly_events = events_df.groupby('hour').size()
print("Events by hour:")
print(hourly_events)
# Driver performance comparison
driver_scores = []
for driver_id in df['driver_id'].unique():
if pd.notna(driver_id):
score = client.safety.get_score(driver_id=driver_id)
driver_scores.append({
'driver_id': driver_id,
'safety_score': score.score,
'grade': score.grade
})
driver_df = pd.DataFrame(driver_scores)
print(driver_df.sort_values('safety_score', ascending=False))
Async Usage
Concurrent Operations
import asyncio
from bookovia import AsyncClient
async def process_fleet_data():
async with AsyncClient('bkv_live_your_api_key') as client:
# Get list of vehicles
vehicles = await client.fleet.get_vehicles()
# Start trips concurrently
tasks = [
client.trips.start(vehicle_id=vehicle.vehicle_id)
for vehicle in vehicles[:10] # Start 10 trips
]
trips = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
successful_trips = [
trip for trip in trips
if not isinstance(trip, Exception)
]
print(f"Started {len(successful_trips)} trips successfully")
return successful_trips
# Run async function
trips = asyncio.run(process_fleet_data())
Async Context Managers
async def managed_client_example():
# Client automatically handles connection lifecycle
async with AsyncClient('your_api_key') as client:
# All operations are async
trip = await client.trips.start(vehicle_id='vehicle_123')
# Upload locations concurrently
location_tasks = [
client.locations.upload(
trip_id=trip.trip_id,
latitude=40.7128 + i * 0.001,
longitude=-74.0060 + i * 0.001,
timestamp=datetime.now(timezone.utc),
speed=35 + i,
heading=180
)
for i in range(10)
]
await asyncio.gather(*location_tasks)
# Stop trip
await client.trips.stop(trip_id=trip.trip_id)
# Client is automatically closed
Async Generators for Large Datasets
async def stream_trip_data(client, filters):
"""Stream trip data in batches to handle large datasets."""
offset = 0
limit = 100
while True:
batch = await client.trips.list(
filters=filters,
limit=limit,
offset=offset
)
if not batch.trips:
break
for trip in batch.trips:
yield trip
offset += limit
# Usage
async def process_all_trips():
async with AsyncClient('your_api_key') as client:
filters = TripFilters(
start_date=date(2024, 1, 1),
end_date=date(2024, 4, 13)
)
trip_count = 0
total_distance = 0
async for trip in stream_trip_data(client, filters):
trip_count += 1
total_distance += trip.analytics.distance_km
if trip_count % 100 == 0:
print(f"Processed {trip_count} trips...")
print(f"Total: {trip_count} trips, {total_distance:.2f} km")
asyncio.run(process_all_trips())
Machine Learning Integration
Feature Engineering for ML Models
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
class TelematicsFeatureEngineer:
def __init__(self, client):
self.client = client
def extract_driver_features(self, driver_id, days=30):
"""Extract features for driver behavior prediction."""
end_date = date.today()
start_date = end_date - timedelta(days=days)
# Get trips for driver
trips = self.client.trips.list(
filters=TripFilters(
driver_id=driver_id,
start_date=start_date,
end_date=end_date,
status=TripStatus.COMPLETED
)
)
if not trips.trips:
return None
# Calculate aggregate features
features = {
'total_trips': len(trips.trips),
'total_distance_km': sum(t.analytics.distance_km for t in trips.trips),
'avg_trip_duration_min': np.mean([t.analytics.duration_minutes for t in trips.trips]),
'max_speed_kmh': max(t.analytics.max_speed_kmh for t in trips.trips),
'avg_speed_kmh': np.mean([t.analytics.avg_speed_kmh for t in trips.trips]),
'total_idle_time_min': sum(t.analytics.idle_time_minutes for t in trips.trips),
'avg_safety_score': np.mean([t.analytics.safety_score for t in trips.trips]),
'avg_eco_score': np.mean([t.analytics.eco_score for t in trips.trips]),
}
# Get safety events for the period
events = self.client.safety.get_harsh_events(
driver_id=driver_id,
start_date=start_date,
end_date=end_date
)
# Count events by type
event_counts = {}
for event_type in EventType:
event_counts[f'{event_type.value}_count'] = sum(
1 for event in events.events
if event.event_type == event_type
)
features.update(event_counts)
return features
def build_risk_prediction_dataset(self, driver_ids):
"""Build dataset for crash risk prediction."""
dataset = []
for driver_id in driver_ids:
features = self.extract_driver_features(driver_id)
if features:
# Get actual risk assessment
risk = self.client.safety.get_crash_risk(
driver_id=driver_id,
timeframe='30days'
)
features['risk_score'] = risk.risk_score
features['risk_level'] = risk.risk_level
dataset.append(features)
return pd.DataFrame(dataset)
# Usage
engineer = TelematicsFeatureEngineer(client)
# Get driver list
drivers = ['driver_001', 'driver_002', 'driver_003'] # Your driver IDs
# Build dataset
df = engineer.build_risk_prediction_dataset(drivers)
# Prepare for ML
feature_columns = [col for col in df.columns if col not in ['risk_score', 'risk_level']]
X = df[feature_columns]
y = df['risk_level'].map({'low': 0, 'medium': 1, 'high': 2})
# Train model
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_scaled, y)
# Feature importance
feature_importance = pd.DataFrame({
'feature': feature_columns,
'importance': model.feature_importances_
}).sort_values('importance', ascending=False)
print("Top risk factors:")
print(feature_importance.head(10))
Anomaly Detection
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
class AnomalyDetector:
def __init__(self, client):
self.client = client
self.model = IsolationForest(contamination=0.1)
self.scaler = StandardScaler()
def detect_anomalous_trips(self, vehicle_id, days=7):
"""Detect anomalous driving patterns."""
end_date = date.today()
start_date = end_date - timedelta(days=days)
trips = self.client.trips.list(
filters=TripFilters(
vehicle_id=vehicle_id,
start_date=start_date,
end_date=end_date,
status=TripStatus.COMPLETED
)
)
# Extract features
features = []
trip_ids = []
for trip in trips.trips:
features.append([
trip.analytics.distance_km,
trip.analytics.duration_minutes,
trip.analytics.max_speed_kmh,
trip.analytics.avg_speed_kmh,
trip.analytics.idle_time_minutes,
trip.analytics.safety_score,
trip.analytics.events_count
])
trip_ids.append(trip.trip_id)
if len(features) < 10: # Need minimum samples
return []
# Fit model and detect anomalies
X = self.scaler.fit_transform(features)
anomalies = self.model.fit_predict(X)
# Return anomalous trip IDs
return [
trip_ids[i] for i, is_anomaly in enumerate(anomalies)
if is_anomaly == -1
]
# Usage
detector = AnomalyDetector(client)
anomalous_trips = detector.detect_anomalous_trips('vehicle_123')
print(f"Found {len(anomalous_trips)} anomalous trips:")
for trip_id in anomalous_trips:
trip = client.trips.get(trip_id)
print(f"- {trip_id}: {trip.analytics.safety_score} safety score")
Testing
Unit Testing with pytest
# tests/test_client.py
import pytest
from unittest.mock import Mock, patch
from bookovia import Client
from bookovia.models import Trip, TripStatus
from bookovia.exceptions import AuthenticationError
class TestBookoviaClient:
@pytest.fixture
def client(self):
return Client('bkv_test_example_key')
@pytest.fixture
def mock_trip(self):
return Trip(
trip_id='trip_123',
organization_id='org_456',
vehicle_id='vehicle_123',
status=TripStatus.ACTIVE,
start_time=datetime.now(timezone.utc),
analytics=TripAnalytics(
distance_km=0,
duration_minutes=0,
max_speed_kmh=0,
avg_speed_kmh=0,
idle_time_minutes=0,
locations_count=0,
events_count=0,
safety_score=100,
eco_score=100
),
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc)
)
def test_start_trip_success(self, client, mock_trip):
with patch.object(client.trips, 'start', return_value=mock_trip):
trip = client.trips.start(vehicle_id='vehicle_123')
assert trip.trip_id == 'trip_123'
assert trip.vehicle_id == 'vehicle_123'
assert trip.status == TripStatus.ACTIVE
def test_authentication_error(self, client):
with patch.object(client.trips, 'start', side_effect=AuthenticationError('Invalid API key')):
with pytest.raises(AuthenticationError):
client.trips.start(vehicle_id='vehicle_123')
@pytest.mark.asyncio
async def test_async_client(self, mock_trip):
from bookovia import AsyncClient
async with AsyncClient('bkv_test_example_key') as client:
with patch.object(client.trips, 'start', return_value=mock_trip):
trip = await client.trips.start(vehicle_id='vehicle_123')
assert trip.trip_id == 'trip_123'
Integration Testing
# tests/test_integration.py
import pytest
from bookovia import Client
from bookovia.models import Location
@pytest.mark.integration
class TestIntegration:
@pytest.fixture(scope='class')
def client(self):
# Use test API key for integration tests
return Client('bkv_test_integration_key')
def test_full_trip_lifecycle(self, client):
# Start trip
trip = client.trips.start(
vehicle_id='test_vehicle_001',
start_location=Location(latitude=40.7128, longitude=-74.0060)
)
assert trip.trip_id is not None
assert trip.status == 'active'
# Upload location
client.locations.upload(
trip_id=trip.trip_id,
latitude=40.7130,
longitude=-74.0058,
timestamp=datetime.now(timezone.utc),
speed=35,
heading=180
)
# Get trip details
updated_trip = client.trips.get(trip.trip_id)
assert updated_trip.analytics.locations_count >= 1
# Stop trip
final_trip = client.trips.stop(trip.trip_id)
assert final_trip.status == 'completed'
Advanced Examples
Data Pipeline for Analytics
class TelematicsDataPipeline:
def __init__(self, api_key, database_url=None):
self.client = bookovia.Client(api_key)
self.db_url = database_url
def extract_daily_metrics(self, target_date):
"""Extract all trip data for a specific date."""
trips = self.client.trips.list(
filters=TripFilters(
start_date=target_date,
end_date=target_date,
status=TripStatus.COMPLETED,
limit=1000
)
)
metrics = []
for trip in trips.trips:
metrics.append({
'date': target_date,
'trip_id': trip.trip_id,
'vehicle_id': trip.vehicle_id,
'driver_id': trip.driver_id,
'distance_km': trip.analytics.distance_km,
'duration_minutes': trip.analytics.duration_minutes,
'safety_score': trip.analytics.safety_score,
'eco_score': trip.analytics.eco_score,
'events_count': trip.analytics.events_count
})
return pd.DataFrame(metrics)
def process_safety_events(self, target_date):
"""Process safety events for analysis."""
events = self.client.safety.get_harsh_events(
start_date=target_date,
end_date=target_date,
limit=10000
)
event_data = []
for event in events.events:
event_data.append({
'date': target_date,
'event_id': event.event_id,
'trip_id': event.trip_id,
'event_type': event.event_type,
'severity': event.severity,
'latitude': event.location.latitude,
'longitude': event.location.longitude,
'timestamp': event.timestamp
})
return pd.DataFrame(event_data)
def run_daily_pipeline(self, target_date):
"""Run complete daily data pipeline."""
print(f"Processing data for {target_date}")
# Extract metrics
trip_metrics = self.extract_daily_metrics(target_date)
safety_events = self.process_safety_events(target_date)
# Calculate aggregations
daily_summary = {
'date': target_date,
'total_trips': len(trip_metrics),
'total_distance_km': trip_metrics['distance_km'].sum(),
'avg_safety_score': trip_metrics['safety_score'].mean(),
'total_events': len(safety_events),
'unique_vehicles': trip_metrics['vehicle_id'].nunique(),
'unique_drivers': trip_metrics['driver_id'].nunique()
}
# Store results (implement your storage logic)
if self.db_url:
self.store_to_database(trip_metrics, safety_events, daily_summary)
return daily_summary
# Usage
pipeline = TelematicsDataPipeline('bkv_live_your_api_key')
summary = pipeline.run_daily_pipeline(date.today())
print(summary)
Support
- PyPI Package: bookovia
- GitHub: github.com/bookovia/python-sdk
- Issues: GitHub Issues
- Email: support@bookovia.com
- Discussions: GitHub Discussions
Ready for data-driven insights? Check out our quickstart guide and API reference for more examples.