适用于采用高性能架构的 Snowpark Streaming 的最佳实践

本指南概述了使用具有高性能架构的 Snowpipe Streaming 设计和实现稳健数据引入管道的关键最佳实践。遵循这些最佳实践,可以确保管道具有持久性、可靠性,并能够高效处理错误。

有策略地管理通道

为了提高性能和长期稳定性,应采用以下通道管理策略:

  • 使用长期存在的通道:为减少开销,在引入任务期间只需打开一次通道并保持其活跃状态。避免重复开关通道。

  • 使用确定性通道名称:采用一致、可预测的命名规范(例如 source-env-region-client-id),简化故障排除并促进自动恢复。

  • 通过多个通道横向扩展:要提高吞吐量,请打开多个通道。这些通道可以指向单个目标管道或多个管道,具体取决于服务限制和吞吐量要求。

  • 监控通道状态:定期使用 getChannelStatus 方法来监控引入通道运行状况。

    • 跟踪 last_committed_offset_token 以确认数据是否成功引入,以及管道是否正常运行。

    • 监控 row_error_count 以便及早发现错误记录或其他引入问题。

持续验证架构

确保传入的数据符合预期表架构,以防引入失败并保持数据完整性:

  • Client-side validation: Implement schema validation on the client side to provide immediate feedback and reduce server-side errors. Although full row-by-row validation offers maximum safety, a method that performs better might involve selective validation; for example, at batch boundaries or by sampling rows.

  • Server-side validation: The high-performance architecture can offload schema validation to the server. Errors and their counts are reported through getChannelStatus if schema mismatches occur during ingestion into the target pipe and table.

添加客户端元数据列

要实现强大的错误检测和恢复,必须将引入元数据作为行有效负载的一部分。这需要提前规划数据结构和 PIPE 定义。

在引入之前,将以下列添加到行有效负载中:

  • CHANNEL_ID; for example, a compact INTEGER

  • STREAM_OFFSET; a BIGINT that is monotonically increasing per channel, such as a Kafka partition offset

这些列共同唯一标识每个通道的记录,使您能够跟踪数据的来源。

Optionally, add a PIPE_ID column if multiple pipes ingest into the same target table. With this column, you can trace rows back to their ingestion pipeline. You can store descriptive pipe names in a separate lookup table, mapping them to compact integers to reduce storage costs.

使用元数据偏移检测并恢复错误

将通道监控与元数据列结合使用,以便及时发现并修复问题:

  • Monitor status: Regularly check getChannelStatus. An increasing row_error_count is a strong indicator of a potential problem.

  • Detect missing records: If errors are detected, use a SQL query to identify missing or out-of-order records by checking for gaps in your STREAM_OFFSET sequence.

SELECT
  PIPE_ID,
  CHANNEL_ID,
  STREAM_OFFSET,
  LAG(STREAM_OFFSET) OVER (
    PARTITION BY PIPE_ID, CHANNEL_ID
    ORDER BY STREAM_OFFSET
  ) AS previous_offset,
  (LAG(STREAM_OFFSET) OVER (
    PARTITION BY PIPE_ID, CHANNEL_ID
    ORDER BY STREAM_OFFSET
  ) + 1) AS expected_next
FROM my_table
QUALIFY STREAM_OFFSET != previous_offset + 1;
Copy

Use compression for REST API requests

When you use the Snowpipe Streaming REST API, use compression to send more data per request and reduce network overhead.

Although the REST API has a physical limit of 4 MB per request, this limit applies to the observed transfer size. By using compression, you can fit a larger uncompressed data volume into each request, enabling higher throughput and reducing the number of API calls required.

Snowflake recommends using ZSTD as the high-performance compression algorithm, although Gzip is also supported.

通过 MATCH_BY_COLUMN_NAME 优化引入性能并降低成本

配置管道,以映射源数据中的必要列,而不是将所有数据引入到单个 VARIANT 列中。为此,请使用 MATCH_BY_COLUMN_NAME = CASE_SENSITIVE 或者在管道定义中应用转换。这种最佳实践不仅可以减少引入成本,还可以提高流式传输数据管道的整体性能。

这种最佳实践具有以下重要优势:

  • 使用 MATCH_BY_COLUMN_NAME = CASE_SENSITIVE 时,您只需为引入目标表的数据值付费。相比之下,将数据引入到单个 VARIANT 列会按所有 JSON 字节(包括键和值)计费。对于具有冗长或大量 JSON 键的数据,这可能会导致引入成本显著且不必要地增加。

  • Snowflake 的处理引擎在计算上更加高效。与先将整个 JSON 对象解析到 VARIANT,再提取所需列的方法不同,该方法直接提取必要的值。

Use native data types for semi-structured data

For optimal performance and data integrity, provide semi-structured data by using native language objects rather than serialized strings.

  • Performance: With native objects, the SDK can handle data more efficiently without requiring additional parsing steps on the Snowflake server.

  • Type Safety: The high-performance architecture treats string literals as literal text. By using native objects, you ensure that your data is stored as structured JSON rather than escaped string values.

Java example:

// Preferred: SDK converts the List to a structured ARRAY
row.put("tags", Arrays.asList("electronics", "sale"));
Copy

Python example:

# Preferred: SDK converts the dict to a structured VARIANT
row["payload"] = {"event_id": 101, "status": "active"}
Copy

Get Prometheus metrics

To get performance metrics from the Snowpipe Streaming high-performance client, you must enable the built-in Prometheus metrics server and configure your Prometheus service to scrape the endpoint.

Enable the metrics server by setting the environment variable SS_ENABLE_METRICS to true before running your application.

Scrape the metrics endpoint on the host that is running your Snowpipe Streaming ingest process. The default path is /metrics on the host and port defined by SS_METRICS_IP and SS_METRICS_PORT.

Example: Verifying the metrics endpoint (local process/dev box)

# Enable Prometheus metrics
export SS_ENABLE_METRICS=true
# Run your application (the server starts on 127.0.0.1:50000 by default)

# Curl the endpoint to verify the metrics are exposed
curl http://127.0.0.1:50000/metrics
Copy

Example: Prometheus scrape configuration

Point your Prometheus service at the host running the Snowpipe Streaming client.

scrape_configs:
  - job_name: snowpipe_streaming_hp
    metrics_path: /metrics   # default is /metrics
    static_configs:
      - targets: ['127.0.0.1:50000']
Copy

Designing for resiliency

Wrap ingestion in try-catch blocks

Don't assume that insertRows always succeeds. Ensure that your ingestion loop can catch SFException and interpret the HTTP status codes, specifically 409 for invalidations and 429 for throttling.

Implement exponential back-off

For retryable errors (429, 500, 503), don't retry immediately. Use an exponential back-off strategy —-- increasing the wait time between each retry —-- to allow the system to recover.

Verify progress with offset tokens

Periodically call getLatestCommittedOffsetToken to track which data was successfully persisted. If a 409 error occurs, use this token to identify the exact point from which to replay data after reopening the channel.

Monitor channel status

Regularly check getChannelStatus(). If the status code is anything other than SUCCESS, trigger your error-handling logic to reset the channel or client connection.