将 Snowflake Connector for Kafka 与 Apache Iceberg™ 表结合使用¶
从版本 3.0.0 开始,Snowflake Connector for Kafka 可以将数据引入 Snowflake 管理的 Apache Iceberg™ 表 中。
要求和限制¶
在为 Iceberg 表引入配置 Kafka Connector 之前,请注意以下要求和限制:
Iceberg 表引入需要版本 3.0.0 或更高版本的 Kafka Connector。
Iceberg 表引入受与 Snowpipe Streaming 结合使用的 Kafka Connector 支持。不受与 Snowpipe 结合使用的 Kafka Connector 支持。
当
snowflake.streaming.enable.single.buffer
设置为false
时,Iceberg 表引入不受支持。在运行连接器之前,必须创建 Iceberg 表。有关更多信息,请参阅本主题内容中的 配置和设置。
架构演化限制¶
AVRO 或 Protobuf 等架构化数据格式完全支持 Iceberg 的架构演化。
对于没有架构的普通 JSON,连接器认为以下消息类型无效,并将其发送到死信队列 (DLQ):
包含新列的消息(如果对应值为
null
或[]
)结构化对象中具有新字段的消息(如果对应值为
null
或[]
)
要手动更改表架构,以便连接器能够引入这些消息类型,请使用 ALTER TABLE 语句。
配置和设置¶
要为 Iceberg 表引入配置 Kafka Connector,请按照常规的 基于 Snowpipe Streaming 的连接器的设置步骤 进行操作,但以下各节中列出了一些不同之处。
授予对外部卷的使用权限¶
您必须将与 Iceberg 表关联的外部卷的 USAGE 权限授予 Kafka Connector 的角色。
例如,如果 Iceberg 表使用 kafka_external_volume
外部卷,而连接器使用角色 kafka_connector_role
,请运行以下语句:
USE ROLE ACCOUNTADMIN;
GRANT USAGE ON EXTERNAL VOLUME kafka_external_volume TO ROLE kafka_connector_role;
创建用于引入的 Iceberg 表¶
在运行连接器之前,必须创建 Iceberg 表。初始表架构取决于连接器的 snowflake.enable.schematization
设置。
如果您启用架构化,则可以创建包含名为 record_metadata
的列的表:
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
record_metadata OBJECT()
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table';
连接器会自动创建 record_content
列并更改 record_metadata
列架构。
如果您不启用架构化,则可以创建包含名为 record_content
的列(其类型与实际 Kafka 消息内容匹配)的表。连接器会自动创建 record_metadata
列。
创建 Iceberg 表时,您可以使用 Iceberg 数据类型或 兼容的 Snowflake 类型。半结构化 VARIANT 类型不受支持。相反,使用 结构化的 OBJECT 或 MAP。
例如,请考虑以下消息:
{
"id": 1,
"name": "Steve",
"body_temperature": 36.6,
"approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
"animals_possessed":
{
"dogs": true,
"cats": false
},
"date_added": "2024-10-15"
}
要为示例消息创建 Iceberg 表,请使用以下语句:
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table ( record_content OBJECT( id INT, body_temperature FLOAT, name STRING, approved_coffee_types ARRAY(STRING), animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN), date_added DATE ) ) EXTERNAL_VOLUME = 'my_volume' CATALOG = 'SNOWFLAKE' BASE_LOCATION = 'my_location/my_iceberg_table';
备注
嵌套结构(如 dogs
或 cats
)内的字段名称区分大小写。
配置属性¶
snowflake.streaming.iceberg.enabled
指定连接器是否将数据引入 Iceberg 表。如果此属性与实际表类型不匹配,则连接器将失败。
- 值:
true
false
- 默认值:
false