调度流水线按时间窗读取源数据流,依次应用一组 VRL 转换,并把结果写入目标数据流(标准
ingest)——还可把同一批事件扇出到外部 connector(S3、Kafka)。流水线按 cron 调度运行,也可对历史
窗口按需做一次回填(backfill)。
调度流水线在数据落库之后作为周期任务处理。若想在 ingest 热路径上即时转换事件,请改为在 ingest
步骤挂一个流水线函数。
在图形编排器里搭建
打开 流水线 → 新建,在画布上排布 来源 → 转换 → 目标。节点可拖动,可在端点之间连线,编排器会随
时校验图(缺来源/目标、转换缺名称或脚本、流名重复、来源与目标同名等)。第一个来源和第一个数据流目标会
保存为流水线的 source_stream 与 target_stream。
| 节点 | 含义 |
|---|
| 来源 | 每次运行读取的数据流。 |
| 转换 | 一个 VRL 步骤。多个步骤自上而下按序执行。 |
| 目标 | 目标数据流(标准 ingest)或外部 connector。 |
每个转换是一段 VRL 脚本。转换检视面板的「复用」下拉里,可选用
内置预设或任意已保存的函数——选中即把脚本灌入该步骤,之后
可内联编辑。
内置预设随每个实例提供,且只读:
| 预设 | 作用 |
|---|
normalize-logs | 解析 JSON message,补充环境/集群字段,统一 level 小写。 |
route-by-service | 归一化 service 并派生每服务目标流名(logs_<service>)。 |
parse-key-value | 把 logfmt / key=value 形式的 message 解析为字段后合并回事件。 |
redact-email | 把 message 中的邮箱地址替换为占位符。 |
add-ingest-time | 附加摄取时间戳,便于排查端到端延迟。 |
预设与已保存函数共用一个目录——它们都出现在 函数 页。内置预设带标识且只读;用「复制脚本」可把它
派生成你自己的函数。
扩展表
扩展表是一张 key → 记录的查找表,可在转换里 join。在 函数 → 扩展表 新建,添加行(一个 key 加若
干命名字段),再在 VRL 里查找:
# 用所属团队丰富每条事件
.team = lookup("service_meta", .service).team
查找在内存中完成,不会给运行增加查询开销。
目标与 connector egress
目标可以是目标数据流(默认——事件经标准 ingest 写入并可被查询),也可以是外部 connector:
| Connector | 投递负载 |
|---|
| S3 | 批量 JSON Lines PUT 到 s3://<bucket>/<prefix><id>.jsonl。 |
| Kafka | 每条事件产生一条 JSON 记录到配置的 topic。 |
在 流水线 → Connectors 添加 connector,再在图形编排器里把它选作目标。一条流水线可在同一次运行里既写
数据流又向 connector egress。
调度与回看窗口
| 字段 | 说明 |
|---|
cron | 间隔简写:every:30s、every:5m、every:1h。 |
lookback_secs | 每次运行从源回看多久(默认 300)。 |
enabled | 不删除流水线即可开关调度。 |
每次触发,runner 读取源的 [now - lookback_secs, now],应用转换链并写出结果。每次运行都会记录——在流
水线的 运行 标签页查看,或通过 GET /api/v1/scheduled_pipelines/{id}/runs(state、scanned_rows、
error)。
cron runner 是单例:分布式集群下只在 alert-manager(或 standalone)节点运行,因此启用的流水线
每个间隔只触发一次,而非每节点一次。
回填(Backfill)
要按需处理历史窗口,提交一次回填:
curl -X POST http://localhost:5080/api/v1/scheduled_pipelines/$PIPELINE_ID/backfill \
-H "authorization: Bearer $MS_JWT" \
-H 'content-type: application/json' \
-d '{"start_micros": 1717200000000000, "end_micros": 1717286400000000}'
窗口必须 ≤ 31 天。请求返回 202 带 job_id 和监控 URL;回填走异步
search-job worker——读取源窗口、应用同一条
转换链、写目标流并 egress。
流水线 API
通过 HTTP API 创建、更新、列出与回填调度流水线。