Topics
Topics implement a publish-subscribe communication pattern between nodes. A node exposes a topic to publish messages, and other nodes subscribe to that topic to receive them.
Use topics for continuous, unidirectional data streams such as sensor readings, camera frames, state updates, or any information that flows over time. Multiple subscribers can listen to the same topic simultaneously.
Exposing a topic
Section titled “Exposing a topic”A node that publishes messages declares its topics under interfaces.exposes.topics in its peppy.json5.
Each topic defines a name, a qos_profile, and a message_format:
{ schema_version: 1, manifest: { name: "uvc_camera", tag: "0.1.0", language: "rust", }, process: { add_cmd: ["cargo", "build", "--release"], start_cmd: ["./target/release/uvc_camera"] }, interfaces: { exposes: { topics: [ { name: "video_stream", qos_profile: "sensor_data", message_format: { header: { $type: "object", stamp: "time", frame_id: "u32" }, encoding: "string", width: "u32", height: "u32", frame: { $type: "array", $items: "u8" } } } ], services: [], actions: [], }, subscribes_to: { topics: [], services: [], actions: [], }, }}The qos_profile controls delivery guarantees. Available profiles are:
sensor_data— optimised for high-frequency data where occasional drops are acceptable.reliable— guarantees delivery at the cost of higher latency.
Publishing messages
Section titled “Publishing messages”After running peppy node sync, the code generator creates a module for each exposed topic under peppygen::exposed_topics.
Use emit to publish a message:
use peppygen::exposed_topics::video_stream;use peppygen::{NodeBuilder, Parameters, Result};use std::time::Duration;
fn main() -> Result<()> { NodeBuilder::new().run(|_args: Parameters, node_runner| async move { let node_runner_clone = node_runner.clone(); tokio::spawn(async move { let mut frame_id = 0u32; loop { let _ = video_stream::emit( &node_runner_clone, video_stream::MessageHeader { stamp: std::time::SystemTime::now(), frame_id, }, "rgb8".to_owned(), 640, 480, vec![1, 2, 3], ) .await;
frame_id = frame_id.wrapping_add(1); tokio::time::sleep(Duration::from_secs_f64(0.1)).await; } });
Ok(()) })}The emit function takes the node_runner reference followed by each field from the message_format in order.
Nested objects become generated structs (e.g. video_stream::MessageHeader) whose fields match the object definition.
For a topic with a simple message format:
{ name: "message_stream", qos_profile: "sensor_data", message_format: { message: "string" }}the emit call takes a single String argument:
use peppygen::exposed_topics::message_stream;
message_stream::emit(&node_runner, "hello world".to_owned()).await?;Subscribing to a topic
Section titled “Subscribing to a topic”A node that receives messages declares its subscriptions under interfaces.subscribes_to.topics:
{ schema_version: 1, manifest: { name: "web_video_stream", tag: "0.1.0", language: "rust", }, process: { add_cmd: ["cargo", "build", "--release"], start_cmd: ["./target/release/web_video_stream"] }, interfaces: { subscribes_to: { topics: [ { id: "camera_frame", // Your chosen identifier for this subscription node: "uvc_camera", // Target node name name: "video_stream", // Topic name on that node tag: "0.1.0", // Target node tag/version }, ], services: [], actions: [], }, }}Receiving messages
Section titled “Receiving messages”The code generator creates a module for each subscribed topic under peppygen::subscribed_topics.
The module name is <node>_<topic_name> based on the subscription id field — in this case uvc_camera_video_stream.
Use on_next_message_received to wait for the next message:
use peppygen::subscribed_topics::uvc_camera_video_stream;use peppygen::{NodeBuilder, Parameters, Result};
fn main() -> Result<()> { NodeBuilder::new().run(|_args: Parameters, node_runner| async move { let (instance_id, frame) = uvc_camera_video_stream::on_next_message_received( &node_runner, None, // target_core_node (None = any) None, // target_instance_id (None = any) ) .await?;
println!( "got {}x{} frame encoded as {} from {}", frame.width, frame.height, frame.encoding, instance_id );
Ok(()) })}The on_next_message_received function takes:
node_runner— the node runner reference.target_core_node— optionally filter messages from a specific core node, orNonefor any.target_instance_id— optionally filter messages from a specific instance, orNonefor any.
It returns a tuple of:
instance_id— the instance that published the message.message— the deserialized message, with fields matching the topic’smessage_format.
Receiving messages continuously
Section titled “Receiving messages continuously”on_next_message_received processes a single message and returns.
To receive messages continuously, call it in a loop.
Each message is processed in a separate task so that the receive loop is not blocked:
use std::sync::Arc;use peppygen::subscribed_topics::uvc_camera_video_stream;use peppygen::{NodeBuilder, NodeRunner, Parameters, Result};
fn main() -> Result<()> { NodeBuilder::new().run(|_args: Parameters, node_runner| async move { tokio::spawn(receive_frames(node_runner)); Ok(()) })}
async fn receive_frames(node_runner: Arc<NodeRunner>) { loop { let result = uvc_camera_video_stream::on_next_message_received( &node_runner, None, None, ) .await;
match result { Ok((instance_id, frame)) => { tokio::spawn(async move { println!("got {}x{} frame from {}", frame.width, frame.height, instance_id); }); } Err(e) => { eprintln!("Error receiving frame: {e}"); break; } } }}Instance targeting
Section titled “Instance targeting”When multiple instances of the same node are publishing, you can control which publisher you receive from:
None(default) — messages from all instances are received.Some("instance_id")— only messages from a specific instance are received.
// Only receive from a specific camera instancelet (instance_id, frame) = uvc_camera_video_stream::on_next_message_received( &node_runner, None, Some("my-camera-instance"),).await?;Message format types
Section titled “Message format types”The message_format supports the following field types:
| Type | Rust type |
|---|---|
"bool" | bool |
"u8" | u8 |
"u16" | u16 |
"u32" | u32 |
"u64" | u64 |
"i8" | i8 |
"i16" | i16 |
"i32" | i32 |
"i64" | i64 |
"f32" | f32 |
"f64" | f64 |
"string" | String |
"time" | std::time::SystemTime |
Fields can also use complex types:
- Object — a nested struct:
header: {$type: "object",stamp: "time",frame_id: "u32"}
- Array — a variable-length list:
frame: {$type: "array",$items: "u8"}
- Fixed-length array — an array with a known size:
position: {$type: "array",$items: "f32",$length: 3}
- Optional — a field that may be absent:
error_msg: {$type: "string",$optional: true}