使用 Snowflake High Performance connector for Kafka

本主题介绍连接器如何与表格和管道配合使用,以及如何使用这些元素配置连接器。

连接器如何与表和管道配合使用

连接器将每条 Kafka 记录视为要插入到 Snowflake 表中的一行。例如,如果您有一个 Kafka 主题,其消息内容的结构如以下 JSON 所示:

{
  "order_id": 12345,
  "customer_name": "John",
  "order_total": 100.00,
  "isPaid": true
}
Copy

默认情况下,引入开始前无需创建表或管道。连接器会创建列与 JSON 键匹配的表,并依赖名为 {tableName}-STREAMING 的默认管道,该管道会自动将记录内容中的一级键按名称(不区分大小写)映射到表列。您也可以创建自己的表,其列与 JSON 键匹配。连接器尝试将记录内容中的一级键按名称与表列匹配。如果 JSON 中的键与表列不匹配,连接器会忽略这些键。

CREATE TABLE ORDERS (
  record_metadata VARIANT,
  order_id NUMBER,
  customer_name VARCHAR,
  order_total NUMBER,
  ispaid BOOLEAN
);
Copy

如果您选择创建自己的管道,可以在管道的 COPY INTO 语句中定义数据转换逻辑。您可以根据需要重命名列,并根据需要转换数据类型。例如:

CREATE TABLE ORDERS (
  order_id VARCHAR,
  customer_name VARCHAR,
  order_total VARCHAR,
  ispaid VARCHAR
);
Copy
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
  SELECT
  $1:order_id::STRING,
  $1:customer_name,
  $1:order_total::STRING,
  $1:isPaid::STRING
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

CREATE TABLE ORDERS (
 topic VARCHAR,
 partition VARCHAR,
 order_id VARCHAR,
 customer_name VARCHAR,
 order_total VARCHAR,
 ispaid VARCHAR
);
Copy
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
  SELECT
  $1:RECORD_METADATA.topic::STRING AS topic,
  $1:RECORD_METADATA.partition::STRING AS partition,
  $1['order_id']::STRING AS order_id,
  $1['customer_name']::STRING as customer_name,
  CONCAT($1['order_total']::STRING, ' USD') AS order_total,
  $1['isPaid']::STRING AS ispaid
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

当您定义自己的管道时,目标表列不必与 JSON 键相匹配。您可以将列重命名为所需名称,并根据需要转换数据类型。

主题名称、表名称和管道名称

根据配置设置,连接器将为目标表使用不同的名称。目标表名称始终派生自主题名称。

连接器如何将主题名称映射到目标表

Kafka Connector 提供两种将 Kafka 主题名称映射到 Snowflake 表名称的模式:

  • 静态映射:连接器仅使用 Kafka 主题名称派生目标表名称。

  • 显式主题到表映射模式:您可以通过 snowflake.topic2table.map 配置参数指定主题与表之间的自定义映射关系

静态映射

如果未配置 snowflake.topic2table.map 参数,连接器始终从主题名称派生表名称。

表名称生成:

连接器使用以下规则从主题名称派生目标表名称:

  1. 如果主题名称是有效的 Snowflake 标识符,连接器会将主题名称作为目标表名称(转换为大写)。

  2. 如果主题名称包含无效字符,连接器将:

    • 用下划线替换无效字符

    • 追加下划线,后跟哈希码以确保唯一性

    • 例如,主题 my-topic.data 变为 MY_TOPIC_DATA_<哈希值>

管道名称确定:

连接器根据以下逻辑确定要使用的管道:

  1. 连接器会检查是否存在与目标表名称同名的管道。

  2. 如果存在用户创建的、具有该名称的管道,连接器将使用该管道(用户自定义管道模式)。

  3. 如果没有,连接器将使用名为 {tableName}-STREAMING 的默认管道

备注

Snowflake 建议选择符合 Snowflake 标识符命名规则的主题名称,以确保生成可预测的表名。

了解 RECORD_METADATA

连接器会用关于 Kafka 记录的元数据填充 RECORD_METADATA 结构。此元数据通过 Snowpipe Streaming 数据源发送至 Snowflake,在管道转换中可通过 $1:RECORD_METADATA 访问器使用。RECORD_METADATA 结构在用户自定义管道和默认管道模式下均可使用。其内容可保存至 VARIANT 类型的列,也可提取其中独立字段并保存到单独的列中。

具有转换和元数据的管道示例:

CREATE PIPE ORDERS AS
COPY INTO ORDERS_TABLE
FROM (
  SELECT
    $1:order_id::NUMBER,
    $1:customer_name,
    $1:order_total,
    $1:RECORD_METADATA.topic AS source_topic,
    $1:RECORD_METADATA.offset::NUMBER AS kafka_offset,
    $1:RECORD_METADATA.SnowflakeConnectorPushTime::BIGINT AS ingestion_time
  FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

在此示例中:

  • 管道从 Kafka 消息中提取特定字段(order_id、customer_name、order_total)

  • 它还捕获元数据字段(主题、偏移和引入时间戳)

  • 可以根据需要对值进行类型转换和/或转换

元数据字段的填充方式

连接器会根据 Kafka 记录属性和连接器配置自动填充元数据字段。您可以使用以下配置参数控制包含哪些元数据字段:

  • ``snowflake.metadata.topic``(默认值:true)– 包括主题名称

  • ``snowflake.metadata.offset.and.partition``(默认值:true)– 包括偏移和分区

  • ``snowflake.metadata.createtime``(默认值:true)– 包括 Kafka 记录时间戳

  • ``snowflake.metadata.all``(默认值:true)– 包括所有可用元数据

当为 snowflake.metadata.all=true``(默认)时,则填充所有元数据字段。将单个元数据字段设置为 ``false 可将其从 RECORD_METADATA 结构中排除。

备注

SnowflakeConnectorPushTime 字段始终可用,它表示连接器将记录推送到引入缓冲区的时间。这对于计算端到端引入延迟非常有用。

RECORD_METADATA 结构默认包含以下信息:

字段

数据类型

描述

主题

字符串

Kafka 主题的名称(记录来源于此主题)。

分区

字符串

主题内的分区编号。(请注意,这是 Kafka 分区,不是 Snowflake 微分区。)

偏移

数字

该分区中的偏移。

CreateTime / . LogAppendTime

数字

这是与 Kafka 主题中的消息关联的时间戳。该值为自 1970 年 1 月 1 日 (UTC) 以来的毫秒数。有关更多信息,请参阅 Kafka ProducerRecord 文档 (https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html)。

SnowflakeConnectorPushTime

数字

一个记录被推送到 Ingest SDK 缓冲区的时间戳。该值为自 1970 年 1 月 1 日午夜 (UTC) 以来的毫秒数。有关更多信息,请参见 估算引入延迟

字符串

如果消息是 Kafka KeyedMessage,这是该消息的键。为了使连接器将密钥存储在 RECORD_METADATA 中,必须将 Kafka 配置属性 中的 key.converter 参数设置为 org.apache.kafka.connect.storage.StringConverter;否则,连接器会忽略密钥。

标头

对象

标头是与记录关联的用户定义的键值对。每条记录可以有 0 个、1 个或多个标头。

RECORD_METADATA 列中记录的元数据量可使用可选的 Kafka 配置属性进行配置。

字段名称和值区分大小写。

Kafka 记录在引入前如何转换

在将每行数据传递给 Snowpipe Streaming 之前,连接器会将 Kafka Connect 记录值转换为一个 Map<String, Object> 类型的对象,其键必须与目标列名匹配(也可在用户自定义管道中进行转换)。基本类型的字符串、字节数组或数字必须进行封装(例如通过 HoistField SMT 方法),以确保连接器接收到结构化对象。转换器应用以下规则:

  • Null 值被视为逻辑删除。当 behavior.on.null.values=IGNORE 时它们将被跳过,否则会作为空 JSON 对象被引入。

  • 数字和布尔字段按原样传递。精度超过 38 的小数值会被序列化为字符串,以符合 Snowflake 的 NUMBER 限制。

  • byte[]ByteBuffer 有效负载是 Base64 编码的字符串,因此将它们存储在 VARIANTVARCHAR 列。

  • 数组仍然是数组,嵌套对象仍然是嵌套映射。当您依赖默认管道以原样存储嵌套数据时,需声明 VARIANT 类型的列。

  • 由于 Snowflake 列名必须为文本类型,非字符串键的映射将以 [key, value] 键值对数组的形式输出。

  • 当相关元数据标志启用时,记录头和键会被复制到 RECORD_METADATA 中。

如果您需要将完整消息正文保留为单列,请使用 SMTs 将其封装到新的顶层字段中。 请参阅转换模式的 旧版 RECORD_CONTENT 列

用户定义的管道模式与默认管道模式

连接器支持两种管理数据引入的模式:

用户定义的管道模式

在此模式下,您可以完全控制数据转换和列映射。

何时使用此模式:

  • 您需要使用与 JSON 字段名不同的自定义列名

  • 您需要应用数据转换(类型转换、掩码、筛选)

  • 您希望完全控制数据映射到列的方式

默认管道模式

在此模式下,连接器使用名为 {tableName}-STREAMING 的默认管道,并将 Kafka 记录字段按名称(不区分大小写)映射到对应的表列。

何时使用此模式:

  • 您的 Kafka 记录键名称与所需的列名称相匹配

  • 您不需要自定义数据转换

  • 您需要简单配置

使用默认管道模式将 Kafka 记录键映射到表列

使用默认管道模式时,连接器会使用名为 {tableName}-STREAMING 的默认管道,并将内容中的一级键通过不区分大小写的匹配方式直接映射到表列。

使用默认管道模式 – 示例

示例 1:

请考虑以下 Kafka 记录内容有效负载:

{
  "city": "New York",
  "age": 30,
  "married": true,
  "has cat": true,
  "@&$#* includes special characters": true,
  "skills": ["sitting", "standing", "eating"],
  "family": {"son": "Jack", "daughter": "Anna"}
}
Copy

您需要创建一个表,其列与 JSON 键相匹配(不区分大小写,包含特殊字符):

CREATE TABLE PERSON_DATA (
  record_metadata VARIANT,
  city VARCHAR,
  age NUMBER,
  married BOOLEAN,
  "has cat" BOOLEAN,
  "!@&$#* includes special characters" BOOLEAN,
  skills VARIANT,
  family VARIANT
);
Copy

匹配行为:

  • "city" (Kafka) → cityCITY``City``(列)– 不区分大小写

  • "has cat" (Kafka) → ``"has cat"``(列)– 由于空格,必须加引号

  • "!@&$#* includes special characters" (Kafka) → ``"!@&$#* includes special characters"``(列)– 保留的特殊字符

  • skillsfamily 这类嵌套对象会自动映射到 VARIANT 类型的列

使用用户定义的管道模式 – 示例

本示例展示如何配置并使用具有自定义数据转换功能的用户自定义管道。

示例 1:

使用所需架构创建表:

CREATE TABLE ORDERS (
  order_id NUMBER,
  customer_name VARCHAR,
  order_total NUMBER,
  order_date TIMESTAMP_NTZ,
  source_topic VARCHAR
);
Copy

创建一个管道,用于转换输入的 Kafka 记录以匹配您的表架构:

CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
  SELECT
    $1:order_id::NUMBER,
    $1:customer_name,
    $1:order_total::NUMBER,
    $1:order_date::TIMESTAMP_NTZ,
    $1:RECORD_METADATA.topic
  FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

请注意,管道名称 (ORDERS) 与表名 (ORDERS) 相匹配。管道定义通过 $1:field_name 语法从 JSON 负载中提取字段,并将其映射到表列。

备注

您可以使用方括号表示法访问嵌套的 JSON 字段及包含特殊字符的字段,例如 $1['field name']$1['has cat']

配置主题到表的映射:

snowflake.topic2table.map=kafka-orders-topic:ORDERS
Copy

此配置将 Kafka 主题 kafka-orders-topic 映射到已存在的、名为 ORDERS 的表和管道。

示例 2:

当需要访问内容中非传统命名的键时,请使用以下语法:

  • 简单字段:$1:field_name

  • 包含空格或特殊字符的字段:$1['field name']$1['has cat']

  • 具有 unicode 字符的字段:$1[' @&$#* has Łułósżź']

  • 嵌套字段:$1:parent.child$1:parent['child field']

请参考以下来自 Kafka 的 JSON 负载:

{
  "city": "New York",
  "age": 30,
  "married": true,
  "has cat": true,
  " @&$#* has Łułósżź": true,
  "skills": ["sitting", "standing", "eating"],
  "family": {"son": "Jack", "daughter": "Anna"}
}
Copy

使用所选列名称创建目标表:

CREATE TABLE PERSON_DATA (
  city VARCHAR,
  age NUMBER,
  married BOOLEAN,
  has_cat BOOLEAN,
  weird_field_name BOOLEAN,
  skills VARIANT,
  family VARIANT
);
Copy

随后创建一个同名的管道来定义映射关系:

CREATE PIPE PERSON_DATA AS
COPY INTO PERSON_DATA
FROM (
  SELECT
    $1:city,
    $1:age,
    $1:married,
    $1['has cat'] AS has_cat,
    $1[' @&$#* has Łułósżź'] AS weird_field_name,
    $1:skills,
    $1:family
  FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

要点:

  • 您可以控制列名称(例如,将 "has cat" 重命名为 has_cat

  • 您可以根据需要转换数据类型(例如,$1:age::NUMBER

  • 您可以根据需要包含或排除字段

  • 您可以添加元数据字段(例如,$1:RECORD_METADATA.topic

  • VARIANT 列自动处理嵌套 JSON 结构

示例 3:使用交互式表

交互式表是 Snowflake 中为低延迟、高并发查询而优化的一种特殊表类型。您可以在 交互式表文档 中找到有关交互式表的更多信息。

  1. 创建交互式表:

    CREATE INTERACTIVE TABLE REALTIME_METRICS (
      metric_name VARCHAR,
      metric_value NUMBER,
      source_topic VARCHAR,
      timestamp TIMESTAMP_NTZ
    ) AS (SELECT
          $1:M_NAME::VARCHAR,
          $1:M_VALUE::NUMBER,
          $1:RECORD_METADATA.topic::VARCHAR,
          $1:RECORD_METADATA.timestamp::TIMESTAMP_NTZ
          from TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
    
    Copy
  2. 配置主题到表的映射:

    snowflake.topic2table.map=metrics-topic:REALTIME_METRICS
    
    Copy

重要注意事项:

  • 交互式表具有特定的限制和查询限制。请先查看 交互式表文档,然后再将它们与连接器一起使用。

  • 对于交互式表,任何所需的转换都必须在表定义中处理。

  • 需要交互式仓库才能有效地查询交互式表。

显式主题到表映射模式

当您配置 snowflake.topic2table.map 参数时,连接器在显式映射模式下运行。此模式允许您执行以下操作:

  • 将多个 Kafka 主题映射到单个 Snowflake 表

  • 使用与主题名称不同的自定义表名称

  • 应用正则表达式模式来匹配多个主题

配置格式:

snowflake.topic2table.map 参数接受以逗号分隔的主题到表的映射列表,格式如下:

topic1:table1,topic2:table2,topic3:table3
Copy

配置示例:

直接主题映射

snowflake.topic2table.map=orders:ORDER_TABLE,customers:CUSTOMER_TABLE
Copy

正则表达式模式匹配

snowflake.topic2table.map=.*_cat:CAT_TABLE,.*_dog:DOG_TABLE
Copy

此配置将所有以 _cat 结尾的主题(例如 orange_catcalico_cat)映射到 CAT_TABLE 表,并将所有以 _dog 结尾的主题映射到 DOG_TABLE 表。

一个表中的多个主题

snowflake.topic2table.map=topic1:shared_table,topic2:shared_table,topic3:other_table
Copy

此配置将 topic1topic2 映射到 shared_table,同时将 topic3 映射到 other_table

重要

  • 映射中的正则表达式模式不能重叠。每个主题最多只能匹配一个模式。

  • 映射中的表名称必须是有效的 Snowflake 标识符,至少包含 2 个字符,以字母或下划线开头。

  • 您可以将多个主题映射到单个表。

旧版 RECORD_CONTENT 列

在连接器旧版本(3.x 及更早版本)中,当模式化功能被禁用时,连接器会创建包含两列的目标表:RECORD_CONTENT 和 RECORD_METADATA。RECORD_CONTENT 列以 VARIANT 类型列的形式完整存储 Kafka 消息内容。RECORD_METADATA 列仍受支持,但连接器不再创建 RECORD_CONTENT 列。同样的功能可以通过 SMT 转换来实现(参见本节后续示例)。RECORD_CONTENT 键在 PIPE 转换中也不再可用。例如,默认情况下,此 PIPE 定义不起作用:

备注

若没有额外的 SMT 转换,此管道定义将无法正常工作。

CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
  SELECT
    $1:RECORD_CONTENT
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

如果您需要将完整的 Kafka 消息内容保存到单个列中,或者需要在 PIPE 转换中获取整个 Kafka 消息内容,可以使用以下 SMT 转换,将整个 Kafka 消息内容封装到您所需的自定义字段:

transforms=wrapKafkaMessageContent
transforms.wrapKafkaMessageContent.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.wrapKafkaMessageContent.field=your_top_level_field_name
Copy

此转换会将整个 Kafka 消息内容封装到名为 your_top_level_field_name 的自定义字段中。随后,您可以在 PIPE 转换中使用 $1:your_top_level_field_name 访问器来获取整个 Kafka 消息内容。

CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
  SELECT
    $1:your_top_level_field_name
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

或者,如果您想通过默认管道将完整的元数据和内容保存到单张表中,则无需创建自定义管道;只需创建一个包含两列的表:RECORD_CONTENTyour_top_level_field_name

CREATE TABLE ORDERS (
  record_metadata VARIANT,
  your_top_level_field_name VARIANT
);
Copy

要阅读有关 HoistField$Value 转换的更多信息,请参阅 Kafka 文档 (https://kafka.apache.org/39/documentation.html#connect_transforms)。

警告

将完整的 Kafka 消息内容及元数据保存到表中可能会对数据引入成本、管道速度和延迟产生负面影响。若需要获得最佳性能,请考虑仅保存所需数据(前提是这些数据可从 Kafka 记录内容的顶层直接获取),或通过 SMT 转换将深层次嵌套字段中的数据提取至顶层字段。

处理流通道错误和死信队列

连接器在提交 Kafka 偏移量之前,会检查 Snowpipe Streaming 通道状态。如果连接器检测到自连接器启动以来通道上的 rowsErrorCount 属性已增加,它会在 errors.tolerance=none 时引发致命错误 (ERROR_5030),以防忽视数据问题。要允许在对异常行进行分类时继续进行引入,可设置 errors.tolerance=all

errors.tolerance=all
Copy

架构演化

对于具有 ENABLE_SCHEMA_EVOLUTION=TRUE 的表,连接器会根据传入 Kafka 记录自动演化其架构。 所有连接器创建的表默认使用 ENABLE_SCHEMA_EVOLUTION=TRUE

架构演化仅限于以下操作:

  • 添加新列如果传入的 Kafka 记录包含表中不存在的新字段,连接器会向表中添加新列。

  • 删除插入记录中缺少数据的列的 NOT NULL 约束

将连接器与 Apache Iceberg™ 表结合使用

连接器可以将数据引入 Snowflake 管理的 Apache Iceberg™ 表,但必须满足以下要求:

  • 对于与您的 Apache Iceberg™ 表关联的外部卷,您必须已获授 USAGE 权限。

  • 在运行连接器之前,必须创建 Apache Iceberg™ 表。

授予对外部卷的使用权限

要将与 Apache Iceberg™ 表关联的外部卷的 USAGE 权限授予 Kafka 连接器所用的角色,请运行以下语句:

例如,如果 Iceberg 表使用 kafka_external_volume 外部卷,而连接器使用角色 kafka_connector_role,请运行以下语句:

USE ROLE ACCOUNTADMIN;
GRANT USAGE ON EXTERNAL VOLUME kafka_external_volume TO ROLE kafka_connector_role;
Copy

创建用于引入的 Apache Iceberg™ 表

连接器不会自动创建 Iceberg 表,也不支持架构演化。在运行连接器之前,您必须手动创建 Iceberg 表。

创建 Iceberg 表时,您可以使用 Iceberg 数据类型(包括 VARIANT)或 兼容的 Snowflake 类型

例如,请考虑以下消息:

{
    "id": 1,
    "name": "Steve",
    "body_temperature": 36.6,
    "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
    "animals_possessed":
    {
        "dogs": true,
        "cats": false
    },
    "options":
    {
        "can_walk": true,
        "can_talk": false
    },
    "date_added": "2024-10-15"
}
Copy

要为示例消息创建 Iceberg 表,请使用以下任一语句:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    id number(38,0),
    name varchar,
    body_temperature number(4,2),
    approved_coffee_types array(varchar),
    animals_possessed variant,
    options object(can_walk boolean, can_talk boolean),
    date_added date
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table'
  ICEBERG_VERSION = 3;
Copy
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    id INT,
    name string,
    body_temperature float,
    approved_coffee_types array(string),
    animals_possessed variant,
    date_added date,
    options object(can_walk boolean, can_talk boolean),
    )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table'
  ICEBERG_VERSION = 3;
Copy

备注

嵌套结构(如 dogscats)内的字段名称区分大小写。

后续步骤

设置任务