Kafka Connector 故障排除

本节介绍如何解决使用 Kafka Connector 引入数据时遇到的问题。

本主题内容:

错误通知

为 Snowpipe 配置错误通知。当 Snowpipe 在加载过程中遇到文件错误时,该功能会将通知推送到配置的云消息传递服务,从而启用数据文件分析。有关更多信息,请参阅 Snowpipe 错误通知

常规故障排除步骤

完成以下步骤,解决使用 Kafka Connector 遇到的加载问题。

第 1 步:查看表的 COPY 历史记录

查询目标表的加载活动历史记录。有关信息,请参阅 COPY_HISTORY 视图。如果 COPY_HISTORY 输出不包含一组预期文件,请查询更早的时间段。如果文件是早期文件的副本,则在尝试加载原始文件时,加载历史记录可能已记录该活动。STATUS 列指明一组特定文件是已加载、部分加载还是加载失败。FIRST_ERROR_MESSAGE 列提供尝试部分加载或加载失败的原因。

Kafka Connector 将其无法加载的文件移动到与目标表关联的暂存区。引用表暂存区的语法为 @[namespace.]%table_name

使用 LIST 列出位于表暂存区的所有文件。

例如:

LIST @mydb.public.%mytable;
Copy

文件名采用以下格式之一。表中描述了生成每种格式的条件:

文件类型

描述

原始字节

这些文件符合以下模式:

<connector_name>/<table_name>/<partition>/offset_(<key>/<value>_)<timestamp>.gz

对于这些文件,无法将 Kafka 记录从原始字节转换为源文件格式(Avro、JSON 或 Protobuf)。

此问题的常见原因是网络故障导致字符从记录中删除。Kafka Connector 无法再解析原始字节,从而导致记录损坏。

源文件格式(Avro、JSON 或 Protobuf)

这些文件符合以下模式:

<connector_name>/<table_name>/<partition>/<start_offset>_<end_offset>_<timestamp>.<file_type>.gz

对于这些文件,在 Kafka Connector 将原始字节转换回源文件格式后,Snowpipe 遇到了错误,无法加载文件。

以下各节提供了解决每种文件类型问题的说明:

原始字节

文件名 <connector_name>/<table_name>/<partition>/offset_(<key>/<value>_)<timestamp>.gz 包含未从原始字节转换为源文件格式的记录的确切偏移。要解决问题,请将该记录作为新记录重新发送到 Kafka Connector。

源文件格式(Avro、JSON 或 Protobuf)

如果 Snowpipe 无法从为 Kafka 主题创建的内部暂存区中的文件加载数据,Kafka Connector 会将文件以源文件格式移动到目标表的暂存区。

如果一组文件存在多个问题,则 COPY_HISTORY 输出中的 FIRST_ERROR_MESSAGE 列仅指明遇到的第一个错误。要查看文件中的所有错误,必须从表暂存区检索文件,将其上传到已命名暂存区,然后执行将 VALIDATION_MODE 复制选项设置为 RETURN_ALL_ERRORSCOPY INTO <table> 语句。VALIDATION_MODE 复制选项指示 COPY 语句验证要加载的数据并根据指定的验证选项返回结果。指定此复制选项时,不会加载任何数据。在语句中,引用您尝试使用 Kafka Connector 加载的一组文件。

当数据文件的任何问题得到解决后,您可以使用一个或多个 COPY 语句手动加载数据。

以下示例引用了位于 mydb.public 数据库和架构中 mytable 表的表暂存区的数据文件。

要在表暂存区验证数据文件并解决错误,请执行以下操作:

  1. 使用 LIST 列出位于表暂存区的所有文件。

    例如:

    LIST @mydb.public.%mytable;
    
    Copy

    本节中的示例假定 JSON 是数据文件的源格式。

  2. 使用 GET 将 Kafka Connector 创建的文件下载至本地计算机。

    例如,将文件下载到本地计算机中名为 data 的目录中:

    Linux 或 macOS:
    GET @mydb.public.%mytable file:///data/;
    
    Copy
    Microsoft Windows:
    GET @mydb.public.%mytable file://C:\data\;
    
    Copy
  3. 使用 CREATE STAGE 创建指定内部暂存区,用于存储与源 Kafka 文件格式相同的数据文件。

    例如,创建一个名为 kafka_json 的内部暂存区来存储 JSON 文件:

    CREATE STAGE kafka_json FILE_FORMAT = (TYPE = JSON);
    
    Copy
  4. 使用 PUT 上传您从表暂存区下载的文件。

    例如,将下载的文件上传到本地计算机的 data 目录中:

    Linux 或 macOS:
    PUT file:///data/ @mydb.public.kafka_json;
    
    Copy
    Microsoft Windows:
    PUT file://C:\data\ @mydb.public.kafka_json;
    
    Copy
  5. 创建一个包含两列变体的临时表进行测试。该表仅用于验证暂存数据文件。表中未加载任何数据。当前用户会话结束时,该表将自动删除:

    CREATE TEMPORARY TABLE t1 (col1 variant);
    
    Copy
  6. 执行 COPY INTO *table* ...VALIDATION_MODE = 'RETURN_ALL_ERRORS' 语句检索数据文件中遇到的所有错误。该语句会验证指定暂存区中的文件。不会将任何数据加载到表中:

    COPY INTO mydb.public.t1
      FROM @mydb.public.kafka_json
      FILE_FORMAT = (TYPE = JSON)
      VALIDATION_MODE = 'RETURN_ALL_ERRORS';
    
    Copy
  7. 修复本地计算机上数据文件中所有报告的错误。

  8. 使用 PUT 将已修复文件上传到表暂存区或指定内部暂存区。

    以下示例将文件上传到表暂存区,覆盖现有文件:

    Linux 或 macOS:
    PUT file:///tmp/myfile.csv @mydb.public.%mytable OVERWRITE = TRUE;
    
    Copy
    Windows:
    PUT file://C:\temp\myfile.csv @mydb.public.%mytable OVERWRITE = TRUE;
    
    Copy
  9. 使用 不带 VALIDATION_MODE 选项的 COPY INTO table 将数据加载到目标表中。

    成功加载数据后,您可以选择使用 PURGE = TRUE 复制选项从暂存区中删除数据文件,或者使用 REMOVE 从表暂存区手动删除文件:

    COPY INTO mydb.public.mytable(RECORD_METADATA, RECORD_CONTENT)
      FROM (SELECT $1:meta, $1:content FROM @mydb.public.%mytable)
      FILE_FORMAT = (TYPE = 'JSON')
      PURGE = TRUE;
    
    Copy

第 2 步:分析 Kafka Connector 日志文件

如果 COPY_HISTORY 视图没有数据加载记录,则分析 Kafka Connector 的日志文件。连接器将事件写入日志文件。请注意,Snowflake Kafka Connector 与所有 Kafka Connector 插件共享相同的日志文件。此日志文件的名称和位置应位于您的 Kafka Connect 配置文件中。有关更多信息,请参阅为您的 Apache Kafka 软件提供的文档。

在 Kafka Connector 日志文件中搜索与 Snowflake 相关的错误消息。大多数消息都将使用字符串 ERROR 并包含文件名 com.snowflake.kafka.connector...,以便于查找这些消息。

您可能遇到的错误包括:

配置错误:

该错误的可能原因:

  • 连接器没有订阅该主题的适当信息。

  • 连接器没有写入 Snowflake 表的适当信息(例如,用于身份验证的密钥对可能出错)。

请注意,Kafka Connector 会验证其参数。对于每个不兼容的配置参数,连接器都会抛出一个错误。错误消息已写入 Kafka Connect 集群的日志文件中。如果您怀疑存在配置问题,请检查该日志文件中的错误。

读取错误:

连接器可能因以下原因无法从 Kafka 读取数据:

  • Kafka 或 Kafka Connect 可能无法运行。

  • 该消息可能尚未发送。

  • 该消息可能已被删除(过期)。

写入错误(暂存区):

该错误的可能原因:

  • 暂存区权限不足。

  • 暂存区空间不足。

  • 暂存区已删除。

  • 其他用户或进程向暂存区写入意外文件。

写入错误(表):

该错误的可能原因:

  • 表权限不足。

第 3 步:检查 Kafka Connect

如果 Kafka 连接日志文件中未报告错误,请检查 Kafka Connect。有关故障排除说明,请参阅 Apache Kafka 软件供应商提供的文档。

解决特定问题

主题分区和偏移相同的重复行

使用 Kafka Connector 1.4 版本(或更高版本)加载数据时,目标表中主题分区和偏移相同的重复行可能表明加载操作超过 300000 毫秒(300 秒)的默认执行超时值。要验证原因,请检查 Kafka Connect 日志文件中是否存在以下错误:

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.

This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1061)
Copy

要解决该错误,请在 Kafka 配置文件(例如 <kafka_dir>/config/connect-distributed.properties)中更改以下 任一 属性:

consumer.max.poll.interval.ms

将执行超时值延长至 900000 (900 秒)。

consumer.max.poll.records

将每次操作加载的记录数减少至 50

报告问题

联系 Snowflake 支持部门 (https://community.snowflake.com/s/article/How-To-Submit-a-Support-Case-in-Snowflake-Lodge) 寻求帮助时,请提供以下文件:

  • 您的 Kafka Connector 的配置文件。

    重要

    在向 Snowflake 提供文件之前,请删除私钥。

  • Kafka Connector 日志的副本。确保文件 含机密或敏感信息。

  • JDBC 日志文件。

    要生成日志文件,请在运行 Kafka Connector 之前在 Kafka Connect 集群上设置 JDBC_TRACE = true 环境变量。

    有关 JDBC 日志文件的更多信息,请参阅 Snowflake 社区中的 这篇文章 (https://community.snowflake.com/s/article/How-to-generate-log-file-on-Snowflake-connectors)。

  • 连接日志文件。

    要生成日志文件,请编辑该 etc/kafka/connect-log4j.properties 文件。按如下方式设置该 log4j.appender.stdout.layout.ConversionPattern 属性:

    log4j.appender.stdout.layout.ConversionPattern=[%d] %p %X{connector.context}%m (%c:%L)%n

    连接器上下文在 Kafka 2.3 版本及更高版本中可用。

    有关更多信息,请参阅 Confluent 网站上的 日志记录改进 (https://www.confluent.io/blog/kafka-connect-improvements-in-apache-kafka-2-3/) 信息。

语言: 中文