Kafka Connector 故障排除

本节介绍如何解决使用 Kafka Connector 引入数据时遇到的问题。

本主题内容:

错误通知

为 Snowpipe 配置错误通知。当 Snowpipe 在加载过程中遇到文件错误时,该功能会将通知推送到配置的云消息传递服务,从而启用数据文件分析。有关更多信息,请参阅 Snowpipe 错误通知

常规故障排除步骤

完成以下步骤,解决使用 Kafka Connector 遇到的加载问题。

第 1 步:查看表的 COPY 历史记录

查询目标表的加载活动历史记录。有关信息,请参阅 COPY_HISTORY 视图。如果 COPY_HISTORY 输出不包含一组预期文件,请查询更早的时间段。如果文件是早期文件的副本,则在尝试加载原始文件时,加载历史记录可能已记录该活动。STATUS 列指明一组特定文件是已加载、部分加载还是加载失败。FIRST_ERROR_MESSAGE 列提供尝试部分加载或加载失败的原因。

Kafka Connector 将其无法加载的文件移动到与目标表关联的暂存区。引用表暂存区的语法为 @[namespace.]%table_name

使用 LIST 列出位于表暂存区的所有文件。

例如:

LIST @mydb.public.%mytable;
Copy

文件名采用以下格式之一。表中描述了生成每种格式的条件:

文件类型

描述

原始字节

这些文件符合以下模式:

<connector_name>/<table_name>/<partition>/offset_(<key>/<value>_)<timestamp>.gz

对于这些文件,无法将 Kafka 记录从原始字节转换为源文件格式(Avro、JSON 或 Protobuf)。

此问题的常见原因是网络故障导致字符从记录中删除。Kafka Connector 无法再解析原始字节,从而导致记录损坏。

源文件格式(Avro、JSON 或 Protobuf)

这些文件符合以下模式:

<connector_name>/<table_name>/<partition>/<start_offset>_<end_offset>_<timestamp>.<file_type>.gz

对于这些文件,在 Kafka Connector 将原始字节转换回源文件格式后,Snowpipe 遇到了错误,无法加载文件。

以下各节提供了解决每种文件类型问题的说明:

原始字节

文件名 <connector_name>/<table_name>/<partition>/offset_(<key>/<value>_)<timestamp>.gz 包含未从原始字节转换为源文件格式的记录的确切偏移。要解决问题,请将该记录作为新记录重新发送到 Kafka Connector。

源文件格式(Avro、JSON 或 Protobuf)

如果 Snowpipe 无法从为 Kafka 主题创建的内部暂存区中的文件加载数据,Kafka Connector 会将文件以源文件格式移动到目标表的暂存区。

如果一组文件存在多个问题,则 COPY_HISTORY 输出中的 FIRST_ERROR_MESSAGE 列仅指明遇到的第一个错误。要查看文件中的所有错误,必须从表暂存区检索文件,将其上传到已命名暂存区,然后执行将 VALIDATION_MODE 复制选项设置为 RETURN_ALL_ERRORSCOPY INTO <table> 语句。VALIDATION_MODE 复制选项指示 COPY 语句验证要加载的数据并根据指定的验证选项返回结果。指定此复制选项时,不会加载任何数据。在语句中,引用您尝试使用 Kafka Connector 加载的一组文件。

当数据文件的任何问题得到解决后,您可以使用一个或多个 COPY 语句手动加载数据。

以下示例引用了位于 mydb.public 数据库和架构中 mytable 表的表暂存区的数据文件。

要在表暂存区验证数据文件并解决错误,请执行以下操作:

  1. 使用 LIST 列出位于表暂存区的所有文件。

    例如:

    LIST @mydb.public.%mytable;
    
    Copy

    本节中的示例假定 JSON 是数据文件的源格式。

  2. 使用 GET 将 Kafka Connector 创建的文件下载至本地计算机。

    例如,将文件下载到本地计算机中名为 data 的目录中:

    Linux 或 macOS:
    GET @mydb.public.%mytable file:///data/;
    
    Copy
    Microsoft Windows:
    GET @mydb.public.%mytable file://C:\data\;
    
    Copy
  3. 使用 CREATE STAGE 创建指定内部暂存区,用于存储与源 Kafka 文件格式相同的数据文件。

    例如,创建一个名为 kafka_json 的内部暂存区来存储 JSON 文件:

    CREATE STAGE kafka_json FILE_FORMAT = (TYPE = JSON);
    
    Copy
  4. 使用 PUT 上传您从表暂存区下载的文件。

    例如,将下载的文件上传到本地计算机的 data 目录中:

    Linux 或 macOS:
    PUT file:///data/ @mydb.public.kafka_json;
    
    Copy
    Microsoft Windows:
    PUT file://C:\data\ @mydb.public.kafka_json;
    
    Copy
  5. 创建一个包含两列变体的临时表进行测试。该表仅用于验证暂存数据文件。表中未加载任何数据。当前用户会话结束时,该表将自动删除:

    CREATE TEMPORARY TABLE t1 (col1 variant);
    
    Copy
  6. 执行 COPY INTO *table* ...VALIDATION_MODE = 'RETURN_ALL_ERRORS' 语句检索数据文件中遇到的所有错误。该语句会验证指定暂存区中的文件。不会将任何数据加载到表中:

    COPY INTO mydb.public.t1
      FROM @mydb.public.kafka_json
      FILE_FORMAT = (TYPE = JSON)
      VALIDATION_MODE = 'RETURN_ALL_ERRORS';
    
    Copy
  7. 修复本地计算机上数据文件中所有报告的错误。

  8. 使用 PUT 将已修复文件上传到表暂存区或指定内部暂存区。

    以下示例将文件上传到表暂存区,覆盖现有文件:

    Linux 或 macOS:
    PUT file:///tmp/myfile.csv @mydb.public.%mytable OVERWRITE = TRUE;
    
    Copy
    Windows:
    PUT file://C:\temp\myfile.csv @mydb.public.%mytable OVERWRITE = TRUE;
    
    Copy
  9. 使用 不带 VALIDATION_MODE 选项的 COPY INTO table 将数据加载到目标表中。

    成功加载数据后,您可以选择使用 PURGE = TRUE 复制选项从暂存区中删除数据文件,或者使用 REMOVE 从表暂存区手动删除文件:

    COPY INTO mydb.public.mytable(RECORD_METADATA, RECORD_CONTENT)
      FROM (SELECT $1:meta, $1:content FROM @mydb.public.%mytable)
      FILE_FORMAT = (TYPE = 'JSON')
      PURGE = TRUE;
    
    Copy

第 2 步:分析 Kafka Connector 日志文件

如果 COPY_HISTORY 视图没有数据加载记录,则分析 Kafka Connector 的日志文件。连接器将事件写入日志文件。请注意,Snowflake Kafka Connector 与所有 Kafka Connector 插件共享相同的日志文件。此日志文件的名称和位置应位于您的 Kafka Connect 配置文件中。有关更多信息,请参阅为您的 Apache Kafka 软件提供的文档。

在 Kafka Connector 日志文件中搜索与 Snowflake 相关的错误消息。大多数消息都将使用字符串 ERROR 并包含文件名 com.snowflake.kafka.connector...,以便于查找这些消息。

您可能遇到的错误包括:

配置错误:

该错误的可能原因:

  • 连接器没有订阅该主题的适当信息。

  • 连接器没有写入 Snowflake 表的适当信息(例如,用于身份验证的密钥对可能出错)。

请注意,Kafka Connector 会验证其参数。对于每个不兼容的配置参数,连接器都会抛出一个错误。错误消息已写入 Kafka Connect 集群的日志文件中。如果您怀疑存在配置问题,请检查该日志文件中的错误。

读取错误:

连接器可能因以下原因无法从 Kafka 读取数据:

  • Kafka 或 Kafka Connect 可能无法运行。

  • 该消息可能尚未发送。

  • 该消息可能已被删除(过期)。

写入错误(暂存区):

该错误的可能原因:

  • 暂存区权限不足。

  • 暂存区空间不足。

  • 暂存区已删除。

  • 其他用户或进程向暂存区写入意外文件。

写入错误(表):

该错误的可能原因:

  • 表权限不足。

第 3 步:检查 Kafka Connect

如果 Kafka 连接日志文件中未报告错误,请检查 Kafka Connect。有关故障排除说明,请参阅 Apache Kafka 软件供应商提供的文档。

解决特定问题

主题分区和偏移相同的重复行

使用 Kafka Connector 1.4 版本(或更高版本)加载数据时,目标表中主题分区和偏移相同的重复行可能表明加载操作超过 300000 毫秒(300 秒)的默认执行超时值。要验证原因,请检查 Kafka Connect 日志文件中是否存在以下错误:

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.

This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1061)
Copy

要解决该错误,请在 Kafka 配置文件(例如 <kafka_dir>/config/connect-distributed.properties)中更改以下 任一 属性:

consumer.max.poll.interval.ms

将执行超时值延长至 900000 (900 秒)。

consumer.max.poll.records

将每次操作加载的记录数减少至 50

Failure in Streaming Channel Offset Migration Response 错误代码:5023

连接器升级到 v2.1.0 版本(或更高版本)时,Snowpipe Streaming Channel 名称格式有所更改。因此,检测先前提交的偏移量信息的逻辑将找不到任何先前提交的偏移量信息。这将表现为以下异常:

com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException: [SF_KAFKA_CONNECTOR] Exception: Failure in Streaming Channel Offset Migration Response Error Code: 5023

Detail: Streaming Channel Offset Migration from Source to Destination Channel has no/invalid response, please contact Snowflake Support

Message: Snowflake experienced a transient exception, please retry the migration request.
Copy

要解决此错误,请在 Kafka 配置文件(例如 <kafka_dir>/config/connect-distributed.properties)中添加以下配置属性:

enable.streaming.channel.offset.migration

通过将其设置为 false 禁用自动偏移迁移。

配置连接器以支持多个主题

我们在单个 Kafka Cconnector 实例支持大量主题(每个主题都有多个分区)时遇到了问题。连接器的配置虽然看似有效,却导致无休止的重新平衡循环,无法将任何数据引入 Snowflake。这个问题是 Snowpipe Streaming 引入模式(snowflake.ingestion.method=SNOWPIPE_STREAMING)所特有的,但准则也适用于 Snowpipe 引入模式(snowflake.ingestion.method=SNOWPIPE)。该问题通过反复记录此日志消息在日志文件中表现出来:

[Worker-xyz] [timestamp] INFO [my-connector|task-id] [SF_INGEST] Channel is marked as closed

当您配置连接器以通过 regex 引入主题时通常会发生这种情况。我们建议对 Kafka 配置文件应用以下选项(例如 :file : <kafka_dir>/ config/connect-distributed.properties):

consumer.override.partition.assignment.strategy

将任务的分区分配策略配置为 org.apache.kafka.clients.consumer.CooperativeStickyAssignor – 这可以确保引入通道均匀分配到可用任务,从而降低重新平衡的风险。

tasks.max

每个连接器的实例化任务数量不应超过可用的 CPU 数量 – 底层驱动程序基于可用 CPU 实现节流机制。并发请求的增加会增加系统的内存压力,但也会延长插入处理时间,直接导致连接器新号丢失。

就连接器的超时值而言,有一组配置属性对其有直接影响:

consumer.override.heartbeat.interval.ms

定义监控线程(每个任务都有一个关联线程)向 Kafka 发送信号的频率。默认值为 3000 ms,但如果系统负载较高 – 您可以尝试将其增加到 5000 ms。

consumer.override.session.timeout.ms

定义代理在假定使用者处于无效状态并尝试重新平衡之前将等待多长时间。此设置通常应是信号间隔时间的 3 倍,因此,如果您将信号间隔时间配置为 5000 ms,请将此设置为 15000 ms。

consumer.override.max.poll.interval.ms

定义从底层 Kafka 调用 poll() 的最大时间间隔。轮询间隔的时间基本上映射到处理批量数据的连接器(包括上传到 Snowflake 并提交)。在有多个任务处理数据的场景中,底层 Snowflake Connection 可能会开始限制请求,从而导致处理时间更长。根据您的场景,可以将此值提高至长达 20 分钟 (1200000 ms) – 特别是当您通过用于引入的大量初始记录计数启动连接器时。

consumer.override.rebalance.timeout.ms

当发生重新平衡时,如果每个任务包含大量通道,每个通道都有很多底层逻辑需要确定从哪里恢复处理。此代码是顺序执行的,因此每个任务的通道越多,初始设置所需的时间就越长。将此属性配置为足够大的值,以便每个通道都能完成其初始化。3 分钟(180000 ms)是一个不错的起点。

还需要注意连接器的可用堆内存这在多个连接器同时运行或单个连接器从多个主题引入数据的场景中尤为重要。每个主题的分区映射到单个通道,因此需要内存。

请确保通过 Xmx 设置调整 Kafka Connect 进程内存设置。其中一种方法是定义 KAFKA_OPTS 环境变量并对其进行相应设置(即 KAFKA_OPTS=-Xmx4G)。

文件清理器意外清除文件

如果将 Kafka Connector 与 SNOWPIPE 一起使用,在从多个主题将数据引入到单个表中时您可能会遇到问题。如果您的配置没有 snowflake.topic2table.map 条目,或者主题与表之间存在 1:1 的映射,则此问题不适用。

Kafka Connector 正在生成包含上载到暂存区的记录的文件。这些文件按照以下模式格式化:snowflake_kafka_connector_<connector-name>_stage_<table-name>/<connector-name>/<table-name>/<partition-id>/<low-watermark>_<high-watermark>_<timestamp>.json.gz。问题位于 <partition-id> 中:如果多个主题将数据加载到单个表中,则 partition-id 值很可能出现重复。在正常连接器操作中,这不是问题。然而,如果连接器重新启动或重新平衡,清理进程可能会误地将已加载到暂存区(但尚未引入)的文件与错误的分区关联起来,并决定删除它们,这可能会导致数据丢失事件。

2.5.0 版本的连接器通过在 partition-id 中包含源主题的哈希码来解决此问题,确保文件名唯一且精确匹配单个主题的分区。此修复程序默认启用 – snowflake.snowpipe.stageFileNameExtensionEnabled – 并且仅影响目标表在 snowflake.topic2table.map 中被多次列出的配置。

如果您的配置受到此功能的影响,您可能会将陈旧的文件上传到暂存区。当连接器启动时,将会检查暂存区是否包含此类文件。您需要查找以 NOTE: For table 开头的日志条目,然后是检测到的文件列表。

您也可以手动检查暂存区中是否有受影响的文件:

  1. 查找受影响的暂存区:

    show stages like 'snowflake_kafka_connector%<your table name>';
    
    Copy
  2. 列出暂存区文件:

    list @<your stage name> pattern = '.+/<your-table-name>/[0-9]{1,4}/[0-9]+_[0-9]+_[0-9]+\.json\.gz$';
    
    Copy

上面的命令列出了所有与表的暂存区匹配并且分区 IDs 处于 0-9999 范围内的文件。这些文件将不再被引入,因此您可以下载或将其删除。

报告问题

联系 Snowflake 支持部门 寻求帮助时,请提供以下文件:

  • 您的 Kafka Connector 的配置文件。

    重要

    在向 Snowflake 提供文件之前,请删除私钥。

  • Kafka Connector 日志的副本。确保文件 含机密或敏感信息。

  • JDBC 日志文件。

    要生成日志文件,请在运行 Kafka Connector 之前在 Kafka Connect 集群上设置 JDBC_TRACE = true 环境变量。

    有关 JDBC 日志文件的更多信息,请参阅 Snowflake 社区中的 这篇文章 (https://community.snowflake.com/s/article/How-to-generate-log-file-on-Snowflake-connectors)。

  • 连接日志文件。

    要生成日志文件,请编辑该 etc/kafka/connect-log4j.properties 文件。按如下方式设置该 log4j.appender.stdout.layout.ConversionPattern 属性:

    log4j.appender.stdout.layout.ConversionPattern=[%d] %p %X{connector.context}%m (%c:%L)%n

    连接器上下文在 Kafka 2.3 版本及更高版本中可用。

    有关更多信息,请参阅 Confluent 网站上的 日志记录改进 (https://www.confluent.io/blog/kafka-connect-improvements-in-apache-kafka-2-3/) 信息。

语言: 中文