Bidirectional communication
In a standard PeppyOS setup, a node subscribes to topics from its declared dependencies using local_node_id.
This creates a directed graph: if arm_controller depends on robot_arm, it can expect topics from robot_arm — but not the other way around, since that would create a circular dependency.
External consumed topics solve this by letting a node subscribe to a topic without declaring a dependency on the publisher.
The subscriber defines the message_format inline and receives messages from any node that emits a matching topic.
Example: robot arm control loop
Section titled “Example: robot arm control loop”Consider a robot arm that needs bidirectional communication between two nodes:
arm_controller— plans trajectories and sends joint commands.robot_arm— drives the physical joints and reports their state.
arm_controller robot_arm │ │ │─── emits: joint_commands ───────────────────>│ consumes: joint_commands (external) │ │ │ consumes: joint_states (linked) <───────────│ emits: joint_states │ │arm_controller depends on robot_arm to consume its joint_states topic — that’s a standard linked consumed topic.
robot_arm needs to receive joint_commands, but it cannot depend on arm_controller (circular dependency).
Instead, it declares joint_commands as an external consumed topic.
Configuring external consumed topics
Section titled “Configuring external consumed topics”An external consumed topic has a name and an inline message_format, but no local_node_id:
{ schema_version: 1, manifest: { name: "robot_arm", tag: "0.1.0", language: "python", // No depends_on — robot_arm does not depend on arm_controller }, process: { add_cmd: ["uv", "sync"], start_cmd: ["uv", "run", "robot_arm"] }, interfaces: { topics: { emits: [ { name: "joint_states", qos_profile: "sensor_data", message_format: { positions: { $type: "array", $items: "f64", $length: 3 }, velocities: { $type: "array", $items: "f64", $length: 3 }, timestamp: "time" } } ], consumes: [ { // No local_node_id — this is an external consumed topic name: "joint_commands", message_format: { target_positions: { $type: "array", $items: "f64", $length: 3 }, max_velocity: "f64" } } ], }, }}{ schema_version: 1, manifest: { name: "robot_arm", tag: "0.1.0", language: "rust", // No depends_on — robot_arm does not depend on arm_controller }, process: { add_cmd: ["cargo", "build", "--release"], start_cmd: ["./target/release/robot_arm"] }, interfaces: { topics: { emits: [ { name: "joint_states", qos_profile: "sensor_data", message_format: { positions: { $type: "array", $items: "f64", $length: 3 }, velocities: { $type: "array", $items: "f64", $length: 3 }, timestamp: "time" } } ], consumes: [ { // No local_node_id — this is an external consumed topic name: "joint_commands", message_format: { target_positions: { $type: "array", $items: "f64", $length: 3 }, max_velocity: "f64" } } ], }, }}Compare this with the arm_controller node, which uses a standard linked consumed topic:
{ schema_version: 1, manifest: { name: "arm_controller", tag: "0.1.0", language: "python", depends_on: { nodes: [ { name: "robot_arm", tag: "0.1.0", local_id: "robot_arm" }, ] }, }, process: { add_cmd: ["uv", "sync"], start_cmd: ["uv", "run", "arm_controller"] }, interfaces: { topics: { emits: [ { name: "joint_commands", qos_profile: "reliable", message_format: { target_positions: { $type: "array", $items: "f64", $length: 3 }, max_velocity: "f64" } } ], consumes: [ { local_node_id: "robot_arm", // Linked — references depends_on name: "joint_states", }, ], }, }}{ schema_version: 1, manifest: { name: "arm_controller", tag: "0.1.0", language: "rust", depends_on: { nodes: [ { name: "robot_arm", tag: "0.1.0", local_id: "robot_arm" }, ] }, }, process: { add_cmd: ["cargo", "build", "--release"], start_cmd: ["./target/release/arm_controller"] }, interfaces: { topics: { emits: [ { name: "joint_commands", qos_profile: "reliable", message_format: { target_positions: { $type: "array", $items: "f64", $length: 3 }, max_velocity: "f64" } } ], consumes: [ { local_node_id: "robot_arm", // Linked — references depends_on name: "joint_states", }, ], }, }}Receiving external messages
Section titled “Receiving external messages”After running peppy node sync, the code generator creates a module for each external consumed topic under peppygen.consumed_topics (Python) / peppygen::consumed_topics (Rust).
The module name is the topic name only (e.g. joint_commands), since there is no source node to prefix it with.
import asyncioimport time
from peppygen import NodeBuilder, NodeRunnerfrom peppygen.parameters import Parametersfrom peppygen.consumed_topics import joint_commandsfrom peppygen.emitted_topics import joint_states
async def handle_commands(node_runner: NodeRunner): while True: instance_id, command = await joint_commands.on_next_message_received( node_runner, )
print( f"received from {instance_id}: " f"target={command.target_positions} max_vel={command.max_velocity}" )
# Drive the joints, then report state await joint_states.emit( node_runner, command.target_positions, [0.0, 0.0, 0.0], time.time(), )
async def setup(_params: Parameters, node_runner: NodeRunner) -> list[asyncio.Task]: return [asyncio.create_task(handle_commands(node_runner))]
def main(): NodeBuilder().run(setup)
if __name__ == "__main__": main()use peppygen::consumed_topics::joint_commands;use peppygen::emitted_topics::joint_states;use peppygen::{NodeBuilder, Parameters, Result};
fn main() -> Result<()> { NodeBuilder::new().run(|_args: Parameters, node_runner| async move { let node_runner_clone = node_runner.clone(); tokio::spawn(async move { loop { let (instance_id, command) = joint_commands::on_next_message_received( &node_runner_clone, None, None, ) .await .expect("failed to receive joint command");
println!( "received from {}: target={:?} max_vel={}", instance_id, command.target_positions, command.max_velocity );
// Drive the joints, then report state let _ = joint_states::emit( &node_runner_clone, command.target_positions, [0.0, 0.0, 0.0], std::time::SystemTime::now(), ) .await; } });
Ok(()) })}The API is identical to linked consumed topics — the only difference is where the message_format comes from (inline vs. resolved from the dependency).
On the other side, the arm_controller uses a standard linked topic. Notice the module name includes the node prefix (robot_arm_joint_states):
import asyncio
from peppygen import NodeBuilder, NodeRunnerfrom peppygen.parameters import Parametersfrom peppygen.consumed_topics import robot_arm_joint_statesfrom peppygen.emitted_topics import joint_commands
def compute_next_target(current: list[float]) -> list[float]: # Trajectory planning logic return [current[0] + 0.1, current[1], current[2]]
async def control_loop(node_runner: NodeRunner): while True: # Linked topic: module name is <local_node_id>_<topic_name> instance_id, state = await robot_arm_joint_states.on_next_message_received( node_runner, )
# Compute next target based on current state target = compute_next_target(state.positions)
await joint_commands.emit( node_runner, target, 1.0, # max_velocity )
async def setup(_params: Parameters, node_runner: NodeRunner) -> list[asyncio.Task]: return [asyncio.create_task(control_loop(node_runner))]
def main(): NodeBuilder().run(setup)
if __name__ == "__main__": main()use peppygen::consumed_topics::robot_arm_joint_states;use peppygen::emitted_topics::joint_commands;use peppygen::{NodeBuilder, Parameters, Result};
fn main() -> Result<()> { NodeBuilder::new().run(|_args: Parameters, node_runner| async move { let node_runner_clone = node_runner.clone(); tokio::spawn(async move { loop { // Linked topic: module name is <local_node_id>_<topic_name> let (instance_id, state) = robot_arm_joint_states::on_next_message_received( &node_runner_clone, None, None, ) .await .expect("failed to receive joint state");
// Compute next target based on current state let target = compute_next_target(&state.positions);
let _ = joint_commands::emit( &node_runner_clone, target, 1.0, // max_velocity ) .await; } });
Ok(()) })}
fn compute_next_target(current: &[f64; 3]) -> [f64; 3] { // Trajectory planning logic [current[0] + 0.1, current[1], current[2]]}Linked vs. external consumed topics
Section titled “Linked vs. external consumed topics”| Linked | External | |
|---|---|---|
| Config | local_node_id + name | name + message_format |
| Dependency | Required in depends_on | None |
| Message format | Resolved from the dependency | Defined inline by the subscriber |
| Module name | <local_node_id>_<topic_name> | <topic_name> |
| Subscribes to | A specific publisher node | Any node emitting that topic |
Why only topics?
Section titled “Why only topics?”PeppyOS has three communication patterns — topics, services, and actions — but only topics support external (dependency-free) subscriptions. The reason is how each pattern flows data:
| Topics | Services | Actions | |
|---|---|---|---|
| Pattern | Publish-subscribe | Request-response | Goal-feedback-result |
| Data flow | One-way: publisher → subscriber | Two-way: request then response | Multi-step: goal, feedback, result |
| Initiator | Publisher pushes; subscriber listens passively | Consumer actively calls the exposer | Consumer orchestrates the full lifecycle |
| Coupling | None — subscriber does not call the publisher | Inherent — consumer targets a specific service | Inherent — built on services, same constraint |
| External variant? | Yes — passive listening needs no dependency | No — calling requires a target node | No — same as services |
A topic subscriber is passive: it declares a message format and waits for data to arrive, without knowing or addressing the publisher.
That is why removing depends_on works — there is nothing to call.
Services and actions are caller-driven: the consumer actively invokes a specific service on a specific node. Even without a config dependency, the consumer would still need to target that node at runtime, reintroducing the coupling that external consumed topics are designed to avoid.
In the robot arm example, robot_arm passively listens for joint_commands from any source — it never calls arm_controller.
If it needed to call a service on arm_controller instead, it would need depends_on, creating the circular dependency that external consumed topics exist to prevent.
When to use external consumed topics
Section titled “When to use external consumed topics”Use external consumed topics when:
- Bidirectional communication — two nodes need to exchange data and a direct dependency would create a cycle.
- Loose coupling — the subscriber should not care which specific node publishes the data. For instance, a
robot_armnode that accepts commands from any controller (teleoperation, autonomous planner, calibration script). - Non-DAG communication — the publisher is not part of the node stack, or the relationship between publisher and subscriber is not a natural dependency.
For all other cases, prefer linked consumed topics — they enforce explicit dependencies and let PeppyOS resolve message formats automatically.