Skip to content

Services

Services implement a request-response communication pattern between nodes. A node exposes a service to handle incoming requests, and other nodes subscribe to that service to send requests and receive responses.

Use services for operations that need a result, such as querying a node’s state, toggling a feature, or triggering a one-time computation.

A node that handles service requests declares its services under interfaces.exposes.services in its peppy.json5. Each service defines a name, an optional request_message_format, and an optional response_message_format:

{
schema_version: 1,
manifest: {
name: "uvc_camera",
tag: "0.1.0",
language: "rust",
},
process: {
add_cmd: ["cargo", "build", "--release"],
start_cmd: ["./target/release/uvc_camera"]
},
interfaces: {
exposes: {
topics: [],
services: [
{
name: "enable_camera",
request_message_format: {
enable: "bool",
},
response_message_format: {
enabled: "bool",
error_msg: {
$type: "string",
$optional: true
},
},
},
{
// A service without a request body — the caller just needs the response.
name: "get_camera_info",
response_message_format: {
card_type: "string",
size: "string",
interval: "string"
},
},
],
actions: [],
},
subscribes_to: {
topics: [],
services: [],
actions: [],
},
}
}

Both request_message_format and response_message_format are optional. A service with no request body acts like a simple getter, and a service with no response body acts like a fire-and-forget trigger.

After running peppy node sync, the code generator creates a module for each exposed service under peppygen::exposed_services. Use handle_next_request to process incoming requests. Each request is handled in a separate task so that the main async block is not blocked:

use peppygen::exposed_services::enable_camera;
use peppygen::{NodeBuilder, Parameters, Result};
fn main() -> Result<()> {
NodeBuilder::new().run(|_args: Parameters, node_runner| async move {
tokio::spawn(async move {
enable_camera::handle_next_request(
&node_runner,
|request| -> Result<enable_camera::Response> {
println!(
"enable_camera request from {}: enable = {}",
request.instance_id, request.data.enable
);
Ok(enable_camera::Response::new(
request.data.enable,
Some("ok".to_owned()),
))
},
)
.await
});
Ok(())
})
}

The request argument contains:

  • instance_id — the instance that sent the request.
  • data — the deserialized request payload (only present when a request_message_format is defined).

handle_next_request processes a single request and returns. To serve requests continuously, call it in a loop inside a spawned task:

tokio::spawn(async move {
loop {
let _ = enable_camera::handle_next_request(&node_runner, |request| {
// ...
})
.await;
}
});

For a service without a request body, the handler receives a Request with only the instance_id:

use peppygen::exposed_services::get_camera_info;
tokio::spawn(async move {
get_camera_info::handle_next_request(
&node_runner,
|request| -> Result<get_camera_info::Response> {
println!("get_camera_info request from {}", request.instance_id);
Ok(get_camera_info::Response::new(
"UVC Webcam".to_owned(),
"1920x1080".to_owned(),
"30fps".to_owned(),
))
},
)
.await
});

A node that calls a service declares its subscriptions under interfaces.subscribes_to.services:

{
schema_version: 1,
manifest: {
name: "robot_brain",
tag: "0.1.0",
language: "rust",
},
process: {
add_cmd: ["cargo", "build", "--release"],
start_cmd: ["./target/release/robot_brain"]
},
interfaces: {
subscribes_to: {
topics: [],
services: [
{
id: "uvc_camera_enable_camera", // Your chosen identifier for this subscription
node: "uvc_camera", // Target node name
name: "enable_camera", // Service name on that node
tag: "0.1.0", // Target node tag/version
},
],
actions: [],
},
}
}

The code generator creates a module for each subscribed service under peppygen::subscribed_services. Use poll to send a request and wait for a response:

use peppygen::subscribed_services::uvc_camera_enable_camera;
use peppygen::{NodeBuilder, Parameters, Result};
use std::time::Duration;
fn main() -> Result<()> {
NodeBuilder::new().run(|_args: Parameters, node_runner| async move {
let request = uvc_camera_enable_camera::Request::new(true);
let response = uvc_camera_enable_camera::poll(
&node_runner,
Duration::from_secs(5), // timeout
None, // target_core_node (None = any)
None, // target_instance_id (None = any)
request,
)
.await?;
println!(
"enable_camera result: instance={} enabled={} error={}",
response.instance_id,
response.data.enabled,
response.data.error_msg.as_deref().unwrap_or("<none>"),
);
Ok(())
})
}

The poll function takes:

  • node_runner — the node runner reference.
  • timeout — maximum time to wait for a response.
  • target_core_node — optionally target a specific core node, or None for any.
  • target_instance_id — optionally target a specific instance, or None for any.
  • request — the request payload.

The response contains:

  • instance_id — the instance that handled the request.
  • data — the deserialized response payload.

For a service without a request body, poll does not take a request argument:

use peppygen::subscribed_services::uvc_camera_get_camera_info;
let response = uvc_camera_get_camera_info::poll(
&node_runner,
Duration::from_secs(5),
None,
None,
).await?;
println!("Camera: {} {}", response.data.card_type, response.data.size);

When multiple instances of the same node are running, you can control which instance handles your service request:

  • None (default) — the request is sent to all instances and the first response wins.
  • Some("instance_id") — the request is sent to a specific instance.
// Target a specific instance
let response = uvc_camera_enable_camera::poll(
&node_runner,
Duration::from_secs(5),
None,
Some("my-camera-instance"),
request,
)
.await?;

Service calls can fail with three error types:

  • ServiceUnreachable — no instance is listening for that service.
  • ServiceTimeout — no response was received within the timeout.
  • ServiceError — the handler returned an error, which is propagated back to the caller.

If the service handler returns an Err, the error is forwarded to the caller rather than silently timing out. This means a failing handler does not block the service from continuing to accept new requests.