Skip to content

Actions

Actions are for long-running tasks that need feedback during execution and support cancellation. A client sends a goal to an action node, which can provide periodic feedback while working and delivers a final result upon completion.

Use actions for tasks like navigation, arm movement, or any operation that runs over time and benefits from progress updates.

A node can drive multiple goals concurrently for the same action. Each accepted goal yields its own GoalContext (owning that goal’s feedback stream, cancel signal, and result), so one server can route by a discriminator in the request (e.g. arm_id) and drive several independent resources in parallel. The framework routes each client’s cancel and result requests to the right goal by goal_id; deciding whether to accept a second concurrent goal is up to your goal handler.

An action consists of three communication channels built on top of services and topics:

  1. Goal (service): the client sends a goal request; the server accepts or rejects it.
  2. Feedback (topic): the server publishes progress updates while working on the goal.
  3. Result (service): the client requests the final result once the server finishes.

Additionally, the client can issue a cancel request at any time to abort an active goal.

Client Server
│ │
│──── fire_goal (request) ──────────>│
│<─── GoalResponse (accepted) ───────│
│ │
│<─── feedback ──────────────────────│ (repeated)
│<─── feedback ──────────────────────│
│ │
│──── get_result (request) ─────────>│
│<─── ResultResponse ────────────────│

A node that handles action goals declares its actions under interfaces.actions.exposes in its peppy.json5. Each action defines a goal_service, a feedback_topic, and a result_service:

{
peppy_schema: "node_v1",
manifest: {
name: "brain",
tag: "v1",
},
interfaces: {
actions: {
exposes: [
{
name: "move_arm",
goal_service: {
request_message_format: {
arm_id: "u16",
desired_position: {
$type: "array",
$items: "i32",
$length: 3
}
},
response_message_format: {
accepted: "bool"
}
},
feedback_topic: {
qos_profile: "sensor_data",
message_format: {
new_position: {
$type: "array",
$items: "i32",
$length: 3
}
}
},
result_service: {
response_message_format: {
success: "bool",
error_msg: {
$type: "string",
$optional: true
},
final_position: {
$type: "array",
$items: "i32",
$length: 3
}
}
}
}
],
},
},
execution: {
language: "rust",
build_cmd: ["cargo", "build", "--release"],
run_cmd: ["./target/release/brain"]
},
}

Every payload on goal_service and result_service is optional: any of goal_service.request_message_format, goal_service.response_message_format, result_service.request_message_format, and result_service.response_message_format can be omitted (or set to {}) when the corresponding payload is empty. For example, a calibrate action can omit goal_service.request_message_format because it has no goal parameters, and a result_service that only needs to signal completion can omit request_message_format.

The feedback_topic block is optional: actions that don’t stream progress can omit it entirely. When you do declare feedback_topic, however, its message_format is required and cannot be empty: every feedback message must carry a non-empty payload, since empty payloads are reserved as the framework’s per-goal end-of-stream signal (see Receiving feedback). Code generation will fail if feedback_topic is declared without a message_format.

After running peppy node sync, the code generator creates a module for each exposed action under peppygen::exposed_actions. Use ActionHandle::expose to set up the action, then loop accepting goals. handle_goal_next_request returns the next accepted goal as a GoalContext; rejected goals are answered and skipped for you, so the accept loop ends only when the goal stream closes (the node is shutting down). Spawn a worker per context so goals run concurrently; each context owns that goal’s feedback, cancel signal, and result, so nothing crosses between goals:

use peppygen::exposed_actions::move_arm;
use peppygen::{NodeBuilder, Parameters, Result};
fn main() -> Result<()> {
NodeBuilder::new().run(|_args: Parameters, node_runner| async move {
let mut action = move_arm::ActionHandle::expose(&node_runner).await?;
// Spawn the accept loop so this setup closure returns and the node
// starts serving. The loop runs for the life of the node.
tokio::spawn(async move {
// Each call returns the next accepted goal; rejected goals are
// answered and skipped, so the loop ends only when the stream
// closes (None) or errors.
while let Ok(Some(ctx)) = action
.handle_goal_next_request(|request| -> Result<move_arm::GoalResponse> {
println!(
"goal from {}: arm_id={} desired={:?}",
request.instance_id, request.data.arm_id, request.data.desired_position
);
// The decider sets the concurrency policy: e.g. reject a
// goal for a busy `arm_id`.
Ok(move_arm::GoalResponse::accept())
})
.await
{
// Drive this goal concurrently with any others already running.
tokio::spawn(async move {
// Feedback goes through this goal's context, not a shared slot.
ctx.publish_feedback([7, 31, 43]).await.ok();
// Deliver the result for this specific goal; the client's
// get_result(handle) is routed back here by goal_id.
ctx.complete(true, None, [98, 4, 26]).await.ok();
});
}
});
Ok(())
})
}

The decider returns a GoalResponse, the framework acknowledgement (accepted plus an optional rejection reason):

  • GoalResponse::accept() (GoalResponse.accept() in Python) admits the goal, replies to the client, and yields a GoalContext.
  • GoalResponse::reject(reason) (GoalResponse.reject(reason) in Python) declines it: the client receives the response with accepted == false and the reason in error_message, no context is created, and the accept loop transparently moves on to the next goal. This is where you enforce per-resource concurrency limits.

The GoalRequest passed to the decider contains:

  • instance_id: the client instance that sent the goal. The producer is binding-agnostic; it doesn’t know which slot on the client this goal is heading to.
  • core_node: the core node of the caller.
  • data: the deserialized goal parameters (only present when request_message_format is defined).

The GoalContext (ctx) is the only handle you need to drive the goal:

  • ctx.request(): the decoded GoalRequest.
  • ctx.goal_id(): this goal’s correlation id.
  • ctx.publish_feedback(...): publish a feedback message on this goal’s stream.
  • ctx.cancel_signal() / ctx.is_cancelled(): observe cancellation (see below).
  • ctx.complete(...) / ctx.complete_cancelled(...): deliver the final result.

A producer exposes its action once and serves any client that fires a goal at it. Starting the producer before or after the client is equally valid.

To enforce a concurrency limit (say, one in-flight goal per arm), reject a goal whose arm is already busy, and release the arm when the goal finishes. The decider and the per-goal workers share the busy set:

use std::collections::HashSet;
use std::sync::{Arc, Mutex};
// Arms currently driving a goal. Shared by the decider and the workers.
let busy: Arc<Mutex<HashSet<u16>>> = Arc::new(Mutex::new(HashSet::new()));
while let Ok(Some(ctx)) = action
.handle_goal_next_request({
let busy = Arc::clone(&busy);
move |request| -> Result<move_arm::GoalResponse> {
// `HashSet::insert` returns false when the arm is already busy.
if busy.lock().unwrap().insert(request.data.arm_id) {
Ok(move_arm::GoalResponse::accept())
} else {
// The reason rides back to the client in `error_message`.
Ok(move_arm::GoalResponse::reject(format!(
"arm {} is already moving",
request.data.arm_id
)))
}
}
})
.await
{
let busy = Arc::clone(&busy);
tokio::spawn(async move {
let arm_id = ctx.request().data.arm_id;
ctx.complete(true, None, [98, 4, 26]).await.ok();
// Release the arm so future goals for it are accepted again.
busy.lock().unwrap().remove(&arm_id);
});
}

A worker reacts to a cancel for its goal via ctx.cancel_signal(), which resolves when a cancel request arrives for that goal_id. Pair it with the goal’s work and report the outcome with complete_cancelled:

tokio::spawn(async move {
tokio::select! {
outcome = run_arm(ctx.request().data.arm_id) => {
for position in outcome.steps {
ctx.publish_feedback(position).await.ok();
}
ctx.complete(true, None, outcome.final_position).await.ok();
}
_ = ctx.cancel_signal() => {
// A cancel arrived for this goal; wind down and report it.
ctx.complete_cancelled(false, Some("cancelled".to_owned()), last_known_position)
.await
.ok();
}
}
});

Cancellation is auto-acknowledged by the framework: the client’s cancel_goal returns a typed CancelState: Signalled when a goal with that goal_id is in flight (the signal was delivered), AlreadyTerminal when it had already finished, or Unknown. Signalled means delivered, not will stop; a worker is free to ignore the signal and keep running. The worker decides the goal’s fate: calling complete_cancelled (or complete) is what a subsequent get_result returns. Whichever completion runs first wins; the framework closes this goal’s feedback stream on completion, so the client’s on_next_feedback_message loop ends cleanly. A goal’s cancel never affects other concurrent goals.

A node that sends goals declares what it consumes under interfaces.actions.consumes. Dependencies are declared in manifest.depends_on and referenced by link_id in the interface:

{
peppy_schema: "node_v1",
manifest: {
name: "controller",
tag: "v1",
depends_on: {
nodes: [
{ name: "brain", tag: "v1", link_id: "brain" },
]
},
},
interfaces: {
actions: {
consumes: [
{
link_id: "brain", // References depends_on.nodes[].link_id
name: "move_arm", // Action name on that node
},
],
},
},
execution: {
language: "rust",
build_cmd: ["cargo", "build", "--release"],
run_cmd: ["./target/release/controller"]
},
}

The code generator creates a module for each consumed action under peppygen::consumed_actions. Use fire_goal to send a goal, then listen for feedback and request the result:

use peppygen::consumed_actions::brain_move_arm;
use peppygen::{NodeBuilder, Parameters, QoSProfile, Result};
use std::time::Duration;
fn main() -> Result<()> {
NodeBuilder::new().run(|_args: Parameters, node_runner| async move {
let request = brain_move_arm::GoalRequest {
arm_id: 7,
desired_position: [10, 20, 30],
};
let mut action_handle = brain_move_arm::ActionHandle::fire_goal(
&node_runner,
Duration::from_secs(5), // timeout
request,
QoSProfile::SensorData, // QoS for the feedback topic
)
.await?;
println!("goal accepted={}", action_handle.data.accepted);
Ok(())
})
}

The signature is the same for pinned and from_any: true slots: fire_goal takes no call-site targeting argument. The slot’s launcher binding (or absence of one) decides which producer instance the goal addresses.

fire_goal returns an ActionHandle whose data field holds the goal response (e.g. data.accepted). The handle is what you use for subsequent feedback, result, and cancel calls. You can fire multiple goals concurrently, and each returns its own handle, so the server can drive them in parallel.

Use on_next_feedback_message on the handle to receive feedback. The idiomatic pattern is a loop that drains feedback until the server signals end-of-stream:

loop {
match action_handle.on_next_feedback_message().await {
Ok(feedback) => println!("new_position={:?}", feedback.new_position),
Err(_) => break, // server has completed (or cancelled) this goal
}
}

Each goal has its own feedback stream, addressed by goal_id, so feedback for one goal never reaches another handle, even when several goals run concurrently on the same server. The server closes a goal’s stream when its worker completes the goal (complete or complete_cancelled); the in-flight on_next_feedback_message then resolves with the closed-channel error, so the loop-until-error pattern drains feedback cleanly before you request the result. The stream also closes if the worker abandons the goal without completing it (an early return or a panic), so a client draining feedback always terminates rather than hanging.

Use get_result on the handle to request the final result. The call is routed to this goal by goal_id and parks until the goal reaches a definitive terminal state, then returns a typed outcome:

let result = action_handle.get_result(Duration::from_secs(5)).await?;
match result.outcome {
brain_move_arm::ResultOutcome::Completed(data) => println!(
"completed: success={} error={:?} final_position={:?}",
data.success,
data.error_msg.as_deref(),
data.final_position,
),
brain_move_arm::ResultOutcome::Cancelled(data) => {
println!("cancelled at {:?}", data.final_position)
}
brain_move_arm::ResultOutcome::Abandoned => {
println!("the worker abandoned the goal without producing a result")
}
brain_move_arm::ResultOutcome::Expired => {
println!("the result expired before it was fetched")
}
}

A poll on a still-running goal parks until the goal reaches a terminal state, then returns a typed outcome, so you can call get_result whenever you like; a forwarding/relaying node does not have to time its poll to the worker’s lifecycle. The returned outcome (a Rust ResultOutcome enum, or result.status plus optional result.data in Python) tells you exactly what happened:

  • Completed / Cancelled: the worker delivered a result via complete / complete_cancelled; the payload is in data.
  • Abandoned: the worker dropped the goal without ever delivering a result (an early return or a panic).
  • Expired: the goal finished, but its result was retained only for a bounded window (30 s by default) that has since elapsed, or the result was already evicted.

A terminal result stays fetchable for that retention window, and you can fetch it more than once within it, which makes relaying it reliable. If the poll outlives the caller’s own timeout, you get a normal timeout error like any other request.

Use cancel_goal on the handle to request cancellation of that specific goal. It returns a typed CancelState:

let cancel_response = action_handle.cancel_goal(Duration::from_secs(5)).await?;
match cancel_response.state {
brain_move_arm::CancelState::Signalled => println!("cancel delivered to a live goal"),
brain_move_arm::CancelState::AlreadyTerminal => println!("goal had already finished"),
brain_move_arm::CancelState::Unknown => println!("no goal with that id is known"),
}

The CancelState reports what the cancel found:

  • Signalled: a live goal received the cancel signal. Delivered, not necessarily will stop: the worker may ignore it.
  • AlreadyTerminal: the goal had already reached a terminal state, so there was nothing to cancel (best-effort; observable only while the result is still retained).
  • Unknown: no goal with that goal_id is known (it never existed, or was evicted long ago).

The cancel targets only this goal; other concurrent goals are unaffected. To learn the goal’s final state, call get_result: if the worker reacted with complete_cancelled you get a Cancelled outcome; if it ignored the cancel and finished normally you get Completed.

Routing for actions uses the same consumer-side model as topics and services. A binding KEY: VALUE creates a private channel from producer instance VALUE to one of the client’s declared slots; the action server itself is binding-agnostic.

Each slot in depends_on is either:

  • Pinned (the default, when from_any is absent or false): the slot must be bound. The generated fire_goal resolves the bound producer’s instance_id from the binding, and the goal, cancel, result, and feedback channels all address that one producer. The validator rejects a pinned-unbound slot before the stack starts.
  • from_any: true: when the slot is bound, the generated fire_goal resolves the binding and addresses the bound producer’s instance_id directly, exactly like a pinned slot. When the slot is left unbound, the call falls back to wildcard discovery: a probe goes out to every matching producer and the first responder is pinned for the rest of the goal cycle, so the same producer handles the goal, cancel, result, and feedback. Bindings constrain which producers discovery may consider.

In a launcher / stack config:

{
source: { local: "./consumer" },
instances: [{
instance_id: "my_consumer",
bindings: { brain: "left-arm-1" },
}],
}

or, when launching a single node during development:

Terminal window
peppy node run --bind brain@left-arm-1 .

When several sibling slots of the same (name, tag) exist on a client, the framework applies the same precedence rule used for topics:

  1. If pinned slots’ bindings name the producer the call resolves to → the call lands on that pinned slot.
  2. Otherwise, if a from_any slot exists for the producer’s (name, tag) and is either explicitly bound to that producer or unbound (wildcard fallback) → the call lands on the from_any slot.
  3. Otherwise the call cannot be resolved.

A client that wires two specific depth cameras to dedicated wrist slots and lets a third slot reach any other depth camera:

openarm01_backbone/peppy.json5
{
manifest: {
name: "openarm01_backbone",
tag: "v1",
depends_on: {
interfaces: [
{ name: "depth_camera", tag: "v1", link_id: "wrist_left_camera" },
{ name: "depth_camera", tag: "v1", link_id: "wrist_right_camera" },
{ name: "depth_camera", tag: "v1", link_id: "extra_cam", from_any: true },
],
},
},
// ...
}
peppy_launcher.json5
{
deployments: [
{ source: { name: "depth_camera:v1" }, instances: [
{ instance_id: "left_cam" },
{ instance_id: "right_cam" },
{ instance_id: "ceiling_cam" },
]},
{ source: { name: "openarm01_backbone:v1" }, instances: [
{ instance_id: "backbone_inst_1", bindings: {
wrist_left_camera: "left_cam",
wrist_right_camera: "right_cam",
}},
]},
],
}

Four contract statements follow from this manifest:

  1. wrist_left_camera_<action>::fire_goal(...) reaches left_cam (pinned-bound).
  2. wrist_right_camera_<action>::fire_goal(...) reaches right_cam (pinned-bound).
  3. extra_cam_<action>::fire_goal(...) reaches ceiling_cam via the unbound from_any wildcard; discovery only finds the producer not already claimed by a pinned binding.
  4. If extra_cam had been bound (e.g. extra_cam: "ceiling_cam"), discovery would be restricted to that bound set instead of every unclaimed producer.

When fire_goal is called on a from_any slot with no binding to a single producer, the framework runs a discover-then-pin sequence before sending the real goal:

  1. A lightweight probe is broadcast to every matching producer. The probe is auto-handled by the framework before any user goal handler runs.
  2. The first producer to respond is captured and the wire sender is pinned to its instance_id.
  3. The real goal request is delivered only to that pinned producer. All subsequent cancel_goal, get_result, and feedback calls also target the same producer.

The non-winning producers see only the probe; their goal handler never runs. Without this step every matching producer would execute the goal concurrently, which for state-changing actions (motor commands, file writes, payment dispatches) is a real-world hazard.

Pinned slots (and from_any slots bound to a single producer instance_id) skip discovery entirely because the codegen already addresses exactly one producer.

The cost is one additional round-trip on wildcard fire_goal calls (typically a few milliseconds). If the discovered producer dies between the probe and the real goal, fire_goal surfaces ServiceUnreachable and the caller can retry; the next attempt re-discovers a different producer if one is available. Discovery is capped at the caller’s goal_timeout if that is tighter than the framework’s default probe timeout, so a fast-fail budget still surfaces unreachable producers quickly.

The launcher validator runs these checks before the stack starts:

  1. Pinned-unbound is a hard error. Every pinned slot in depends_on must have a binding whose KEY equals the slot’s link_id.
  2. A KEY that matches a pinned slot’s link_id binds that slot to VALUE.
  3. Free-form KEYs are allowed for from_any slots. A binding whose KEY doesn’t match a pinned link_id is accepted as long as some from_any slot exists for VALUE’s (name, tag); multiple such bindings on the same from_any slot accumulate.
  4. A KEY that matches neither a pinned link_id nor a from_any slot for VALUE’s (name, tag) is a dead key and is rejected.
  5. A pinned binding whose target instance_id deploys a different node than the slot expects is a target mismatch and is rejected.
  6. KEY uniqueness within a single invocation is enforced (no two bindings on the same client may share the same KEY).
  7. Stack-wide instance_id uniqueness. Every instance_id must be unique across the entire stack, not just within a (node_name, node_tag) group. Bindings address producers by instance_id, so a duplicate would make the binding ambiguous.

A single action server can drive many goals at once. The accept loop only waits for the next goal; each accepted goal runs in its own spawned task with its own GoalContext. The framework routes every cancel and result request to the right goal by goal_id, and each goal has its own feedback stream, so goals never interfere with one another.

This makes the “one server, many resources” pattern natural: include a discriminator in the goal request (e.g. arm_id or device_id) and route to a per-resource worker. Your goal handler is responsible for the concurrency policy: accept goals to run them in parallel, or reject a goal (with GoalResponse::reject(reason)) when its target resource is already busy. A goal that is not accepted yields no GoalContext and cannot be cancelled or completed.