Handling Truck Capacity Constraints in Python for Waste Management Route Optimization
Production patterns for binding capacity dimensions in OR-Tools without solver divergence.
Municipal waste fleets operate under strict volumetric and mass thresholds that dictate route viability. Exceeding these boundaries triggers regulatory penalties, unplanned depot returns, and accelerated suspension wear. Python implementations for VRP Route Optimization Algorithms must enforce hard boundaries while absorbing high-frequency telemetry variance. This reference details production-ready patterns for load-cell stabilization, axle compliance mapping, and memory-efficient constraint validation.
Telemetry Stabilization & Drift Compensation
Municipal scale sensors produce noisy weight streams with measurable calibration drift and compaction-induced spikes. Unfiltered telemetry causes premature route termination in the solver. Recursive exponential moving averages (EMA) provide O(1) memory overhead per asset and suppress transient noise without introducing phase lag.
import numpy as np
from typing import Dict, Optional
class LoadCellFilter:
"""Stateful EMA filter for streaming load-cell telemetry."""
def __init__(self, alpha: float = 0.35, min_samples: int = 5):
self.alpha = alpha
self.min_samples = min_samples
self._state: Dict[str, Dict[str, float]] = {}
def update(self, truck_id: str, raw_kg: float) -> Optional[float]:
if truck_id not in self._state:
self._state[truck_id] = {"ema": raw_kg, "count": 0}
state = self._state[truck_id]
state["count"] += 1
if state["count"] < self.min_samples:
state["ema"] = (state["ema"] * (state["count"] - 1) + raw_kg) / state["count"]
return None # Warm-up phase
state["ema"] = self.alpha * raw_kg + (1 - self.alpha) * state["ema"]
return round(state["ema"], 1)
def reset(self, truck_id: str) -> None:
self._state.pop(truck_id, None)
Hard Constraint Enforcement & Memory-Efficient Validation
Capacity dimensions act as immutable state transitions within the routing graph. Standard list comprehensions for pre-validation cause rapid heap expansion when processing municipal-scale node graphs. Generator-based validation preserves RAM during solver initialization and enables lazy evaluation of stop sequences.
from typing import Iterator, Tuple
def validate_remaining_capacity(
remaining_stops: Iterator[float],
current_load_kg: float,
max_capacity_kg: float,
buffer_pct: float = 0.98
) -> Tuple[bool, str, float]:
"""Lazily evaluates stop sequence against hard volumetric/mass limits."""
projected_load = current_load_kg
threshold = max_capacity_kg * buffer_pct
for stop_weight in remaining_stops:
projected_load += stop_weight
if projected_load > threshold:
return False, "CAPACITY_THRESHOLD_EXCEEDED", round(projected_load, 1)
return True, "VALID", round(projected_load, 1)
Axle Load Compliance Mapping
Regulatory frameworks mandate exact mapping of axle weight distributions, not just gross vehicle weight. Python routing engines must maintain separate counters for front and rear axle loads. Municipal codes frequently cap single-axle loads at 9,072 kg and tandem configurations at 15,422 kg. Violations require immediate route branching.
def validate_axle_compliance(
front_load_kg: float,
rear_load_kg: float,
config: dict
) -> Tuple[bool, str]:
max_single = config.get("axle_limit_kg", 9072)
max_tandem = config.get("tandem_limit_kg", 15422)
gross = front_load_kg + rear_load_kg
if front_load_kg > max_single:
return False, "FRONT_AXLE_EXCEEDED"
if rear_load_kg > max_tandem:
return False, "REAR_AXLE_EXCEEDED"
if gross > config.get("gvwr_kg", 26308):
return False, "GVWR_EXCEEDED"
return True, "COMPLIANT"
Production Workflow: Constraint Enforcement & Structured Logging
The following workflow integrates telemetry filtering, hard capacity validation, axle compliance, and fallback routing. It demonstrates a single constraint enforcement cycle with realistic mock payloads and structured JSON logging aligned with Capacity & Weight Limits standards.
import logging
import json
from dataclasses import dataclass, asdict
from typing import List, Dict, Any
# Structured logging configuration
class JSONFormatter(logging.Formatter):
def format(self, record):
log_record = {
"ts": self.formatTime(record),
"level": record.levelname,
"msg": record.getMessage(),
"truck_id": getattr(record, "truck_id", None),
"event_type": getattr(record, "event_type", None),
"payload": getattr(record, "payload", None),
"compliance": getattr(record, "compliance", None)
}
return json.dumps(log_record)
logger = logging.getLogger("capacity_engine")
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
@dataclass
class StopPayload:
truck_id: str
raw_kg: float
front_axle_kg: float
rear_axle_kg: float
stop_id: str
timestamp_ms: int
def process_collection_cycle(
payload: StopPayload,
filter_engine: LoadCellFilter,
route_state: Dict[str, Any],
compliance_config: Dict[str, int]
) -> Dict[str, Any]:
"""End-to-end constraint validation with fallback routing."""
# 1. Stabilize telemetry
filtered_kg = filter_engine.update(payload.truck_id, payload.raw_kg)
if filtered_kg is None:
return {"status": "WARMUP", "action": "DEFER"}
route_state["current_load"] = filtered_kg
# 2. Validate axle compliance
axle_ok, axle_msg = validate_axle_compliance(
payload.front_axle_kg, payload.rear_axle_kg, compliance_config
)
# 3. Enforce capacity threshold
threshold_exceeded = route_state["current_load"] >= route_state["max_capacity"] * 0.98
extra = {
"truck_id": payload.truck_id,
"event_type": "CAPACITY_CHECK",
"payload": asdict(payload),
"compliance": {"axle_status": axle_msg, "threshold_hit": threshold_exceeded}
}
if not axle_ok:
logger.warning("Axle violation detected", extra=extra)
return {"status": "COMPLIANCE_FAIL", "action": "DEPOT_REDIRECT", "reason": axle_msg}
if threshold_exceeded:
logger.info("Capacity threshold reached", extra=extra)
route_state["fallback_triggered"] = True
return {
"status": "THRESHOLD_EXCEEDED",
"action": "FORCED_DEPOT_RETURN",
"remaining_stops": route_state.get("unassigned_stops", [])
}
logger.debug("Constraint validated", extra=extra)
return {"status": "VALID", "action": "CONTINUE_ROUTE"}
# Mock execution
if __name__ == "__main__":
mock_payload = StopPayload(
truck_id="WM-8842",
raw_kg=14250.0,
front_axle_kg=4100.0,
rear_axle_kg=10200.0,
stop_id="STOP-7741",
timestamp_ms=1709482100000
)
engine = LoadCellFilter(alpha=0.35)
state = {"current_load": 13800.0, "max_capacity": 14500.0, "unassigned_stops": ["STOP-7742", "STOP-7743"]}
config = {"axle_limit_kg": 9072, "tandem_limit_kg": 15422, "gvwr_kg": 26308}
result = process_collection_cycle(mock_payload, engine, state, config)
print(json.dumps(result, indent=2))
Integration Notes
This constraint engine plugs directly into solver initialization phases. When integrating with OR-Tools or custom metaheuristics, pass validate_remaining_capacity as a lazy evaluator during node insertion. The structured logging schema aligns with municipal telemetry pipelines and enables downstream compliance auditing. For dynamic threshold tuning based on seasonal waste density, implement a sliding-window percentile calculator that adjusts buffer_pct before peak collection windows.
References
- Python
loggingmodule documentation: https://docs.python.org/3/library/logging.html - FHWA Truck Size and Weight overview: https://ops.fhwa.dot.gov/freight/sw/overview/index.htm
- Google OR-Tools Vehicle Routing Problem: https://developers.google.com/optimization/routing/vrp