Skip to content

Topics

Topics implement a publish-subscribe communication pattern between nodes. A node emits a topic to publish messages, and other nodes expect that topic to receive them.

Use topics for continuous, unidirectional data streams such as sensor readings, camera frames, state updates, or any information that flows over time. Multiple consumers can listen to the same topic simultaneously.

A node that publishes messages declares its topics under interfaces.topics.emits in its peppy.json5. Each topic defines a name, a qos_profile, and a message_format:

{
schema_version: 1,
manifest: {
name: "uvc_camera",
tag: "0.1.0",
},
interfaces: {
topics: {
emits: [
{
name: "video_stream",
qos_profile: "sensor_data",
message_format: {
header: {
$type: "object",
stamp: "time",
frame_id: "u32"
},
encoding: "string",
width: "u32",
height: "u32",
frame: {
$type: "array",
$items: "u8"
}
}
}
],
},
},
execution: {
language: "rust",
build_cmd: ["cargo", "build", "--release"],
run_cmd: ["./target/release/uvc_camera"]
},
}

The qos_profile controls delivery guarantees. Available profiles are:

  • sensor_data — optimised for high-frequency data where occasional drops are acceptable.
  • standard — balanced defaults suitable for most use cases. This is the default when qos_profile is omitted.
  • reliable — guarantees delivery at the cost of higher latency.
  • critical — strongest delivery guarantees for safety-critical data.

After running peppy node sync, the code generator creates a module for each emitted topic under peppygen::emitted_topics. Use emit to publish a message:

use peppygen::emitted_topics::video_stream;
use peppygen::{NodeBuilder, Parameters, Result};
use std::time::Duration;
fn main() -> Result<()> {
NodeBuilder::new().run(|_args: Parameters, node_runner| async move {
let node_runner_clone = node_runner.clone();
tokio::spawn(async move {
let mut frame_id = 0u32;
loop {
let _ = video_stream::emit(
&node_runner_clone,
video_stream::MessageHeader {
stamp: std::time::SystemTime::now(),
frame_id,
},
"rgb8".to_owned(),
640,
480,
vec![1, 2, 3],
)
.await;
frame_id = frame_id.wrapping_add(1);
tokio::time::sleep(Duration::from_secs_f64(0.1)).await;
}
});
Ok(())
})
}

The emit function takes the node_runner reference followed by each field from the message_format in order. Nested objects become generated structs (e.g. video_stream::MessageHeader) whose fields match the object definition.

For a topic with a simple message format:

{
name: "message_stream",
qos_profile: "sensor_data",
message_format: {
message: "string"
}
}

the emit call takes a single String argument:

use peppygen::emitted_topics::message_stream;
message_stream::emit(&node_runner, "hello world".to_owned()).await?;

A node that receives messages declares what it consumes under interfaces.topics.consumes. Dependencies are declared in manifest.depends_on and referenced by local_node_id in the interface:

{
schema_version: 1,
manifest: {
name: "web_video_stream",
tag: "0.1.0",
depends_on: {
nodes: [
{ name: "uvc_camera", tag: "0.1.0", local_id: "uvc_camera" },
]
},
},
interfaces: {
topics: {
consumes: [
{
local_node_id: "uvc_camera", // References depends_on.nodes[].local_id
name: "video_stream", // Topic name on that node
},
],
},
},
execution: {
language: "rust",
build_cmd: ["cargo", "build", "--release"],
run_cmd: ["./target/release/web_video_stream"]
},
}

The code generator creates a module for each consumed topic under peppygen::consumed_topics. The module name is <local_node_id>_<topic_name> based on the local_node_id field — in this case uvc_camera_video_stream. Use on_next_message_received to wait for the next message:

use peppygen::consumed_topics::uvc_camera_video_stream;
use peppygen::{NodeBuilder, Parameters, Result};
fn main() -> Result<()> {
NodeBuilder::new().run(|_args: Parameters, node_runner| async move {
let (instance_id, frame) = uvc_camera_video_stream::on_next_message_received(
&node_runner,
None, // target_core_node (None = any)
None, // target_instance_id (None = any)
)
.await?;
println!(
"got {}x{} frame encoded as {} from {}",
frame.width, frame.height, frame.encoding, instance_id
);
Ok(())
})
}

The on_next_message_received function takes:

  • node_runner — the node runner reference.
  • target_core_node — optionally filter messages from a specific core node, or None for any.
  • target_instance_id — optionally filter messages from a specific instance, or None for any.

It returns a tuple of:

  • instance_id — the instance that published the message.
  • message — the deserialized message, with fields matching the topic’s message_format.

on_next_message_received processes a single message and returns. To receive messages continuously, call it in a loop. Each message is processed in a separate task so that the receive loop is not blocked:

use std::sync::Arc;
use peppygen::consumed_topics::uvc_camera_video_stream;
use peppygen::{NodeBuilder, NodeRunner, Parameters, Result};
fn main() -> Result<()> {
NodeBuilder::new().run(|_args: Parameters, node_runner| async move {
tokio::spawn(receive_frames(node_runner));
Ok(())
})
}
async fn receive_frames(node_runner: Arc<NodeRunner>) {
loop {
let result = uvc_camera_video_stream::on_next_message_received(
&node_runner,
None,
None,
)
.await;
match result {
Ok((instance_id, frame)) => {
tokio::spawn(async move {
println!("got {}x{} frame from {}", frame.width, frame.height, instance_id);
});
}
Err(e) => {
eprintln!("Error receiving frame: {e}");
break;
}
}
}
}

When multiple instances of the same node are publishing, you can control which publisher you receive from:

  • None (default) — messages from all instances are received.
  • Some("instance_id") — only messages from a specific instance are received.
// Only receive from a specific camera instance
let (instance_id, frame) = uvc_camera_video_stream::on_next_message_received(
&node_runner,
None,
Some("my-camera-instance"),
)
.await?;

The message_format supports the following field types:

TypeRust type
"bool"bool
"u8"u8
"u16"u16
"u32"u32
"u64"u64
"i8"i8
"i16"i16
"i32"i32
"i64"i64
"f32"f32
"f64"f64
"string"String
"bytes"Vec<u8>
"time"std::time::SystemTime

Fields can also use complex types:

  • Object — a nested struct:
    header: {
    $type: "object",
    stamp: "time",
    frame_id: "u32"
    }
  • Array — a variable-length list:
    frame: {
    $type: "array",
    $items: "u8"
    }
  • Fixed-length array — an array with a known size:
    position: {
    $type: "array",
    $items: "f32",
    $length: 3
    }
  • Optional — a field that may be absent:
    error_msg: {
    $type: "string",
    $optional: true
    }