Skip to content

guides

Intended Documentation

ROS2 Integration

Layer Intended Authority Tokens onto a ROS2 agent — cobot, AMR, drone, surgical — without writing HTTP plumbing.

Integrating Intended on ROS2 (DOC-P1)#

Audience: robotics engineers shipping ROS2-based agents (cobots, AMRs, drones, surgical robots) who want to gate state-changing actions on an Intended Authority Token.

Prereqs: ROS2 Humble or newer, Python 3.10+, an Intended tenant API key.

Status: SDK skeleton (v0.2). Cloud round-trip works end-to-end. Sub-50ms hot-path verification ships with the Rust edge verifier (TOK-P1, deferred — see the roadmap).

Why this exists#

ROS2 already has good primitives for what a robot is doing. It does not have a primitive for whether the robot is allowed to do it. The moment an LLM, planner, or operator command becomes the source of an action, you need:

  • An auditable record of which OIL category the action falls under.
  • A short-lived signed credential the controller checks before commanding motors.
  • A safe-default fallback if the credential is denied or expires.

Intended is that layer. This guide shows how to wire it into a ROS2 agent without writing HTTP plumbing.

Architecture#

                       ┌──────────────────────┐
                       │   Intended Cloud     │
                       │  classifier + signer │
                       └──────────▲───────────┘
                                  │ HTTPS (sub-second)
┌────────────────────────────────────────────────────────────────┐
│  Your robot                                                    │
│                                                                │
│  ┌─────────────┐    ┌────────────────────────┐   ┌─────────┐   │
│  │ Your agent  │───▶│ intended_authority_node│──▶│ Verifier│──▶│ motors
│  │  (BT, FSM,  │    │   (this package)       │   │ (cloud  │   │
│  │   LLM, …)   │    │                        │   │  today; │   │
│  └─────────────┘    └────────────────────────┘   │  edge   │   │
│        │                       │                 │  TOK-P1)│   │
│        ▼                       ▼                 └─────────┘   │
│  ┌──────────────────────────────────┐                          │
│  │ Your PhysicalStateProvider       │                          │
│  │  (FSoE / PROFIsafe / DDS / topic │                          │
│  │   reads — you implement)         │                          │
│  └──────────────────────────────────┘                          │
└────────────────────────────────────────────────────────────────┘

Intended runs as a ROS2 node in your robot's compute. Your agent never talks to our cloud directly — the node mediates. Your safety bus data never leaves your network — you implement a PhysicalStateProvider that surfaces it as structured predicates.

Install#

bash
# 1. Clone the package into your colcon workspace.
cd ~/colcon_ws/src
git clone https://github.com/intended-so/intended.git
ln -s intended/packages/intended-ros2 intended_ros2

# 2. Install the Python SDK from PyPI.
pip install intended>=0.2.0

# 3. Build.
cd ~/colcon_ws
colcon build --packages-select intended_ros2
source install/setup.bash

Configure#

The node reads its config from environment variables, so it bootstrappable from systemd / launch files / Docker without ROS2 parameter wiring.

bash
export INTENDED_API_URL=https://api.intended.so
export INTENDED_API_KEY=mrt_…
export INTENDED_ACTOR_IDENTITY=cobot-east-3   # use your DevID
export INTENDED_SAFE_DEFAULT=hold-position    # or stop, request-operator
export INTENDED_DEADLINE_MS=200

INTENDED_ACTOR_IDENTITY SHOULD be the IEEE 802.1AR DevID of your robot once TOK-P5 ships. Until then any unique stable identifier works.

Launch#

bash
ros2 launch intended_ros2 intended_authority_node.launch.py

That gives you a node exposing ~/classify_and_authorize as a std_srvs/srv/Trigger-shaped service (typed srv lands in v0.3).

Calling from your agent#

The v0.2 service contract is JSON-in / JSON-out, encoded through ROS2 parameters. From a Python lifecycle node:

python
import json
import rclpy
from rclpy.node import Node
from std_srvs.srv import Trigger

class PickPlacePlanner(Node):
    def __init__(self):
        super().__init__("pick_place_planner")
        self._cli = self.create_client(Trigger, "/intended_authority_node/classify_and_authorize")
        self._cli.wait_for_service(timeout_sec=5.0)

    def request_authority_for_pick(self, target_pose):
        # 1. Build the structured goal — a ROS2 action shape, not English.
        request_payload = {
            "goal": {
                "schema": "ros2:action:moveit_msgs/Pick",
                "verb": "pick-workpiece",
                "object": "workpiece-A4-7",
                "parameters": {"target_pose": target_pose},
                "actor": {"kind": "cobot", "identifier": "cobot-east-3"},
            },
            "dag_node": {
                "node_id": "pick-step-1",
                "oil_code": "OIL-1501",  # Intended classifier may override
                "deadline_ms": 200,
                "safe_default": "hold-position",
                "real_time_tier": "rt-soft",
            },
            "physical_state": self._snapshot_required_predicates(),
        }

        # 2. Encode the request via the pending_request param (v0.2 placeholder).
        self.set_parameters([
            rclpy.Parameter(
                "pending_request",
                rclpy.Parameter.Type.STRING,
                json.dumps(request_payload),
            )
        ])

        # 3. Call the service.
        future = self._cli.call_async(Trigger.Request())
        rclpy.spin_until_future_complete(self, future)
        response = future.result()

        # 4. Branch on decision.
        if not response.success:
            denial = json.loads(response.message)
            self.get_logger().error(f"DENIED → falling back to {denial['safe_default']}")
            self._actuate_safe_default(denial["safe_default"])
            return None

        decision = json.loads(response.message)
        return decision["token"]   # opaque; your trajectory dispatcher verifies it

    def _snapshot_required_predicates(self):
        # Read from your safety bus / DDS topics here. Structure each as:
        #   {"kind": "boolean", "value": <bool>, "as_of_timestamp_ms": <int>,
        #    "channel": "<bus>", "safety_rated": <bool>, "protocol": "<protocol>"}
        # See DOC-P5 for the exact contract.
        ...

Implementing PhysicalStateProvider#

Intended never reads your safety bus. You bridge it. The minimum you need is a node (Python or C++) that, given a predicate name, returns its current value with attestation metadata.

python
from intended.physical import PhysicalStateValue, now_ms

class FsoeBridgeProvider:
    """Reads predicates from a Beckhoff FSoE master via your existing
    DDS topic bridge. Replace with your bus shape."""

    def __init__(self, dds_subscriber):
        self._dds = dds_subscriber

    def get_physical_state(self, predicate, deadline_ms=None, tenant_id=None):
        msg = self._dds.read_with_timeout(predicate, timeout_ms=deadline_ms)
        if msg is None:
            return PhysicalStateValue(
                kind="unavailable",
                as_of_timestamp_ms=now_ms(),
                reason=f"timeout reading {predicate}",
            )
        return PhysicalStateValue(
            kind="boolean",
            value=msg.value,
            as_of_timestamp_ms=msg.publish_timestamp_ms,
            channel=msg.topic,
            safety_rated=True,
            protocol="fsoe",
        )

safety_rated=True and protocol="fsoe" are the load-bearing claims here. If your policy uses safety-rated predicates and the bus drops to an untrusted channel, the cloud will deny on attestation grounds (clause require-attested-safety-bus in the reference policy).

Verifying the token#

Today: the cloud verifies on each issuance — no edge verification yet. Your trajectory dispatcher trusts the token if decision == "ALLOW" and expires_at_ms > now().

When TOK-P1 ships: drop in the Rust edge verifier binary and your dispatcher links against it. The wire format and JWT claims do not change.

Common patterns#

Behavior-tree (BehaviorTree.CPP)#

Wrap the service call in a RetryNode over a ConditionNode so an ESCALATE decision pauses the tree until an operator approves. The operator approval ticket comes back in decision.escalation_ticket_id.

Real-time controllers (ros2_control)#

Don't call the service from the controller's update loop — the cloud round-trip will starve your real-time deadline. Pre-issue tokens from a non-RT planner and pass them down via the controller's command interface. Edge verifier (TOK-P1) collapses this to a single local check.

Multi-robot fleets#

Run one intended_authority_node per robot. Tokens bind to the robot's actor_identity, so a token issued for cobot-east-3 cannot be replayed on cobot-east-4 once TOK-P5 ships.

Troubleshooting#

SymptomLikely causeFix
Service times outAPI key empty or wrong tenantcheck INTENDED_API_KEY, see node logs
Every request returns DENY with attestation reasonpredicates marked protocol: untrusted but policy requires safety-ratedwire your safety bus, set safety_rated=True
ESCALATE on every novel goalclassifier confidence below your policy floorextend the LIM corpus (LIM-P2) or lower the floor in policy
Token expires before motion startsdeadline_ms too tight for your trajectory lengthrequest fresh token from controller, not planner

See also#

ROS2 Integration | Intended