Skip to content

guides

Intended Documentation

Python ML Pipelines

Gate perception → policy pipelines (PyTorch / JAX) on Intended Authority Tokens. Same SDK as the reference cobot.

Integrating Intended in Python ML Pipelines (DOC-P3)#

Audience: ML engineers who run perception / planning pipelines for physical agents in Python — typically PyTorch / JAX models that consume sensor streams and emit structured commands.

Prereqs: Python 3.10+, an Intended tenant API key.

Status: SDK shipped (intended>=0.2.0), cloud round-trip works end-to-end.

Where Intended fits in an ML pipeline#

sensors ──▶ perception model ──▶ planner / policy ──▶ structured command ──▶ controller
                                                          │
                                                          ▼
                                               classify + authorize
                                                  (Intended SDK)
                                                          │
                                                          ▼
                                               Authority Token + safe-default
                                                          │
                                                          ▼
                                                  controller dispatch
                                                  (gated on token)

The pattern: every time your model produces a command that will become real-world motion, you classify the command, snapshot the relevant state, and ask for an Authority Token. The token is the credential the controller checks. Without a valid token, the controller falls back to the safe-default action declared on the DAG node.

This is the same pattern as the ROS2 guide, just without the ROS2 plumbing — useful for:

  • Standalone perception → command pipelines (autonomous vehicles running outside of ROS2, drone autopilots wrapping PX4 / ArduPilot)
  • Surgical robotics platforms where the planner is a Python service separate from the real-time motion stack
  • Agriculture / mining / construction ML stacks where ROS is not the dominant integration

Install#

bash
pip install intended>=0.2.0

Basic usage#

python
import os
from intended.physical import (
    IntendedPhysicalClient,
    PhysicalDagNode,
    PhysicalSdkConfig,
    StructuredGoal,
    now_ms,
)

config = PhysicalSdkConfig(
    api_base_url=os.environ["INTENDED_API_URL"],
    api_key=os.environ["INTENDED_API_KEY"],
)

with IntendedPhysicalClient(config) as client:
    # 1. Build the structured goal from your planner's output.
    goal = StructuredGoal(
        schema="px4:command/MAV_CMD_NAV_WAYPOINT",
        verb="navigate-to-waypoint",
        object="waypoint-7",
        parameters={"lat": 37.7749, "lon": -122.4194, "alt_m": 80},
        actor_kind="drone",
        actor_identifier="drone-fleet-3",
        source_timestamp_ms=now_ms(),
    )

    # 2. Classify into OIL.
    classification = client.classify_structured_goal(goal, deadline_ms=200)
    if classification.fail_closed:
        # Novel goal — fall back to declared safe default.
        actuate_safe_default("hold-position")
        return

    # 3. Issue a token bound to this drone + this DAG node.
    dag = PhysicalDagNode(
        node_id="nav-step-1",
        oil_code=classification.oil_code,
        deadline_ms=500,
        safe_default="hold-position",
        real_time_tier="rt-soft",
    )
    token = client.issue_authority_token(
        intent_oil_code=classification.oil_code,
        structured_goal=goal,
        dag_node=dag,
        actor_identity="drone-fleet-3",
        physical_state=snapshot_state(),
        safety_citations=classification.safety_citations,
    )

    # 4. Hand the token off to the controller.
    controller.dispatch_with_token(token["token"])

Pattern: per-frame perception → policy gate#

For perception-heavy pipelines (autonomous driving, agricultural row following, surgical autonomy), you may run the classifier every frame and only re-issue tokens when the cited OIL category changes. This reduces cloud QPS without sacrificing correctness:

python
class GatedPolicyHead:
    def __init__(self, client, *, dag, actor_identity):
        self._client = client
        self._dag = dag
        self._actor = actor_identity
        self._cached_token = None
        self._cached_oil = None

    def authorize(self, goal, physical_state):
        classification = self._client.classify_structured_goal(goal)
        if classification.fail_closed:
            return None

        # Reuse the token if (a) OIL category is unchanged and
        # (b) it's still alive for at least the deadline window.
        if (
            self._cached_token
            and classification.oil_code == self._cached_oil
            and self._cached_token["expiresAtMs"] - now_ms() > self._dag.deadline_ms
        ):
            return self._cached_token["token"]

        token = self._client.issue_authority_token(
            intent_oil_code=classification.oil_code,
            structured_goal=goal,
            dag_node=self._dag,
            actor_identity=self._actor,
            physical_state=physical_state,
            safety_citations=classification.safety_citations,
        )
        self._cached_token = token
        self._cached_oil = classification.oil_code
        return token["token"]

When the classifier shifts the OIL category — e.g. from OIL-2002 (lane keeping) to OIL-2005 (lane change) — a fresh token is required. This is by design: the new category may have a different policy, different safe-default, different operator approval requirement.

Pattern: streaming classifier (vision-grounded)#

For agents whose intent is perception-determined (the model output IS the intent), pass a fingerprint of the relevant tensor as a parameter:

python
goal = StructuredGoal(
    schema="custom:perception/agricultural-row-follow",
    verb="follow-row",
    parameters={
        "row_id": detected_row.id,
        "perception_confidence": float(detection.confidence),
        "perception_fingerprint": tensor_hash(latest_frame),  # for audit replay
    },
    actor_kind="agv",
    actor_identifier="tractor-12",
)

The fingerprint goes into the audit chain so you can replay the decision later — critical for safety reviews of ML-driven autonomy.

LIM-P4 (vision-grounded classification) — accepting a vision tensor as direct classifier input — is on the roadmap; until it ships, fingerprint

  • structured goal is the documented pattern.

Snapshotting state#

If your perception stack already publishes structured world-state observations, wrap them in a PhysicalStateProvider:

python
from intended.physical import PhysicalStateValue, PhysicalStateProvider

class PerceptionStateProvider:
    """Surfaces the perception stack's latest world-model observations
    as Intended predicates. Customers extend with their own predicates."""

    def __init__(self, world_model):
        self._wm = world_model

    def get_physical_state(self, predicate, deadline_ms=None, tenant_id=None):
        if predicate == "perception/operator_in_cabin":
            obs = self._wm.latest_cabin_occupancy()
            return PhysicalStateValue(
                kind="boolean",
                value=obs.has_human,
                as_of_timestamp_ms=obs.timestamp_ms,
                channel="cabin-cam-stack",
                safety_rated=False,   # vision is not a safety-rated bus
                protocol="untrusted",
            )
        return PhysicalStateValue(
            kind="unavailable",
            as_of_timestamp_ms=now_ms(),
            reason=f"unknown predicate: {predicate}",
        )

Note safety_rated=False for perception-derived predicates. This is honest: a camera-based human detector is not a safety-rated input in the IEC 61508 sense. Your policy can either (a) require a redundant safety-rated channel via the consensus operator (if 2_of_3(human_present) then deny) or (b) downgrade the action to a non-safety-critical OIL category. Don't claim safety rating you don't have.

Real-time considerations#

IntendedPhysicalClient is synchronous and uses httpx. Cloud round-trip median is ~80–120ms; p99 ~250ms. For pipelines running at ≥30Hz, do not issue per frame — use the cached pattern above and re-issue on OIL transitions only.

For genuinely real-time loops (≤10ms decision budgets), Python is the wrong language and you should be on the C++ or Rust SDK path (DOC-P2 / DOC-P4) with the edge verifier (TOK-P1). Until those ship, the bridge is: Python service mints tokens, RT controller in C++ verifies them against the cloud-cached JWKS.

Async usage#

The async client lands in v0.3 (IntendedAsyncPhysicalClient). For asyncio-based pipelines today, run the sync client in a worker thread:

python
import asyncio
import functools

async def authorize_async(client, goal, dag, state):
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        None,
        functools.partial(
            client.issue_authority_token,
            intent_oil_code=goal.oil_code,
            structured_goal=goal,
            dag_node=dag,
            actor_identity="drone-fleet-3",
            physical_state=state,
        ),
    )

Testing#

  • Unit tests against the cloud sandbox: set INTENDED_API_URL to the sandbox base. Tokens issued there are clearly tagged and rejected by any production verifier.
  • Replay tests: capture historical perception traces + state snapshots, feed them through your gating pipeline, assert the decision shape. DAG-P5 (replay simulator) automates this once shipped.

See also#

Python ML Pipelines | Intended