将 Snowflake Connector for Kafka 与 Apache Iceberg™ 表结合使用

从版本 3.0.0 开始,Snowflake Connector for Kafka 可以将数据引入 Snowflake 管理的 Apache Iceberg™ 表 中。

要求和限制

在为 Iceberg 表引入配置 Kafka Connector 之前,请注意以下要求和限制:

  • Iceberg 表引入需要版本 3.0.0 或更高版本的 Kafka Connector。

  • Iceberg 表引入受与 Snowpipe Streaming 结合使用的 Kafka Connector 支持。不受与 Snowpipe 结合使用的 Kafka Connector 支持。

  • snowflake.streaming.enable.single.buffer 设置为 false 时,Iceberg 表引入不受支持。

  • 在运行连接器之前,必须创建 Iceberg 表。有关更多信息,请参阅本主题内容中的 配置和设置

架构演化限制

AVRO 或 Protobuf 等架构化数据格式完全支持 Iceberg 的架构演化。

对于没有架构的普通 JSON,连接器认为以下消息类型无效,并将其发送到死信队列 (DLQ):

  • 包含新列的消息(如果对应值为 null[]

  • 结构化对象中具有新字段的消息(如果对应值为 null[]

要手动更改表架构,以便连接器能够引入这些消息类型,请使用 ALTER TABLE 语句。

配置和设置

要为 Iceberg 表引入配置 Kafka Connector,请按照常规的 基于 Snowpipe Streaming 的连接器的设置步骤 进行操作,但以下各节中列出了一些不同之处。

授予对外部卷的使用权限

您必须将与 Iceberg 表关联的外部卷的 USAGE 权限授予 Kafka Connector 的角色。

例如,如果 Iceberg 表使用 kafka_external_volume 外部卷,而连接器使用角色 kafka_connector_role,请运行以下语句:

USE ROLE ACCOUNTADMIN;
GRANT USAGE ON EXTERNAL VOLUME kafka_external_volume TO ROLE kafka_connector_role;
Copy

创建用于引入的 Iceberg 表

在运行连接器之前,必须创建 Iceberg 表。初始表架构取决于连接器的 snowflake.enable.schematization 设置。

如果您启用架构化,则可以创建包含名为 record_metadata 的列的表:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    record_metadata OBJECT()
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

连接器会自动创建 record_content 列并更改 record_metadata 列架构。

如果您不启用架构化,则可以创建包含名为 record_content 的列(其类型与实际 Kafka 消息内容匹配)的表。连接器会自动创建 record_metadata 列。

创建 Iceberg 表时,您可以使用 Iceberg 数据类型或 兼容的 Snowflake 类型。半结构化 VARIANT 类型不受支持。相反,使用 结构化的 OBJECT 或 MAP

例如,请考虑以下消息:

{
    "id": 1,
    "name": "Steve",
    "body_temperature": 36.6,
    "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
    "animals_possessed":
    {
        "dogs": true,
        "cats": false
    },
    "date_added": "2024-10-15"
}
Copy

要为示例消息创建 Iceberg 表,请使用以下语句:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    record_content OBJECT(
        id INT,
        body_temperature FLOAT,
        name STRING,
        approved_coffee_types ARRAY(STRING),
        animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
        date_added DATE
    )
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

备注

嵌套结构(如 dogscats)内的字段名称区分大小写。

配置属性

snowflake.streaming.iceberg.enabled

指定连接器是否将数据引入 Iceberg 表。如果此属性与实际表类型不匹配,则连接器将失败。

:

  • true

  • false

默认值:

false

语言: 中文