使用 Java Management Extension (JMX) 监控 Kafka Connector

本主题介绍如何使用 Java Management Extensions (JMX) 监控 Snowflake Connector for Kafka。Kafka Connect 提供预配置的 JMX 指标,这些指标提供了有关 Kafka Connector 的信息。Snowflake Connector for Kafka 提供了多个 Managed Beans (MBeans),可用于引入有关 Kafka 环境的指标。您可以将此信息加载到第三方工具(包括 Prometheus 和 Grafana)中。

默认在连接器中启用 JMX 功能。要禁用 JMX,请将 jmx 属性设置为 false

重要

Snowpipe 支持 Kafka Connector 版本 1.6.0 及更高版本。

Snowpipe Streaming 支持 Kafka Connector 版本 2.1.2 及更高版本。

本主题内容:

在 Kafka Connector 中配置 JMX

默认在 Snowflake Kafka Connector 中启用 JMX。要在 Kafka 中启用 JMX,请执行以下操作:

  1. 启用 JMX 以连接到 Kafka 安装:

    • 要使 JMX 连接到远程服务器上运行的 Kafka 安装,请在 Kafka Connect 启动脚本中设置 KAFKA_JMX_OPTS 环境变量:

      export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true
          -Dcom.sun.management.jmxremote.authenticate=false
          -Dcom.sun.management.jmxremote.ssl=false
          -Djava.rmi.server.hostname=<ip_address>
          -Dcom.sun.management.jmxremote.port=<jmx_port>"
      
      Copy

      其中:

      • ip_address:指定 Kafka Connect安装的 IP 地址。

      • jmx_port:指定 Kafka Connect 侦听 JMX 连接的 JMX 端口。

    • 要使 JMX 连接到相同服务器上运行的 Kafka,请在 Kafka 启动脚本中设置 JMX_PORT 环境变量:

      export JMX_PORT=<port_number>
      
      Copy

      其中, port_number 是 Kafka 安装的 JMX 端口。

  2. 重新启动 Kafka Connector。

使用 Snowflake Kafka Connector Managed Beans (MBeans)

JMX 使用 MBeans 表示 Kafka 中可以监控的对象(例如线程数、CPU 负载等)。Snowflake Kafka Connector 提供 MBeans,用于访问连接器管理的对象。可以使用这些 MBeans 创建监控仪表板。

Kafka Connector MBean 对象名称的一般格式如下所示:

snowflake.kafka.connector:connector=connector_name,pipe=pipe_name,category=category_name,name=metric_name

其中:

  • connector=connector_name 指定 Kafka 配置文件中定义的连接器的名称。

  • pipe=pipe_name 指定用于引入数据的 Snowpipe 对象。Kafka Connector 为每个分区定义 Snowpipe 对象。

  • category=category_name 指定 MBean 的类别。每个类别都包含一组指标。

  • name=metric_name 指定指标的名称。

以下部分列出了 Snowflake Kafka Connector 提供的类别和指标的名称。

类别:file-counts

此指标类别仅适用于基于 Snowpipe 的 Kafka Connector,不适用于 Snowpipe Streaming。

指标名称

数据类型

描述

file-count-on-stage

长整型

当前处于内部暂存区的文件数量。清除文件的过程开始后,该值会递减。此属性提供当前内部暂存区上有多少文件的估计。

file-count-on-ingestion

长整型

通过调用 insertFiles REST API 确定 Snowpipe 中的文件数量。目前通过单个 REST API 请求发送的文件有 5k 的限制。文件数量和 REST API 调用数量之间没有一对一的关系。对 insertFiles REST API 的调用次数可以大于此值。如果没有更多要引入的文件,则此属性的值为 0

file-count-table-stage-ingestion-fail

长整型

表暂存区上引入失败的文件数。

file-count-table-stage-broken-record

长整型

表暂存区上存在的与损坏的偏移量相对应的文件数。

file-count-purged

长整型

确定引入状态后从内部暂存区清除的文件数量。

类别:offsets

offsetPersistedInSnowflakelatestConsumerOffset 指标适用于基于 Snowpipe Streaming 的 Kafka Connector。此类别的其余部分仅适用于基于 Snowpipe 的 Kafka Connector。

指标名称

数据类型

描述

processed-offset

长整型

引用发送到内存缓冲区的最新记录的偏移量。

flushed-offset

长整型

指达到缓冲区阈值后在内部暂存区刷新的记录的偏移量。缓冲区可能会根据时间、记录数或大小达到其阈值。

committed-offset

长整型

一个偏移量,指的是调用了预提交 API 并调用了 Snowpipe insertFiles REST API 的记录。

purged-offset

长整型

指的是从内部暂存区清除的记录的偏移量。该数字是从内部暂存区清除的最近偏移量的最大值。

offsetPersistedInSnowflake

长整型

此偏移涉及到在 Snowflake 中含有最新持久数据的记录。此偏移由 insertRows API 调用决定。

latestConsumerOffset

长整型

此偏移涉及到发送至内存中缓冲区的最新记录。它仅用于在通道偏移令牌为 NULL 时重新发送偏移。

类别:buffer

此指标类别仅适用于基于 Snowpipe 的 Kafka Connector。

指标名称

数据类型

描述

buffer-size-bytes

长整型

根据缓冲区阈值,返回将缓冲区刷新到内部暂存区之前的缓冲区大小(以字节为单位)。该值可能与文件大小不同,因为在加载到内部暂存区时会压缩文件。

buffer-record-count

长整型

根据缓冲区阈值,返回将缓冲区刷新到内部暂存区之前已缓冲到内存中的 Kafka 记录数。

类别:latencies

此指标类别仅适用于基于 Snowpipe 的 Kafka Connector。

指标名称

数据类型

描述

kafka-lag

长整型

将记录放入 Kafka 的时间与将记录提取到Kafka Connect 的时间之间的差异(以秒为单位)。请注意,如果未在记录内设置该值,则该值可以为 null。

commit-lag

长整型

文件上传到内部暂存区的时间与调用 insertFiles REST API 的时间之间的差异(以秒为单位)。

ingestion-lag

长整型

文件上传到内部暂存区的时间与通过 insertReportloadHistoryScan API 报告文件引入状态的时间之间的差异(以秒为单位)。

语言: 中文