Skip to main content
A scheduled pipeline reads a source stream over a time window, applies an ordered chain of VRL transforms, and writes the output to a target stream (standard ingest) — optionally fanning the same events out to external connectors (S3, Kafka). Pipelines run on a cron schedule, or on demand as a backfill over a historical window.
Scheduled pipelines transform data after it lands, as a recurring job. To transform events on the ingest hot path instead, attach a pipeline function to the ingest step.
Scheduled pipeline

Build a pipeline in the graph editor

Open Pipelines → New to lay out the flow on a canvas: source → transform → sink. Nodes are draggable, you can draw connections between handles, and the editor validates the graph as you go (missing source/sink, a transform without a name or script, duplicate stream names, a source that also appears as a sink). The first source and first stream sink are saved as the pipeline’s source_stream and target_stream.
NodeMeaning
SourceThe stream the pipeline reads from each run.
TransformOne VRL step. Steps run in order, top to bottom.
SinkA target stream (standard ingest) or an external connector.

Transforms

Each transform is a VRL script. The transform inspector lets you reuse a built-in preset or any saved function from the reuse picker — selecting one fills the step’s script, which you can then edit inline. Built-in presets ship with every instance and are read-only:
PresetWhat it does
normalize-logsParse the JSON message, add environment/cluster fields, lowercase level.
route-by-serviceNormalize service and derive a per-service target stream name (logs_<service>).
parse-key-valueParse a logfmt / key=value message into fields and merge them back.
redact-emailReplace email addresses in message with a placeholder.
add-ingest-timeStamp an ingest timestamp to help debug end-to-end latency.
Presets and saved functions share one catalog — they all appear on the Functions page. Built-in presets are badged and read-only; use Copy script to fork one into your own function.

Extend tables

An extend table is a key → record lookup table you can join against from a transform. Create one under Functions → Extend tables, add rows (a key plus named fields), and look it up from VRL:
# enrich each event with the service's owning team
.team = lookup("service_meta", .service).team
Lookups happen in memory, so they add no query cost to the run.

Sinks and connector egress

A sink is either a target stream (the default — events are written through standard ingest and become queryable) or an external connector:
ConnectorPayload
S3Batched JSON Lines PUT to s3://<bucket>/<prefix><id>.jsonl.
KafkaOne JSON record produced per event to the configured topic.
Add connectors under Pipelines → Connectors, then select them as sinks in the graph editor. A pipeline can write to a stream and egress to connectors in the same run.

Schedule and lookback

FieldNotes
cronInterval shorthand: every:30s, every:5m, every:1h.
lookback_secsHow far back each run reads from the source (default 300).
enabledToggle the schedule without deleting the pipeline.
On each tick the runner reads [now - lookback_secs, now] from the source, applies the chain, and writes the output. Every run is recorded — view history under the pipeline’s Runs tab, or via GET /api/v1/scheduled_pipelines/{id}/runs (state, scanned_rows, error).
The cron runner is a singleton: in a distributed cluster it runs only on the alert-manager (or standalone) node, so enabled pipelines fire once per interval rather than once per node.

Backfill

To process a historical window on demand, submit a backfill:
curl -X POST http://localhost:5080/api/v1/scheduled_pipelines/$PIPELINE_ID/backfill \
  -H "authorization: Bearer $MS_JWT" \
  -H 'content-type: application/json' \
  -d '{"start_micros": 1717200000000000, "end_micros": 1717286400000000}'
The window must be ≤ 31 days. The request returns 202 with a job_id and a monitor URL; the backfill runs through the async search-job worker — read the source window, apply the same transform chain, write the target stream and egress.

Pipeline API

Create, update, list, and backfill scheduled pipelines over the HTTP API.