Connector 是与外部平台的托管集成。它有两个方向:
- 接入源 —— 平台把数据推进 MoleSignal(Kinesis Firehose、Cloudflare Logpush、Heroku log
drain)。
- 出向 sink —— 流水线把产出投递出去到 S3 或 Kafka。
配置一个 connector
在 Pipelines → Connectors 下创建 connector,或通过 API。每个 connector 有一个 kind 和一个
config_json,后者的结构随 kind 而定。敏感字段(push_token、access_key、secret_key …)在 API
响应中被掩码。
curl -X POST http://localhost:5080/api/v1/connectors \
-H "authorization: Bearer $MS_JWT" \
-H 'content-type: application/json' \
-d '{
"name": "cf-prod",
"kind": "cloudflare_logpush",
"config_json": { "push_token": "ms_cf_9f3c…", "target_stream": "cloudflare" }
}'
接入源(推送)
三种推送源共用一套模型:你创建一个带 push_token 和 target_stream 的 connector,然后把平台
指向对应端点。这些端点不使用你的 JWT——请求用 connector token 鉴权,按平台允许的方式传入:
| 方式 | 适用平台 |
|---|
X-Connector-Token 头 | 可配置自定义头的平台(如 Cloudflare) |
X-Amz-Firehose-Access-Key 头 | Kinesis Firehose(自动发送) |
?token= 查询参数 | 无法设置自定义头的平台(如 Heroku) |
带 Content-Encoding: gzip 的 body 会被自动解压。事件走与其它接入相同的路径——schema-on-write、
流水线、脱敏一并生效。
Kinesis Firehose
Cloudflare Logpush
Heroku
用 kind: "aws_kinesis_firehose" 创建 connector,然后给你的 Firehose 投递流添加一个 HTTP
endpoint 目标:
- URL ——
https://<your-host>/api/v1/_kinesis_firehose
- Access key —— connector 的
push_token(Firehose 以 X-Amz-Firehose-Access-Key 发送)
MoleSignal 对每条 record base64 解码、按换行切分,并把每行解析为 JSON(纯文本则回退到 message
字段),并返回 Firehose 所要求的 200 ACK。 用 kind: "cloudflare_logpush" 创建 connector,然后创建一个 Logpush job,目标为:
- URL ——
https://<your-host>/api/v1/_cloudflare
- Header ——
X-Connector-Token: <push_token>
Cloudflare 发送 gzip 压缩的 NDJSON,每行成为一条事件。端点回 204 No Content。 用 kind: "heroku_drain" 创建 connector,然后添加 drain——Heroku 设不了自定义头,故 token 放在
URL 里:heroku drains:add "https://<your-host>/api/v1/_heroku?token=<push_token>"
每行日志成为一条带 message 和 source: "heroku" 的事件。端点回 204 No Content。
target_stream 在首次投递时创建、其 schema 随新字段出现而演化——与任意其它流一致。省略它则回退到
按源区分的默认流名(kinesis、cloudflare、heroku)。
拉取源
CloudWatch Logs 是拉取源:MoleSignal 按周期主动轮询 AWS API,而非被动接收推送。创建一个 connector
后,MoleSignal 周期调用 FilterLogEvents(SigV4 签名,不引 AWS SDK),并按 connector 维度推进
checkpoint,使每轮只读新事件。
curl -X POST http://localhost:5080/api/v1/connectors \
-H "authorization: Bearer $MS_JWT" \
-H 'content-type: application/json' \
-d '{
"name": "cw-prod",
"kind": "aws_cloudwatch_logs",
"config_json": {
"region": "us-east-1",
"log_group": "/aws/lambda/checkout",
"access_key": "AKIA…",
"secret_key": "…",
"target_stream": "cloudwatch"
}
}'
| 字段 | 说明 |
|---|
region / log_group | 必填——要读取的 CloudWatch 区域与 log group。 |
access_key / secret_key | 具备 logs:FilterLogEvents 权限的 IAM 凭据。 |
session_token | 可选,用于 STS 临时凭据。 |
target_stream | 目标 stream(默认 default)。 |
轮询器以单例运行(alert-manager / standalone 节点),无论集群规模都只有一个节点轮询。每条事件成为
一条带 message、log_stream、log_group 的日志。
出向 sink
S3 与 Kafka connector 是 sink:在流水线中选中它们,
即可在流水线运行时把变换后的事件投递出去。
| Kind | 载荷 |
|---|
s3 | 批量 JSON Lines PUT 到 s3://<bucket>/<prefix><id>.jsonl。 |
kafka | 每条事件作为一条 JSON 记录 produce 到指定 topic。 |
接入 API
Kinesis、Cloudflare、Heroku 推送接收端点的细节。