Snowpipe Streaming¶
Snowpipe Streaming 是 Snowflake 的服务,用于将流数据连续、低延迟地直接加载到 Snowflake 中。它支持近乎实时的数据引入和分析,这对于及时的洞察和即时的运营响应至关重要。来自不同流式数据源的大量数据可在几秒钟内进行查询和分析。
Snowpipe Streaming 的价值¶
实时数据可用性:与传统的批量加载方法不同,在数据到达时引入数据,支持实时仪表板、实时分析和欺诈检测等用例。
高效流式工作负载:利用 Snowflake Ingest 将行直接写入表中,绕过中间云存储。利用 Snowpipe Streaming SDKs 直接将行写入表中,无需在中间云存储中暂存数据。这种直接方法减少了延迟并简化了引入架构。
简化的数据管道:为来自应用程序事件、IoT 传感器、变更数据捕获 (CDC) 流和消息队列(例如 Apache Kafka)等数据源的持续数据管道提供了一种简化的方法。
无服务器且可扩展:作为无服务器产品,它会根据引入负载自动扩缩计算资源。
流处理的成本效益:对流式引入进行了计费优化,可能为高容量、低延迟数据流提供更具成本效益的解决方案。
借助 Snowpipe Streaming,可以在 Snowflake 数据云上构建实时数据应用程序,以便根据最新可用数据做出决策。
Snowpipe Streaming 实施方案¶
Snowpipe Streaming 提供两种不同的实现方式,以满足不同的数据引入需求和性能预期:采用高性能架构的 Snowpipe Streaming 和采用经典架构的 Snowpipe Streaming:
-
Snowflake 设计了这种下一代实现,旨在显著提高吞吐量、优化流处理性能并提供可预测的成本模型,为高级数据流处理功能奠定基础。
主要特征:
SDK:选择使用 时默认使用的角色和仓库。利用新的 snowpipe-streaming SDK (https://repo1.maven.org/maven2/com/snowflake/snowpipe-streaming/)。
定价:采用透明、基于吞吐量的定价(按未压缩数据量每 GB 计算 credit)。
数据流管理:利用 PIPE 对象来管理数据流并在引入时启用轻量级转换。针对此 PIPE 对象对通道进行开启。
引入:提供 REST API,以通过 PIPE 进行直接的、轻量级数据引入。
架构验证:根据 PIPE 中定义的架构在引入期间在服务器端执行。
性能:专为显著提高吞吐量和提高所引入数据的查询效率而设计。
我们鼓励您探索这种先进的架构,尤其适用于新的流处理项目。
-
这是最初的、正式发布的实现,为已建立的数据管道提供了可靠的解决方案。
主要特征:
SDK:选择使用 时默认使用的角色和仓库。利用 snowflake-ingest-sdk (https://mvnrepository.com/artifact/net.snowflake/snowflake-ingest-sdk)。
数据流管理:利用 对象来管理数据流并在引入时启用轻量级转换。不要使用 PIPE 对象概念进行流式引入。直接针对目标表对通道进行配置和开启。
定价:基于用于引入的无服务器计算资源和活跃客户端连接数量的组合。
选择您的实施方案¶
在选择实施方案时,请考虑您的眼前需求和长期数据策略:
新的流处理项目:建议评估 Snowpipe Streaming 高性能架构,因为其前瞻性设计、更好的性能、可扩展性和成本可预测性。
性能要求:高性能架构旨在尽可能提高吞吐量和优化实时性能。
定价偏好:高性能架构提供了清晰的、基于吞吐量的定价,而经典架构则根据无服务器计算使用量和客户端连接进行计费。
现有设置:使用经典架构的现有应用程序可以继续运行。对于未来的扩展或重新设计,可以考虑迁移到高性能架构或整合高性能架构。
功能集和管理:高性能架构中的 PIPE 对象引入了经典架构所不具备的增强管理和转换功能。
Snowpipe Streaming 与 Snowpipe 的对比¶
Snowpipe Streaming 旨在补充 Snowpipe,而不是取而代之。在流处理场景中使用 Snowpipe Streaming API 时,数据会通过行(例如 Apache Kafka 主题)进行流传输,而非写入到文件。API 适用于引入工作流程,该工作流程包括生成或接收记录的现有自定义 Java 应用程序。WIth API,无需创建文件即可将数据加载到 Snowflake 表中,并且 API 可以在数据可用时自动连续地将数据流加载到 Snowflake 中。
下表描述了 Snowpipe Streaming 与 Snowpipe 之间的区别:
类别 |
Snowpipe Streaming |
Snowpipe |
---|---|---|
待加载数据的形式 |
行 |
文件。如果您的现有数据管道在 Blob 存储中生成文件,则建议使用 Snowpipe,而非 API。 |
第三方软件要求 |
Snowflake Ingest SDK 的自定义 Java 应用程序代码包装程序 |
无 |
数据排序 |
各通道内的有序插入 |
不支持。在从文件加载数据时,Snowpipe 可按照与云存储中文件创建时间戳不同的顺序进行加载。 |
加载历史记录 |
加载历史记录会记录在 SNOWPIPE_STREAMING_FILE_MIGRATION_HISTORY 视图 (Account Usage) 之中 |
加载 COPY_HISTORY (Account Usage) 和 COPY_HISTORY 函数 (Information Schema) 中记录的历史记录 |
管道对象 |
经典架构不需要管道对象:API 直接将记录写入到目标表中。高性能架构需要管道对象。 |
需要一个管道对象,用于将暂存文件数据排入队列,并加载到目标表中。 |
通道¶
API 通过一个或多个通道引入行。通道表示与 Snowflake 的逻辑、指定流处理连接,用于以有序方式将数据加载到表中。行的顺序及其相应的偏移令牌会在通道内保留,但不会跨指向同一个表的多个通道保留。
在经典架构中,单个通道恰好映射到 Snowflake 中的一个表;尽管多个通道可以指向同一个表。客户端 SDK 可以为多个表打开多个通道;但 SDK 无法跨账户打开通道。在客户端主动插入数据时,通道应长期存在,并且由于保留了偏移令牌信息,应该在客户端进程重新启动时重复使用。默认情况下,通道内的数据每 1 秒自动刷新一次,并且不需要关闭通道。有关更多信息,请参阅 延迟建议。
当不再需要通道和相关的偏移元数据时,您可以使用 DropChannelRequest
API 永久删除通道。有两种方法可以删除通道:
关闭时删除通道。在删除通道之前,通道内的数据会自动刷新。
盲目删除通道。不建议这样做,因为盲目删除通道会丢弃任何待处理的数据。
您可以运行 SHOW CHANNELS 命令,列出您有访问权限的通道。有关更多信息,请参阅 SHOW CHANNELS。
备注
不活动的通道及其偏移令牌会在闲置 30 天后自动删除。
偏移令牌¶
偏移令牌 是一个字符串,客户端可以在其行提交方法请求(例如,对于单行或多行)中包含该字符串,以跟踪每个通道的引入进度。使用的具体方法是适用于经典架构的 insertRow
或 insertRows
,以及适用于高性能架构的 appendRow
或 appendRows
。在通道创建时,此令牌将初始化为 NULL,并会在具有所提供的偏移令牌的行通过异步过程提交到 Snowflake 时更新。客户端可以定期提出 getLatestCommittedOffsetToken
方法请求,以获取特定通道的最新提交偏移令牌,并使用该令牌来推断引入进度。请注意,Snowflake 不 使用此令牌来执行重复数据删除;但客户端可以使用此令牌,通过自定义代码来执行重复数据删除。
在客户端重新打开一个通道时,将会返回最新的持久偏移令牌。客户端可使用令牌重置其在数据源中的位置,以避免发送两次相同的数据。请注意,在通道重新打开事件发生时,Snowflake 中缓存的任何未提交数据都将被丢弃,以避免提交这些数据。
您可以使用最新提交的偏移令牌来执行以下常见用例:
跟踪引入进度
将特定偏移与最新提交的偏移令牌进行比较,确认特定偏移是否已提交
推进源偏移,并清除已提交的数据
启用重复数据删除并确保确切传送一次数据
例如,Kafka Connector 可从主题读取偏移令牌,例如 <partition>:<offset>
,或者仅为 :code:`<offset>`(如果分区编码在通道名称中)。考虑以下情景:
Kafka Connector 联机并在 Kafka 主题
T
中打开了对应于Partition 1
的通道,通道名称为T:P1
。该连接器开始从 Kafka 分区读取记录。
该连接器调用 API,发出一条
insertRows
方法请求,使用与记录关联的偏移作为偏移令牌。例如,偏移令牌可以是
10
,代表 Kafka 分区中的第十条记录。连接器定期发出
getLatestCommittedOffsetToken
方法请求,确定引入进度。
如果 Kafka Connector 崩溃,可完成以下过程,以恢复从 Kafka 分区的正确偏移位置读取记录。
Kafka Connector 重新联机,并使用与之前相同的名称重新打开通道。
连接器调用 API,发出一条
getLatestCommittedOffsetToken
方法请求,以获取分区最新提交的偏移。例如,假设最新的持久偏移令牌是
20
。连接器使用 Kafka 读取 APIs,重置与偏移加 1(本例中为
21
)相对应的游标。连接器恢复读取记录。读取游标在重新定位成功后不会检索到重复数据。
在另一个示例中,应用程序从目录读取日志,并使用 Snowpipe Streaming Client SDK 将这些日志导出到 Snowflake。您可以构建一个日志导出应用程序,以执行以下操作:
列出日志目录中的文件。
假设日志记录框架生成可按字典顺序排序的日志文件,并且在此排序中,新的日志文件位于末尾。
逐行读取日志文件并调用 API,发出
insertRows
方法请求,在请求中使用与日志文件名、行数或字节位置相对应的偏移令牌。例如,偏移令牌可能是
messages_1.log:20
,其中的messages_1.log
是日志文件的名称,20
是行号。
如果应用程序崩溃或需要重新启动,它将调用 API,发出 getLatestCommittedOffsetToken
方法请求,检索与上次导出的日志文件和行相对应的偏移令牌。根据之前的例子,这可能是 messages_1.log:20
。随后应用程序将打开 messages_1.log
并寻找 21
行,以防止同一日志行被引入两次。
备注
偏移令牌信息可能会丢失。偏移令牌链接到一个通道对象,如果 30 天内未使用该通道执行新的引入,该通道将自动清除。为了防止偏移令牌丢失,请考虑维护单独的偏移,并按需重置通道的偏移令牌。
offsetToken
和 continuationToken
的角色¶
offsetToken
和 continuationToken
都用于确保数据精确传输一次,但它们的用途不同,由不同的子系统管理。主要区别在于谁控制令牌的价值及其使用范围。
:code:`continuationToken`(仅适用于高性能架构,并且仅由直接 RESTAPI 用户使用):
该令牌由 Snowflake 管理,对于维持单个连续流式传输会话的状态至关重要。当客户端使用
Append Rows
API 发送数据时,Snowflake 返回continuationToken
。客户端必须在下一个 AppendRows 请求中弹回此令牌,以确保 Snowflake 以正确的顺序连续接收数据。Snowflake 使用令牌来检测和防止 SDK 重试时出现重复数据或缺少数据。:code:`offsetToken`(适用于经典架构和高性能架构):
该令牌是用户定义的标识符,允许从外部源确切传送一次。Snowflake 不会将此令牌用于自己的内部操作或防止重新引入。相反,Snowflake 仅存储这个值。外部系统(如 Kafka 连接器)有责任从 Snowflake 读取 offsetToken 并使用它来跟踪自己的引入进度,避免在需要重放外部流时发送重复的数据。
仅限插入操作¶
API 目前仅限于插入行。要修改、删除或合并数据,请将“原始”记录写入一个或多个临时表。若要合并、连接或转换数据,请使用 :doc:` 连续数据管道 </user-guide/data-pipelines-intro>` 将修改后的数据插入目标报告表。
支持的 Java 数据类型¶
下表汇总了支持将哪些 Java 数据类型引入到 Snowflake 列:
Snowflake 列类型 |
允许的 Java 数据类型 |
---|---|
|
|
|
|
|
|
|
|
|
请参阅 布尔转换详细信息。 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
所需访问权限¶
调用 Snowpipe Streaming API 需要具有以下权限的角色:
对象 |
权限 |
---|---|
表 |
OWNERSHIP 或至少 INSERT 和 EVOLVE SCHEMA(仅当将 Kafka Connector 的架构演化与 Snowpipe Streaming 结合使用时才需要) |
数据库 |
USAGE |
架构 |
USAGE |
管道 |
OPERATE(仅适用于高性能架构) |