Snowpipe Streaming REST API 入门:cURL 和 JWT 教程

备注

我们建议您优先选择 Snowpipe Streaming SDK 而非 REST API,以获得更优的性能和更顺畅的入门体验。

本指南将向您介绍如何使用 Snowpipe Streaming REST API使用 SnowSQL 生成的 JSON Web 令牌 (JWT) 将数据流式传输到 Snowflake。

先决条件

在开始之前,请确保您准备好以下各项:

Snowflake 用户和对象:

已配置密钥对身份验证的 Snowflake 用户。使用以下 SQL 命令注册您的公钥:

ALTER USER MY_USER SET RSA_PUBLIC_KEY='<your-public-key>';
Copy

Snowflake 数据库、架构和用于流式传输引入的 PIPE 对象。您可以使用以下 SQL 命令创建它们(用您所需的名称替换 MY_DATABASEMY_SCHEMAMY_PIPEMY_TABLE 等占位符):

-- Create Database and Schema
CREATE OR REPLACE DATABASE MY_DATABASE;
CREATE OR REPLACE SCHEMA MY_SCHEMA;

-- Create Target Table
CREATE OR REPLACE TABLE MY_TABLE (
    data VARIANT,
    c1 NUMBER,
    c2 STRING
);

-- Create PIPE Object for Streaming Ingestion
CREATE OR REPLACE PIPE MY_PIPE
AS COPY INTO MY_TABLE FROM (SELECT $1, $1:c1, $1:ts FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
Copy

ACCOUNT_IDENTIFIER:

We suggest using Format 1 for the ACCOUNT_IDENTIFIER, which uses the account name within your organization; for example, myorg-account123. For more information on the format, see 账户标识符.

Installed tools:

  • curl:用于提出 HTTP 请求。

  • jq:用于解析 JSON 响应。

  • SnowSQL:要运行命令,请使用 Snowflake 的命令行客户端。

Generated JWT:

Generate your JWT by using SnowSQL:

snowsql --private-key-path rsa_key.p8 --generate-jwt \
  -a <ACCOUNT_IDENTIFIER> \
  -u MY_USER
Copy

小心

安全存储您的 JWT。避免将其暴露在日志或脚本中。

分步说明

完成以下步骤,将数据流式传输到 Snowflake。

第 1 步:设置环境变量

为您的 Snowflake 账户和流式传输操作设置必要的环境变量:

# Paste the JWT token obtained from SnowSQL
export JWT_TOKEN="PASTE_YOUR_JWT_TOKEN_HERE"

# Configure your Snowflake account and resources:
export ACCOUNT="<ACCOUNT_IDENTIFIER>" # For example, ab12345
export USER="MY_USER"
export DB="MY_DATABASE"
export SCHEMA="MY_SCHEMA"
export PIPE="MY_PIPE"
export CHANNEL="MY_CHANNEL"

# Replace ACCOUNT with your Account URL Host to form the control plane host:
export CONTROL_HOST="${ACCOUNT}.snowflakecomputing.cn"
Copy

第 2 步:探索引入主机

重要

如果您的 Snowflake 账户名称包含下划线(例如 MY_ACCOUNT),已知问题可能在调用摄取服务时导致内部错误。

在生成作用域令牌之前,您必须将 INGEST_HOST 中的所有下划线替换为短横线。此转换后的格式(使用短横线)必须用于之后所有的 REST API 调用,包括生成作用域令牌本身。

例如,如果返回的主机名是 my_account.region.ingest.snowflakecomputing.cn,您必须将其更改为 my-account.region.ingest.snowflakecomputing.cn,以用于之后所有的 REST API 调用。

引入主机是流式传输数据的端点。使用 JWT 发现引入主机:

export INGEST_HOST=$(curl -sS -X GET \
  -H "Authorization: Bearer $JWT_TOKEN" \
  -H "X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT" \
  "https://${CONTROL_HOST}/v2/streaming/hostname")

echo "Ingest Host: $INGEST_HOST"
Copy

获取作用域令牌以授权在引入主机上进行操作:

export SCOPED_TOKEN=$(curl -sS -X POST "https://$CONTROL_HOST/oauth/token" \
  -H 'Content-Type: application/x-www-form-urlencoded' \
  -H "Authorization: Bearer $JWT_TOKEN" \
  -d "grant_type=urn:ietf:params:oauth:grant-type:jwt-bearer&scope=${INGEST_HOST}")

echo "Scoped Token obtained for ingest host"
Copy

第 3 步:打开通道

打开流式传输通道开始引入数据:

curl -sS -X PUT \
  -H "Authorization: Bearer $SCOPED_TOKEN" \
  -H "Content-Type: application/json" \
  "https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL" \
  -d '{}' | tee open_resp.json | jq .
Copy

第 4 步:追加一行数据

向打开的通道追加一行数据。

4.1 提取延续和偏移令牌

这些令牌对于维持流式传输会话的状态至关重要。

export CONT_TOKEN=$(jq -r '.next_continuation_token' open_resp.json)
export OFFSET_TOKEN=$(jq -r '.channel_status.last_committed_offset_token' open_resp.json)
export NEW_OFFSET=$((OFFSET_TOKEN + 1))
Copy

4.2 创建示例行

生成 NDJSON 格式的示例数据行:

export NOW_TS=$(date -u +"%Y-%m-%dT%H:%M:%SZ")

cat <<EOF > rows.ndjson
{
  "id": 1,
  "c1": $RANDOM,
  "ts": "$NOW_TS"
}
EOF
Copy

4.3 追加行

将示例行发送到流式传输通道:

curl -sS -X POST \
  -H "Authorization: Bearer $SCOPED_TOKEN" \
  -H "Content-Type: application/x-ndjson" \
  "https://${INGEST_HOST}/v2/streaming/data/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL/rows?continuationToken=$CONT_TOKEN&offsetToken=$NEW_OFFSET" \
  --data-binary @rows.ndjson | jq .
Copy

重要

  • After each append operation, you must update the continuationToken for the next append call. The response from the append rows call contains a next_continuation_token field that you should use to make your updates.

  • The success of the append operation confirms only that the data was received by the service, not that it is persisted to the table. Take the next step to verify persistence before querying or moving to the next batch.

4.4 Verify data persistence and committed offset by using getChannelStatus

Complete this critical step to ensure application reliability. Data isn't guaranteed to be persistent until the committedOffset has advanced. To confirm that the rows that you just appended are successfully persisted, use getChannelStatus.

检查流式传输通道的当前状态:

curl -sS -X POST \
  -H "Authorization: Bearer $SCOPED_TOKEN" \
  -H "Content-Type: application/json" \
  "https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE:bulk-channel-status" \
  -d "{\"channel_names\": [\"$CHANNEL\"]}" | jq ".channel_statuses.\"$CHANNEL\""
Copy

Verification check

You must ensure that the committedOffset returned in the response is greater than or equal to the offset of the rows you just appended. Only after the committedOffset advances can you be certain that the data is safely available in the table.

4.5 Query the table for persisted data

After you confirm that the committedOffset has advanced in the previous step (4.4), you can query to confirm that the data is ingested into your Snowflake table.

在 Snowflake 中运行以下 SQL 查询:

SELECT * FROM MY_DATABASE.MY_SCHEMA.MY_TABLE WHERE data:id::NUMBER = 1;
Copy

(Optional) Step 5: Clean up

移除临时文件并取消设置环境变量:

rm -f rows.ndjson open_resp.json
unset JWT_TOKEN SCOPED_TOKEN ACCOUNT USER DB SCHEMA PIPE CHANNEL CONTROL_HOST INGEST_HOST CONT_TOKEN OFFSET_TOKEN NEW_OFFSET NOW_TS
Copy

故障排除

  • HTTP 401(未授权): 确认您的 JWT 令牌有效且未过期。如果需要,请重新生成令牌。

  • ** HTTP 404(未找到):** 仔细检查数据库、架构、管道和通道名称拼写是否正确,以及是否存在于您的 Snowflake 账户中。

  • 无引入主机: 确保您的控制平面主机 URL 正确且可访问。

语言: 中文