Snowpipe Streaming

调用 Snowpipe Streaming API(“API”)提示使用 Snowflake Ingest SDK 以及您自己的托管应用程序代码,实现流数据行的低延迟加载。流数据引入 API 将数据行写入 Snowflake 表,这与批量数据加载或 Snowpipe 不同,后者会写入来自暂存文件的数据。这种架构可降低加载延迟,并相应降低加载相似数据量的费用,因此是处理实时数据流的强大工具。

本主题介绍了与调用 API 的自定义客户端应用程序有关的概念。有关 Snowflake Connector for Kafka(“Kafka Connector”)的说明,请参阅 结合使用 Snowflake Connector for Kafka 和 Snowpipe Streaming

本主题内容:

Snowpipe Streaming API 与 Snowpipe 的对比

这种 API 旨在补充 Snowpipe,而非取而代之。在流处理场景中使用 Snowpipe Streaming API 时,数据会通过行(例如 Apache Kafka 主题)进行流传输,而非写入文件。这种 API 适合包含生成或接收记录的现有自定义 Java 应用程序的引入工作流程。此 API 不需要创建文件即可将数据加载到 Snowflake 表中,并且可以在数据可用时自动、连续地将数据流载入 Snowflake。

Snowpipe Streaming

下表描述了 Snowpipe Streaming 与 Snowpipe 之间的区别:

类别

Snowpipe Streaming

Snowpipe

待加载数据的形式

文件。如果您的现有数据管道在 Blob 存储中生成文件,则建议使用 Snowpipe,而非 API。

第三方软件要求

Snowflake Ingest SDK 的自定义 Java 应用程序代码包装程序

数据排序

各通道内的有序插入

不支持。在从文件加载数据时,Snowpipe 可按照与云存储中文件创建时间戳不同的顺序进行加载。

加载历史记录

加载历史记录会记录在 SNOWPIPE_STREAMING_FILE_MIGRATION_HISTORY 视图 (Account Usage) 之中

加载历史记录会记录在 LOAD_HISTORY 视图 (Account Usage) 和 COPY_HISTORY 函数 (Information Schema) 之中

管道对象

不需要管道对象:API 将记录直接写入目标表。

需要一个管道对象,用于将暂存文件数据排入队列,并加载到目标表中。

软件要求

Java SDK

Snowpipe Streaming 服务目前作为用于 Snowflake Ingest SDK 的 一组 APIs 实现。可从 Maven Central Repository (https://mvnrepository.com/artifact/net.snowflake/snowflake-ingest-sdk) 下载该 SDK。Snowflake 建议使用 Snowflake Ingest SDK 2.0.2 或更新版本。

该 SDK 支持 Java 8 或更高版本,并且需要 Java Cryptography Extension (JCE) Unlimited Strength Jurisdiction Policy Files

重要

该 SDK 会对 Snowflake 进行 REST API 调用。您可能需要调整网络防火墙规则以允许连接。

自定义客户端应用程序

该 API 需要一个能提取数据行,并处理遇到的错误的自定义 Java 应用程序接口。您负责确保应用程序持续运行,并且可从故障中恢复。对于给定的一批行,API 支持与 ON_ERROR = CONTINUE | SKIP_BATCH | ABORT 同等的处理。

  • CONTINUE:继续加载可接受的数据行,并返回所有错误。

  • SKIP_BATCH:如果在整批行中遇到任何错误,则跳过加载并返回所有错误。

  • :code:`ABORT`(默认设置):中止整批行,并在遇到第一个错误时抛出异常。

应用程序应该使用来自 :code:`insertRow`(单行)或 :code:`insertRows`(行集)方法的响应获取错误。

通道

API 通过一个或多个 通道 引入行。通道表示与 Snowflake 的逻辑、指定流处理连接,用于将数据加载到表中。一个通道精确映射到 Snowflake 中的一个表;但多个通道可以指向同一个表。Client SDK 可以为多个表打开多个通道;但 SDK 无法跨账户打开通道。行的顺序及其相应的偏移令牌会在通道内保留,但不会跨指向同一个表的多个通道保留。

在客户端主动插入数据时,通道应长期存在,并且由于保留了偏移令牌,应该加以重复使用。默认情况下,通道内的数据每 1 秒自动刷新一次,不需要关闭。有关更多信息,请参阅 延迟

Snowpipe Streaming 客户端通道表映射

您可以运行 SHOW CHANNELS 命令,列出您有访问权限的通道。有关更多信息,请参阅 SHOW CHANNELS

备注

不活动的通道及其偏移令牌会在 30 天后自动删除。

偏移令牌

偏移令牌*是客户端可包含在 :code:`insertRow`(单行)或 :code:`insertRows`(行集)方法中的字符串,这些方法请求跟踪每个通道的引入进度。在通道创建时,此令牌将初始化为 NULL,并会在具有所提供的偏移令牌的行通过异步过程提交到 Snowflake 时更新。客户可以定期发出 :code:`getLatestCommittedOffsetToken` 方法请求,以获取特定通道的最新提交的偏移令牌,并使用它来推断引入进度。请注意,Snowflake *不 使用此令牌来执行重复数据删除;但客户端可以使用此令牌,通过自定义代码来执行重复数据删除。

在客户端重新打开一个通道时,将会返回最新的持久偏移令牌。客户端可使用令牌重置其在数据源中的位置,以避免发送两次相同的数据。请注意,在通道重新打开事件发生时,Snowflake 中缓存的任何数据都将被丢弃,以避免提交这些数据。

您可以使用最新提交的偏移令牌来执行以下常见用例:

  • 跟踪引入进度

  • 将特定偏移与最新提交的偏移令牌进行比较,确认特定偏移是否已提交

  • 推进源偏移,并清除已提交的数据

  • 启用重复数据删除并确保确切传送一次数据

例如,Kafka Connector 可从主题读取偏移令牌,例如 <partition>:<offset>,或者仅为 :code:`<offset>`(如果分区编码在通道名称中)。考虑以下情景:

  1. Kafka Connector 联机并在 Kafka 主题 T 中打开了对应于 Partition 1 的通道,通道名称为 T:P1

  2. 该连接器开始从 Kafka 分区读取记录。

  3. 该连接器调用 API,发出一条 insertRows 方法请求,使用与记录关联的偏移作为偏移令牌。

    例如,偏移令牌可以是 10,代表 Kafka 分区中的第十条记录。

  4. 连接器定期发出 getLatestCommittedOffsetToken 方法请求,确定引入进度。

如果 Kafka Connector 崩溃,可完成以下过程,以恢复从 Kafka 分区的正确偏移位置读取记录。

  1. Kafka Connector 重新联机,并使用与之前相同的名称重新打开通道。

  2. 连接器调用 API,发出一条 getLatestCommittedOffsetToken 方法请求,以获取分区最新提交的偏移。

    例如,假设最新的持久偏移令牌是 20

  3. 连接器使用 Kafka 读取 APIs,重置与偏移加 1(本例中为 21)相对应的游标。

  4. 连接器恢复读取记录。读取游标在重新定位成功后不会检索到重复数据。

在另一个示例中,应用程序从目录读取日志,并使用 Snowpipe Streaming Client SDK 将这些日志导出到 Snowflake。您可以构建一个日志导出应用程序,以执行以下操作:

  1. 列出日志目录中的文件。

    假设日志记录框架生成可按字典顺序排序的日志文件,并且在此排序中,新的日志文件位于末尾。

  2. 逐行读取日志文件并调用 API,发出 insertRows 方法请求,在请求中使用与日志文件名、行数或字节位置相对应的偏移令牌。

    例如,偏移令牌可能是 messages_1.log:20,其中的 messages_1.log 是日志文件的名称,20 是行号。

如果应用程序崩溃或需要重新启动,它将调用 API,发出 getLatestCommittedOffsetToken 方法请求,检索与上次导出的日志文件和行相对应的偏移令牌。根据之前的例子,这可能是 messages_1.log:20。随后应用程序将打开 messages_1.log 并寻找 21 行,以防止同一日志行被引入两次。

备注

偏移令牌信息可能会丢失。偏移令牌链接到一个通道对象,如果 30 天内未使用该通道执行新的引入,该通道将自动清除。为了防止偏移令牌丢失,请考虑维护单独的偏移,并按需重置通道的偏移令牌。

关于确切传送一次数据的最佳实践

实现确切传送一次数据可能并不容易,在自定义代码中遵守以下原则至关重要:

  • 为了确保适当地从异常、故障或崩溃中恢复,您必须始终重新打开通道,并使用最新提交的偏移令牌重新启动引入。

  • 应用程序可能会维护自己的偏移,但使用 Snowflake 提供的、最新提交的偏移令牌作为事实来源,并相应地重置自己的偏移,这一点至关重要。

  • 唯一应将自己的偏移视为事实来源的情况是,Snowflake 的偏移令牌设置为或重置为 NULL。NULL 偏移令牌通常意味着以下情况之一:

    • 这是一个新通道,因此尚未设置偏移令牌。

    • 目标表已丢弃并重新创建,因此该通道视为新通道。

    • 30 日内未通过通道执行任何引入活动,因此通道被自动清理,偏移令牌信息丢失。

  • 如有必要,可根据最新提交的偏移令牌定期清除已提交的源数据,并推进您自己的偏移。

如需进一步了解带有 Snowpipe Streaming 的 Kafka Connector 如何实现确切传送一次数据,请参阅 Exactly-once 语义

延迟

Snowpipe Streaming 每秒自动刷新一次通道内的数据。您无需关闭通道即可刷新数据。

在 Snowflake Ingest SDK 2.0.4 及更高版本中,您可以使用 max_client_lag 选项配置延迟。默认选项为 1 秒。最大延迟时间可以设置为 10 分钟。 更多信息,请参阅 MAX_CLIENT_LAG (https://javadoc.io/doc/net.snowflake/snowflake-ingest-sdk/latest/net/snowflake/ingest/utils/ParameterProvider.html#MAX_CLIENT_LAG) 和 推荐的 Snowpipe Streams 延迟配置

请注意,Snowpipe Streaming 的 Kafka Connector 有自己的缓冲区。达到 Kafka 缓冲区刷新时间后,数据将通过 Snowpipe Streaming 按一秒的延迟发送到 Snowflake。有关更多信息,请参阅 buffer.flush.time

迁移到经优化的文件

API 将通道中的行写入云存储中的 blob,然后提交到目标表。在初始情况下,写入目标表的流数据以临时中间文件格式存储。在此暂存区,该表被视为“混合表”,因为分区数据使用本机文件与中间文件的混合形式存储。一个自动后台进程根据需要,将数据从活动中间文件迁移到针对查询和 DML 操作而优化的本机文件。

复制

Snowpipe Streaming 支持 Snowpipe Streaming 填充的 Snowflake 表及其关联的通道偏移从不同 区域、跨 云平台 的源账户到目标账户的 复制和故障转移

有关更多信息,请参阅 复制和 Snowpipe 流

仅限插入操作

API 目前仅限于插入行。要修改、删除或合并数据,请将“原始”记录写入一个或多个临时表。若要合并、连接或转换数据,请使用 :doc:` 连续数据管道 </user-guide/data-pipelines-intro>` 将修改后的数据插入目标报告表。

类和接口

有关类和接口的文档,请参阅 Snowflake Ingest SDK API (https://javadoc.io/doc/net.snowflake/snowflake-ingest-sdk/latest/overview-summary.html)。

支持的 Java 数据类型

下表汇总了支持将哪些 Java 数据类型引入到 Snowflake 列:

Snowflake 列类型

允许的 Java 数据类型

  • CHAR

  • VARCHAR

  • 字符串

  • 基元数据类型(整数、布尔、字符等)

  • BigInteger、BigDecimal

  • BINARY

  • byte[]

  • 字符串(十六进制编码)

  • NUMBER

  • 数字类型(BigInteger、BigDecimal、字节、整数、双精度等)

  • 字符串

  • FLOAT

  • 数字类型(BigInteger、BigDecimal、字节、整数、双精度等)

  • 字符串

  • BOOLEAN

  • 布尔

  • 数字类型(BigInteger、BigDecimal、字节、整数、双精度等)

  • 字符串

请参阅 布尔转换详细信息

  • TIME

  • java.time.LocalTime

  • java.time.OffsetTime

  • 字符串

    • 以整数形式存储的时间

    • HH24:MI:SS.FFTZH:TZM`(例如 :code:`20:57:01.123456789+07:00

    • HH24:MI:SS.FF`(例如 :code:`20:57:01.123456789

    • HH24:MI:SS`(例如 :code:`20:57:01

    • HH24:MI`(例如 :code:`20:57

  • DATE

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.time.Instant

  • 字符串

    • 以整数形式存储的日期

    • YYYY-MM-DD`(例如 :code:`2013-04-28

    • YYYY-MM-DDTHH24:MI:SS.FFTZH:TZM`(例如 :code:`2013-04-28T20:57:01.123456789+07:00

    • YYYY-MM-DDTHH24:MI:SS.FF`(例如 :code:`2013-04-28T20:57:01.123456

    • YYYY-MM-DDTHH24:MI:SS`(例如 :code:`2013-04-28T20:57:01

    • YYYY-MM-DDTHH24:MI`(例如 :code:`2013-04-28T20:57

    • YYYY-MM-DDTHH24:MI:SSTZH:TZM`(例如 :code:`2013-04-28T20:57:01-07:00

    • YYYY-MM-DDTHH24:MITZH:TZM`(例如 :code:`2013-04-28T20:57-07:00

  • TIMESTAMP_NTZ

  • TIMESTAMP_LTZ

  • TIMESTAMP_TZ

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.time.Instant

  • 字符串

    • 以整数形式存储的时间戳

    • YYYY-MM-DD`(例如 :code:`2013-04-28

    • YYYY-MM-DDTHH24:MI:SS.FFTZH:TZM`(例如 :code:`2013-04-28T20:57:01.123456789+07:00

    • YYYY-MM-DDTHH24:MI:SS.FF`(例如 :code:`2013-04-28T20:57:01.123456

    • YYYY-MM-DDTHH24:MI:SS`(例如 :code:`2013-04-28T20:57:01

    • YYYY-MM-DDTHH24:MI`(例如 :code:`2013-04-28T20:57

    • YYYY-MM-DDTHH24:MI:SSTZH:TZM`(例如 :code:`2013-04-28T20:57:01-07:00

    • YYYY-MM-DDTHH24:MITZH:TZM`(例如 :code:`2013-04-28T20:57-07:00

  • VARIANT

  • ARRAY

  • 字符串(必须是有效的 JSON)

  • 基元数据类型及其数组

  • BigInteger、BigDecimal

  • java.time.LocalTime

  • java.time.OffsetTime

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.util.Map<String, T>,其中 T 是有效的 VARIANT 类型

  • T[],其中 T 是有效的 VARIANT 类型

  • List<T>,其中 T 是有效的 VARIANT 类型

  • OBJECT

  • 字符串(必须是有效的 JSON 对象)

  • Map<String, T>,其中 T 是有效的变体类型

  • GEOGRAPHY

  • 不支持

  • GEOMETRY

  • 不支持

所需访问权限

调用 Snowpipe Streaming API 需要具有以下权限的角色:

对象

权限

OWNERSHIP 或至少 INSERT 和 EVOLVE SCHEMA(仅当将 Kafka Connector 的架构演化与 Snowpipe Streaming 结合使用时才需要)

数据库

USAGE

架构

USAGE

限制

Snowpipe Streaming 仅支持使用 256 位 AES 密钥进行数据加密。

*不*支持以下对象或类型:

  • GEOGRAPHY 和 GEOMETRY 数据类型

  • 带有排序规则的列

  • TRANSIENT 或 TEMPORARY 表

  • 具有以下任何列设置的表:

    • AUTOINCREMENTIDENTITY

    • 默认列值不是 NULL

语言: 中文