带有 DLQ 和元数据的 Apache Kafka

备注

This connector is subject to the Snowflake Connector Terms.

本主题介绍了带有 DLQ 和元数据连接器的 Apache Kafka。这是一款功能齐全的连接器,其功能与传统的 Snowflake Connector for Kafka 相同,还包括适用于生产用例的高级功能。

主要功能

带有 DLQ 和元数据连接器的 Apache Kafka 提供了全面的功能:

  • **死信队列 (DLQ) ** 支持失败的消息处理

  • 包含 Kafka 消息元数据的 RECORD_METADATA

  • 可配置架构化 – 启用或禁用架构检测

  • 带有架构演化的 Iceberg 表支持

  • 多种消息格式 – JSON 和 AVRO 支持

  • AVRO 消息的 架构注册表集成

  • 采用高级模式的 主题到表的映射

  • SASL 身份验证 支持

特定参数

除了 设置 Openflow Connector for Kafka 中描述的常用参数外,此连接器还包括高级功能的其他参数上下文。

消息格式和架构参数

参数

描述

必填

消息格式

Kafka 中消息的格式。其中之一:JSON/AVRO。默认值:JSON

AVRO 架构

如果在采用 AVRO 消息格式的 AVRO Schema Access Strategy 中使用了 schema-text-property,则使用 Avro 架构。注意:仅当在配置的 Kafka 主题中使用的所有消息都共享相同的架构时,才应使用此设置。

AVRO 架构访问策略

访问消息的 AVRO 架构的方法。AVRO 的必填项。其中之一:embedded-avro-schema/schema-reference-reader/schema-text-property。默认值:embedded-avro-schema

架构注册表参数

参数

描述

必填

架构注册表身份验证类型

对架构注册表进行身份验证的方法(如果使用)。否则,请使用 NONE。其中之一:NONE/BASIC。默认值:NONE

架构注册表 URL

架构注册表的 URL*AVRO* 消息格式的必填项。

架构注册表用户名

架构注册表的用户名。AVRO 消息格式的必填项。

架构注册表密码

架构注册表的密码。AVRO 消息格式的必填项。

DLQ 和高级功能参数

参数

描述

必填

Kafka DLQ 主题

将存在解析错误的消息发送到的 DLQ 主题

已启用架构化

确定数据是插入到单个列还是单个 RECORD_CONTENT 字段中。其中之一:true/false。默认值:true

已启用 Iceberg

指定处理器是否将数据引入 Iceberg 表。如果此属性与实际表类型不匹配,则处理器将失败。默认值:false

架构化行为

连接器的行为根据 Schematization Enabled 参数而变化:

已启用架构化

启用架构化后,连接器将执行以下操作:

  • 为消息中的每个字段创建单独的列

  • 添加包含 Kafka 元数据的 RECORD_METADATA

  • 检测到新字段时自动演变表架构

  • 将嵌套 JSON/AVRO 结构展平为单独的列

示例表结构:

RECORD_METADATA

ACCOUNT

SYMBOL

SIDE

QUANTITY

1

{"timestamp":1669074170090, "headers": {"current.iter...

ABC123

ZTEST

BUY

3572

2

{"timestamp":1669074170400, "headers": {"current.iter...

XYZ789

ZABX

SELL

3024

已禁用架构化

禁用架构化后,连接器将执行以下操作:

  • 仅创建两列:RECORD_CONTENTRECORD_METADATA

  • 将整个消息内容作为 OBJECT 存储在 RECORD_CONTENT

  • 不执行自动架构演化

  • 为下游处理提供最大灵活性

示例表结构:

RECORD_METADATA

RECORD_CONTENT

1

{"timestamp":1669074170090, "headers": {"current.iter...

{"account": "ABC123", "symbol": "ZTEST", "side":...

2

{"timestamp":1669074170400, "headers": {"current.iter...

{"account": "XYZ789", "symbol": "ZABX", "side":...

使用连接器配置属性中的 Schematization Enabled 属性来启用或禁用架构检测。

架构检测和演化

连接器支持架构检测和演化。Snowflake 中表的结构可以自动定义和演化,以支持连接器加载的新数据的结构。

如果没有架构检测和演化,连接器加载的 Snowflake 表仅包含两个 OBJECT 列:RECORD_CONTENTRECORD_METADATA

启用架构检测和演变后,Snowflake 可以检测流式数据的架构,并将数据加载到自动匹配任何用户定义架构的表中。Snowflake 还允许添加新列或从新数据文件所缺失的列中删除 NOT NULL 约束。

无论是否提供架构注册表,都支持使用连接器进行架构检测。如果使用架构注册表 (Avro),则将使用提供的架构注册表中定义的数据类型来创建该列。如果没有架构注册表 (JSON),则将根据提供的数据推断出数据类型。

JSON ARRAY 不支持进一步自动格式演化。

启用架构演化

如果连接器创建了目标表,在默认情况下启用架构演化。

如果要在现有表上启用或禁用架构演化,请使用 ALTER TABLE 命令来设置 ENABLE_SCHEMA_EVOLUTION 参数。您还必须使用对该表具有 OWNERSHIP 权限的角色。有关更多信息,请参阅 表模式演化

但是,如果对现有表禁用了架构演化,则连接器将尝试将架构不匹配的行发送到配置的死信队列 (DLQ)。

RECORD_METADATA 结构

RECORD_METADATA 列中包含重要的 Kafka 消息元数据:

字段

描述

偏移

Kafka 分区内的消息偏移量

主题

Kafka 主题名称

分区

Kafka 分区号

消息键(如果有)

timestamp

消息时间戳

SnowflakeConnectorPushTime

连接器从 Kafka 提取消息的时间戳

标头

消息标头的映射(如果有)

死信队列 (DLQ)

DLQ 功能处理的是无法成功处理的消息:

DLQ 行为

  • 解析失败 – 将采用无效 JSON/AVRO 格式的消息发送到 DLQ

  • 架构不匹配 – 禁用架构演化后,消息与预期架构不匹配

  • 处理错误 – 引入过程中出现其他处理故障

Iceberg 表支持

Iceberg Enabled 设置为 true 时,Openflow Connector for Kafka 可以将数据引入到 Snowflake 管理的 Apache Iceberg™ 表 中。

要求和限制

在为 Iceberg 表引入配置 Openflow Kafka 连接器之前,请注意以下要求和限制:

  • 在运行连接器之前,必须创建 Iceberg 表。

  • 确保用户有权将数据插入到创建的表中。

配置和设置

要为 Iceberg 表引入配置 Openflow Connector for Kafka,请按照 设置 Openflow Connector for Kafka 中的步骤进行操作,但请注意以下各节中存在一些差异。

启用向 Iceberg 表中引入数据

要启用向 Iceberg 表中引入功能,必须将 Iceberg Enabled 参数设置为 true

创建用于引入的 Iceberg 表

在运行连接器之前,必须创建 Iceberg 表。初始表架构取决于连接器的 Schematization Enabled 属性设置。

如果启用架构化,则必须创建包含名为 record_metadata 的列的表:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    record_metadata OBJECT()
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

连接器自动为消息字段创建列并更改 record_metadata 列架构。

如果不启用架构化,则必须创建包含名为 record_content 的列(其类型与实际 Kafka 消息内容匹配)的表。连接器会自动创建 record_metadata 列。

创建 Iceberg 表时,您可以使用 Iceberg 数据类型或 兼容的 Snowflake 类型。半结构化 VARIANT 类型不受支持。请改用 结构化的 OBJECT 或 MAP

例如,请考虑以下消息:

{
    "id": 1,
    "name": "Steve",
    "body_temperature": 36.6,
    "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
    "animals_possessed":
    {
        "dogs": true,
        "cats": false
    },
    "date_added": "2024-10-15"
}
Copy

Iceberg 表创建示例

启用了架构化:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    RECORD_METADATA OBJECT(
        offset INTEGER,
        topic STRING,
        partition INTEGER,
        key STRING,
        timestamp TIMESTAMP,
        SnowflakeConnectorPushTime BIGINT,
        headers MAP(VARCHAR, VARCHAR)
    ),
    id INT,
    body_temperature FLOAT,
    name STRING,
    approved_coffee_types ARRAY(STRING),
    animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
    date_added DATE
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

禁用了架构化:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    RECORD_METADATA OBJECT(
        offset INTEGER,
        topic STRING,
        partition INTEGER,
        key STRING,
        timestamp TIMESTAMP,
        SnowflakeConnectorPushTime BIGINT,
        headers MAP(VARCHAR, VARCHAR)
    ),
    RECORD_CONTENT OBJECT(
        id INT,
        body_temperature FLOAT,
        name STRING,
        approved_coffee_types ARRAY(STRING),
        animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
        date_added DATE
    )
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

备注

必须始终创建 RECORD_METADATA。嵌套结构(如 dogscats)内的字段名称区分大小写。

用例

该连接器非常适合:

  • 需要 DLQ 的 生产环境

  • 数据沿袭和审计,其中 Kafka 元数据很重要

  • 具有架构演化要求的 复杂消息处理

  • Iceberg 表集成

如果您需要在不使用元数据或 DLQ 功能的情况下进行更简单的引入,请改用 适用于 JSON/AVRO 数据格式的 Apache Kafka 连接器。

语言: 中文