与 Snowpipe Streaming 结合使用的 Kafka Connector 的架构检测和演变¶
带有 Snowpipe Streaming 的 Kafka Connector 支持架构检测和演化。Snowflake 中的表结构可以自动定义和演化,以支持 Kafka Connector 加载的新 Snowpipe Streaming 数据的结构。
如果没有架构检测和演变,Kafka Connector 加载的 Snowflake 表仅包含两个 VARIANT 列, RECORD_CONTENT 和 RECORD_METADATA。启用架构检测和演变后,Snowflake 可以检测流式数据的架构,并将数据加载到自动匹配任何用户定义架构的表中。Snowflake 还允许添加新列或从新数据文件所缺失的列中删除 NOT NULL 约束。
备注
此功能仅适用于与 Snowpipe Streaming 结合使用的 Kafka Connector。它不支持与基于文件的 Snowpipe 结合使用的 Kafka Connector。
本主题内容:
先决条件¶
在启用此功能之前,请务必设置以下先决条件。
下载 Kafka Connector 版本 2.0.0 或更高版本。有关更多信息,请参阅 安装和配置 Kafka Connector。
使用 ALTER TABLE 命令将表上的
ENABLE_SCHEMA_EVOLUTION
参数设置为 TRUE。 您还必须使用对该表具有 OWNERSHIP 权限的角色。有关更多信息,请参阅 表模式演化。
配置所需的 Kafka 属性¶
在 Kafka Connector 属性文件中配置以下所需的属性:
snowflake.ingestion.method
指定使用
SNOWPIPE_STREAMING
加载 Kafka 主题数据。请注意,此功能不支持SNOWPIPE
。snowflake.enable.schematization
指定为
TRUE
来对与 Snowpipe Streaming 结合使用的 Kafka Connector 启用架构检测和演变。默认值为FALSE
。当此属性设置为
TRUE
时,对于 Kafka Connector 创建的任何新表,表参数
ENABLE_SCHEMA_EVOLUTION
会自动设置为TRUE
。对于任何现有表,仍需手动将表参数
ENABLE_SCHEMA_EVOLUTION
设置为TRUE
。
schema.registry.url
指定到架构注册表服务的 URL。默认值为空。
根据文件格式的不同,
schema.registry.url
可能是必需的,也可能是可选的。以下任一场景均支持使用 Kafka Connector 进行架构检测:Avro 和 Protobuf 需要架构注册表。该列是使用提供的架构注册表中定义的数据类型来创建的。
架构注册表对 JSON 而言是可选的。如果没有架构注册表,则将根据提供的数据推断出数据类型。
照常在 Kafka Connector 属性文件中配置其他属性。有关更多信息,请参阅 配置 Kafka Connector。
转换器¶
支持结构化数据转换器,例如 Json、Avro 和 Protobuf。请注意,我们仅测试了以下结构化数据转换器:
io.confluent.connect.avro.AvroConverter
io.confluent.connect.protobuf.ProtobufConverter
org.apache.kafka.connect.json.JsonConverter
io.confluent.connect.json.JsonSchemaConverter
任何非结构化数据转换器都不受自动格式演化支持。例如:
org.apache.kafka.connect.converters.ByteArrayConverter
org.apache.kafka.connect.storage.StringConverter
Snowpipe Streaming 不支持 Snowflake 转换器。某些自定义数据转换器未经测试,可能也不受支持。
使用说明¶
无论是否提供架构注册表,都支持使用 Kafka Connector 进行架构检测。如果使用架构注册表(Avro 和 Protobuf),则将使用提供的架构注册表中定义的数据类型来创建该列。如果没有架构注册表 (JSON),则将根据提供的数据推断出数据类型。
通过 Kafka Connector 进行的架构演化支持以下表列修改:
添加新列
如果缺少源数据列,则删除 NOT NULL 约束。
如果 Kafka Connector 创建了目标表,在默认情况下启用架构演化。但是,如果对现有表禁用了架构演化,则 Kafka Connector 将尝试将架构不匹配的行发送到配置的死信队列 (DLQ)。
JSON ARRAY 不支持进一步自动格式演化。
示例¶
以下示例说明了为与 Snowpipe Streaming 结合使用的 Kafka Connector 启用架构检测和演变之前和之后所创建的表。
-- Before schema detection and evolution is enabled, the table only consists of two VARIANT columns, RECORD_CONTENT and RECORD_METADATA, as the following example demonstrates. +------+---------------------------------------------------------+---------------------------------------------------+ | Row | RECORD_METADATA | RECORD_CONTENT | |------+---------------------------------------------------------+---------------------------------------------------| | 1 |{"CreateTime":1669074170090, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZTEST", "side":...| | 2 |{"CreateTime":1669074170400, "headers": {"current.iter...| "account": "XYZ789", "symbol": "ZABZX", "side":...| | 3 |{"CreateTime":1669074170659, "headers": {"current.iter...| "account": "XYZ789", "symbol": "ZTEST", "side":...| | 4 |{"CreateTime":1669074170904, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZABZX", "side":...| | 5 |{"CreateTime":1669074171063, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZTEST", "side":...| +------+---------------------------------------------------------+---------------------------------------------------| -- After schema detection and evolution is enabled, the table contains the columns that match the user-defined schema. The table can also automatically evolve to support the structure of new Snowpipe streaming data loaded by the Kafka connector. +------+---------------------------------------------------------+---------+--------+-------+----------+ | Row | RECORD_METADATA | ACCOUNT | SYMBOL | SIDE | QUANTITY | |------+---------------------------------------------------------+---------+--------+-------+----------| | 1 |{"CreateTime":1669074170090, "headers": {"current.iter...| ABC123 | ZTEST | BUY | 3572 | | 2 |{"CreateTime":1669074170400, "headers": {"current.iter...| XYZ789 | ZABZX | SELL | 3024 | | 3 |{"CreateTime":1669074170659, "headers": {"current.iter...| XYZ789 | ZTEST | SELL | 799 | | 4 |{"CreateTime":1669074170904, "headers": {"current.iter...| ABC123 | ZABZX | BUY | 2033 | | 5 |{"CreateTime":1669074171063, "headers": {"current.iter...| ABC123 | ZTEST | BUY | 1558 | +------+---------------------------------------------------------+---------+--------+-------+----------|