Snowflake High Performance connector for Kafka 的工作原理¶
This topic describes various aspects of the connector, how it works with tables and pipes, and how to configure the connector.
连接器如何与表和管道配合使用¶
The connector treats each Kafka record as a row to be inserted into a Snowflake table. For example, if you have a Kafka topic with the content of the message structured like :
{
"order_id": 12345,
"customer_name": "John",
"order_total": 100.00,
"isPaid": true
}
默认情况下,引入开始前无需创建表或管道。连接器会创建列与 JSON 键匹配的表,并依赖名为 {tableName}-STREAMING 的默认管道,该管道会自动将记录内容中的一级键按名称(不区分大小写)映射到表列。您也可以创建自己的表,其列与 JSON 键匹配。连接器尝试将记录内容中的一级键按名称与表列匹配。如果 JSON 中的键与表列不匹配,连接器会忽略这些键。
CREATE TABLE ORDERS (
record_metadata VARIANT,
order_id NUMBER,
customer_name VARCHAR,
order_total NUMBER,
ispaid BOOLEAN
);
如果您选择创建自己的管道,可以在管道的 COPY INTO 语句中定义数据转换逻辑。您可以根据需要重命名列,并根据需要转换数据类型。例如:
CREATE TABLE ORDERS (
order_id VARCHAR,
customer_name VARCHAR,
order_total VARCHAR,
ispaid VARCHAR
);
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
SELECT
$1:order_id::STRING,
$1:customer_name,
$1:order_total::STRING,
$1:isPaid::STRING
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
或
CREATE TABLE ORDERS (
topic VARCHAR,
partition VARCHAR,
order_id VARCHAR,
customer_name VARCHAR,
order_total VARCHAR,
ispaid VARCHAR
);
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
SELECT
$1:RECORD_METADATA.topic::STRING AS topic,
$1:RECORD_METADATA.partition::STRING AS partition,
$1['order_id']::STRING AS order_id,
$1['customer_name']::STRING as customer_name,
CONCAT($1['order_total']::STRING, ' USD') AS order_total,
$1['isPaid']::STRING AS ispaid
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
当您定义自己的管道时,目标表列不需要与 JSON 键相匹配。您可以将列重命名为所需名称,并根据需要转换数据类型。
主题名称、表名称和管道名称¶
根据配置设置,连接器将为目标表使用不同的名称。目标表名称始终派生自主题名称。
连接器如何将主题名称映射到目标表¶
Kafka Connector 提供两种将 Kafka 主题名称映射到 Snowflake 表名称的模式:
静态映射:连接器仅使用 Kafka 主题名称派生目标表名称。
显式主题到表映射模式:您可以通过
snowflake.topic2table.map配置参数指定主题与表之间的自定义映射关系
静态映射¶
如果未配置 snowflake.topic2table.map 参数,连接器始终从主题名称派生表名称。
表名称生成:
连接器使用以下规则从主题名称派生目标表名称:
If the topic name is a valid Snowflake identifier the connector uses the topic name as the destination table name, converted to uppercase).
如果主题名称包含无效字符,连接器将:
用下划线替换无效字符
追加下划线,后跟哈希码以确保唯一性
例如,主题
my-topic.data变为MY_TOPIC_DATA_<哈希值>
管道名称确定:
连接器根据以下逻辑确定要使用的管道:
连接器会检查是否存在与目标表名称同名的管道。
如果存在用户创建的、具有该名称的管道,连接器将使用该管道(用户自定义管道模式)。
如果没有,连接器将使用名为
{tableName}-STREAMING的默认管道
备注
Snowflake 建议选择符合 Snowflake 标识符命名规则的主题名称,以确保生成可预测的表名。
了解 RECORD_METADATA¶
连接器会用关于 Kafka 记录的元数据填充 RECORD_METADATA 结构。此元数据通过 Snowpipe Streaming 数据源发送至 Snowflake,在管道转换中可通过 $1:RECORD_METADATA 访问器使用。RECORD_METADATA 结构在用户自定义管道和默认管道模式下均可使用。其内容可保存至 VARIANT 类型的列,也可提取其中独立字段并保存到单独的列中。
具有转换和元数据的管道示例:
CREATE PIPE ORDERS AS
COPY INTO ORDERS_TABLE
FROM (
SELECT
$1:order_id::NUMBER,
$1:customer_name,
$1:order_total,
$1:RECORD_METADATA.topic AS source_topic,
$1:RECORD_METADATA.offset::NUMBER AS kafka_offset,
$1:RECORD_METADATA.SnowflakeConnectorPushTime::BIGINT AS ingestion_time
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
在此示例中:
管道从 Kafka 消息中提取特定字段(order_id、customer_name、order_total)
它还捕获元数据字段(主题、偏移和引入时间戳)
可以根据需要对值进行类型转换和/或转换
元数据字段的填充方式¶
连接器会根据 Kafka 记录属性和连接器配置自动填充元数据字段。您可以使用以下配置参数控制包含哪些元数据字段:
``snowflake.metadata.topic``(默认值:true)– 包括主题名称
``snowflake.metadata.offset.and.partition``(默认值:true)– 包括偏移和分区
``snowflake.metadata.createtime``(默认值:true)– 包括 Kafka 记录时间戳
``snowflake.metadata.all``(默认值:true)– 包括所有可用元数据
当为 snowflake.metadata.all=true``(默认)时,则填充所有元数据字段。将单个元数据字段设置为 ``false 可将其从 RECORD_METADATA 结构中排除。
备注
SnowflakeConnectorPushTime 字段始终可用,它表示连接器将记录推送到引入缓冲区的时间。这对于计算端到端引入延迟非常有用。
RECORD_METADATA 结构默认包含以下信息:
字段 |
数据类型 |
描述 |
|---|---|---|
主题 |
字符串 |
Kafka 主题的名称(记录来源于此主题)。 |
分区 |
字符串 |
主题内的分区编号。(请注意,这是 Kafka 分区,不是 Snowflake 微分区。) |
偏移 |
数字 |
该分区中的偏移。 |
CreateTime / . LogAppendTime |
数字 |
This is the timestamp associated with the message in the Kafka topic. The value is milliseconds since midnight January 1, 1970, UTC. For more information, see: Kafka ProducerRecord documentation (https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html). |
SnowflakeConnectorPushTime |
数字 |
一个记录被推送到 Ingest SDK 缓冲区的时间戳。该值为自 1970 年 1 月 1 日午夜 (UTC) 以来的毫秒数。有关更多信息,请参见 估算引入延迟。 |
键 |
字符串 |
If the message is a Kafka KeyedMessage, this is the key for that message.
In order for the connector to store the key in the RECORD_METADATA, the |
标头 |
对象 |
A header is a user-defined key-value pair associated with the record. Each record can have 0, 1, or multiple headers. |
RECORD_METADATA 列中记录的元数据量可使用可选的 Kafka 配置属性进行配置。
字段名称和值区分大小写。
Kafka 记录在引入前如何转换¶
在将每行数据传递给 Snowpipe Streaming 之前,连接器会将 Kafka Connect 记录值转换为一个 Map<String, Object> 类型的对象,其键必须与目标列名匹配(也可在用户自定义管道中进行转换)。基本类型的字符串、字节数组或数字必须进行封装(例如通过 HoistField SMT 方法),以确保连接器接收到结构化对象。转换器应用以下规则:
Null 值被视为逻辑删除。当
behavior.on.null.values=IGNORE时它们将被跳过,否则会作为空 JSON 对象被引入。数字和布尔字段按原样传递。精度超过 38 的小数值会被序列化为字符串,以符合 Snowflake 的
NUMBER限制。byte[]和ByteBuffer有效负载是 Base64 编码的字符串,因此将它们存储在VARIANT或VARCHAR列。数组仍然是数组,嵌套对象仍然是嵌套映射。当您依赖默认管道以原样存储嵌套数据时,需声明
VARIANT类型的列。由于 Snowflake 列名必须为文本类型,非字符串键的映射将以
[key, value]键值对数组的形式输出。当相关元数据标志启用时,记录头和键会被复制到
RECORD_METADATA中。
如果您需要将完整消息正文保留为单列,请使用 SMTs 将其封装到新的顶层字段中。 请参阅转换模式的 旧版 RECORD_CONTENT 列。
用户定义的管道模式与默认管道模式¶
连接器支持两种管理数据引入的模式:
用户定义的管道模式¶
在此模式下,您可以完全控制数据转换和列映射。
何时使用此模式:
您需要使用与 JSON 字段名不同的自定义列名
您需要应用数据转换(类型转换、掩码、筛选)
您希望完全控制数据映射到列的方式
默认管道模式¶
在此模式下,连接器使用名为 {tableName}-STREAMING 的默认管道,并将 Kafka 记录字段按名称(不区分大小写)映射到对应的表列。
何时使用此模式:
您的 Kafka 记录键名称与所需的列名称相匹配
您不需要自定义数据转换
You want a simple configuration
使用默认管道模式将 Kafka 记录键映射到表列
使用默认管道模式时,连接器会使用名为 {tableName}-STREAMING 的默认管道,并将内容中的一级键通过不区分大小写的匹配方式直接映射到表列。
使用默认管道模式 – 示例¶
示例 1:¶
请考虑以下 Kafka 记录内容有效负载:
{
"city": "New York",
"age": 30,
"married": true,
"has cat": true,
"@&$#* includes special characters": true,
"skills": ["sitting", "standing", "eating"],
"family": {"son": "Jack", "daughter": "Anna"}
}
您需要创建一个表,其列与 JSON 键相匹配(不区分大小写,包含特殊字符):
CREATE TABLE PERSON_DATA (
record_metadata VARIANT,
city VARCHAR,
age NUMBER,
married BOOLEAN,
"has cat" BOOLEAN,
"!@&$#* includes special characters" BOOLEAN,
skills VARIANT,
family VARIANT
);
匹配行为:
使用用户定义的管道模式 – 示例¶
本示例展示如何配置并使用具有自定义数据转换功能的用户自定义管道。
示例 1:¶
使用所需架构创建表:
CREATE TABLE ORDERS (
order_id NUMBER,
customer_name VARCHAR,
order_total NUMBER,
order_date TIMESTAMP_NTZ,
source_topic VARCHAR
);
创建一个管道,用于转换输入的 Kafka 记录以匹配您的表架构:
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
SELECT
$1:order_id::NUMBER,
$1:customer_name,
$1:order_total::NUMBER,
$1:order_date::TIMESTAMP_NTZ,
$1:RECORD_METADATA.topic
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
请注意,管道名称 (ORDERS) 与表名 (ORDERS) 相匹配。管道定义通过 $1:field_name 语法从 JSON 负载中提取字段,并将其映射到表列。
备注
您可以使用方括号表示法访问嵌套的 JSON 字段及包含特殊字符的字段,例如 $1['field name'] 或 $1['has cat']。
配置主题到表的映射:
snowflake.topic2table.map=kafka-orders-topic:ORDERS
此配置将 Kafka 主题 kafka-orders-topic 映射到已存在的、名为 ORDERS 的表和管道。
示例 2:¶
当需要访问内容中非传统命名的键时,请使用以下语法:
简单字段:
$1:field_name包含空格或特殊字符的字段:
$1['field name']或$1['has cat']具有 unicode 字符的字段:
$1[' @&$#* has Łułósżź']嵌套字段:
$1:parent.child或$1:parent['child field']
请参考以下来自 Kafka 的 JSON 负载:
{
"city": "New York",
"age": 30,
"married": true,
"has cat": true,
" @&$#* has Łułósżź": true,
"skills": ["sitting", "standing", "eating"],
"family": {"son": "Jack", "daughter": "Anna"}
}
使用所选列名称创建目标表:
CREATE TABLE PERSON_DATA (
city VARCHAR,
age NUMBER,
married BOOLEAN,
has_cat BOOLEAN,
weird_field_name BOOLEAN,
skills VARIANT,
family VARIANT
);
随后创建一个同名的管道来定义映射关系:
CREATE PIPE PERSON_DATA AS
COPY INTO PERSON_DATA
FROM (
SELECT
$1:city,
$1:age,
$1:married,
$1['has cat'] AS has_cat,
$1[' @&$#* has Łułósżź'] AS weird_field_name,
$1:skills,
$1:family
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
要点:
您可以控制列名称(例如,将
"has cat"重命名为has_cat)您可以根据需要转换数据类型(例如,
$1:age::NUMBER)您可以根据需要包含或排除字段
您可以添加元数据字段(例如,
$1:RECORD_METADATA.topic)VARIANT 列自动处理嵌套 JSON 结构
示例 3:使用交互式表¶
交互式表是 Snowflake 中为低延迟、高并发查询而优化的一种特殊表类型。您可以在 交互式表文档 中找到有关交互式表的更多信息。
创建交互式表:
CREATE INTERACTIVE TABLE REALTIME_METRICS ( metric_name VARCHAR, metric_value NUMBER, source_topic VARCHAR, timestamp TIMESTAMP_NTZ ) AS (SELECT $1:M_NAME::VARCHAR, $1:M_VALUE::NUMBER, $1:RECORD_METADATA.topic::VARCHAR, $1:RECORD_METADATA.timestamp::TIMESTAMP_NTZ from TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
配置主题到表的映射:
snowflake.topic2table.map=metrics-topic:REALTIME_METRICS
重要注意事项:
交互式表具有特定的限制和查询限制。请先查看 交互式表文档,然后再将它们与连接器一起使用。
对于交互式表,任何所需的转换都必须在表定义中处理。
需要交互式仓库才能有效地查询交互式表。
显式主题到表映射模式¶
当您配置 snowflake.topic2table.map 参数时,连接器在显式映射模式下运行。此模式允许您执行以下操作:
将多个 Kafka 主题映射到单个 Snowflake 表
使用与主题名称不同的自定义表名称
应用正则表达式模式来匹配多个主题
配置格式:
snowflake.topic2table.map 参数接受以逗号分隔的主题到表的映射列表,格式如下:
topic1:table1,topic2:table2,topic3:table3
配置示例:
直接主题映射
snowflake.topic2table.map=orders:ORDER_TABLE,customers:CUSTOMER_TABLE
正则表达式模式匹配
snowflake.topic2table.map=.*_cat:CAT_TABLE,.*_dog:DOG_TABLE
此配置将所有以 _cat 结尾的主题(例如 orange_cat、calico_cat)映射到 CAT_TABLE 表,并将所有以 _dog 结尾的主题映射到 DOG_TABLE 表。
一个表中的多个主题
snowflake.topic2table.map=topic1:shared_table,topic2:shared_table,topic3:other_table
此配置将 topic1 和 topic2 映射到 shared_table,同时将 topic3 映射到 other_table。
重要
映射中的正则表达式模式不能重叠。每个主题最多只能匹配一个模式。
映射中的表名称必须是有效的 Snowflake 标识符,至少包含 2 个字符,以字母或下划线开头。
您可以将多个主题映射到单个表。
旧版 RECORD_CONTENT 列¶
In prior versions of the connector (3.x and earlier), when the schematization feature was disabled, the connector created a destination table with two columns: RECORD_CONTENT and RECORD_METADATA. The RECORD_CONTENT column contained the entire Kafka message content in a column of type VARIANT. The RECORD_METADATA column continues to be supported but the RECORD_CONTENT column is no longer created by the connector. The same functionality can be achieved using SMT transformations (see examples later in this section). The RECORD_CONTENT key is also no longer available in PIPE transformations. For example, this PIPE definition will not work by default:
备注
若没有额外的 SMT 转换,此管道定义将无法正常工作。
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
SELECT
$1:RECORD_CONTENT
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
如果您需要将完整的 Kafka 消息内容保存到单个列中,或者需要在 PIPE 转换中获取整个 Kafka 消息内容,可以使用以下 SMT 转换,将整个 Kafka 消息内容封装到您所需的自定义字段:
transforms=wrapKafkaMessageContent
transforms.wrapKafkaMessageContent.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.wrapKafkaMessageContent.field=your_top_level_field_name
此转换会将整个 Kafka 消息内容封装到名为 your_top_level_field_name 的自定义字段中。随后,您可以在 PIPE 转换中使用 $1:your_top_level_field_name 访问器来获取整个 Kafka 消息内容。
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
SELECT
$1:your_top_level_field_name
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
或者,如果您想通过默认管道将完整的元数据和内容保存到单张表中,则无需创建自定义管道;只需创建一个包含两列的表:RECORD_CONTENT 和 your_top_level_field_name。
CREATE TABLE ORDERS (
record_metadata VARIANT,
your_top_level_field_name VARIANT
);
要阅读有关 HoistField$Value 转换的更多信息,请参阅 Kafka 文档 (https://kafka.apache.org/39/documentation.html#connect_transforms)。
警告
将完整的 Kafka 消息内容及元数据保存到表中可能会对数据引入成本、管道速度和延迟产生负面影响。若需要获得最佳性能,请考虑仅保存所需数据(前提是这些数据可从 Kafka 记录内容的顶层直接获取),或通过 SMT 转换将深层次嵌套字段中的数据提取至顶层字段。
处理流通道错误和死信队列¶
The connector inspects the Snowpipe Streaming channel status before committing offsets in Kafka. If the connector detects that the rowsErrorCount property on channel has increased since the connector was started, it raises a fatal error (ERROR_5030) when errors.tolerance=none so that data issues don't go unnoticed. To allow ingestion to continue while triaging bad rows, set errors.tolerance=all
errors.tolerance=all
架构演化¶
对于具有 ENABLE_SCHEMA_EVOLUTION=TRUE 的表,连接器会根据传入 Kafka 记录自动演化其架构。 所有连接器创建的表默认使用 ENABLE_SCHEMA_EVOLUTION=TRUE。
架构演化仅限于以下操作:
添加新列如果传入的 Kafka 记录包含表中不存在的新字段,连接器会向表中添加新列。
删除插入记录中缺少数据的列的 NOT NULL 约束
连接器的容错限制¶
可以为 Kafka 主题配置存储空间限制或保留时间限制。
如果系统离线时间超过保留时间,则不会加载过期的记录。同样,如果超过 Kafka 的存储空间限制,某些消息将无法传递。
如果已删除 Kafka 主题中的消息,这些更改可能不会反映在 Snowflake 表中。
后续步骤¶
设置任务。