Snowpipe Streaming Classic 最佳实践

成本优化

作为最佳实践,对于每秒写入较多数据的 Snowpipe Streaming 客户端,我们建议减少此类客户端的 API 调用。使用 Java 或 Scala 应用程序汇总来自多个来源(例如 IoT 设备或传感器)的数据,然后使用 Snowflake Ingest SDK 调用 API,从而以更高的流速率加载数据。API 可以高效地汇总账户中多个目标表的数据。

一个 Snowpipe Streaming 客户端可以打开多个通道来发送数据,但客户端成本仅按每个活动客户端收取。通道数量不影响客户端成本。因此,我们建议每个客户端使用多个通道来优化性能和成本。

由于预占文件迁移操作的缘故,使用相同的表进行批处理和流式数据引入也可以降低 Snowpipe Streaming 计算成本。如果在 Snowpipe Streaming 正插入到的同一个表上也启用了 自动群集,则文件迁移的计算成本可能会降低。群集操作将在同一事务中优化和迁移数据。

性能建议

为了在高吞吐量部署中实现最佳性能,我们建议采取以下措施:

  • 如果您要加载多行,则使用 insertRows 比多次调用 insertRow 更具效率和成本效益,因为用在锁上的时间更少。

    • 将传给 insertRows 的每行批次的大小保持在 16 MB 以下(压缩后)。

    • 行批次的最佳大小介于 10-16 MB 之间。

  • java.time 包中,将 TIME、DATE 和所有 TIMESTAMP 列的值作为 支持的类型 之一传递。

  • 使用 OpenChannelRequest.builder 创建通道时,将 OnErrorOption 设置为 OnErrorOption.CONTINUE,然后手动检查 insertRows 的返回值,以了解是否可能存在引入错误。与依赖使用 OnErrorOption.ABORT 时引发的异常相比,这种方法目前可以带来更好的性能。

  • 将默认日志级别设置为 DEBUG 时,请确保以下日志记录器继续在 INFO 级别记录日志:它们的 DEBUG 输出非常冗长,这可能导致性能显著下降。

    • net.snowflake.ingest.internal.apache.parquet

    • org.apache.parquet

  • 在客户端主动插入数据时,通道应长期存在,并且由于保留了偏移令牌信息,应该加以重复使用。插入数据后不要关闭通道,因为通道内的数据会根据 MAX_CLIENT_LAG 中配置的时间自动刷新。

延迟建议

当您使用 Snowpipe Streaming 时,延迟是指写入通道的数据在 Snowflake 中可供查询的速度。Snowpipe Streaming 每隔一秒自动刷新通道内的数据,这意味着您无需明确关闭通道即可刷新数据。

使用 MAX_CLIENT_LAG 配置延迟 在 Snowflake Ingest SDK 版本 2.0.4 及更高版本中,您可以使用 MAX_CLIENT_LAG 选项微调数据刷新延迟:

  • 标准 Snowflake 表(非 Iceberg):默认 MAX_CLIENT_LAG 为 1 秒。您可以替换此设置,将所需的刷新延迟设置为 1 秒到最多 10 分钟。

  • Snowflake 管理的 Iceberg 表:由 Snowflake Ingest SDK 版本 3.0.0 及更高版本支持,默认 MAX_CLIENT_LAG 为 30 秒。此默认设置有助于确保生成优化的 Parquet 文件,从而提升查询性能。虽然您可以设置更低值,但除非吞吐量极高,否则通常不建议调整。

助力获得出色性能的延迟建议 有效设置 MAX_CLIENT_LAG 可显著提升查询性能,并优化内部迁移过程(即 Snowflake 压缩小分区的过程)。

对于低吞吐量场景,您可能每秒只发送少量数据(例如 1 行或 1 KB),频繁刷新可能会导致大量小分区。这会增加查询编译时间,因为 Snowflake 必须解析许多小分区,特别是在迁移过程可以压缩查询之前运行查询。

因此,您应将 MAX_CLIENT_LAG 尽可能设置为目标延迟要求允许的最高值。延长插入数据的缓冲时长可使 Snowpipe Streaming 生成大小更合理的分区,从而提升查询性能并减少迁移开销。例如,如果您有任务每分钟执行一次流数据合并/转换,则理想 MAX_CLIENT_LAG 可能为 50 秒至 55 秒。这样可以确保在下游进程运行之前以更大的区块刷新数据。

Kafka connector for Snowpipe Streaming 请务必注意,Kafka connector for Snowpipe Streaming 有自己的内部缓冲区。当达到 Kafka 缓冲区刷新时间时,数据将通过 Snowpipe Streaming 以标准的一秒延迟发送到 Snowflake。有关更多信息,请参阅 buffer.flush.time 设置

语言: 中文