带有 DLQ 和元数据的 Apache Kafka¶
备注
使用该连接器需遵守 连接器条款。
本主题介绍了带有 DLQ 和元数据连接器的 Apache Kafka。这是一款功能齐全的连接器,其功能与传统的 Snowflake Connector for Kafka 相同,还包括适用于生产用例的高级功能。
主要功能¶
带有 DLQ 和元数据连接器的 Apache Kafka 提供了全面的功能:
**死信队列 (DLQ) ** 支持失败的消息处理
包含 Kafka 消息元数据的 RECORD_METADATA 列
可配置架构化 – 启用或禁用架构检测
带有架构演化的 Iceberg 表支持
多种消息格式 – JSON 和 AVRO 支持
AVRO 消息的 架构注册表集成
采用高级模式的 主题到表的映射
SASL 身份验证 支持
特定参数¶
除了 设置 Openflow Connector for Kafka 中描述的常用参数外,此连接器还包括高级功能的其他参数上下文。
消息格式和架构参数¶
参数 |
描述 |
必填 |
---|---|---|
消息格式 |
Kafka 中消息的格式。其中之一:JSON/AVRO。默认值:JSON |
是 |
AVRO 架构 |
如果在采用 AVRO 消息格式的 AVRO Schema Access Strategy 中使用了 schema-text-property,则使用 Avro 架构。注意:仅当在配置的 Kafka 主题中使用的所有消息都共享相同的架构时,才应使用此设置。 |
否 |
AVRO 架构访问策略 |
访问消息的 AVRO 架构的方法。AVRO 的必填项。其中之一:embedded-avro-schema/schema-reference-reader/schema-text-property。默认值:embedded-avro-schema |
否 |
架构注册表参数¶
参数 |
描述 |
必填 |
---|---|---|
架构注册表身份验证类型 |
对架构注册表进行身份验证的方法(如果使用)。否则,请使用 NONE。其中之一:NONE/BASIC。默认值:NONE |
是 |
架构注册表 URL |
架构注册表的 URL*AVRO* 消息格式的必填项。 |
否 |
架构注册表用户名 |
架构注册表的用户名。AVRO 消息格式的必填项。 |
否 |
架构注册表密码 |
架构注册表的密码。AVRO 消息格式的必填项。 |
否 |
DLQ 和高级功能参数¶
参数 |
描述 |
必填 |
---|---|---|
Kafka DLQ 主题 |
将存在解析错误的消息发送到的 DLQ 主题 |
是 |
已启用架构化 |
确定数据是插入到单个列还是单个 RECORD_CONTENT 字段中。其中之一:true/false。默认值:true |
是 |
已启用 Iceberg |
指定处理器是否将数据引入 Iceberg 表。如果此属性与实际表类型不匹配,则处理器将失败。默认值:false |
是 |
架构化行为¶
连接器的行为根据 Schematization Enabled 参数而变化:
已启用架构化¶
启用架构化后,连接器将执行以下操作:
为消息中的每个字段创建单独的列
添加包含 Kafka 元数据的 RECORD_METADATA 列
检测到新字段时自动演变表架构
将嵌套 JSON/AVRO 结构展平为单独的列
示例表结构:
行 |
RECORD_METADATA |
ACCOUNT |
SYMBOL |
SIDE |
QUANTITY |
---|---|---|---|---|---|
1 |
{"timestamp":1669074170090, "headers": {"current.iter... |
ABC123 |
ZTEST |
BUY |
3572 |
2 |
{"timestamp":1669074170400, "headers": {"current.iter... |
XYZ789 |
ZABX |
SELL |
3024 |
已禁用架构化¶
禁用架构化后,连接器将执行以下操作:
仅创建两列:RECORD_CONTENT 和 RECORD_METADATA
将整个消息内容作为 OBJECT 存储在 RECORD_CONTENT 中
不执行自动架构演化
为下游处理提供最大灵活性
示例表结构:
行 |
RECORD_METADATA |
RECORD_CONTENT |
---|---|---|
1 |
{"timestamp":1669074170090, "headers": {"current.iter... |
{"account": "ABC123", "symbol": "ZTEST", "side":... |
2 |
{"timestamp":1669074170400, "headers": {"current.iter... |
{"account": "XYZ789", "symbol": "ZABX", "side":... |
使用连接器配置属性中的 Schematization Enabled
属性来启用或禁用架构检测。
架构检测和演化¶
连接器支持架构检测和演化。Snowflake 中表的结构可以自动定义和演化,以支持连接器加载的新数据的结构。
如果没有架构检测和演化,连接器加载的 Snowflake 表仅包含两个 OBJECT
列:RECORD_CONTENT
和 RECORD_METADATA
。
启用架构检测和演变后,Snowflake 可以检测流式数据的架构,并将数据加载到自动匹配任何用户定义架构的表中。Snowflake 还允许添加新列或从新数据文件所缺失的列中删除 NOT NULL
约束。
无论是否提供架构注册表,都支持使用连接器进行架构检测。如果使用架构注册表 (Avro),则将使用提供的架构注册表中定义的数据类型来创建该列。如果没有架构注册表 (JSON),则将根据提供的数据推断出数据类型。
JSON ARRAY 不支持进一步自动格式演化。
启用架构演化¶
如果连接器创建了目标表,在默认情况下启用架构演化。
如果要在现有表上启用或禁用架构演化,请使用 ALTER TABLE 命令来设置 ENABLE_SCHEMA_EVOLUTION
参数。您还必须使用对该表具有 OWNERSHIP
权限的角色。有关更多信息,请参阅 表模式演化。
但是,如果对现有表禁用了架构演化,则连接器将尝试将架构不匹配的行发送到配置的死信队列 (DLQ)。
RECORD_METADATA 结构¶
RECORD_METADATA 列中包含重要的 Kafka 消息元数据:
字段 |
描述 |
---|---|
偏移 |
Kafka 分区内的消息偏移量 |
主题 |
Kafka 主题名称 |
分区 |
Kafka 分区号 |
键 |
消息键(如果有) |
timestamp |
消息时间戳 |
SnowflakeConnectorPushTime |
连接器从 Kafka 提取消息的时间戳 |
标头 |
消息标头的映射(如果有) |
死信队列 (DLQ)¶
DLQ 功能处理的是无法成功处理的消息:
DLQ 行为¶
解析失败 – 将采用无效 JSON/AVRO 格式的消息发送到 DLQ
架构不匹配 – 禁用架构演化后,消息与预期架构不匹配
处理错误 – 引入过程中出现其他处理故障
Iceberg 表支持¶
当 Iceberg Enabled 设置为 true 时,Openflow Connector for Kafka 可以将数据引入到 Snowflake 管理的 Apache Iceberg™ 表 中。
要求和限制¶
在为 Iceberg 表引入配置 Openflow Kafka 连接器之前,请注意以下要求和限制:
在运行连接器之前,必须创建 Iceberg 表。
确保用户有权将数据插入到创建的表中。
配置和设置¶
要为 Iceberg 表引入配置 Openflow Connector for Kafka,请按照 设置 Openflow Connector for Kafka 中的步骤进行操作,但请注意以下各节中存在一些差异。
启用向 Iceberg 表中引入数据¶
要启用向 Iceberg 表中引入功能,必须将 Iceberg Enabled
参数设置为 true
。
创建用于引入的 Iceberg 表¶
在运行连接器之前,必须创建 Iceberg 表。初始表架构取决于连接器的 Schematization Enabled
属性设置。
如果启用架构化,则必须创建包含名为 record_metadata
的列的表:
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
record_metadata OBJECT()
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table';
连接器自动为消息字段创建列并更改 record_metadata
列架构。
如果不启用架构化,则必须创建包含名为 record_content
的列(其类型与实际 Kafka 消息内容匹配)的表。连接器会自动创建 record_metadata
列。
创建 Iceberg 表时,您可以使用 Iceberg 数据类型或 兼容的 Snowflake 类型。半结构化 VARIANT 类型不受支持。请改用 结构化的 OBJECT 或 MAP。
例如,请考虑以下消息:
{
"id": 1,
"name": "Steve",
"body_temperature": 36.6,
"approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
"animals_possessed":
{
"dogs": true,
"cats": false
},
"date_added": "2024-10-15"
}
Iceberg 表创建示例¶
启用了架构化:
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
RECORD_METADATA OBJECT(
offset INTEGER,
topic STRING,
partition INTEGER,
key STRING,
timestamp TIMESTAMP,
SnowflakeConnectorPushTime BIGINT,
headers MAP(VARCHAR, VARCHAR)
),
id INT,
body_temperature FLOAT,
name STRING,
approved_coffee_types ARRAY(STRING),
animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
date_added DATE
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table';
禁用了架构化:
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
RECORD_METADATA OBJECT(
offset INTEGER,
topic STRING,
partition INTEGER,
key STRING,
timestamp TIMESTAMP,
SnowflakeConnectorPushTime BIGINT,
headers MAP(VARCHAR, VARCHAR)
),
RECORD_CONTENT OBJECT(
id INT,
body_temperature FLOAT,
name STRING,
approved_coffee_types ARRAY(STRING),
animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
date_added DATE
)
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table';
备注
必须始终创建 RECORD_METADATA。嵌套结构(如 dogs
或 cats
)内的字段名称区分大小写。
用例¶
该连接器非常适合:
需要 DLQ 的 生产环境
数据沿袭和审计,其中 Kafka 元数据很重要
具有架构演化要求的 复杂消息处理
Iceberg 表集成
如果您需要在不使用元数据或 DLQ 功能的情况下进行更简单的引入,请改用 适用于 JSON/AVRO 数据格式的 Apache Kafka 连接器。