Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.bookovia.com/llms.txt

Use this file to discover all available pages before exploring further.

Bookovia Python SDK

The official Python SDK for the Bookovia Telematics API provides a Pythonic interface for data analytics, machine learning pipelines, automation scripts, and backend integrations.

Features

  • Pythonic API Design - Follows Python best practices and conventions
  • Type Hints - Full typing support for better IDE experience
  • AsyncIO Support - Concurrent operations with async/await
  • Pandas Integration - Direct DataFrame conversion for analysis
  • Pydantic Models - Built-in data validation and serialization
  • Comprehensive Error Handling - Detailed exception hierarchy
  • Automatic Retries - Configurable retry logic with exponential backoff

Installation

PyPI

pip install bookovia

With optional dependencies

# For data analysis
pip install bookovia[pandas]

# For async support  
pip install bookovia[async]

# All extras
pip install bookovia[all]

Development Installation

git clone https://github.com/bookovia/python-sdk.git
cd python-sdk
pip install -e ".[dev]"
Requirements: Python 3.8+

Quick Start

Synchronous Client

import bookovia
from bookovia.models import StartTripRequest, Location

# Initialize client
client = bookovia.Client('bkv_live_your_api_key')

# Start a trip
trip = client.trips.start(
    vehicle_id='vehicle_123',
    driver_id='driver_456',
    start_location=Location(
        latitude=40.7128,
        longitude=-74.0060,
        address='New York, NY'
    ),
    metadata={
        'purpose': 'delivery',
        'route': 'downtown'
    }
)

print(f"Trip started: {trip.trip_id}")

Asynchronous Client

import asyncio
import bookovia

async def main():
    # Initialize async client
    async with bookovia.AsyncClient('bkv_live_your_api_key') as client:
        
        # Start multiple trips concurrently
        tasks = [
            client.trips.start(vehicle_id=f'vehicle_{i}')
            for i in range(1, 6)
        ]
        
        trips = await asyncio.gather(*tasks)
        
        for trip in trips:
            print(f"Started trip: {trip.trip_id}")

# Run async function
asyncio.run(main())

Upload Location Data

from bookovia.models import LocationPoint
from datetime import datetime, timezone

# Single location upload
client.locations.upload(
    trip_id='trip_123',
    latitude=40.7128,
    longitude=-74.0060,
    timestamp=datetime.now(timezone.utc),
    speed=35,
    heading=180
)

# Batch location upload
locations = [
    LocationPoint(
        latitude=40.7128,
        longitude=-74.0060,
        timestamp=datetime.now(timezone.utc),
        speed=35,
        heading=180
    ),
    LocationPoint(
        latitude=40.7130,
        longitude=-74.0058,
        timestamp=datetime.now(timezone.utc),
        speed=42,
        heading=175
    )
]

client.locations.batch_upload(
    trip_id='trip_123',
    locations=locations
)

API Reference

Client Configuration

import bookovia
from bookovia.config import ClientConfig

# Basic configuration
client = bookovia.Client('your_api_key')

# Advanced configuration
config = ClientConfig(
    base_url='https://api.bookovia.com/v1',
    timeout=30,
    max_retries=3,
    retry_delay=1.0,
    debug=False,
    user_agent='MyApp/1.0.0',
    
    # Custom headers
    default_headers={
        'X-Custom-Header': 'custom-value'
    }
)

client = bookovia.Client('your_api_key', config=config)

Trip Management

User Trip Queries

# Get all active trips for a user
active_trips = await sdk.get_user_active_trips('user_123')
print(f"User has {active_trips['data']['count']} active trips")

# Get trip history for a user
completed_trips = await sdk.get_user_trip_history('user_123', status='completed')
all_history = await sdk.get_user_trip_history('user_123')  # defaults to completed

# Calculate total distance
total_distance = sum(trip['distance_meters'] or 0 for trip in completed_trips['data']['trips'])
print(f"Total distance: {total_distance/1000:.1f} km")

Organization Trip Queries

# Get all active trips for an organization
org_active_trips = await sdk.get_org_active_trips('org_123')
print(f"Organization has {org_active_trips['data']['count']} active trips")

# Get trip history for an organization
org_history = await sdk.get_org_trip_history('org_123', status='completed')
cancelled_trips = await sdk.get_org_trip_history('org_123', status='cancelled')

Vehicle Trip Queries

# Get all active trips for a vehicle
vehicle_active_trips = await sdk.get_vehicle_active_trips('vehicle_123')

# Get trip history for a vehicle
vehicle_history = await sdk.get_vehicle_trip_history('vehicle_123', status='completed')

### Safety Analytics

```python
from bookovia.models import SafetyScoreRequest, EventFilters

# Get safety score
score = client.safety.get_score(
    trip_id='trip_123'
    # OR driver_id='driver_456'
    # OR vehicle_id='vehicle_123'
)

print(f"Safety Score: {score.score}/100 (Grade: {score.grade})")

# Analyze driving behavior
analysis = client.safety.analyze_behavior(
    driver_id='driver_456',
    date_range=(date(2024, 4, 1), date(2024, 4, 13))
)

# Get harsh events
events = client.safety.get_harsh_events(
    trip_id='trip_123',
    event_type='harsh_braking',
    severity='medium'
)

# Crash risk assessment
risk = client.safety.get_crash_risk(
    driver_id='driver_456',
    timeframe='30days'
)

Fleet Management

# Get fleet overview
overview = client.fleet.get_overview()

# List vehicles
vehicles = client.fleet.get_vehicles(status='active')

# Vehicle utilization
utilization = client.fleet.get_utilization(
    vehicle_id='vehicle_123',
    timeframe='7days'
)

# Fleet optimization
optimization = client.fleet.get_optimization()

Data Models

Core Models (Pydantic)

from bookovia.models import *
from typing import Optional, Dict, Any
from datetime import datetime
from pydantic import BaseModel, Field

class Trip(BaseModel):
    trip_id: str
    organization_id: str
    vehicle_id: str
    driver_id: Optional[str] = None
    status: TripStatus
    start_time: datetime
    end_time: Optional[datetime] = None
    start_location: Optional[Location] = None
    end_location: Optional[Location] = None
    analytics: TripAnalytics
    metadata: Optional[Dict[str, Any]] = None
    created_at: datetime
    updated_at: datetime

class Location(BaseModel):
    latitude: float = Field(..., ge=-90, le=90)
    longitude: float = Field(..., ge=-180, le=180)
    address: Optional[str] = None

class LocationPoint(Location):
    timestamp: datetime
    speed: float = Field(..., ge=0)      # km/h
    heading: float = Field(..., ge=0, lt=360)  # degrees
    accuracy: Optional[float] = None     # meters
    altitude: Optional[float] = None     # meters

class TripAnalytics(BaseModel):
    distance_km: float
    duration_minutes: int
    max_speed_kmh: float
    avg_speed_kmh: float
    idle_time_minutes: int
    locations_count: int
    events_count: int
    safety_score: int = Field(..., ge=0, le=100)
    eco_score: int = Field(..., ge=0, le=100)

Enums

from enum import Enum

class TripStatus(str, Enum):
    ACTIVE = 'active'
    COMPLETED = 'completed'
    PAUSED = 'paused'
    CANCELLED = 'cancelled'

class EventType(str, Enum):
    HARSH_ACCELERATION = 'harsh_acceleration'
    HARSH_BRAKING = 'harsh_braking'
    HARSH_CORNERING = 'harsh_cornering'
    SPEEDING = 'speeding'
    IDLE_EXCESSIVE = 'idle_excessive'
    PHONE_USAGE = 'phone_usage'

class EventSeverity(str, Enum):
    LOW = 'low'
    MEDIUM = 'medium'
    HIGH = 'high'
    CRITICAL = 'critical'

Request Models

class StartTripRequest(BaseModel):
    vehicle_id: str
    driver_id: Optional[str] = None
    start_location: Optional[Location] = None
    metadata: Optional[Dict[str, Any]] = None
    odometer_reading: Optional[int] = None
    fuel_level_percent: Optional[float] = Field(None, ge=0, le=100)

class StopTripRequest(BaseModel):
    end_location: Optional[Location] = None
    odometer_reading: Optional[int] = None
    fuel_level_percent: Optional[float] = Field(None, ge=0, le=100)
    metadata: Optional[Dict[str, Any]] = None

class TripFilters(BaseModel):
    vehicle_id: Optional[str] = None
    driver_id: Optional[str] = None
    status: Optional[TripStatus] = None
    start_date: Optional[date] = None
    end_date: Optional[date] = None
    limit: Optional[int] = Field(25, ge=1, le=100)
    offset: Optional[int] = Field(0, ge=0)

Error Handling

from bookovia.exceptions import (
    BookoviaError,
    AuthenticationError,
    ValidationError,
    NotFoundError,
    RateLimitError,
    ServerError
)

try:
    trip = client.trips.start(vehicle_id='vehicle_123')
except AuthenticationError as e:
    print(f"Authentication failed: {e}")
    # Handle auth error (check API key)
    
except ValidationError as e:
    print(f"Validation error: {e}")
    print(f"Invalid fields: {e.fields}")
    # Handle validation errors
    
except RateLimitError as e:
    print(f"Rate limited. Retry after: {e.retry_after}s")
    # Handle rate limiting
    
except NotFoundError as e:
    print(f"Resource not found: {e}")
    # Handle not found
    
except ServerError as e:
    print(f"Server error: {e}")
    # Handle server errors
    
except BookoviaError as e:
    print(f"General Bookovia error: {e}")
    # Handle other API errors
    
except Exception as e:
    print(f"Unexpected error: {e}")
    # Handle unexpected errors

Pandas Integration

Convert API Data to DataFrames

import pandas as pd
from bookovia.utils import to_dataframe

# Get trips and convert to DataFrame
trips = client.trips.list(limit=100)
df = to_dataframe(trips.trips)

print(df.head())
print(df.info())

# Analyze trip data
print("Average trip distance:", df['analytics.distance_km'].mean())
print("Safety score distribution:")
print(df['analytics.safety_score'].describe())

# Filter and group data
active_trips = df[df['status'] == 'active']
by_vehicle = df.groupby('vehicle_id')['analytics.distance_km'].sum()

Location Data Analysis

# Get route data as DataFrame
route = client.locations.get_route('trip_123')
locations_df = pd.DataFrame([
    {
        'latitude': coord[1],
        'longitude': coord[0],
        'sequence': i
    }
    for i, coord in enumerate(route.coordinates)
])

# Calculate distances between points
from geopy.distance import geodesic

def calculate_distance(row1, row2):
    return geodesic(
        (row1['latitude'], row1['longitude']),
        (row2['latitude'], row2['longitude'])
    ).kilometers

locations_df['distance_km'] = locations_df.apply(
    lambda row: calculate_distance(
        locations_df.iloc[row.name - 1], row
    ) if row.name > 0 else 0,
    axis=1
)

print("Total route distance:", locations_df['distance_km'].sum(), "km")

Safety Analytics with Pandas

# Analyze safety events across fleet
events = client.safety.get_harsh_events(limit=1000)
events_df = to_dataframe(events.events)

# Group by event type and severity
event_summary = events_df.groupby(['event_type', 'severity']).size().reset_index(name='count')
print(event_summary)

# Time-based analysis
events_df['timestamp'] = pd.to_datetime(events_df['timestamp'])
events_df['hour'] = events_df['timestamp'].dt.hour

hourly_events = events_df.groupby('hour').size()
print("Events by hour:")
print(hourly_events)

# Driver performance comparison
driver_scores = []
for driver_id in df['driver_id'].unique():
    if pd.notna(driver_id):
        score = client.safety.get_score(driver_id=driver_id)
        driver_scores.append({
            'driver_id': driver_id,
            'safety_score': score.score,
            'grade': score.grade
        })

driver_df = pd.DataFrame(driver_scores)
print(driver_df.sort_values('safety_score', ascending=False))

Async Usage

Concurrent Operations

import asyncio
from bookovia import AsyncClient

async def process_fleet_data():
    async with AsyncClient('bkv_live_your_api_key') as client:
        
        # Get list of vehicles
        vehicles = await client.fleet.get_vehicles()
        
        # Start trips concurrently
        tasks = [
            client.trips.start(vehicle_id=vehicle.vehicle_id)
            for vehicle in vehicles[:10]  # Start 10 trips
        ]
        
        trips = await asyncio.gather(*tasks, return_exceptions=True)
        
        # Process results
        successful_trips = [
            trip for trip in trips 
            if not isinstance(trip, Exception)
        ]
        
        print(f"Started {len(successful_trips)} trips successfully")
        
        return successful_trips

# Run async function
trips = asyncio.run(process_fleet_data())

Async Context Managers

async def managed_client_example():
    # Client automatically handles connection lifecycle
    async with AsyncClient('your_api_key') as client:
        
        # All operations are async
        trip = await client.trips.start(vehicle_id='vehicle_123')
        
        # Upload locations concurrently
        location_tasks = [
            client.locations.upload(
                trip_id=trip.trip_id,
                latitude=40.7128 + i * 0.001,
                longitude=-74.0060 + i * 0.001,
                timestamp=datetime.now(timezone.utc),
                speed=35 + i,
                heading=180
            )
            for i in range(10)
        ]
        
        await asyncio.gather(*location_tasks)
        
        # Stop trip
        await client.trips.stop(trip_id=trip.trip_id)
    
    # Client is automatically closed

Async Generators for Large Datasets

async def stream_trip_data(client, filters):
    """Stream trip data in batches to handle large datasets."""
    offset = 0
    limit = 100
    
    while True:
        batch = await client.trips.list(
            filters=filters,
            limit=limit,
            offset=offset
        )
        
        if not batch.trips:
            break
            
        for trip in batch.trips:
            yield trip
            
        offset += limit

# Usage
async def process_all_trips():
    async with AsyncClient('your_api_key') as client:
        filters = TripFilters(
            start_date=date(2024, 1, 1),
            end_date=date(2024, 4, 13)
        )
        
        trip_count = 0
        total_distance = 0
        
        async for trip in stream_trip_data(client, filters):
            trip_count += 1
            total_distance += trip.analytics.distance_km
            
            if trip_count % 100 == 0:
                print(f"Processed {trip_count} trips...")
        
        print(f"Total: {trip_count} trips, {total_distance:.2f} km")

asyncio.run(process_all_trips())

Machine Learning Integration

Feature Engineering for ML Models

import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier

class TelematicsFeatureEngineer:
    def __init__(self, client):
        self.client = client
    
    def extract_driver_features(self, driver_id, days=30):
        """Extract features for driver behavior prediction."""
        end_date = date.today()
        start_date = end_date - timedelta(days=days)
        
        # Get trips for driver
        trips = self.client.trips.list(
            filters=TripFilters(
                driver_id=driver_id,
                start_date=start_date,
                end_date=end_date,
                status=TripStatus.COMPLETED
            )
        )
        
        if not trips.trips:
            return None
            
        # Calculate aggregate features
        features = {
            'total_trips': len(trips.trips),
            'total_distance_km': sum(t.analytics.distance_km for t in trips.trips),
            'avg_trip_duration_min': np.mean([t.analytics.duration_minutes for t in trips.trips]),
            'max_speed_kmh': max(t.analytics.max_speed_kmh for t in trips.trips),
            'avg_speed_kmh': np.mean([t.analytics.avg_speed_kmh for t in trips.trips]),
            'total_idle_time_min': sum(t.analytics.idle_time_minutes for t in trips.trips),
            'avg_safety_score': np.mean([t.analytics.safety_score for t in trips.trips]),
            'avg_eco_score': np.mean([t.analytics.eco_score for t in trips.trips]),
        }
        
        # Get safety events for the period
        events = self.client.safety.get_harsh_events(
            driver_id=driver_id,
            start_date=start_date,
            end_date=end_date
        )
        
        # Count events by type
        event_counts = {}
        for event_type in EventType:
            event_counts[f'{event_type.value}_count'] = sum(
                1 for event in events.events 
                if event.event_type == event_type
            )
        
        features.update(event_counts)
        
        return features
    
    def build_risk_prediction_dataset(self, driver_ids):
        """Build dataset for crash risk prediction."""
        dataset = []
        
        for driver_id in driver_ids:
            features = self.extract_driver_features(driver_id)
            if features:
                # Get actual risk assessment
                risk = self.client.safety.get_crash_risk(
                    driver_id=driver_id,
                    timeframe='30days'
                )
                
                features['risk_score'] = risk.risk_score
                features['risk_level'] = risk.risk_level
                dataset.append(features)
        
        return pd.DataFrame(dataset)

# Usage
engineer = TelematicsFeatureEngineer(client)

# Get driver list
drivers = ['driver_001', 'driver_002', 'driver_003']  # Your driver IDs

# Build dataset
df = engineer.build_risk_prediction_dataset(drivers)

# Prepare for ML
feature_columns = [col for col in df.columns if col not in ['risk_score', 'risk_level']]
X = df[feature_columns]
y = df['risk_level'].map({'low': 0, 'medium': 1, 'high': 2})

# Train model
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_scaled, y)

# Feature importance
feature_importance = pd.DataFrame({
    'feature': feature_columns,
    'importance': model.feature_importances_
}).sort_values('importance', ascending=False)

print("Top risk factors:")
print(feature_importance.head(10))

Anomaly Detection

from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler

class AnomalyDetector:
    def __init__(self, client):
        self.client = client
        self.model = IsolationForest(contamination=0.1)
        self.scaler = StandardScaler()
        
    def detect_anomalous_trips(self, vehicle_id, days=7):
        """Detect anomalous driving patterns."""
        end_date = date.today()
        start_date = end_date - timedelta(days=days)
        
        trips = self.client.trips.list(
            filters=TripFilters(
                vehicle_id=vehicle_id,
                start_date=start_date,
                end_date=end_date,
                status=TripStatus.COMPLETED
            )
        )
        
        # Extract features
        features = []
        trip_ids = []
        
        for trip in trips.trips:
            features.append([
                trip.analytics.distance_km,
                trip.analytics.duration_minutes,
                trip.analytics.max_speed_kmh,
                trip.analytics.avg_speed_kmh,
                trip.analytics.idle_time_minutes,
                trip.analytics.safety_score,
                trip.analytics.events_count
            ])
            trip_ids.append(trip.trip_id)
        
        if len(features) < 10:  # Need minimum samples
            return []
            
        # Fit model and detect anomalies
        X = self.scaler.fit_transform(features)
        anomalies = self.model.fit_predict(X)
        
        # Return anomalous trip IDs
        return [
            trip_ids[i] for i, is_anomaly in enumerate(anomalies)
            if is_anomaly == -1
        ]

# Usage
detector = AnomalyDetector(client)
anomalous_trips = detector.detect_anomalous_trips('vehicle_123')

print(f"Found {len(anomalous_trips)} anomalous trips:")
for trip_id in anomalous_trips:
    trip = client.trips.get(trip_id)
    print(f"- {trip_id}: {trip.analytics.safety_score} safety score")

Testing

Unit Testing with pytest

# tests/test_client.py
import pytest
from unittest.mock import Mock, patch
from bookovia import Client
from bookovia.models import Trip, TripStatus
from bookovia.exceptions import AuthenticationError

class TestBookoviaClient:
    @pytest.fixture
    def client(self):
        return Client('bkv_test_example_key')
    
    @pytest.fixture
    def mock_trip(self):
        return Trip(
            trip_id='trip_123',
            organization_id='org_456',
            vehicle_id='vehicle_123',
            status=TripStatus.ACTIVE,
            start_time=datetime.now(timezone.utc),
            analytics=TripAnalytics(
                distance_km=0,
                duration_minutes=0,
                max_speed_kmh=0,
                avg_speed_kmh=0,
                idle_time_minutes=0,
                locations_count=0,
                events_count=0,
                safety_score=100,
                eco_score=100
            ),
            created_at=datetime.now(timezone.utc),
            updated_at=datetime.now(timezone.utc)
        )
    
    def test_start_trip_success(self, client, mock_trip):
        with patch.object(client.trips, 'start', return_value=mock_trip):
            trip = client.trips.start(vehicle_id='vehicle_123')
            
            assert trip.trip_id == 'trip_123'
            assert trip.vehicle_id == 'vehicle_123'
            assert trip.status == TripStatus.ACTIVE
    
    def test_authentication_error(self, client):
        with patch.object(client.trips, 'start', side_effect=AuthenticationError('Invalid API key')):
            with pytest.raises(AuthenticationError):
                client.trips.start(vehicle_id='vehicle_123')
    
    @pytest.mark.asyncio
    async def test_async_client(self, mock_trip):
        from bookovia import AsyncClient
        
        async with AsyncClient('bkv_test_example_key') as client:
            with patch.object(client.trips, 'start', return_value=mock_trip):
                trip = await client.trips.start(vehicle_id='vehicle_123')
                assert trip.trip_id == 'trip_123'

Integration Testing

# tests/test_integration.py
import pytest
from bookovia import Client
from bookovia.models import Location

@pytest.mark.integration
class TestIntegration:
    @pytest.fixture(scope='class')
    def client(self):
        # Use test API key for integration tests
        return Client('bkv_test_integration_key')
    
    def test_full_trip_lifecycle(self, client):
        # Start trip
        trip = client.trips.start(
            vehicle_id='test_vehicle_001',
            start_location=Location(latitude=40.7128, longitude=-74.0060)
        )
        
        assert trip.trip_id is not None
        assert trip.status == 'active'
        
        # Upload location
        client.locations.upload(
            trip_id=trip.trip_id,
            latitude=40.7130,
            longitude=-74.0058,
            timestamp=datetime.now(timezone.utc),
            speed=35,
            heading=180
        )
        
        # Get trip details
        updated_trip = client.trips.get(trip.trip_id)
        assert updated_trip.analytics.locations_count >= 1
        
        # Stop trip
        final_trip = client.trips.stop(trip.trip_id)
        assert final_trip.status == 'completed'

Advanced Examples

Data Pipeline for Analytics

class TelematicsDataPipeline:
    def __init__(self, api_key, database_url=None):
        self.client = bookovia.Client(api_key)
        self.db_url = database_url
    
    def extract_daily_metrics(self, target_date):
        """Extract all trip data for a specific date."""
        trips = self.client.trips.list(
            filters=TripFilters(
                start_date=target_date,
                end_date=target_date,
                status=TripStatus.COMPLETED,
                limit=1000
            )
        )
        
        metrics = []
        for trip in trips.trips:
            metrics.append({
                'date': target_date,
                'trip_id': trip.trip_id,
                'vehicle_id': trip.vehicle_id,
                'driver_id': trip.driver_id,
                'distance_km': trip.analytics.distance_km,
                'duration_minutes': trip.analytics.duration_minutes,
                'safety_score': trip.analytics.safety_score,
                'eco_score': trip.analytics.eco_score,
                'events_count': trip.analytics.events_count
            })
        
        return pd.DataFrame(metrics)
    
    def process_safety_events(self, target_date):
        """Process safety events for analysis."""
        events = self.client.safety.get_harsh_events(
            start_date=target_date,
            end_date=target_date,
            limit=10000
        )
        
        event_data = []
        for event in events.events:
            event_data.append({
                'date': target_date,
                'event_id': event.event_id,
                'trip_id': event.trip_id,
                'event_type': event.event_type,
                'severity': event.severity,
                'latitude': event.location.latitude,
                'longitude': event.location.longitude,
                'timestamp': event.timestamp
            })
        
        return pd.DataFrame(event_data)
    
    def run_daily_pipeline(self, target_date):
        """Run complete daily data pipeline."""
        print(f"Processing data for {target_date}")
        
        # Extract metrics
        trip_metrics = self.extract_daily_metrics(target_date)
        safety_events = self.process_safety_events(target_date)
        
        # Calculate aggregations
        daily_summary = {
            'date': target_date,
            'total_trips': len(trip_metrics),
            'total_distance_km': trip_metrics['distance_km'].sum(),
            'avg_safety_score': trip_metrics['safety_score'].mean(),
            'total_events': len(safety_events),
            'unique_vehicles': trip_metrics['vehicle_id'].nunique(),
            'unique_drivers': trip_metrics['driver_id'].nunique()
        }
        
        # Store results (implement your storage logic)
        if self.db_url:
            self.store_to_database(trip_metrics, safety_events, daily_summary)
        
        return daily_summary

# Usage
pipeline = TelematicsDataPipeline('bkv_live_your_api_key')
summary = pipeline.run_daily_pipeline(date.today())
print(summary)

Support


Ready for data-driven insights? Check out our quickstart guide and API reference for more examples.