Kafka Connector 概述¶
本主题概述了 Apache Kafka 和 Snowflake Connector for Kafka。
备注
Kafka Connector 受 Connector 条款 的约束。
本主题内容:
Apache Kafka 简介¶
Apache Kafka 软件使用发布和订阅模型来写入和读取记录流,类似于消息队列或企业消息传递系统。Kafka 允许进程异步读取和写入消息。订阅者不需要直接连接到发布者;发布者可以在 Kafka 中对消息进行排队,以便订阅者稍后接收。
应用程序将消息发布到 主题,并且应用程序订阅主题以接收这些消息。Kafka 可以处理和传输消息;然而,这超出了本文档的范围。可将主题划分为 分区 以提高可扩展性。
Kafka Connect 是一个用于连接 Kafka 与外部系统(包括数据库)的框架。Kafka Connect 集群是独立于 Kafka 集群的集群。Kafka Connect 集群支持运行和扩展连接器(该组件支持在外部系统之间进行读取和/或写入)。
Kafka connector 的设计旨在 Kafka Connect 集群中运行,以便从 Kafka 主题读取数据并将数据写入 Snowflake 表。
Snowflake 提供两个版本的连接器:
一个版本为 Kafka 的 Confluent 包版本 (https://www.confluent.io/hub/snowflakeinc/snowflake-kafka-connector/)。
有关 Kafka Connect 的更多信息,请参阅 https://docs.confluent.io/current/connect/ (https://docs.confluent.io/current/connect/)。
备注
Confluent Cloud 可提供 Kafka connector 的托管版本。有关信息,请参阅 https://docs.confluent.io/current/cloud/connectors/cc-snowflake-sink.html (https://docs.confluent.io/current/cloud/connectors/cc-snowflake-sink.html)。
一个版本为 开源软件 (OSS) Apache Kafka 包 (https://mvnrepository.com/artifact/com.snowflake/snowflake-kafka-connector/)。
有关 Apache Kafka 的更多信息,请参阅 https://kafka.apache.org/ (https://kafka.apache.org/)。
从 Snowflake 的角度来看,Kafka 主题会生成可插入到 Snowflake 表中的行流。一般来说,每条 Kafka 消息都包含一行。
与许多消息发布/订阅平台一样,Kafka 允许发布者和订阅者之间存在多对多关系。单个应用程序可以发布到多个主题,单个应用程序可以订阅多个主题。借助 Snowflake,典型模式是一个主题为一张 Snowflake 表提供消息(行)。
当前版本的 Kafka connector 仅限于将数据加载到 Snowflake。Kafka connector 支持两种数据加载方式:
有关详细信息,请参阅 将数据加载到 Snowflake 以及 结合使用 Snowflake Connector for Kafka 和 Snowpipe Streaming。
Kafka 主题的目标表¶
Kafka 主题可以映射到 Kafka 配置中现有的 Snowflake 表。如果没有映射主题,那么 Kafka connector 会使用主题名为每个主题创建一个新表。
连接器使用以下规则将主题名转换为有效的 Snowflake 表名:
小写主题名将转换为大写表名。
如果主题名中的第一个字符不是字母(
a-z
或A-Z
)或下划线字符 (_
),那么连接器会在表名前加下划线。如果主题名中的字符不是 Snowflake 表名的合法字符,则该字符将替换为下划线字符。有关表名中哪些字符有效的更多信息,请参阅 标识符要求。
请注意,如果 Kafka connector 需要调整为 Kakfa 主题创建的表名,则同一架构中的两个表的名称可能相同。例如,如果从主题 numbers+x
和 numbers-x
读取数据,那么为这些主题创建的表都是 NUMBERS_X
。为了避免表名意外重复,连接器会在表名后追加后缀。后缀是下划线,后跟生成的哈希码。
小技巧
Snowflake 建议尽可能选择遵循 Snowflake 标识符名称规则的主题名称。
Kafka 主题的表架构¶
通过 Snowpipe Streaming,Kafka connector 可选择支持 架构检测和演化。
默认情况下,借助 Snowpipe 或 Snowpipe Streaming,由 Kafka Connector 加载的每个 Snowflake 表都有一个由两个 VARIANT 列组成的架构:
RECORD_CONTENT。其中包含 Kafka 消息。
RECORD_METADATA。其中包含有关消息的元数据,例如,从中读取消息的主题。
如果 Snowflake 创建表,则该表仅包含这两列。如果用户为 Kafka Connector 创建表以添加行,则该表可以包含多于这两列的列(任何附加列必须允许 NULL 值,因为来自连接器的数据不包含这些列的值)。
RECORD_CONTENT 列包含 Kafka 消息。
Kafka 消息的内部结构取决于所发送的信息。例如,来自 IoT(物联网)天气传感器的消息可能包括记录数据的时间戳、传感器的位置、温度、湿度等。来自库存系统的消息可能包含产品 ID 以及已售商品的数量,也许还有表明商品何时售出或发货的时间戳。
通常,特定主题中的每条消息都具有相同的基本结构。不同的主题通常使用不同的结构。
每条 Kafka 消息都会以 JSON 格式或 Avro 格式传递到 Snowflake。Kafka connector 将格式化的信息存储在 VARIANT 类型的单列中。既不解析数据,也不在 Snowflake 表中将数据拆分为多列。
RECORD_METADATA 列默认包含以下信息:
字段 |
Java . 数据类型 |
SQL . 数据类型 |
必填 |
描述 |
---|---|---|---|---|
主题 |
字符串 |
VARCHAR |
是 |
Kafka 主题的名称(记录来源于此主题)。 |
分区 |
字符串 |
VARCHAR |
是 |
主题内的分区编号。(请注意,这是 Kafka 分区,不是 Snowflake 微分区。) |
偏移 |
长整型 |
INTEGER |
是 |
该分区中的偏移。 |
CreateTime / . LogAppendTime |
长整型 |
BIGINT |
否 |
这是与 Kafka 主题中的消息关联的时间戳。该值为自 1970 年 1 月 1 日 (UTC) 以来的毫秒数。有关更多信息,请参阅:https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html (https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html) |
键 |
字符串 |
VARCHAR |
否 |
如果消息是 Kafka KeyedMessage,这是该消息的键。要让连接器将键存储在 RECORD_METADATA,则 Kafka 配置属性 中的 key.converter 参数必须设置为“org.apache.kafka.connect.storage.StringConverter”;否则,连接器会忽略键。 |
schema_id |
整数 |
INTEGER |
否 |
当使用 Avro 与架构注册表来指定架构时,这是该注册表中架构的 ID。 |
标头 |
对象 |
OBJECT |
否 |
标头是与记录关联的用户定义的键值对。 每条记录可以有 0 个、1 个或多个标头。 |
RECORD_METADATA 列中记录的元数据量可使用可选的 Kafka 配置属性进行配置。有关信息,请参阅 安装和配置 Kafka Connector。
字段名称和值区分大小写。
以 JSON 语法表示,示例消息可能类似于以下内容:
{
"meta":
{
"offset": 1,
"topic": "PressureOverloadWarning",
"partition": 12,
"key": "key name",
"schema_id": 123,
"CreateTime": 1234567890,
"headers":
{
"name1": "value1",
"name2": "value2"
}
},
"content":
{
"ID": 62,
"PSI": 451,
"etc": "..."
}
}
可以使用 查询 VARIANT 列的相应语法 直接查询 Snowflake 表。
下面是根据 RECORD_METADATA 中的主题提取数据的简单示例:
select
record_metadata:CreateTime,
record_content:ID
from table1
where record_metadata:topic = 'PressureOverloadWarning';
输出类似以下内容:
+------------+-----+
| CREATETIME | ID |
+------------+-----+
| 1234567890 | 62 |
+------------+-----+
可以从这些表中提取数据,将数据平铺到各个列中,将数据存储在其他表中,这些表通常更易于查询。
Kafka Connector 工作流程¶
Kafka connector 通过完成以下流程来订阅 Kafka 主题并创建 Snowflake 对象:
通过 Kafka 配置文件或命令行(或 Confluent Control Center;仅限 Confluent)提供的配置信息,在此基础上 Kafka connector 订阅一个或多个 Kafka 主题。
连接器为每个主题创建以下对象:
一个内部暂存区用于临时存储每个主题的数据文件。
一个管道用于引入每个主题分区的数据文件。
每个主题一张表。如果为每个主题指定的表不存在,则连接器将创建该表;否则,连接器将在现有表中创建 RECORD_CONTENT 和 RECORD_METADATA 列,并验证其他列是否可为空(如果不是,则会产生错误)。
下图显示了使用 Kafka connector 进行 Kafka 的引入流程:
一个或多个应用程序将 JSON 或 Avro 记录发布到 Kafka 集群。这些记录可分成一个或多个主题分区。
Kafka Connector 可缓冲来自 Kafka 主题的消息。当达到阈值(时间、内存或消息数量)时,连接器会将消息写入内部暂存区的临时文件中。连接器触发 Snowpipe 可引入临时文件。Snowpipe 将指向数据文件的指针复制到队列中。
通过为 Kafka 主题分区创建的管道,Snowflake 提供的虚拟仓库可将数据从暂存文件加载到目标表(即主题配置文件中指定的表)中。
(未显示)连接器监控 Snowpipe,并在确认文件数据已加载到表中后,删除内部暂存区中的每个文件。
如果出现故障导致数据无法加载,连接器会将文件移动到表暂存区,并生成错误消息。
连接器重复第 2-4 步。
注意
Snowflake 轮询 insertReport
API 一小时。如果所引入文件的状态在这一小时内未成功,则正在引入的文件将移至表暂存区。
可能需要至少一小时才能在表暂存区上提供这些文件。仅在前一小时内找不到文件的引入状态时,文件才会移至表暂存区。
容错能力¶
Kafka 和 Kafka connector 都是容错的。消息既不会重复,也不会默默丢弃。
在数据加载链中,Snowpipe 工作流程中的重复数据删除逻辑会消除重复数据的重复副本(极少数情况除外)。如果在 Snowpipe 加载记录时检测到错误(例如,该记录不是格式正确的 JSON 或 Avro),则不加载该记录,而是将该记录移到表暂存区。
Kafka Connector with Snowpipe Streaming 支持死信队列 (DLQ) 用于错误处理。有关详细信息,请参阅 Kafka Connector with Snowpipe Streaming 的错误处理以及 DLQ 属性。
连接器的容错限制¶
可以为 Kafka 主题配置存储空间限制或保留时间限制。
默认保留时间为 7 天。如果系统离线时间超过保留时间,则不会加载过期的记录。同样,如果超过 Kafka 的存储空间限制,某些消息将无法传递。
如果已删除或更新 Kafka 主题中的消息,这些更改可能不会反映在 Snowflake 表中。
注意
Kafka connector 的实例之间不进行通信。如果在相同主题或分区上启动连接器的多个实例,则同一行的多个副本可能会插入到表中。不建议这样做;每个主题只能由连接器的一个实例进行处理。
理论上,消息从 Kafka 流出的速度可能比 Snowflake 引入消息的速度更快。但实际上,这种情况不太可能发生。如果确实发生了,需要对 Kafka Connect 集群进行性能调整,才能解决问题。例如:
调整 Connect 集群中的节点数量。
调整分配给连接器的任务数量。
了解连接器和 Snowflake 部署之间网络带宽的影响。
重要
不保证按照最初发布的顺序插入行。
支持的平台¶
Kafka connector 可在 Kafka Connect 集群中运行,还可以将数据发送到受支持的 云平台 上的 Snowflake 账户。
Protobuf 数据支持¶
Kafka connector 1.5.0(或更高版本)通过 protobuf 转换器支持协议缓冲区 (protobuf)。有关详细信息,请参阅 使用 Snowflake Connector for Kafka 加载 Protobuf 数据。
账单信息¶
使用 Kafka connector 无需直接付费。然而,也存在间接成本:
Snowpipe 用于加载连接器从 Kafka 读取的数据,Snowpipe 处理时间费用由您的账户支付。
数据存储费用由您的账户支付。
Kafka Connector 限制¶
Single Message Transformations (SMTs) 适用于流经 Kafka Connect 的消息。配置 Kafka 配置属性 时,如果将 :emph:`` key.converter
或 value.converter
配置为以下值之一,则相应的键或值不支持 SMTs:
com.snowflake.kafka.connector.records.SnowflakeJsonConverter
com.snowflake.kafka.connector.records.SnowflakeAvroConverter
com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry
如果未设置 key.converter
或 value.converter
,则大多数 SMTs 受支持, regex.router
除外。
虽然 Snowflake 转换器不支持 SMTs,Kafka connector 版本 1.4.3(或更高版本)支持许多基于社区的转换器,如下所示:
io.confluent.connect.avro.AvroConverter
org.apache.kafka.connect.json.JsonConverter
有关 SMTs 的更多信息,请参阅 https://docs.confluent.io/current/connect/transforms/index.html (https://docs.confluent.io/current/connect/transforms/index.html)。