带有 DLQ 和元数据的 Apache Kafka¶
备注
This connector is subject to the Snowflake Connector Terms.
本主题介绍了带有 DLQ 和元数据连接器的 Apache Kafka。这是一款功能齐全的连接器,其功能与传统的 Snowflake Connector for Kafka 相同,还包括适用于生产用例的高级功能。
主要功能¶
带有 DLQ 和元数据连接器的 Apache Kafka 提供了全面的功能:
**死信队列 (DLQ) ** 支持失败的消息处理
包含 Kafka 消息元数据的 RECORD_METADATA 列
可配置架构化 – 启用或禁用架构检测
带有架构演化的 Iceberg 表支持
多种消息格式 – JSON 和 AVRO 支持
AVRO 消息的 架构注册表集成
采用高级模式的 主题到表的映射
SASL 身份验证 支持
特定参数¶
除了 设置 Openflow Connector for Kafka 中描述的常用参数外,此连接器还包括高级功能的其他参数上下文。
消息格式和架构参数¶
参数 |
描述 |
必填 |
|---|---|---|
消息格式 |
Kafka 中消息的格式。其中之一:JSON/AVRO。默认值:JSON |
是 |
AVRO 架构 |
如果在采用 AVRO 消息格式的 AVRO Schema Access Strategy 中使用了 schema-text-property,则使用 Avro 架构。注意:仅当在配置的 Kafka 主题中使用的所有消息都共享相同的架构时,才应使用此设置。 |
否 |
AVRO 架构访问策略 |
访问消息的 AVRO 架构的方法。AVRO 的必填项。其中之一:embedded-avro-schema/schema-reference-reader/schema-text-property。默认值:embedded-avro-schema |
否 |
架构注册表参数¶
参数 |
描述 |
必填 |
|---|---|---|
架构注册表身份验证类型 |
对架构注册表进行身份验证的方法(如果使用)。否则,请使用 NONE。其中之一:NONE/BASIC。默认值:NONE |
是 |
架构注册表 URL |
架构注册表的 URL*AVRO* 消息格式的必填项。 |
否 |
架构注册表用户名 |
架构注册表的用户名。AVRO 消息格式的必填项。 |
否 |
架构注册表密码 |
架构注册表的密码。AVRO 消息格式的必填项。 |
否 |
DLQ 和高级功能参数¶
参数 |
描述 |
必填 |
|---|---|---|
Kafka DLQ 主题 |
将存在解析错误的消息发送到的 DLQ 主题 |
是 |
已启用架构化 |
确定数据是插入到单个列还是单个 RECORD_CONTENT 字段中。其中之一:true/false。默认值:true |
是 |
已启用 Iceberg |
指定处理器是否将数据引入 Iceberg 表。如果此属性与实际表类型不匹配,则处理器将失败。默认值:false |
是 |
架构化行为¶
连接器的行为根据 Schematization Enabled 参数而变化:
已启用架构化¶
启用架构化后,连接器将执行以下操作:
为消息中的每个字段创建单独的列
添加包含 Kafka 元数据的 RECORD_METADATA 列
检测到新字段时自动演变表架构
将嵌套 JSON/AVRO 结构展平为单独的列
示例表结构:
行 |
RECORD_METADATA |
ACCOUNT |
SYMBOL |
SIDE |
QUANTITY |
|---|---|---|---|---|---|
1 |
{"timestamp":1669074170090, "headers": {"current.iter... |
ABC123 |
ZTEST |
BUY |
3572 |
2 |
{"timestamp":1669074170400, "headers": {"current.iter... |
XYZ789 |
ZABX |
SELL |
3024 |
已禁用架构化¶
禁用架构化后,连接器将执行以下操作:
仅创建两列:RECORD_CONTENT 和 RECORD_METADATA
将整个消息内容作为 OBJECT 存储在 RECORD_CONTENT 中
不执行自动架构演化
为下游处理提供最大灵活性
示例表结构:
行 |
RECORD_METADATA |
RECORD_CONTENT |
|---|---|---|
1 |
{"timestamp":1669074170090, "headers": {"current.iter... |
{"account": "ABC123", "symbol": "ZTEST", "side":... |
2 |
{"timestamp":1669074170400, "headers": {"current.iter... |
{"account": "XYZ789", "symbol": "ZABX", "side":... |
使用连接器配置属性中的 Schematization Enabled 属性来启用或禁用架构检测。
架构检测和演化¶
连接器支持架构检测和演化。Snowflake 中表的结构可以自动定义和演化,以支持连接器加载的新数据的结构。
如果没有架构检测和演化,连接器加载的 Snowflake 表仅包含两个 OBJECT 列:RECORD_CONTENT 和 RECORD_METADATA。
启用架构检测和演变后,Snowflake 可以检测流式数据的架构,并将数据加载到自动匹配任何用户定义架构的表中。Snowflake 还允许添加新列或从新数据文件所缺失的列中删除 NOT NULL 约束。
无论是否提供架构注册表,都支持使用连接器进行架构检测。如果使用架构注册表 (Avro),则将使用提供的架构注册表中定义的数据类型来创建该列。如果没有架构注册表 (JSON),则将根据提供的数据推断出数据类型。
JSON ARRAY 不支持进一步自动格式演化。
启用架构演化¶
如果连接器创建了目标表,在默认情况下启用架构演化。
如果要在现有表上启用或禁用架构演化,请使用 ALTER TABLE 命令来设置 ENABLE_SCHEMA_EVOLUTION 参数。您还必须使用对该表具有 OWNERSHIP 权限的角色。有关更多信息,请参阅 表模式演化。
但是,如果对现有表禁用了架构演化,则连接器将尝试将架构不匹配的行发送到配置的死信队列 (DLQ)。
RECORD_METADATA 结构¶
RECORD_METADATA 列中包含重要的 Kafka 消息元数据:
字段 |
描述 |
|---|---|
偏移 |
Kafka 分区内的消息偏移量 |
主题 |
Kafka 主题名称 |
分区 |
Kafka 分区号 |
键 |
消息键(如果有) |
timestamp |
消息时间戳 |
SnowflakeConnectorPushTime |
连接器从 Kafka 提取消息的时间戳 |
标头 |
消息标头的映射(如果有) |
死信队列 (DLQ)¶
DLQ 功能处理的是无法成功处理的消息:
DLQ 行为¶
解析失败 – 将采用无效 JSON/AVRO 格式的消息发送到 DLQ
架构不匹配 – 禁用架构演化后,消息与预期架构不匹配
处理错误 – 引入过程中出现其他处理故障
Iceberg 表支持¶
当 Iceberg Enabled 设置为 true 时,Openflow Connector for Kafka 可以将数据引入到 Snowflake 管理的 Apache Iceberg™ 表 中。
要求和限制¶
在为 Iceberg 表引入配置 Openflow Kafka 连接器之前,请注意以下要求和限制:
在运行连接器之前,必须创建 Iceberg 表。
确保用户有权将数据插入到创建的表中。
配置和设置¶
要为 Iceberg 表引入配置 Openflow Connector for Kafka,请按照 设置 Openflow Connector for Kafka 中的步骤进行操作,但请注意以下各节中存在一些差异。
启用向 Iceberg 表中引入数据¶
要启用向 Iceberg 表中引入功能,必须将 Iceberg Enabled 参数设置为 true。
创建用于引入的 Iceberg 表¶
在运行连接器之前,必须创建 Iceberg 表。初始表架构取决于连接器的 Schematization Enabled 属性设置。
如果启用架构化,则必须创建包含名为 record_metadata 的列的表:
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
record_metadata OBJECT()
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table';
连接器自动为消息字段创建列并更改 record_metadata 列架构。
如果不启用架构化,则必须创建包含名为 record_content 的列(其类型与实际 Kafka 消息内容匹配)的表。连接器会自动创建 record_metadata 列。
创建 Iceberg 表时,您可以使用 Iceberg 数据类型或 兼容的 Snowflake 类型。半结构化 VARIANT 类型不受支持。请改用 结构化的 OBJECT 或 MAP。
例如,请考虑以下消息:
{
"id": 1,
"name": "Steve",
"body_temperature": 36.6,
"approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
"animals_possessed":
{
"dogs": true,
"cats": false
},
"date_added": "2024-10-15"
}
Iceberg 表创建示例¶
启用了架构化:
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
RECORD_METADATA OBJECT(
offset INTEGER,
topic STRING,
partition INTEGER,
key STRING,
timestamp TIMESTAMP,
SnowflakeConnectorPushTime BIGINT,
headers MAP(VARCHAR, VARCHAR)
),
id INT,
body_temperature FLOAT,
name STRING,
approved_coffee_types ARRAY(STRING),
animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
date_added DATE
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table';
禁用了架构化:
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
RECORD_METADATA OBJECT(
offset INTEGER,
topic STRING,
partition INTEGER,
key STRING,
timestamp TIMESTAMP,
SnowflakeConnectorPushTime BIGINT,
headers MAP(VARCHAR, VARCHAR)
),
RECORD_CONTENT OBJECT(
id INT,
body_temperature FLOAT,
name STRING,
approved_coffee_types ARRAY(STRING),
animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
date_added DATE
)
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table';
备注
必须始终创建 RECORD_METADATA。嵌套结构(如 dogs 或 cats)内的字段名称区分大小写。
用例¶
该连接器非常适合:
需要 DLQ 的 生产环境
数据沿袭和审计,其中 Kafka 元数据很重要
具有架构演化要求的 复杂消息处理
Iceberg 表集成
如果您需要在不使用元数据或 DLQ 功能的情况下进行更简单的引入,请改用 适用于 JSON/AVRO 数据格式的 Apache Kafka 连接器。