Datastore
Topics and services move data between nodes that are up at the same time. Sometimes you instead want a place to leave a value so another node can pick it up later, or so a node can read back what it wrote before it restarted. The datastore is that place: a small in-memory key/value store that lives in the core node and is shared by every node in the stack.
peppylib::datastore exposes four helpers:
store: upsert a value under a key.get: read the value back, orNoneif the key was never set.list: list every key’s metadata (encoding and last writer), without the value bytes.remove: delete a key, reporting whether it existed.
Each value is a pair of raw bytes and an encoding tag (a short string such as "text/plain", "application/json", or "application/octet-stream"). This mirrors Zenoh’s (payload, encoding) value model: the store keeps your bytes verbatim and hands back the tag you chose, so any value type round-trips faithfully. The store never inspects either field.
The store also records, for each key, the instance_id of the node that last wrote it. Both get (on the returned StoredValue) and list surface this as last_modified_by, so you can tell which node owns a value.
peppylib::datastore ships an Encoding helper with constants for the common tags: Encoding::TEXT_PLAIN, Encoding::APPLICATION_JSON, Encoding::APPLICATION_OCTET_STREAM in Rust, and the same members on Python’s Encoding (a StrEnum). Like Zenoh’s own encoding, the set is open: the constants are a convenience, but any string is a valid tag, so you are never boxed in to the listed ones.
They are available from Rust as peppylib::datastore::{store, get, list, remove, StoredValue, DatastoreEntry, Encoding}, and from Python as from peppylib.datastore import store, get, list, remove, StoredValue, DatastoreEntry, Encoding. Like the other core node helpers, they are not re-exported by peppygen; import them from peppylib::datastore directly.
Each takes a NodeRunner, the same handle your node receives from NodeBuilder::new().run(...) (see the services guide for a typical setup), and talks to the node’s bound core node.
When to use this
Section titled “When to use this”- A shared blackboard. One node computes a value (a calibration result, a chosen target, a mode flag) and another reads it later. Unlike a topic, the reader does not have to be subscribed at the moment the value is produced; it can ask for the key whenever it needs it.
- Last-known-value handoff. A node that restarts can read back the value it stored before, as long as the core node stayed up, instead of recomputing from scratch.
- Small cross-node state that does not warrant a service contract. When defining a request/response service is more ceremony than the data is worth, a well-known key is often enough.
It is not a database. The store is in-memory and process-local to the core node (see the notes below), so reach for topics when you need streaming data, for services when you need a node to act on a request, and for a real persistence layer when you need durability.
store writes value (arbitrary bytes) under key, tagged with encoding, on the node’s bound core node. It returns once the core node acknowledges the write. Storing a key that already exists overwrites the previous value and encoding.
The final argument is a response timeout. Rust accepts anything that converts to Option<Duration>; Python accepts a float in seconds. Pass None (or omit it in Python) to use the default of 10 seconds.
from peppygen import NodeBuilder, NodeRunnerfrom peppygen.parameters import Parametersfrom peppylib.datastore import store, Encoding
async def setup(_params: Parameters, node_runner: NodeRunner) -> None: await store( node_runner, "calibration/wrist_offset", b'{"x": 0.1, "y": -0.4}', Encoding.APPLICATION_JSON, 3.0, ) print("stored wrist offset")
def main(): NodeBuilder().run(setup)
if __name__ == "__main__": main()use std::time::Duration;
use peppygen::{NodeBuilder, Parameters, Result};use peppylib::datastore::{store, Encoding};
fn main() -> Result<()> { NodeBuilder::new().run(|_args: Parameters, node_runner| async move { store( &node_runner, "calibration/wrist_offset", br#"{"x": 0.1, "y": -0.4}"#.to_vec(), Encoding::APPLICATION_JSON, Duration::from_secs(3), ) .await?; println!("stored wrist offset"); Ok(()) })}get reads the value stored under key from the node’s bound core node. A key that was never stored (or that no node has stored yet) reads as None rather than an error: the helper folds the wire response’s found flag into Option::None (Rust) / None (Python), so you never get back an empty StoredValue you have to second-guess.
When the key is present you get a StoredValue with three fields:
value: the raw bytes, exactly as stored (Vec<u8>in Rust,bytesin Python).encoding: the encoding tag they were stored with.last_modified_by: theinstance_idof the node that last wrote this key.
The second argument is the same response timeout as above.
from peppygen import NodeBuilder, NodeRunnerfrom peppygen.parameters import Parametersfrom peppylib.datastore import get
async def setup(_params: Parameters, node_runner: NodeRunner) -> None: stored = await get(node_runner, "calibration/wrist_offset", 3.0) if stored is None: print("no wrist offset stored yet") else: print(f"wrist offset ({stored.encoding}): {stored.value!r}")
def main(): NodeBuilder().run(setup)
if __name__ == "__main__": main()use std::time::Duration;
use peppygen::{NodeBuilder, Parameters, Result};use peppylib::datastore::get;
fn main() -> Result<()> { NodeBuilder::new().run(|_args: Parameters, node_runner| async move { let key = "calibration/wrist_offset"; match get(&node_runner, key, Duration::from_secs(3)).await? { Some(stored) => println!("wrist offset ({}): {:?}", stored.encoding, stored.value), None => println!("no wrist offset stored yet"), } Ok(()) })}list returns the metadata of every key currently in the store: one entry per key carrying the key, its encoding tag, and last_modified_by (the instance_id of the node that last wrote it). The value bytes are deliberately not included — a list stays cheap no matter how large your values are; fetch the bytes for a specific key with get. The order is unspecified, and the result is a point-in-time snapshot (another node may store or remove keys immediately after).
The only argument is the same response timeout as above.
from peppygen import NodeBuilder, NodeRunnerfrom peppygen.parameters import Parametersfrom peppylib.datastore import list
async def setup(_params: Parameters, node_runner: NodeRunner) -> None: entries = await list(node_runner, 3.0) for entry in entries: print(f"{entry.key} ({entry.encoding}) last written by {entry.last_modified_by}")
def main(): NodeBuilder().run(setup)
if __name__ == "__main__": main()use std::time::Duration;
use peppygen::{NodeBuilder, Parameters, Result};use peppylib::datastore::list;
fn main() -> Result<()> { NodeBuilder::new().run(|_args: Parameters, node_runner| async move { for entry in list(&node_runner, Duration::from_secs(3)).await? { println!( "{} ({}) last written by {}", entry.key, entry.encoding, entry.last_modified_by ); } Ok(()) })}remove
Section titled “remove”remove deletes (unsets) key from the store. It returns a boolean: true if the key existed and was removed, false if it was already absent — so removing a missing key is a no-op, not an error.
The second argument is the same response timeout as above.
from peppygen import NodeBuilder, NodeRunnerfrom peppygen.parameters import Parametersfrom peppylib.datastore import remove
async def setup(_params: Parameters, node_runner: NodeRunner) -> None: removed = await remove(node_runner, "calibration/wrist_offset", 3.0) print("removed" if removed else "key was already absent")
def main(): NodeBuilder().run(setup)
if __name__ == "__main__": main()use std::time::Duration;
use peppygen::{NodeBuilder, Parameters, Result};use peppylib::datastore::remove;
fn main() -> Result<()> { NodeBuilder::new().run(|_args: Parameters, node_runner| async move { let key = "calibration/wrist_offset"; if remove(&node_runner, key, Duration::from_secs(3)).await? { println!("removed {key}"); } else { println!("{key} was already absent"); } Ok(()) })}Behavior notes
Section titled “Behavior notes”- In-memory, never written to disk. The store lives inside the core node process. Values last exactly as long as the core node does; restarting the core node starts from an empty store. Treat it as fast shared scratch space, not durable storage.
- Shared across the whole stack, with no namespacing. Every node bound to the same core node reads and writes one flat keyspace. Coordinate on key names (a prefix convention like
calibration/...works well) so two nodes do not clobber each other by accident. - Keys are arbitrary strings. A key rides inside the request payload, not as a Zenoh keyexpr, so any character is allowed: slashes, spaces,
*,{}, anything."robot/state**{1}"is a perfectly valid key. - Store is an upsert; last writer wins. A later store under an existing key overwrites the value, the encoding, and the recorded writer. The store records the storing node’s
instance_idon every write and surfaces it aslast_modified_byongetandlist. There is no atomic read-modify-write, so two nodes that both do get-then-store on the same key can race; design keys so a single writer owns each one when that matters. listis a metadata snapshot. It returns every key’sencodingandlast_modified_bybut never the value bytes (fetch those withget), in unspecified order, reflecting the store at the moment it was answered.- The encoding tag is yours to interpret. The store treats it as an opaque label and returns it unchanged. Reach for the
Encodingconstants for the common tags and pass any string for the rest (the Zenoh-style MIME-like tags are a good default), then have readers branch on it when you store more than one value type. A tag read back fromStoredValue.encodingcompares equal to itsEncodingmember, sostored.encoding == Encoding.APPLICATION_JSONworks. - Default timeout is 10 seconds. Pass an explicit timeout from a latency-sensitive path so a slow or unreachable core node does not stall your node.