Skip to content

Topics

Topics implement a publish-subscribe communication pattern between nodes. A node exposes a topic to publish messages, and other nodes subscribe to that topic to receive them.

Use topics for continuous, unidirectional data streams such as sensor readings, camera frames, state updates, or any information that flows over time. Multiple subscribers can listen to the same topic simultaneously.

A node that publishes messages declares its topics under interfaces.exposes.topics in its peppy.json5. Each topic defines a name, a qos_profile, and a message_format:

{
schema_version: 1,
manifest: {
name: "uvc_camera",
tag: "0.1.0",
language: "rust",
},
process: {
add_cmd: ["cargo", "build", "--release"],
start_cmd: ["./target/release/uvc_camera"]
},
interfaces: {
exposes: {
topics: [
{
name: "video_stream",
qos_profile: "sensor_data",
message_format: {
header: {
$type: "object",
stamp: "time",
frame_id: "u32"
},
encoding: "string",
width: "u32",
height: "u32",
frame: {
$type: "array",
$items: "u8"
}
}
}
],
services: [],
actions: [],
},
subscribes_to: {
topics: [],
services: [],
actions: [],
},
}
}

The qos_profile controls delivery guarantees. Available profiles are:

  • sensor_data — optimised for high-frequency data where occasional drops are acceptable.
  • reliable — guarantees delivery at the cost of higher latency.

After running peppy node sync, the code generator creates a module for each exposed topic under peppygen::exposed_topics. Use emit to publish a message:

use peppygen::exposed_topics::video_stream;
use peppygen::{NodeBuilder, Parameters, Result};
use std::time::Duration;
fn main() -> Result<()> {
NodeBuilder::new().run(|_args: Parameters, node_runner| async move {
let node_runner_clone = node_runner.clone();
tokio::spawn(async move {
let mut frame_id = 0u32;
loop {
let _ = video_stream::emit(
&node_runner_clone,
video_stream::MessageHeader {
stamp: std::time::SystemTime::now(),
frame_id,
},
"rgb8".to_owned(),
640,
480,
vec![1, 2, 3],
)
.await;
frame_id = frame_id.wrapping_add(1);
tokio::time::sleep(Duration::from_secs_f64(0.1)).await;
}
});
Ok(())
})
}

The emit function takes the node_runner reference followed by each field from the message_format in order. Nested objects become generated structs (e.g. video_stream::MessageHeader) whose fields match the object definition.

For a topic with a simple message format:

{
name: "message_stream",
qos_profile: "sensor_data",
message_format: {
message: "string"
}
}

the emit call takes a single String argument:

use peppygen::exposed_topics::message_stream;
message_stream::emit(&node_runner, "hello world".to_owned()).await?;

A node that receives messages declares its subscriptions under interfaces.subscribes_to.topics:

{
schema_version: 1,
manifest: {
name: "web_video_stream",
tag: "0.1.0",
language: "rust",
},
process: {
add_cmd: ["cargo", "build", "--release"],
start_cmd: ["./target/release/web_video_stream"]
},
interfaces: {
subscribes_to: {
topics: [
{
id: "camera_frame", // Your chosen identifier for this subscription
node: "uvc_camera", // Target node name
name: "video_stream", // Topic name on that node
tag: "0.1.0", // Target node tag/version
},
],
services: [],
actions: [],
},
}
}

The code generator creates a module for each subscribed topic under peppygen::subscribed_topics. The module name is <node>_<topic_name> based on the subscription id field — in this case uvc_camera_video_stream. Use on_next_message_received to wait for the next message:

use peppygen::subscribed_topics::uvc_camera_video_stream;
use peppygen::{NodeBuilder, Parameters, Result};
fn main() -> Result<()> {
NodeBuilder::new().run(|_args: Parameters, node_runner| async move {
let (instance_id, frame) = uvc_camera_video_stream::on_next_message_received(
&node_runner,
None, // target_core_node (None = any)
None, // target_instance_id (None = any)
)
.await?;
println!(
"got {}x{} frame encoded as {} from {}",
frame.width, frame.height, frame.encoding, instance_id
);
Ok(())
})
}

The on_next_message_received function takes:

  • node_runner — the node runner reference.
  • target_core_node — optionally filter messages from a specific core node, or None for any.
  • target_instance_id — optionally filter messages from a specific instance, or None for any.

It returns a tuple of:

  • instance_id — the instance that published the message.
  • message — the deserialized message, with fields matching the topic’s message_format.

on_next_message_received processes a single message and returns. To receive messages continuously, call it in a loop. Each message is processed in a separate task so that the receive loop is not blocked:

use std::sync::Arc;
use peppygen::subscribed_topics::uvc_camera_video_stream;
use peppygen::{NodeBuilder, NodeRunner, Parameters, Result};
fn main() -> Result<()> {
NodeBuilder::new().run(|_args: Parameters, node_runner| async move {
tokio::spawn(receive_frames(node_runner));
Ok(())
})
}
async fn receive_frames(node_runner: Arc<NodeRunner>) {
loop {
let result = uvc_camera_video_stream::on_next_message_received(
&node_runner,
None,
None,
)
.await;
match result {
Ok((instance_id, frame)) => {
tokio::spawn(async move {
println!("got {}x{} frame from {}", frame.width, frame.height, instance_id);
});
}
Err(e) => {
eprintln!("Error receiving frame: {e}");
break;
}
}
}
}

When multiple instances of the same node are publishing, you can control which publisher you receive from:

  • None (default) — messages from all instances are received.
  • Some("instance_id") — only messages from a specific instance are received.
// Only receive from a specific camera instance
let (instance_id, frame) = uvc_camera_video_stream::on_next_message_received(
&node_runner,
None,
Some("my-camera-instance"),
)
.await?;

The message_format supports the following field types:

TypeRust type
"bool"bool
"u8"u8
"u16"u16
"u32"u32
"u64"u64
"i8"i8
"i16"i16
"i32"i32
"i64"i64
"f32"f32
"f64"f64
"string"String
"time"std::time::SystemTime

Fields can also use complex types:

  • Object — a nested struct:
    header: {
    $type: "object",
    stamp: "time",
    frame_id: "u32"
    }
  • Array — a variable-length list:
    frame: {
    $type: "array",
    $items: "u8"
    }
  • Fixed-length array — an array with a known size:
    position: {
    $type: "array",
    $items: "f32",
    $length: 3
    }
  • Optional — a field that may be absent:
    error_msg: {
    $type: "string",
    $optional: true
    }