跳转到主要内容
Connector 是与外部平台的托管集成。它有两个方向:
  • 接入源 —— 平台把数据推进 MoleSignal(Kinesis Firehose、Cloudflare Logpush、Heroku log drain)。
  • 出向 sink —— 流水线把产出投递出去到 S3 或 Kafka。

配置一个 connector

Pipelines → Connectors 下创建 connector,或通过 API。每个 connector 有一个 kind 和一个 config_json,后者的结构随 kind 而定。敏感字段(push_tokenaccess_keysecret_key …)在 API 响应中被掩码。
curl -X POST http://localhost:5080/api/v1/connectors \
  -H "authorization: Bearer $MS_JWT" \
  -H 'content-type: application/json' \
  -d '{
        "name": "cf-prod",
        "kind": "cloudflare_logpush",
        "config_json": { "push_token": "ms_cf_9f3c…", "target_stream": "cloudflare" }
      }'

接入源(推送)

三种推送源共用一套模型:你创建一个带 push_tokentarget_stream 的 connector,然后把平台 指向对应端点。这些端点不使用你的 JWT——请求用 connector token 鉴权,按平台允许的方式传入:
方式适用平台
X-Connector-Token可配置自定义头的平台(如 Cloudflare)
X-Amz-Firehose-Access-KeyKinesis Firehose(自动发送)
?token= 查询参数无法设置自定义头的平台(如 Heroku)
Content-Encoding: gzip 的 body 会被自动解压。事件走与其它接入相同的路径——schema-on-write、 流水线脱敏一并生效。
kind: "aws_kinesis_firehose" 创建 connector,然后给你的 Firehose 投递流添加一个 HTTP endpoint 目标:
  • URL —— https://<your-host>/api/v1/_kinesis_firehose
  • Access key —— connector 的 push_token(Firehose 以 X-Amz-Firehose-Access-Key 发送)
MoleSignal 对每条 record base64 解码、按换行切分,并把每行解析为 JSON(纯文本则回退到 message 字段),并返回 Firehose 所要求的 200 ACK。
target_stream 在首次投递时创建、其 schema 随新字段出现而演化——与任意其它流一致。省略它则回退到 按源区分的默认流名(kinesiscloudflareheroku)。

拉取源

CloudWatch Logs 是拉取源:MoleSignal 按周期主动轮询 AWS API,而非被动接收推送。创建一个 connector 后,MoleSignal 周期调用 FilterLogEvents(SigV4 签名,不引 AWS SDK),并按 connector 维度推进 checkpoint,使每轮只读新事件。
curl -X POST http://localhost:5080/api/v1/connectors \
  -H "authorization: Bearer $MS_JWT" \
  -H 'content-type: application/json' \
  -d '{
        "name": "cw-prod",
        "kind": "aws_cloudwatch_logs",
        "config_json": {
          "region": "us-east-1",
          "log_group": "/aws/lambda/checkout",
          "access_key": "AKIA…",
          "secret_key": "…",
          "target_stream": "cloudwatch"
        }
      }'
字段说明
region / log_group必填——要读取的 CloudWatch 区域与 log group。
access_key / secret_key具备 logs:FilterLogEvents 权限的 IAM 凭据。
session_token可选,用于 STS 临时凭据。
target_stream目标 stream(默认 default)。
轮询器以单例运行(alert-manager / standalone 节点),无论集群规模都只有一个节点轮询。每条事件成为 一条带 messagelog_streamlog_group 的日志。

出向 sink

S3 与 Kafka connector 是 sink:在流水线中选中它们, 即可在流水线运行时把变换后的事件投递出去。
Kind载荷
s3批量 JSON Lines PUTs3://<bucket>/<prefix><id>.jsonl
kafka每条事件作为一条 JSON 记录 produce 到指定 topic。

接入 API

Kinesis、Cloudflare、Heroku 推送接收端点的细节。