Topics
Topics implement a publish-subscribe communication pattern between nodes. A node emits a topic to publish messages, and other nodes consume that topic to receive them.
Use topics for continuous, unidirectional data streams such as sensor readings, camera frames, state updates, or any information that flows over time. Multiple consumers can listen to the same topic simultaneously.
Emitting a topic
Section titled “Emitting a topic”A node that publishes messages declares its topics under interfaces.topics.emits in its peppy.json5.
Each topic defines a name, a qos_profile, and a message_format:
{ peppy_schema: "node_v1", manifest: { name: "uvc_camera", tag: "v1", }, interfaces: { topics: { emits: [ { name: "video_stream", qos_profile: "sensor_data", message_format: { header: { $type: "object", stamp: "time", frame_id: "u32" }, encoding: "string", width: "u32", height: "u32", frame: { $type: "array", $items: "u8" } } } ], }, }, execution: { language: "rust", build_cmd: ["cargo", "build", "--release"], run_cmd: ["./target/release/uvc_camera"] },}The qos_profile controls delivery guarantees. Available profiles are:
sensor_data: optimised for high-frequency data where occasional drops are acceptable.standard: balanced defaults suitable for most use cases. This is the default whenqos_profileis omitted.reliable: guarantees delivery at the cost of higher latency.critical: strongest delivery guarantees for safety-critical data.
Publishing messages
Section titled “Publishing messages”After running peppy node sync, the code generator creates a module for each emitted topic under peppygen::emitted_topics.
Use emit to publish a message:
use peppygen::emitted_topics::video_stream;use peppygen::{NodeBuilder, Parameters, Result};use std::time::Duration;
fn main() -> Result<()> { NodeBuilder::new().run(|_args: Parameters, node_runner| async move { let node_runner_clone = node_runner.clone(); tokio::spawn(async move { let mut frame_id = 0u32; loop { let _ = video_stream::emit( &node_runner_clone, video_stream::MessageHeader { stamp: std::time::SystemTime::now(), frame_id, }, "rgb8".to_owned(), 640, 480, vec![1, 2, 3], ) .await;
frame_id = frame_id.wrapping_add(1); tokio::time::sleep(Duration::from_secs_f64(0.1)).await; } });
Ok(()) })}The emit function takes the node_runner reference followed by each field from the message_format in order.
Nested objects become generated structs (e.g. video_stream::MessageHeader) whose fields match the object definition.
A producer publishes a single stream regardless of how many consumers are listening, and it never knows about the bindings consumers use to address it. Starting the producer before any consumer (or after them) is equally valid; consumers attach when they show up.
For a topic with a simple message format:
{ name: "message_stream", qos_profile: "sensor_data", message_format: { message: "string" }}the emit call takes a single String argument:
use peppygen::emitted_topics::message_stream;
message_stream::emit(&node_runner, "hello world".to_owned()).await?;Consuming a topic
Section titled “Consuming a topic”A node that receives messages declares what it consumes under interfaces.topics.consumes.
Dependencies are declared in manifest.depends_on and referenced by link_id in the interface:
{ peppy_schema: "node_v1", manifest: { name: "web_video_stream", tag: "v1", depends_on: { nodes: [ { name: "uvc_camera", tag: "v1", link_id: "uvc_camera" }, ] }, }, interfaces: { topics: { consumes: [ { link_id: "uvc_camera", // References depends_on.nodes[].link_id name: "video_stream", // Topic name on that node }, ], }, }, execution: { language: "rust", build_cmd: ["cargo", "build", "--release"], run_cmd: ["./target/release/web_video_stream"] },}Receiving messages
Section titled “Receiving messages”The code generator creates a module for each consumed topic under peppygen::consumed_topics.
The module name is <link_id>_<topic_name> based on the link_id field; in this case uvc_camera_video_stream.
Use on_next_message_received to wait for the next message:
use peppygen::consumed_topics::uvc_camera_video_stream;use peppygen::{NodeBuilder, Parameters, Result};
fn main() -> Result<()> { NodeBuilder::new().run(|_args: Parameters, node_runner| async move { let (instance_id, frame) = uvc_camera_video_stream::on_next_message_received( &node_runner, None, // from_core_node (None = any) ) .await?;
println!( "got {}x{} frame encoded as {} from {}", frame.width, frame.height, frame.encoding, instance_id );
Ok(()) })}The on_next_message_received function takes:
node_runner: the node runner reference.from_core_node: optionally filter messages from a specific core node, orNonefor any.
It returns a tuple of:
instance_id: the producer instance that published the message, read straight from the message context.message: the deserialized message, with fields matching the topic’smessage_format.
There is no call-site instance-targeting argument. To pin a from_any slot to one specific producer, use a launcher binding (see Bindings and routing below).
Receiving messages continuously
Section titled “Receiving messages continuously”on_next_message_received processes a single message and returns.
To receive messages continuously, call it in a loop.
Each message is processed in a separate task so that the receive loop is not blocked:
use std::sync::Arc;use peppygen::consumed_topics::uvc_camera_video_stream;use peppygen::{NodeBuilder, NodeRunner, Parameters, Result};
fn main() -> Result<()> { NodeBuilder::new().run(|_args: Parameters, node_runner| async move { tokio::spawn(receive_frames(node_runner)); Ok(()) })}
async fn receive_frames(node_runner: Arc<NodeRunner>) { loop { let result = uvc_camera_video_stream::on_next_message_received( &node_runner, None, ) .await;
match result { Ok((instance_id, frame)) => { tokio::spawn(async move { println!("got {}x{} frame from {}", frame.width, frame.height, instance_id); }); } Err(e) => { eprintln!("Error receiving frame: {e}"); break; } } }}Bindings and routing
Section titled “Bindings and routing”Routing is entirely consumer-side. Producers publish unconditionally and are binding-agnostic; only the consumer’s depends_on slots and the launcher bindings that fill them decide which producer instance reaches which slot.
A binding takes the form KEY: VALUE, where KEY is a label local to the consumer (typically a link_id from depends_on) and VALUE is the target producer instance_id. Each binding creates a private one-way channel from that producer instance to one of the consumer’s declared slots.
In a launcher / stack config:
{ source: { local: "./consumer" }, instances: [{ instance_id: "my_consumer", bindings: { uvc_camera: "my-camera-instance" }, }],}or, when launching a single node during development:
peppy node run --bind uvc_camera@my-camera-instance .Per-message routing
Section titled “Per-message routing”For every message a producer publishes, each consumer instance applies this rule on its own slots:
- Pinned-bound wins. If one or more pinned slots’ bindings name the producer, the message is delivered to every such pinned slot.
- Else
from_anyfor the producer’s(name, tag). Afrom_any: trueslot for that producer kind receives the message if it is bound to this producer, or (if there is nofrom_anybinding for that producer) as the wildcard fallback for unclaimed producers. - Else drop. No slot claims the message; the consumer ignores it.
Two consequences worth remembering:
- A pinned-bound slot preempts the
from_anyslot for the same(name, tag)pair: as long as the producer is named by a pinned binding, only the pinned slot fires. - An explicit
from_anybinding replaces the wildcard fallback for that producer: once afrom_anyslot lists specific producer instance IDs, unlisted producers no longer flow into it.
Worked example: openarm01_backbone
Section titled “Worked example: openarm01_backbone”A consumer that wires two specific depth cameras to dedicated wrist slots and lets a third slot hoover up any other depth camera:
{ manifest: { name: "openarm01_backbone", tag: "v1", depends_on: { interfaces: [ { name: "depth_camera", tag: "v1", link_id: "wrist_left_camera" }, { name: "depth_camera", tag: "v1", link_id: "wrist_right_camera" }, { name: "depth_camera", tag: "v1", link_id: "extra_cam", from_any: true }, ], }, }, // ...}And the launcher that binds it:
{ deployments: [ { source: { name: "depth_camera:v1" }, instances: [ { instance_id: "left_cam" }, { instance_id: "right_cam" }, { instance_id: "ceiling_cam" }, ]}, { source: { name: "openarm01_backbone:v1" }, instances: [ { instance_id: "backbone_inst_1", bindings: { wrist_left_camera: "left_cam", wrist_right_camera: "right_cam", }}, ]}, ],}Four contract statements follow from this manifest:
- A frame from
left_camarrives only on thewrist_left_camera_video_streamslot. - A frame from
right_camarrives only on thewrist_right_camera_video_streamslot. - A frame from
ceiling_cam(no binding names it) flows into theextra_cam_video_streamslot via the wildcard fallback. - If
extra_camhad been bound explicitly (e.g.extra_cam: "ceiling_cam"), it would still receiveceiling_camframes but would stop receiving frames from any depth camera not named by a binding.
Validator rules
Section titled “Validator rules”The launcher validator runs these checks before the stack starts:
- Pinned-unbound is a hard error. Every pinned (
from_anyabsent orfalse) slot independs_onmust have a--bind KEY@VALUEwhoseKEYequals the slot’slink_id. - A
KEYthat matches a pinned slot’slink_idbinds that slot toVALUE. - Free-form
KEYs are allowed forfrom_anyslots. A--bind KEY@VALUEwhoseKEYdoesn’t match a pinnedlink_idis accepted as long as somefrom_anyslot acceptsVALUEas a satisfying producer. For node slots, the producer’s(node_name, node_tag)must equal the slot’s declared pair. For interface slots, the producer’sinterfaces.conforms_tomust include the slot’s(name, tag); see Interface conformance for the conformance contract. Multiple such bindings on the samefrom_anyslot accumulate. - A
KEYthat matches neither a pinnedlink_idnor afrom_anyslot satisfied byVALUEis a dead key and is rejected. - A pinned binding whose target
instance_iddeploys a different node than the slot expects (for node deps) or a node that does notconforms_tothe requested interface (for interface deps) is rejected. Node-dep mismatches surface asBindingTargetMismatch; interface-dep mismatches surface asBindingInterfaceNotConformed. KEYuniqueness within a single invocation is enforced (no two bindings may share the sameKEY).- Stack-wide
instance_iduniqueness. Everyinstance_idmust be unique across the entire stack, not just within a(node_name, node_tag)group. Bindings address producers byinstance_id, so reusing one across two different node kinds would make the binding ambiguous.
Message format types
Section titled “Message format types”The message_format supports the following field types:
| Type | Rust type |
|---|---|
"bool" | bool |
"u8" | u8 |
"u16" | u16 |
"u32" | u32 |
"u64" | u64 |
"i8" | i8 |
"i16" | i16 |
"i32" | i32 |
"i64" | i64 |
"f32" | f32 |
"f64" | f64 |
"string" | String |
"bytes" | Vec<u8> |
"time" | std::time::SystemTime |
Fields can also use complex types:
- Object: a nested struct:
header: {$type: "object",stamp: "time",frame_id: "u32"}
- Array: a variable-length list:
frame: {$type: "array",$items: "u8"}
- Fixed-length array: an array with a known size:
position: {$type: "array",$items: "f32",$length: 3}
- Optional: a field that may be absent:
error_msg: {$type: "string",$optional: true}