跳转到主要内容
调度流水线按时间窗读取源数据流,依次应用一组 VRL 转换,并把结果写入目标数据流(标准 ingest)——还可把同一批事件扇出到外部 connector(S3、Kafka)。流水线按 cron 调度运行,也可对历史 窗口按需做一次回填(backfill)
调度流水线在数据落库之后作为周期任务处理。若想在 ingest 热路径上即时转换事件,请改为在 ingest 步骤挂一个流水线函数
调度流水线

在图形编排器里搭建

打开 流水线 → 新建,在画布上排布 来源 → 转换 → 目标。节点可拖动,可在端点之间连线,编排器会随 时校验图(缺来源/目标、转换缺名称或脚本、流名重复、来源与目标同名等)。第一个来源和第一个数据流目标会 保存为流水线的 source_streamtarget_stream
节点含义
来源每次运行读取的数据流。
转换一个 VRL 步骤。多个步骤自上而下按序执行。
目标目标数据流(标准 ingest)或外部 connector。

转换

每个转换是一段 VRL 脚本。转换检视面板的「复用」下拉里,可选用 内置预设或任意已保存的函数——选中即把脚本灌入该步骤,之后 可内联编辑。 内置预设随每个实例提供,且只读:
预设作用
normalize-logs解析 JSON message,补充环境/集群字段,统一 level 小写。
route-by-service归一化 service 并派生每服务目标流名(logs_<service>)。
parse-key-valuelogfmt / key=value 形式的 message 解析为字段后合并回事件。
redact-emailmessage 中的邮箱地址替换为占位符。
add-ingest-time附加摄取时间戳,便于排查端到端延迟。
预设与已保存函数共用一个目录——它们都出现在 函数 页。内置预设带标识且只读;用「复制脚本」可把它 派生成你自己的函数。

扩展表

扩展表是一张 key → 记录的查找表,可在转换里 join。在 函数 → 扩展表 新建,添加行(一个 key 加若 干命名字段),再在 VRL 里查找:
# 用所属团队丰富每条事件
.team = lookup("service_meta", .service).team
查找在内存中完成,不会给运行增加查询开销。

目标与 connector egress

目标可以是目标数据流(默认——事件经标准 ingest 写入并可被查询),也可以是外部 connector
Connector投递负载
S3批量 JSON Lines PUTs3://<bucket>/<prefix><id>.jsonl
Kafka每条事件产生一条 JSON 记录到配置的 topic。
流水线 → Connectors 添加 connector,再在图形编排器里把它选作目标。一条流水线可在同一次运行里既写 数据流又向 connector egress。

调度与回看窗口

字段说明
cron间隔简写:every:30severy:5mevery:1h
lookback_secs每次运行从源回看多久(默认 300)。
enabled不删除流水线即可开关调度。
每次触发,runner 读取源的 [now - lookback_secs, now],应用转换链并写出结果。每次运行都会记录——在流 水线的 运行 标签页查看,或通过 GET /api/v1/scheduled_pipelines/{id}/runsstatescanned_rowserror)。
cron runner 是单例:分布式集群下只在 alert-manager(或 standalone)节点运行,因此启用的流水线 每个间隔只触发一次,而非每节点一次。

回填(Backfill)

要按需处理历史窗口,提交一次回填:
curl -X POST http://localhost:5080/api/v1/scheduled_pipelines/$PIPELINE_ID/backfill \
  -H "authorization: Bearer $MS_JWT" \
  -H 'content-type: application/json' \
  -d '{"start_micros": 1717200000000000, "end_micros": 1717286400000000}'
窗口必须 ≤ 31 天。请求返回 202job_id 和监控 URL;回填走异步 search-job worker——读取源窗口、应用同一条 转换链、写目标流并 egress。

流水线 API

通过 HTTP API 创建、更新、列出与回填调度流水线。