Kafka Connector 故障排除¶
本节介绍如何解决使用 Kafka Connector 引入数据时遇到的问题。
本主题内容:
错误通知¶
为 Snowpipe 配置错误通知。当 Snowpipe 在加载过程中遇到文件错误时,该功能会将通知推送到配置的云消息传递服务,从而启用数据文件分析。有关更多信息,请参阅 Snowpipe 错误通知。
常规故障排除步骤¶
完成以下步骤,解决使用 Kafka Connector 遇到的加载问题。
第 1 步:查看表的 COPY 历史记录¶
查询目标表的加载活动历史记录。有关信息,请参阅 COPY_HISTORY 视图。如果 COPY_HISTORY 输出不包含一组预期文件,请查询更早的时间段。如果文件是早期文件的副本,则在尝试加载原始文件时,加载历史记录可能已记录该活动。STATUS
列指明一组特定文件是已加载、部分加载还是加载失败。FIRST_ERROR_MESSAGE
列提供尝试部分加载或加载失败的原因。
Kafka Connector 将其无法加载的文件移动到与目标表关联的暂存区。引用表暂存区的语法为 @[namespace.]%table_name
。
使用 LIST 列出位于表暂存区的所有文件。
例如:
LIST @mydb.public.%mytable;
文件名采用以下格式之一。表中描述了生成每种格式的条件:
文件类型 |
描述 |
---|---|
原始字节 |
这些文件符合以下模式:
对于这些文件,无法将 Kafka 记录从原始字节转换为源文件格式(Avro、JSON 或 Protobuf)。 此问题的常见原因是网络故障导致字符从记录中删除。Kafka Connector 无法再解析原始字节,从而导致记录损坏。 |
源文件格式(Avro、JSON 或 Protobuf) |
这些文件符合以下模式:
对于这些文件,在 Kafka Connector 将原始字节转换回源文件格式后,Snowpipe 遇到了错误,无法加载文件。 |
以下各节提供了解决每种文件类型问题的说明:
原始字节¶
文件名 <connector_name>/<table_name>/<partition>/offset_(<key>/<value>_)<timestamp>.gz
包含未从原始字节转换为源文件格式的记录的确切偏移。要解决问题,请将该记录作为新记录重新发送到 Kafka Connector。
源文件格式(Avro、JSON 或 Protobuf)¶
如果 Snowpipe 无法从为 Kafka 主题创建的内部暂存区中的文件加载数据,Kafka Connector 会将文件以源文件格式移动到目标表的暂存区。
如果一组文件存在多个问题,则 COPY_HISTORY 输出中的 FIRST_ERROR_MESSAGE
列仅指明遇到的第一个错误。要查看文件中的所有错误,必须从表暂存区检索文件,将其上传到已命名暂存区,然后执行将 VALIDATION_MODE 复制选项设置为 RETURN_ALL_ERRORS
的 COPY INTO <table> 语句。VALIDATION_MODE 复制选项指示 COPY 语句验证要加载的数据并根据指定的验证选项返回结果。指定此复制选项时,不会加载任何数据。在语句中,引用您尝试使用 Kafka Connector 加载的一组文件。
当数据文件的任何问题得到解决后,您可以使用一个或多个 COPY 语句手动加载数据。
以下示例引用了位于 mydb.public
数据库和架构中 mytable
表的表暂存区的数据文件。
要在表暂存区验证数据文件并解决错误,请执行以下操作:
使用 LIST 列出位于表暂存区的所有文件。
例如:
LIST @mydb.public.%mytable;
本节中的示例假定 JSON 是数据文件的源格式。
使用 GET 将 Kafka Connector 创建的文件下载至本地计算机。
例如,将文件下载到本地计算机中名为
data
的目录中:- Linux 或 macOS:
GET @mydb.public.%mytable file:///data/;
- Microsoft Windows:
GET @mydb.public.%mytable file://C:\data\;
使用 CREATE STAGE 创建指定内部暂存区,用于存储与源 Kafka 文件格式相同的数据文件。
例如,创建一个名为
kafka_json
的内部暂存区来存储 JSON 文件:CREATE STAGE kafka_json FILE_FORMAT = (TYPE = JSON);
使用 PUT 上传您从表暂存区下载的文件。
例如,将下载的文件上传到本地计算机的
data
目录中:- Linux 或 macOS:
PUT file:///data/ @mydb.public.kafka_json;
- Microsoft Windows:
PUT file://C:\data\ @mydb.public.kafka_json;
创建一个包含两列变体的临时表进行测试。该表仅用于验证暂存数据文件。表中未加载任何数据。当前用户会话结束时,该表将自动删除:
CREATE TEMPORARY TABLE t1 (col1 variant);
执行 COPY INTO *table* ...VALIDATION_MODE = 'RETURN_ALL_ERRORS' 语句检索数据文件中遇到的所有错误。该语句会验证指定暂存区中的文件。不会将任何数据加载到表中:
COPY INTO mydb.public.t1 FROM @mydb.public.kafka_json FILE_FORMAT = (TYPE = JSON) VALIDATION_MODE = 'RETURN_ALL_ERRORS';
修复本地计算机上数据文件中所有报告的错误。
使用 PUT 将已修复文件上传到表暂存区或指定内部暂存区。
以下示例将文件上传到表暂存区,覆盖现有文件:
- Linux 或 macOS:
PUT file:///tmp/myfile.csv @mydb.public.%mytable OVERWRITE = TRUE;
- Windows:
PUT file://C:\temp\myfile.csv @mydb.public.%mytable OVERWRITE = TRUE;
使用 不带 VALIDATION_MODE 选项的 COPY INTO table 将数据加载到目标表中。
成功加载数据后,您可以选择使用 PURGE = TRUE 复制选项从暂存区中删除数据文件,或者使用 REMOVE 从表暂存区手动删除文件:
COPY INTO mydb.public.mytable(RECORD_METADATA, RECORD_CONTENT) FROM (SELECT $1:meta, $1:content FROM @mydb.public.%mytable) FILE_FORMAT = (TYPE = 'JSON') PURGE = TRUE;
第 2 步:分析 Kafka Connector 日志文件¶
如果 COPY_HISTORY 视图没有数据加载记录,则分析 Kafka Connector 的日志文件。连接器将事件写入日志文件。请注意,Snowflake Kafka Connector 与所有 Kafka Connector 插件共享相同的日志文件。此日志文件的名称和位置应位于您的 Kafka Connect 配置文件中。有关更多信息,请参阅为您的 Apache Kafka 软件提供的文档。
在 Kafka Connector 日志文件中搜索与 Snowflake 相关的错误消息。大多数消息都将使用字符串 ERROR
并包含文件名 com.snowflake.kafka.connector...
,以便于查找这些消息。
您可能遇到的错误包括:
- 配置错误:
该错误的可能原因:
连接器没有订阅该主题的适当信息。
连接器没有写入 Snowflake 表的适当信息(例如,用于身份验证的密钥对可能出错)。
请注意,Kafka Connector 会验证其参数。对于每个不兼容的配置参数,连接器都会抛出一个错误。错误消息已写入 Kafka Connect 集群的日志文件中。如果您怀疑存在配置问题,请检查该日志文件中的错误。
- 读取错误:
连接器可能因以下原因无法从 Kafka 读取数据:
Kafka 或 Kafka Connect 可能无法运行。
该消息可能尚未发送。
该消息可能已被删除(过期)。
- 写入错误(暂存区):
该错误的可能原因:
暂存区权限不足。
暂存区空间不足。
暂存区已删除。
其他用户或进程向暂存区写入意外文件。
- 写入错误(表):
该错误的可能原因:
表权限不足。
第 3 步:检查 Kafka Connect¶
如果 Kafka 连接日志文件中未报告错误,请检查 Kafka Connect。有关故障排除说明,请参阅 Apache Kafka 软件供应商提供的文档。
解决特定问题¶
主题分区和偏移相同的重复行¶
使用 Kafka Connector 1.4 版本(或更高版本)加载数据时,目标表中主题分区和偏移相同的重复行可能表明加载操作超过 300000 毫秒(300 秒)的默认执行超时值。要验证原因,请检查 Kafka Connect 日志文件中是否存在以下错误:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.
This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1061)
要解决该错误,请在 Kafka 配置文件(例如 <kafka_dir>/config/connect-distributed.properties
)中更改以下 任一 属性:
consumer.max.poll.interval.ms
将执行超时值延长至
900000
(900 秒)。consumer.max.poll.records
将每次操作加载的记录数减少至
50
。
Failure in Streaming Channel Offset Migration Response 错误代码:5023¶
连接器升级到 v2.1.0 版本(或更高版本)时,Snowpipe Streaming Channel 名称格式有所更改。因此,检测先前提交的偏移量信息的逻辑将找不到任何先前提交的偏移量信息。这将表现为以下异常:
com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException: [SF_KAFKA_CONNECTOR] Exception: Failure in Streaming Channel Offset Migration Response Error Code: 5023
Detail: Streaming Channel Offset Migration from Source to Destination Channel has no/invalid response, please contact Snowflake Support
Message: Snowflake experienced a transient exception, please retry the migration request.
要解决此错误,请在 Kafka 配置文件(例如 <kafka_dir>/config/connect-distributed.properties
)中添加以下配置属性:
enable.streaming.channel.offset.migration
通过将其设置为
false
禁用自动偏移迁移。
配置连接器以支持多个主题¶
我们在单个 Kafka Cconnector 实例支持大量主题(每个主题都有多个分区)时遇到了问题。连接器的配置虽然看似有效,却导致无休止的重新平衡循环,无法将任何数据引入 Snowflake。这个问题是 Snowpipe Streaming 引入模式(snowflake.ingestion.method=SNOWPIPE_STREAMING
)所特有的,但准则也适用于 Snowpipe 引入模式(snowflake.ingestion.method=SNOWPIPE
)。该问题通过反复记录此日志消息在日志文件中表现出来:
[Worker-xyz] [timestamp] INFO [my-connector|task-id] [SF_INGEST] Channel is marked as closed
当您配置连接器以通过 regex 引入主题时通常会发生这种情况。我们建议对 Kafka 配置文件应用以下选项(例如 :file : <kafka_dir>/ config/connect-distributed.properties
):
consumer.override.partition.assignment.strategy
将任务的分区分配策略配置为
org.apache.kafka.clients.consumer.CooperativeStickyAssignor
– 这可以确保引入通道均匀分配到可用任务,从而降低重新平衡的风险。tasks.max
每个连接器的实例化任务数量不应超过可用的 CPU 数量 – 底层驱动程序基于可用 CPU 实现节流机制。并发请求的增加会增加系统的内存压力,但也会延长插入处理时间,直接导致连接器新号丢失。
就连接器的超时值而言,有一组配置属性对其有直接影响:
consumer.override.heartbeat.interval.ms
定义监控线程(每个任务都有一个关联线程)向 Kafka 发送信号的频率。默认值为
3000
ms,但如果系统负载较高 – 您可以尝试将其增加到5000
ms。consumer.override.session.timeout.ms
定义代理在假定使用者处于无效状态并尝试重新平衡之前将等待多长时间。此设置通常应是信号间隔时间的 3 倍,因此,如果您将信号间隔时间配置为
5000
ms,请将此设置为15000
ms。consumer.override.max.poll.interval.ms
定义从底层 Kafka 调用
poll()
的最大时间间隔。轮询间隔的时间基本上映射到处理批量数据的连接器(包括上传到 Snowflake 并提交)。在有多个任务处理数据的场景中,底层 Snowflake Connection 可能会开始限制请求,从而导致处理时间更长。根据您的场景,可以将此值提高至长达 20 分钟 (1200000
ms) – 特别是当您通过用于引入的大量初始记录计数启动连接器时。consumer.override.rebalance.timeout.ms
当发生重新平衡时,如果每个任务包含大量通道,每个通道都有很多底层逻辑需要确定从哪里恢复处理。此代码是顺序执行的,因此每个任务的通道越多,初始设置所需的时间就越长。将此属性配置为足够大的值,以便每个通道都能完成其初始化。3 分钟(
180000
ms)是一个不错的起点。
还需要注意连接器的可用堆内存这在多个连接器同时运行或单个连接器从多个主题引入数据的场景中尤为重要。每个主题的分区映射到单个通道,因此需要内存。
请确保通过 Xmx 设置调整 Kafka Connect 进程内存设置。其中一种方法是定义 KAFKA_OPTS
环境变量并对其进行相应设置(即 KAFKA_OPTS=-Xmx4G
)。
文件清理器意外清除文件¶
如果将 Kafka Connector 与 SNOWPIPE 一起使用,在从多个主题将数据引入到单个表中时您可能会遇到问题。如果您的配置没有 snowflake.topic2table.map
条目,或者主题与表之间存在 1:1 的映射,则此问题不适用。
Kafka Connector 正在生成包含上载到暂存区的记录的文件。这些文件按照以下模式格式化:snowflake_kafka_connector_<connector-name>_stage_<table-name>/<connector-name>/<table-name>/<partition-id>/<low-watermark>_<high-watermark>_<timestamp>.json.gz
。问题位于 <partition-id>
中:如果多个主题将数据加载到单个表中,则 partition-id
值很可能出现重复。在正常连接器操作中,这不是问题。然而,如果连接器重新启动或重新平衡,清理进程可能会误地将已加载到暂存区(但尚未引入)的文件与错误的分区关联起来,并决定删除它们,这可能会导致数据丢失事件。
2.5.0 版本的连接器通过在 partition-id
中包含源主题的哈希码来解决此问题,确保文件名唯一且精确匹配单个主题的分区。此修复程序默认启用 – snowflake.snowpipe.stageFileNameExtensionEnabled
– 并且仅影响目标表在 snowflake.topic2table.map
中被多次列出的配置。
如果您的配置受到此功能的影响,您可能会将陈旧的文件上传到暂存区。当连接器启动时,将会检查暂存区是否包含此类文件。您需要查找以 NOTE: For table
开头的日志条目,然后是检测到的文件列表。
您也可以手动检查暂存区中是否有受影响的文件:
查找受影响的暂存区:
show stages like 'snowflake_kafka_connector%<your table name>';
列出暂存区文件:
list @<your stage name> pattern = '.+/<your-table-name>/[0-9]{1,4}/[0-9]+_[0-9]+_[0-9]+\.json\.gz$';
上面的命令列出了所有与表的暂存区匹配并且分区 IDs 处于 0-9999 范围内的文件。这些文件将不再被引入,因此您可以下载或将其删除。
报告问题¶
联系 Snowflake 支持部门 寻求帮助时,请提供以下文件:
您的 Kafka Connector 的配置文件。
重要
在向 Snowflake 提供文件之前,请删除私钥。
Kafka Connector 日志的副本。确保文件 不 含机密或敏感信息。
JDBC 日志文件。
要生成日志文件,请在运行 Kafka Connector 之前在 Kafka Connect 集群上设置
JDBC_TRACE = true
环境变量。有关 JDBC 日志文件的更多信息,请参阅 Snowflake 社区中的 这篇文章 (https://community.snowflake.com/s/article/How-to-generate-log-file-on-Snowflake-connectors)。
连接日志文件。
要生成日志文件,请编辑该
etc/kafka/connect-log4j.properties
文件。按如下方式设置该log4j.appender.stdout.layout.ConversionPattern
属性:log4j.appender.stdout.layout.ConversionPattern=[%d] %p %X{connector.context}%m (%c:%L)%n
连接器上下文在 Kafka 2.3 版本及更高版本中可用。
有关更多信息,请参阅 Confluent 网站上的 日志记录改进 (https://www.confluent.io/blog/kafka-connect-improvements-in-apache-kafka-2-3/) 信息。