与 Snowpipe Streaming 结合使用的 Kafka Connector 的架构检测和演变

带有 Snowpipe Streaming 的 Kafka Connector 支持架构检测和演化。Snowflake 中的表结构可以自动定义和演化,以支持 Kafka Connector 加载的新 Snowpipe Streaming 数据的结构。

如果没有架构检测和演变,Kafka Connector 加载的 Snowflake 表仅包含两个 VARIANT 列, RECORD_CONTENT 和 RECORD_METADATA。启用架构检测和演变后,Snowflake 可以检测流式数据的架构,并将数据加载到自动匹配任何用户定义架构的表中。Snowflake 还允许添加新列或从新数据文件所缺失的列中删除 NOT NULL 约束。

备注

此功能仅适用于与 Snowpipe Streaming 结合使用的 Kafka Connector。它不支持与基于文件的 Snowpipe 结合使用的 Kafka Connector。

本主题内容:

先决条件

在启用此功能之前,请务必设置以下先决条件。

  • 下载 Kafka Connector 版本 2.0.0 或更高版本。有关更多信息,请参阅 安装和配置 Kafka Connector

  • 使用 ALTER TABLE 命令将表上的 ENABLE_SCHEMA_EVOLUTION 参数设置为 TRUE。 您还必须使用对该表具有 OWNERSHIP 权限的角色。有关更多信息,请参阅 表模式演化

配置所需的 Kafka 属性

在 Kafka Connector 属性文件中配置以下所需的属性:

snowflake.ingestion.method

指定使用 SNOWPIPE_STREAMING 加载 Kafka 主题数据。请注意,此功能不支持 SNOWPIPE

snowflake.enable.schematization

指定为 TRUE 来对与 Snowpipe Streaming 结合使用的 Kafka Connector 启用架构检测和演变。默认值为 FALSE

当此属性设置为 TRUE 时,

  • 对于 Kafka Connector 创建的任何新表,表参数 ENABLE_SCHEMA_EVOLUTION 会自动设置为 TRUE

  • 对于任何现有表,仍需手动将表参数 ENABLE_SCHEMA_EVOLUTION 设置为 TRUE

schema.registry.url

指定到架构注册表服务的 URL。默认值为空。

根据文件格式的不同,schema.registry.url 可能是必需的,也可能是可选的。以下任一场景均支持使用 Kafka Connector 进行架构检测:

  • Avro 和 Protobuf 需要架构注册表。该列是使用提供的架构注册表中定义的数据类型来创建的。

  • 架构注册表对 JSON 而言是可选的。如果没有架构注册表,则将根据提供的数据推断出数据类型。

照常在 Kafka Connector 属性文件中配置其他属性。有关更多信息,请参阅 配置 Kafka Connector

转换器

支持结构化数据转换器,例如 Json、Avro 和 Protobuf。请注意,我们仅测试了以下结构化数据转换器:

  • io.confluent.connect.avro.AvroConverter

  • io.confluent.connect.protobuf.ProtobufConverter

  • org.apache.kafka.connect.json.JsonConverter

  • io.confluent.connect.json.JsonSchemaConverter

任何非结构化数据转换器都不受自动格式演化支持。例如:

  • org.apache.kafka.connect.converters.ByteArrayConverter

  • org.apache.kafka.connect.storage.StringConverter

Snowpipe Streaming 不支持 Snowflake 转换器。某些自定义数据转换器未经测试,可能也不受支持。

使用说明

  • 无论是否提供架构注册表,都支持使用 Kafka Connector 进行架构检测。如果使用架构注册表(Avro 和 Protobuf),则将使用提供的架构注册表中定义的数据类型来创建该列。如果没有架构注册表 (JSON),则将根据提供的数据推断出数据类型。

  • 通过 Kafka Connector 进行的架构演化支持以下表列修改:

    • 添加新列

    • 如果缺少源数据列,则删除 NOT NULL 约束。

  • 如果 Kafka Connector 创建了目标表,在默认情况下启用架构演化。但是,如果对现有表禁用了架构演化,则 Kafka Connector 将尝试将架构不匹配的行发送到配置的死信队列 (DLQ)。

  • JSON ARRAY 不支持进一步自动格式演化。

示例

以下示例说明了为与 Snowpipe Streaming 结合使用的 Kafka Connector 启用架构检测和演变之前和之后所创建的表。

-- Before schema detection and evolution is enabled, the table only consists of two VARIANT columns, RECORD_CONTENT and RECORD_METADATA, as the following example demonstrates.
+------+---------------------------------------------------------+---------------------------------------------------+
| Row  | RECORD_METADATA                                         | RECORD_CONTENT                                    |
|------+---------------------------------------------------------+---------------------------------------------------|
| 1    |{"CreateTime":1669074170090, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZTEST", "side":...|
| 2    |{"CreateTime":1669074170400, "headers": {"current.iter...| "account": "XYZ789", "symbol": "ZABZX", "side":...|
| 3    |{"CreateTime":1669074170659, "headers": {"current.iter...| "account": "XYZ789", "symbol": "ZTEST", "side":...|
| 4    |{"CreateTime":1669074170904, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZABZX", "side":...|
| 5    |{"CreateTime":1669074171063, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZTEST", "side":...|
+------+---------------------------------------------------------+---------------------------------------------------|

-- After schema detection and evolution is enabled, the table contains the columns that match the user-defined schema. The table can also automatically evolve to support the structure of new Snowpipe streaming data loaded by the Kafka connector.
+------+---------------------------------------------------------+---------+--------+-------+----------+
| Row  | RECORD_METADATA                                         | ACCOUNT | SYMBOL | SIDE  | QUANTITY |
|------+---------------------------------------------------------+---------+--------+-------+----------|
| 1    |{"CreateTime":1669074170090, "headers": {"current.iter...| ABC123  | ZTEST  | BUY   | 3572     |
| 2    |{"CreateTime":1669074170400, "headers": {"current.iter...| XYZ789  | ZABZX  | SELL  | 3024     |
| 3    |{"CreateTime":1669074170659, "headers": {"current.iter...| XYZ789  | ZTEST  | SELL  | 799      |
| 4    |{"CreateTime":1669074170904, "headers": {"current.iter...| ABC123  | ZABZX  | BUY   | 2033     |
| 5    |{"CreateTime":1669074171063, "headers": {"current.iter...| ABC123  | ZTEST  | BUY   | 1558     |
+------+---------------------------------------------------------+---------+--------+-------+----------|
Copy
语言: 中文