带有 DLQ 和元数据的 Apache Kafka

备注

使用该连接器需遵守 连接器条款

本主题介绍了带有 DLQ 和元数据连接器的 Apache Kafka。这是一款功能齐全的连接器,其功能与传统的 Snowflake Connector for Kafka 相同,还包括适用于生产用例的高级功能。

主要功能

带有 DLQ 和元数据连接器的 Apache Kafka 提供了全面的功能:

  • **死信队列 (DLQ) ** 支持失败的消息处理

  • 包含 Kafka 消息元数据的 RECORD_METADATA

  • 可配置架构化 – 启用或禁用架构检测

  • 带有架构演化的 Iceberg 表支持

  • 多种消息格式 – JSON 和 AVRO 支持

  • AVRO 消息的 架构注册表集成

  • 采用高级模式的 主题到表的映射

  • SASL 身份验证 支持

特定参数

除了 设置 Openflow Connector for Kafka 中描述的常用参数外,此连接器还包括高级功能的其他参数上下文。

消息格式和架构参数

参数

描述

必填

消息格式

Kafka 中消息的格式。其中之一:JSON/AVRO。默认值:JSON

AVRO 架构

如果在采用 AVRO 消息格式的 AVRO Schema Access Strategy 中使用了 schema-text-property,则使用 Avro 架构。注意:仅当在配置的 Kafka 主题中使用的所有消息都共享相同的架构时,才应使用此设置。

AVRO 架构访问策略

访问消息的 AVRO 架构的方法。AVRO 的必填项。其中之一:embedded-avro-schema/schema-reference-reader/schema-text-property。默认值:embedded-avro-schema

架构注册表参数

参数

描述

必填

架构注册表身份验证类型

对架构注册表进行身份验证的方法(如果使用)。否则,请使用 NONE。其中之一:NONE/BASIC。默认值:NONE

架构注册表 URL

架构注册表的 URL*AVRO* 消息格式的必填项。

架构注册表用户名

架构注册表的用户名。AVRO 消息格式的必填项。

架构注册表密码

架构注册表的密码。AVRO 消息格式的必填项。

DLQ 和高级功能参数

参数

描述

必填

Kafka DLQ 主题

将存在解析错误的消息发送到的 DLQ 主题

已启用架构化

确定数据是插入到单个列还是单个 RECORD_CONTENT 字段中。其中之一:true/false。默认值:true

已启用 Iceberg

指定处理器是否将数据引入 Iceberg 表。如果此属性与实际表类型不匹配,则处理器将失败。默认值:false

架构化行为

连接器的行为根据 Schematization Enabled 参数而变化:

已启用架构化

启用架构化后,连接器将执行以下操作:

  • 为消息中的每个字段创建单独的列

  • 添加包含 Kafka 元数据的 RECORD_METADATA

  • 检测到新字段时自动演变表架构

  • 将嵌套 JSON/AVRO 结构展平为单独的列

示例表结构:

RECORD_METADATA

ACCOUNT

SYMBOL

SIDE

QUANTITY

1

{"timestamp":1669074170090, "headers": {"current.iter...

ABC123

ZTEST

BUY

3572

2

{"timestamp":1669074170400, "headers": {"current.iter...

XYZ789

ZABX

SELL

3024

已禁用架构化

禁用架构化后,连接器将执行以下操作:

  • 仅创建两列:RECORD_CONTENTRECORD_METADATA

  • 将整个消息内容作为 OBJECT 存储在 RECORD_CONTENT

  • 不执行自动架构演化

  • 为下游处理提供最大灵活性

示例表结构:

RECORD_METADATA

RECORD_CONTENT

1

{"timestamp":1669074170090, "headers": {"current.iter...

{"account": "ABC123", "symbol": "ZTEST", "side":...

2

{"timestamp":1669074170400, "headers": {"current.iter...

{"account": "XYZ789", "symbol": "ZABX", "side":...

使用连接器配置属性中的 Schematization Enabled 属性来启用或禁用架构检测。

架构检测和演化

连接器支持架构检测和演化。Snowflake 中表的结构可以自动定义和演化,以支持连接器加载的新数据的结构。

如果没有架构检测和演化,连接器加载的 Snowflake 表仅包含两个 OBJECT 列:RECORD_CONTENTRECORD_METADATA

启用架构检测和演变后,Snowflake 可以检测流式数据的架构,并将数据加载到自动匹配任何用户定义架构的表中。Snowflake 还允许添加新列或从新数据文件所缺失的列中删除 NOT NULL 约束。

无论是否提供架构注册表,都支持使用连接器进行架构检测。如果使用架构注册表 (Avro),则将使用提供的架构注册表中定义的数据类型来创建该列。如果没有架构注册表 (JSON),则将根据提供的数据推断出数据类型。

JSON ARRAY 不支持进一步自动格式演化。

启用架构演化

如果连接器创建了目标表,在默认情况下启用架构演化。

如果要在现有表上启用或禁用架构演化,请使用 ALTER TABLE 命令来设置 ENABLE_SCHEMA_EVOLUTION 参数。您还必须使用对该表具有 OWNERSHIP 权限的角色。有关更多信息,请参阅 表模式演化

但是,如果对现有表禁用了架构演化,则连接器将尝试将架构不匹配的行发送到配置的死信队列 (DLQ)。

RECORD_METADATA 结构

RECORD_METADATA 列中包含重要的 Kafka 消息元数据:

字段

描述

偏移

Kafka 分区内的消息偏移量

主题

Kafka 主题名称

分区

Kafka 分区号

消息键(如果有)

timestamp

消息时间戳

SnowflakeConnectorPushTime

连接器从 Kafka 提取消息的时间戳

标头

消息标头的映射(如果有)

死信队列 (DLQ)

DLQ 功能处理的是无法成功处理的消息:

DLQ 行为

  • 解析失败 – 将采用无效 JSON/AVRO 格式的消息发送到 DLQ

  • 架构不匹配 – 禁用架构演化后,消息与预期架构不匹配

  • 处理错误 – 引入过程中出现其他处理故障

Iceberg 表支持

Iceberg Enabled 设置为 true 时,Openflow Connector for Kafka 可以将数据引入到 Snowflake 管理的 Apache Iceberg™ 表 中。

要求和限制

在为 Iceberg 表引入配置 Openflow Kafka 连接器之前,请注意以下要求和限制:

  • 在运行连接器之前,必须创建 Iceberg 表。

  • 确保用户有权将数据插入到创建的表中。

配置和设置

要为 Iceberg 表引入配置 Openflow Connector for Kafka,请按照 设置 Openflow Connector for Kafka 中的步骤进行操作,但请注意以下各节中存在一些差异。

启用向 Iceberg 表中引入数据

要启用向 Iceberg 表中引入功能,必须将 Iceberg Enabled 参数设置为 true

创建用于引入的 Iceberg 表

在运行连接器之前,必须创建 Iceberg 表。初始表架构取决于连接器的 Schematization Enabled 属性设置。

如果启用架构化,则必须创建包含名为 record_metadata 的列的表:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    record_metadata OBJECT()
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

连接器自动为消息字段创建列并更改 record_metadata 列架构。

如果不启用架构化,则必须创建包含名为 record_content 的列(其类型与实际 Kafka 消息内容匹配)的表。连接器会自动创建 record_metadata 列。

创建 Iceberg 表时,您可以使用 Iceberg 数据类型或 兼容的 Snowflake 类型。半结构化 VARIANT 类型不受支持。请改用 结构化的 OBJECT 或 MAP

例如,请考虑以下消息:

{
    "id": 1,
    "name": "Steve",
    "body_temperature": 36.6,
    "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
    "animals_possessed":
    {
        "dogs": true,
        "cats": false
    },
    "date_added": "2024-10-15"
}
Copy

Iceberg 表创建示例

启用了架构化:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    RECORD_METADATA OBJECT(
        offset INTEGER,
        topic STRING,
        partition INTEGER,
        key STRING,
        timestamp TIMESTAMP,
        SnowflakeConnectorPushTime BIGINT,
        headers MAP(VARCHAR, VARCHAR)
    ),
    id INT,
    body_temperature FLOAT,
    name STRING,
    approved_coffee_types ARRAY(STRING),
    animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
    date_added DATE
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

禁用了架构化:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    RECORD_METADATA OBJECT(
        offset INTEGER,
        topic STRING,
        partition INTEGER,
        key STRING,
        timestamp TIMESTAMP,
        SnowflakeConnectorPushTime BIGINT,
        headers MAP(VARCHAR, VARCHAR)
    ),
    RECORD_CONTENT OBJECT(
        id INT,
        body_temperature FLOAT,
        name STRING,
        approved_coffee_types ARRAY(STRING),
        animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
        date_added DATE
    )
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

备注

必须始终创建 RECORD_METADATA。嵌套结构(如 dogscats)内的字段名称区分大小写。

用例

该连接器非常适合:

  • 需要 DLQ 的 生产环境

  • 数据沿袭和审计,其中 Kafka 元数据很重要

  • 具有架构演化要求的 复杂消息处理

  • Iceberg 表集成

如果您需要在不使用元数据或 DLQ 功能的情况下进行更简单的引入,请改用 适用于 JSON/AVRO 数据格式的 Apache Kafka 连接器。

语言: 中文