Kafka Connector with Snowpipe Streaming Classic 的架构检测和演化¶
带有 Snowpipe Streaming 的 Kafka Connector 支持架构检测和演化。Snowflake 中表的结构可以自动定义和演化,以支持 Kafka Connector 加载的新 Snowpipe Streaming 数据的结构。
如果没有架构检测和演变,Kafka Connector 加载的 Snowflake 表仅包含两个 VARIANT 列,RECORD_CONTENT 和 RECORD_METADATA。启用架构检测和演变后,Snowflake 可以检测流式数据的架构,并将数据加载到自动匹配任何用户定义架构的表中。Snowflake 还允许添加新列或从新数据文件所缺失的列中删除 NOT NULL 约束。
备注
此功能仅适用于与 Snowpipe Streaming 结合使用的 Kafka Connector。它不支持与基于文件的 Snowpipe 结合使用的 Kafka Connector。
先决条件¶
在启用此功能之前,请务必设置以下先决条件。
下载 Kafka Connector 版本 2.0.0 或更高版本。有关更多信息,请参阅 安装和配置 Kafka Connector。
使用 ALTER TABLE 命令将表上的
ENABLE_SCHEMA_EVOLUTION参数设置为 TRUE。 您还必须使用对该表具有 OWNERSHIP 权限的角色。有关更多信息,请参阅 表模式演化。
配置所需的 Kafka 属性¶
在 Kafka Connector 属性文件中配置以下所需的属性:
snowflake.ingestion.method指定使用
SNOWPIPE_STREAMING加载 Kafka 主题数据。请注意,此功能不支持SNOWPIPE。snowflake.enable.schematization指定为
TRUE来对与 Snowpipe Streaming 结合使用的 Kafka Connector 启用架构检测和演变。默认值为FALSE。当此属性设置为
TRUE时,对于 Kafka Connector 创建的任何新表,表参数
ENABLE_SCHEMA_EVOLUTION会自动设置为TRUE。对于任何现有表,仍需手动将表参数
ENABLE_SCHEMA_EVOLUTION设置为TRUE。
schema.registry.url指定到架构注册表服务的 URL。默认值为空。
根据文件格式的不同,
schema.registry.url可能是必需的,也可能是可选的。以下任一场景均支持使用 Kafka Connector 进行架构检测:Avro 和 Protobuf 需要架构注册表。该列是使用提供的架构注册表中定义的数据类型来创建的。
架构注册表对 JSON 而言是可选的。如果没有架构注册表,则将根据提供的数据推断出数据类型。
照常在 Kafka Connector 属性文件中配置其他属性。有关更多信息,请参阅 配置 Kafka Connector。
转换器¶
支持结构化数据转换器,例如 Json、Avro 和 Protobuf。请注意,我们仅测试了以下结构化数据转换器:
io.confluent.connect.avro.AvroConverterio.confluent.connect.protobuf.ProtobufConverterorg.apache.kafka.connect.json.JsonConverterio.confluent.connect.json.JsonSchemaConverter
任何非结构化数据转换器都不受自动格式演化支持。例如:
org.apache.kafka.connect.converters.ByteArrayConverterorg.apache.kafka.connect.storage.StringConverter
Snowpipe Streaming 不支持 Snowflake 转换器。某些自定义数据转换器未经测试,可能也不受支持。
使用说明¶
无论是否提供架构注册表,都支持使用 Kafka Connector 进行架构检测。如果使用架构注册表(Avro 和 Protobuf),则将使用提供的架构注册表中定义的数据类型来创建该列。如果没有架构注册表 (JSON),则将根据提供的数据推断出数据类型。
通过 Kafka Connector 进行的架构演化支持以下表列修改:
添加新列
如果缺少源数据列,则删除 NOT NULL 约束。
如果 Kafka Connector 创建了目标表,在默认情况下启用架构演化。但是,如果对现有表禁用了架构演化,则 Kafka Connector 将尝试将架构不匹配的行发送到配置的死信队列 (DLQ)。
JSON ARRAY 不支持进一步自动格式演化。
对于与 Snowpipe Streaming 结合使用的 Kafka 连接器,在以下视图和命令中,架构演变不受
SchemaEvolutionRecord输出的跟踪:INFORMATION_SCHEMA COLUMNS 视图、ACCOUNT_USAGE COLUMNS 视图、DESCRIBE TABLE 命令 和 SHOW COLUMNS 命令。SchemaEvolutionRecord输出始终显示 NULL。
示例¶
以下示例说明了为与 Snowpipe Streaming 结合使用的 Kafka Connector 启用架构检测和演变之前和之后所创建的表。
-- Before schema detection and evolution is enabled, the table only consists of two VARIANT columns, RECORD_CONTENT and RECORD_METADATA, as the following example demonstrates. +------+---------------------------------------------------------+---------------------------------------------------+ | Row | RECORD_METADATA | RECORD_CONTENT | |------+---------------------------------------------------------+---------------------------------------------------| | 1 |{"CreateTime":1669074170090, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZTEST", "side":...| | 2 |{"CreateTime":1669074170400, "headers": {"current.iter...| "account": "XYZ789", "symbol": "ZABZX", "side":...| | 3 |{"CreateTime":1669074170659, "headers": {"current.iter...| "account": "XYZ789", "symbol": "ZTEST", "side":...| | 4 |{"CreateTime":1669074170904, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZABZX", "side":...| | 5 |{"CreateTime":1669074171063, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZTEST", "side":...| +------+---------------------------------------------------------+---------------------------------------------------| -- After schema detection and evolution is enabled, the table contains the columns that match the user-defined schema. The table can also automatically evolve to support the structure of new Snowpipe streaming data loaded by the Kafka connector. +------+---------------------------------------------------------+---------+--------+-------+----------+ | Row | RECORD_METADATA | ACCOUNT | SYMBOL | SIDE | QUANTITY | |------+---------------------------------------------------------+---------+--------+-------+----------| | 1 |{"CreateTime":1669074170090, "headers": {"current.iter...| ABC123 | ZTEST | BUY | 3572 | | 2 |{"CreateTime":1669074170400, "headers": {"current.iter...| XYZ789 | ZABZX | SELL | 3024 | | 3 |{"CreateTime":1669074170659, "headers": {"current.iter...| XYZ789 | ZTEST | SELL | 799 | | 4 |{"CreateTime":1669074170904, "headers": {"current.iter...| ABC123 | ZABZX | BUY | 2033 | | 5 |{"CreateTime":1669074171063, "headers": {"current.iter...| ABC123 | ZTEST | BUY | 1558 | +------+---------------------------------------------------------+---------+--------+-------+----------|