Skip to main content

Pipelines

Pipelines are Toposync's runtime for real-time automation and media processing. A pipeline is a named directed acyclic graph that receives packets from sources, transforms them through operators, and optionally writes side effects such as notifications, stored images, Home Assistant state, or live stream publications.

This page is a developer overview. It explains the current architecture and the contracts extension authors should respect. It is not a full operator reference.

Where pipelines fit

The origin backend owns pipeline configuration, compilation, orchestration, status, telemetry, and persistence. Operators are provided by the core and by installed extensions.

Config store
-> Pipeline graph
-> Compiler
-> Local runtime
-> Remote processing server runtime
-> Projected events back to origin

The current app exposes pipeline management through the backend API and the frontend editor. Developers usually interact with:

  • GET /api/pipelines for saved pipelines;
  • GET /api/pipelines/operators for the operator catalog;
  • POST /api/pipelines/compile for graph validation and diagnostics;
  • GET /api/pipelines/runtime/status for active runtime state;
  • GET /api/processing-servers for remote processing targets.

For local development commands, see Development setup. For installation paths that run processing servers, see Choose your installation.

Pipeline model

A saved pipeline has a stable name, an enabled flag, a processing_server_id, an editor mode, optional Python source, and a graph object.

The graph contains:

  • schema_version: required, currently >= 1;
  • nodes: operator instances with id, operator, and config;
  • edges: connections from one node port to another node port;
  • limits: optional runtime limits such as artifact memory budgets;
  • layout and meta: editor and metadata fields that should not affect runtime semantics.

Minimal graph shape:

{
"schema_version": 1,
"nodes": [
{
"id": "camera",
"operator": "camera.source",
"config": {
"camera_id": "front"
}
},
{
"id": "motion",
"operator": "camera.motion_gate",
"config": {}
},
{
"id": "notify",
"operator": "core.notify",
"config": {
"title": "Motion detected"
}
}
],
"edges": [
{
"from": { "node": "camera", "port": "out" },
"to": { "node": "motion", "port": "in" },
"maxsize": 1,
"drop_policy": "latest_only"
},
{
"from": { "node": "motion", "port": "out" },
"to": { "node": "notify", "port": "in" }
}
]
}

Pipeline names must be valid Python identifiers. This keeps the JSON, Python DSL, and generated references aligned.

Operators

An operator is the reusable unit of work inside a graph. The backend registers operators in an OperatorRegistry. Core operators are registered during startup, and extensions can add their own operators during extension setup.

An operator definition declares:

  • id: a stable identifier such as camera.source, vision.detect, or stream.publish_video;
  • input and output ports;
  • a Pydantic config model and normalized defaults;
  • runtime capabilities such as source, sink, realtime, heavy_compute, or origin_only;
  • input and output packet contracts, including payload keys, artifacts, source fields, media fields, and modalities;
  • UI metadata for grouping and ordering in the pipeline editor;
  • optional diagnostics to warn about invalid or risky configurations before runtime.

Use share_strategy="by_signature" for pure reusable work and share_strategy="never" for side-effectful or stateful operators. The registry automatically marks those as pure or side_effect capabilities.

Packet contract

Operators exchange Packet objects. A packet carries:

  • packet_id: unique packet identity;
  • stream_id: logical stream identity;
  • lifecycle: open, update, or close;
  • payload: structured JSON-compatible data;
  • artifacts: binary or in-memory objects such as images;
  • metadata: runtime metadata that should not be treated as user-facing payload;
  • parent_packet_id: optional lineage for derived packets.

The lifecycle is important for event-like flows. For example, a motion or tracking operator can open an event, emit updates while it remains active, and close it when the condition ends. Sinks such as notifications can use that lifecycle to update or close existing records instead of creating unrelated messages.

Image and video operators usually use the main artifact for the primary frame. Operators should declare required and produced artifacts so the UI, diagnostics, and future tooling can reason about graph compatibility.

Compilation

Compilation turns user-authored graph JSON into a normalized runtime plan.

The compiler validates that:

  • the graph schema is valid;
  • node ids are unique;
  • every operator id is registered;
  • every config matches the operator's Pydantic model;
  • required input ports are connected;
  • edge source and target ports exist;
  • each input port has at most one upstream edge;
  • the graph is a DAG, not a cycle.

Compilation also calculates topological order and stable node signatures. Those signatures let the runtime share equivalent pure nodes across multiple local pipelines when possible.

The compiler can also reject specific unsafe combinations. For example, a detection operator that emits finite detection events cannot feed tracking directly; tracking needs frame annotations instead.

Runtime execution

At runtime, each node runs as an async task and communicates through bounded channels. Edges can define queue size and drop policy, which matters for real-time camera streams where stale frames are usually worse than dropped frames.

Supported channel policies include:

  • block;
  • drop_updates;
  • drop_oldest;
  • drop_newest;
  • latest_only;
  • keyed_latest_only.

Operators can run in the event loop, a thread pool, a process pool, or an external runtime depending on their execution mode. Heavy CPU work should not run directly in the event loop.

Artifact memory is budgeted at packet, pipeline, and global levels. The default runtime keeps the primary artifact when possible and evicts derived in-memory artifact data first when budgets are exceeded.

Local and distributed execution

Every pipeline has a processing_server_id.

  • local runs on the origin backend.
  • Any other id targets a registered HTTP processing server.

The origin still owns the canonical configuration. When a pipeline targets a remote processing server, the orchestrator pushes the pipeline and a settings snapshot to that server. Processing-side events and observability are streamed back to the origin.

Distributed planning is based on operator capabilities:

  • operators with origin_only stay on the origin;
  • other operators can run on the processing server;
  • processing-to-origin edges are supported through projected events;
  • origin-to-processing edges are not supported in the current planner.

This shape is deliberate. It allows heavy camera and vision work to move away from Home Assistant or a Raspberry Pi, while origin-only side effects such as local notifications and local file storage remain under the origin's control.

Streaming has an additional affinity rule: a pipeline that writes to a hosted transmission with stream.publish_video must run on the same processing_server_id as that transmission's host_server_id. This prevents frames from being published into the wrong streaming engine.

Extensions and operator ownership

Extensions should register domain operators rather than adding domain-specific code to the core.

Current examples:

  • toposync-ext-cameras registers camera sources, ONVIF event sources, motion gates, and camera post-processing;
  • toposync-ext-vision registers task-oriented vision operators such as detection, tracking, segmentation, classification, and object cropping;
  • toposync-ext-home-assistant registers Home Assistant state and action operators;
  • toposync-ext-streaming registers stream demand and video publication operators;
  • optional extensions can add their own operators without changing the core graph model.

For extension packaging and runtime loading, see Extension authoring. For the frontend host contract, see Plugin API.

Observability and storage

The origin records runtime status, node metrics, numeric telemetry, image markers, storage layers, and selected step snapshots. The frontend uses this for pipeline status, previews, diagnostics, and troubleshooting.

Important API areas:

  • /api/pipelines/{pipeline_name}/stats;
  • /api/pipelines/{pipeline_name}/telemetry/numeric;
  • /api/pipelines/{pipeline_name}/telemetry/image-markers;
  • /api/pipelines/{pipeline_name}/storage;
  • /api/pipelines/preview/frame.

Remote processing servers can report observability batches back to the origin. Numeric telemetry is applied on the origin; remote image markers are intentionally conservative because the origin may not be able to read files that only exist on the processing host.

Design rules for new operators

New operators should follow these rules:

  • Prefer task-oriented ids such as vision.detect over vendor-specific ids.
  • Use Pydantic config models with strict validation where possible.
  • Declare inputs, outputs, required payload keys, required artifacts, produced fields, and modalities.
  • Mark side-effectful work with share_strategy="never".
  • Mark origin-bound work with the origin_only capability.
  • Put blocking work behind the scheduler with an appropriate execution mode.
  • Preserve packet lifecycle unless the operator intentionally creates a new event stream.
  • Keep payload data JSON-compatible and store large data in artifacts or files.
  • Emit diagnostics for configurations that are technically valid but operationally risky.

What belongs in separate docs

This page is the map, not the full reference. Separate docs should cover:

  • complete operator catalog;
  • Python DSL;
  • pipeline editor UX;
  • processing server operations;
  • streaming-specific pipeline patterns;
  • vision model provisioning;
  • telemetry and storage retention;
  • extension authoring for custom operators.