Skip to content

Topics

Topics implement a publish-subscribe communication pattern between nodes. A node emits a topic to publish messages, and other nodes consume that topic to receive them.

Use topics for continuous, unidirectional data streams such as sensor readings, camera frames, state updates, or any information that flows over time. Multiple consumers can listen to the same topic simultaneously.

A node that publishes messages declares its topics under interfaces.topics.emits in its peppy.json5. Each topic defines a name, a qos_profile, and a message_format:

{
peppy_schema: "node_v1",
manifest: {
name: "uvc_camera",
tag: "v1",
},
interfaces: {
topics: {
emits: [
{
name: "video_stream",
qos_profile: "sensor_data",
message_format: {
header: {
$type: "object",
stamp: "time",
frame_id: "u32"
},
encoding: "string",
width: "u32",
height: "u32",
frame: {
$type: "array",
$items: "u8"
}
}
}
],
},
},
execution: {
language: "rust",
build_cmd: ["cargo", "build", "--release"],
run_cmd: ["./target/release/uvc_camera"]
},
}

The qos_profile controls delivery guarantees. Available profiles are:

  • sensor_data: optimised for high-frequency data where occasional drops are acceptable.
  • standard: balanced defaults suitable for most use cases. This is the default when qos_profile is omitted.
  • reliable: guarantees delivery at the cost of higher latency.
  • critical: strongest delivery guarantees for safety-critical data.

After running peppy node sync, the code generator creates a module for each emitted topic under peppygen::emitted_topics. Use emit to publish a message:

use peppygen::emitted_topics::video_stream;
use peppygen::{NodeBuilder, Parameters, Result};
use std::time::Duration;
fn main() -> Result<()> {
NodeBuilder::new().run(|_args: Parameters, node_runner| async move {
let node_runner_clone = node_runner.clone();
tokio::spawn(async move {
let mut frame_id = 0u32;
loop {
let _ = video_stream::emit(
&node_runner_clone,
video_stream::MessageHeader {
stamp: std::time::SystemTime::now(),
frame_id,
},
"rgb8".to_owned(),
640,
480,
vec![1, 2, 3],
)
.await;
frame_id = frame_id.wrapping_add(1);
tokio::time::sleep(Duration::from_secs_f64(0.1)).await;
}
});
Ok(())
})
}

The emit function takes the node_runner reference followed by each field from the message_format in order. Nested objects become generated structs (e.g. video_stream::MessageHeader) whose fields match the object definition.

A producer publishes a single stream regardless of how many consumers are listening, and it never knows about the bindings consumers use to address it. Starting the producer before any consumer (or after them) is equally valid; consumers attach when they show up.

For a topic with a simple message format:

{
name: "message_stream",
qos_profile: "sensor_data",
message_format: {
message: "string"
}
}

the emit call takes a single String argument:

use peppygen::emitted_topics::message_stream;
message_stream::emit(&node_runner, "hello world".to_owned()).await?;

A node that receives messages declares what it consumes under interfaces.topics.consumes. Dependencies are declared in manifest.depends_on and referenced by link_id in the interface:

{
peppy_schema: "node_v1",
manifest: {
name: "web_video_stream",
tag: "v1",
depends_on: {
nodes: [
{ name: "uvc_camera", tag: "v1", link_id: "uvc_camera" },
]
},
},
interfaces: {
topics: {
consumes: [
{
link_id: "uvc_camera", // References depends_on.nodes[].link_id
name: "video_stream", // Topic name on that node
},
],
},
},
execution: {
language: "rust",
build_cmd: ["cargo", "build", "--release"],
run_cmd: ["./target/release/web_video_stream"]
},
}

The code generator creates a module for each consumed topic under peppygen::consumed_topics. The module name is <link_id>_<topic_name> based on the link_id field; in this case uvc_camera_video_stream. Use on_next_message_received to wait for the next message:

use peppygen::consumed_topics::uvc_camera_video_stream;
use peppygen::{NodeBuilder, Parameters, Result};
fn main() -> Result<()> {
NodeBuilder::new().run(|_args: Parameters, node_runner| async move {
let (instance_id, frame) = uvc_camera_video_stream::on_next_message_received(
&node_runner,
None, // from_core_node (None = any)
)
.await?;
println!(
"got {}x{} frame encoded as {} from {}",
frame.width, frame.height, frame.encoding, instance_id
);
Ok(())
})
}

The on_next_message_received function takes:

  • node_runner: the node runner reference.
  • from_core_node: optionally filter messages from a specific core node, or None for any.

It returns a tuple of:

  • instance_id: the producer instance that published the message, read straight from the message context.
  • message: the deserialized message, with fields matching the topic’s message_format.

There is no call-site instance-targeting argument. To pin a from_any slot to one specific producer, use a launcher binding (see Bindings and routing below).

on_next_message_received processes a single message and returns. To receive messages continuously, call it in a loop. Each message is processed in a separate task so that the receive loop is not blocked:

use std::sync::Arc;
use peppygen::consumed_topics::uvc_camera_video_stream;
use peppygen::{NodeBuilder, NodeRunner, Parameters, Result};
fn main() -> Result<()> {
NodeBuilder::new().run(|_args: Parameters, node_runner| async move {
tokio::spawn(receive_frames(node_runner));
Ok(())
})
}
async fn receive_frames(node_runner: Arc<NodeRunner>) {
loop {
let result = uvc_camera_video_stream::on_next_message_received(
&node_runner,
None,
)
.await;
match result {
Ok((instance_id, frame)) => {
tokio::spawn(async move {
println!("got {}x{} frame from {}", frame.width, frame.height, instance_id);
});
}
Err(e) => {
eprintln!("Error receiving frame: {e}");
break;
}
}
}
}

Routing is entirely consumer-side. Producers publish unconditionally and are binding-agnostic; only the consumer’s depends_on slots and the launcher bindings that fill them decide which producer instance reaches which slot.

A binding takes the form KEY: VALUE, where KEY is a label local to the consumer (typically a link_id from depends_on) and VALUE is the target producer instance_id. Each binding creates a private one-way channel from that producer instance to one of the consumer’s declared slots.

In a launcher / stack config:

{
source: { local: "./consumer" },
instances: [{
instance_id: "my_consumer",
bindings: { uvc_camera: "my-camera-instance" },
}],
}

or, when launching a single node during development:

Terminal window
peppy node run --bind uvc_camera@my-camera-instance .

For every message a producer publishes, each consumer instance applies this rule on its own slots:

  1. Pinned-bound wins. If one or more pinned slots’ bindings name the producer, the message is delivered to every such pinned slot.
  2. Else from_any for the producer’s (name, tag). A from_any: true slot for that producer kind receives the message if it is bound to this producer, or (if there is no from_any binding for that producer) as the wildcard fallback for unclaimed producers.
  3. Else drop. No slot claims the message; the consumer ignores it.

Two consequences worth remembering:

  • A pinned-bound slot preempts the from_any slot for the same (name, tag) pair: as long as the producer is named by a pinned binding, only the pinned slot fires.
  • An explicit from_any binding replaces the wildcard fallback for that producer: once a from_any slot lists specific producer instance IDs, unlisted producers no longer flow into it.

A consumer that wires two specific depth cameras to dedicated wrist slots and lets a third slot hoover up any other depth camera:

openarm01_backbone/peppy.json5
{
manifest: {
name: "openarm01_backbone",
tag: "v1",
depends_on: {
interfaces: [
{ name: "depth_camera", tag: "v1", link_id: "wrist_left_camera" },
{ name: "depth_camera", tag: "v1", link_id: "wrist_right_camera" },
{ name: "depth_camera", tag: "v1", link_id: "extra_cam", from_any: true },
],
},
},
// ...
}

And the launcher that binds it:

peppy_launcher.json5
{
deployments: [
{ source: { name: "depth_camera:v1" }, instances: [
{ instance_id: "left_cam" },
{ instance_id: "right_cam" },
{ instance_id: "ceiling_cam" },
]},
{ source: { name: "openarm01_backbone:v1" }, instances: [
{ instance_id: "backbone_inst_1", bindings: {
wrist_left_camera: "left_cam",
wrist_right_camera: "right_cam",
}},
]},
],
}

Four contract statements follow from this manifest:

  1. A frame from left_cam arrives only on the wrist_left_camera_video_stream slot.
  2. A frame from right_cam arrives only on the wrist_right_camera_video_stream slot.
  3. A frame from ceiling_cam (no binding names it) flows into the extra_cam_video_stream slot via the wildcard fallback.
  4. If extra_cam had been bound explicitly (e.g. extra_cam: "ceiling_cam"), it would still receive ceiling_cam frames but would stop receiving frames from any depth camera not named by a binding.

The launcher validator runs these checks before the stack starts:

  1. Pinned-unbound is a hard error. Every pinned (from_any absent or false) slot in depends_on must have a --bind KEY@VALUE whose KEY equals the slot’s link_id.
  2. A KEY that matches a pinned slot’s link_id binds that slot to VALUE.
  3. Free-form KEYs are allowed for from_any slots. A --bind KEY@VALUE whose KEY doesn’t match a pinned link_id is accepted as long as some from_any slot accepts VALUE as a satisfying producer. For node slots, the producer’s (node_name, node_tag) must equal the slot’s declared pair. For interface slots, the producer’s interfaces.conforms_to must include the slot’s (name, tag); see Interface conformance for the conformance contract. Multiple such bindings on the same from_any slot accumulate.
  4. A KEY that matches neither a pinned link_id nor a from_any slot satisfied by VALUE is a dead key and is rejected.
  5. A pinned binding whose target instance_id deploys a different node than the slot expects (for node deps) or a node that does not conforms_to the requested interface (for interface deps) is rejected. Node-dep mismatches surface as BindingTargetMismatch; interface-dep mismatches surface as BindingInterfaceNotConformed.
  6. KEY uniqueness within a single invocation is enforced (no two bindings may share the same KEY).
  7. Stack-wide instance_id uniqueness. Every instance_id must be unique across the entire stack, not just within a (node_name, node_tag) group. Bindings address producers by instance_id, so reusing one across two different node kinds would make the binding ambiguous.

The message_format supports the following field types:

TypeRust type
"bool"bool
"u8"u8
"u16"u16
"u32"u32
"u64"u64
"i8"i8
"i16"i16
"i32"i32
"i64"i64
"f32"f32
"f64"f64
"string"String
"bytes"Vec<u8>
"time"std::time::SystemTime

Fields can also use complex types:

  • Object: a nested struct:
    header: {
    $type: "object",
    stamp: "time",
    frame_id: "u32"
    }
  • Array: a variable-length list:
    frame: {
    $type: "array",
    $items: "u8"
    }
  • Fixed-length array: an array with a known size:
    position: {
    $type: "array",
    $items: "f32",
    $length: 3
    }
  • Optional: a field that may be absent:
    error_msg: {
    $type: "string",
    $optional: true
    }