监控 Snowflake Connector for Kafka

本主题介绍如何监控 Snowflake Connector for Kafka。

JMX 监控

连接器通过 Java Management Extensions (JMX) MBeans公开指标。JMX 默认处于启用状态 (jmx=true)。

要使用 JMX 监控,请为 Kafka Connect 处理器的 JVM 配置以下系统属性:

-Dcom.sun.management.jmxremote
-Dcom.sun.management.jmxremote.port=<port>
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false

然后,您可以使用任意兼容 JMX 的监控工具(例如,JConsole 或带有 JMX Exporter 的 Prometheus)查看连接器指标。

MBean 域与命名

连接器会在 snowflake.kafka.connector 域下注册 MBeans,并采用以下 ObjectName 模式:

snowflake.kafka.connector:connector=<connectorName>,<scope>=<scopeValue>,category=<category>,name=<metricName>

其中 <scope> 可以是 ``task``(任务级指标)或 ``channel``(每个分区的通道指标)。

任务级指标

这些指标的作用范围为连接器任务,提供对吞吐量和生命周期操作的汇总可见性。

指标

类型

描述

put-records

计量器

任务通过 put() 接收的记录数。使用速率属性(1 分钟、5 分钟、15 分钟)来监控引入吞吐量。

put-duration

计时器

每个 put() 调用的持续时间。值过高可能表明连接器存在瓶颈或下游存在背压。

precommit-duration

计时器

预提交操作的持续时间,包括偏移验证。

assigned-partitions

Gauge

当前分配给此任务的 Kafka 分区数量。这有助于验证各任务之间的分区分配是否均衡。

channel-open-duration

计时器

打开 Snowpipe Streaming 通道所需的时间。值升高可能表明存在连接问题。

channel-open-count

Counter

此任务打开的通道总数。

backpressure-rewind-count

Counter

由于下游背压,导致连接器必须回滚的次数。持续的非零值表明连接器的生成速度快于 Snowflake 的引入速度。

通道级指标

这些指标的作用范围为特定的 Kafka 主题分区通道,对于监控引入延迟和数据持久性至关重要。

指标

类型

描述

processed-offset

Gauge

连接器已缓冲的最新偏移。这是此分区从 Kafka 接收到的最新记录。

persisted-in-snowflake-offset

Gauge

已确认在 Snowflake 中持久提交的最新偏移。通过将其与 processed-offset 进行比较来衡量引入延迟。

latest-consumer-offset

Gauge

Kafka 使用者为此分区提供的最新偏移。通过与 persisted-in-snowflake-offset 进行比较来查看完整的端到端延迟。

channel-recovery-count

Gauge

通道恢复事件的数量。值较高或不断增加表明 Snowpipe Streaming 通道存在不稳定情况。

警报的关键指标

对于生产部署,请考虑针对以下情况设置警报:

  • 引入延迟latest-consumer-offset 减去 persisted-in-snowflake-offset。差距扩大表明连接器处理进度落后。

  • 背压backpressure-rewind-count 随时间推移而增加。

  • 通道恢复channel-recovery-count 增加,这可能表明存在连接或身份验证问题。

  • 写入耗时put-duration 的平均值或 99 分位值超过了您可接受的阈值。

MDC 日志记录

启用 MDC(映射的诊断上下文)日志记录,可在日志消息前附加连接器上下文信息。在运行多个连接器实例且需要关联日志条目时,此功能非常有用:

enable.mdc.logging=true

预估引入延迟

RECORD_METADATA 中的 SnowflakeConnectorPushTime 字段记录连接器缓冲记录以供引入时的时间戳。您可以通过将此值与记录在 Snowflake 中变为可查询状态的时间进行对比,来估算端到端的引入延迟。

例如:

SELECT
  RECORD_METADATA:topic::STRING AS topic,
  RECORD_METADATA:partition::NUMBER AS partition,
  RECORD_METADATA:offset::NUMBER AS offset,
  TIMESTAMPDIFF('millisecond',
    TO_TIMESTAMP(RECORD_METADATA:SnowflakeConnectorPushTime::BIGINT, 3),
    CONVERT_TIMEZONE('UTC', CURRENT_TIMESTAMP())
  ) AS latency_ms
FROM my_table
ORDER BY latency_ms DESC
LIMIT 10;

有关监控 Snowpipe Streaming 引入的更多信息,请参阅 Snowpipe Streaming key concepts