Snowpipe Streaming REST API 入门:cURL 和 JWT 教程

本指南将向您介绍如何使用 Snowpipe Streaming REST API使用 SnowSQL 生成的 JSON Web 令牌 (JWT) 将数据流式传输到 Snowflake。

先决条件

在开始之前,请确保您准备好以下各项:

Snowflake 用户和对象:

已配置密钥对身份验证的 Snowflake 用户。使用以下 SQL 命令注册您的公钥:

ALTER USER MY_USER SET RSA_PUBLIC_KEY='<your-public-key>';
Copy

Snowflake 数据库、架构和用于流式传输引入的 PIPE 对象。您可以使用以下 SQL 命令创建它们(用您所需的名称替换 MY_DATABASEMY_SCHEMAMY_PIPEMY_TABLE 等占位符):

-- Create Database and Schema
CREATE OR REPLACE DATABASE MY_DATABASE;
CREATE OR REPLACE SCHEMA MY_SCHEMA;

-- Create Target Table
CREATE OR REPLACE TABLE MY_TABLE (
    data VARIANT,
    c1 NUMBER,
    c2 STRING
);

-- Create PIPE Object for Streaming Ingestion
CREATE OR REPLACE PIPE MY_PIPE
AS COPY INTO MY_TABLE FROM (SELECT $1, $1:c1, $1:ts FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
Copy

已安装的工具:

  • curl:用于提出 HTTP 请求。

  • jq:用于解析 JSON 响应。

  • SnowSQL:要运行命令,请使用 Snowflake 的命令行客户端。

生成的 JWT: 使用 SnowSQL 生成您的 JWT:

snowsql --private-key-path rsa_key.p8 --generate-jwt \
  -a <ACCOUNT_LOCATOR> \
  -u MY_USER
Copy

小心

安全存储您的 JWT。避免将其暴露在日志或脚本中。

分步说明

完成以下步骤,将数据流式传输到 Snowflake。

第 1 步:设置环境变量

为您的 Snowflake 账户和流式传输操作设置必要的环境变量:

# Paste the JWT token obtained from SnowSQL
export JWT_TOKEN="PASTE_YOUR_JWT_TOKEN_HERE"

# Configure your Snowflake account and resources:
export ACCOUNT="<ACCOUNT_LOCATOR>" # For example, ab12345
export USER="MY_USER"
export DB="MY_DATABASE"
export SCHEMA="MY_SCHEMA"
export PIPE="MY_PIPE"
export CHANNEL="MY_CHANNEL"

# Replace ACCOUNT with your Account URL Host to form the control plane host:
export CONTROL_HOST="${ACCOUNT}.snowflakecomputing.cn"
Copy

第 2 步:探索引入主机

引入主机是流式传输数据的端点。使用 JWT 发现引入主机:

export INGEST_HOST=$(curl -sS -X GET \
  -H "Authorization: Bearer $JWT_TOKEN" \
  -H "X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT" \
  "https://${CONTROL_HOST}/v2/streaming/hostname")

echo "Ingest Host: $INGEST_HOST"
Copy

获取作用域令牌以授权在引入主机上进行操作:

export SCOPED_TOKEN=$(curl -sS -X POST "https://$CONTROL_HOST/oauth/token" \
  -H 'Content-Type: application/x-www-form-urlencoded' \
  -H "Authorization: Bearer $JWT_TOKEN" \
  -d "grant_type=urn:ietf:params:oauth:grant-type:jwt-bearer&scope=${INGEST_HOST}")

echo "Scoped Token obtained for ingest host"
Copy

第 3 步:打开通道

打开流式传输通道开始引入数据:

curl -sS -X PUT \
  -H "Authorization: Bearer $SCOPED_TOKEN" \
  -H "Content-Type: application/json" \
  "https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL" \
  -d '{}' | tee open_resp.json | jq .
Copy

第 4 步:追加一行数据

向打开的通道追加一行数据。

4.1 提取延续和偏移令牌

这些令牌对于维持流式传输会话的状态至关重要。

export CONT_TOKEN=$(jq -r '.next_continuation_token' open_resp.json)
export OFFSET_TOKEN=$(jq -r '.channel_status.last_committed_offset_token' open_resp.json)
export NEW_OFFSET=$((OFFSET_TOKEN + 1))
Copy

4.2 创建示例行

生成 NDJSON 格式的示例数据行:

export NOW_TS=$(date -u +"%Y-%m-%dT%H:%M:%SZ")

cat <<EOF > rows.ndjson
{
  "id": 1,
  "c1": $RANDOM,
  "ts": "$NOW_TS"
}
EOF
Copy

4.3 追加行

将示例行发送到流式传输通道:

curl -sS -X POST \
  -H "Authorization: Bearer $SCOPED_TOKEN" \
  -H "Content-Type: application/x-ndjson" \
  "https://${INGEST_HOST}/v2/streaming/data/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL/rows?continuationToken=$CONT_TOKEN&offsetToken=$NEW_OFFSET" \
  --data-binary @rows.ndjson | jq .
Copy

重要

每次追加操作后,都必须为下一次追加调用更新 continuationToken。追加行调用的响应包含一个应用于进行更新的 next_continuation_token 字段。

4.4 验证数据引入

确认数据已引入您的 Snowflake 表中。

在 Snowflake 中运行以下 SQL 查询:

SELECT * FROM MY_DATABASE.MY_SCHEMA.MY_TABLE WHERE id = 1;
Copy

第 5 步:获取通道状态

检查流式传输通道的当前状态:

curl -sS -X POST \
  -H "Authorization: Bearer $SCOPED_TOKEN" \
  -H "Content-Type: application/json" \
  "https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE:bulk-channel-status" \
  -d "{\"channel_names\": [\"$CHANNEL\"]}" | jq ".channel_statuses.\"$CHANNEL\""
Copy

(可选)第 6 步:清理

移除临时文件并取消设置环境变量:

rm -f rows.ndjson open_resp.json
unset JWT_TOKEN SCOPED_TOKEN ACCOUNT USER DB SCHEMA PIPE CHANNEL CONTROL_HOST INGEST_HOST CONT_TOKEN OFFSET_TOKEN NEW_OFFSET NOW_TS
Copy

故障排除

  • HTTP 401(未授权): 确认您的 JWT 令牌有效且未过期。如果需要,请重新生成令牌。

  • ** HTTP 404(未找到):** 仔细检查数据库、架构、管道和通道名称拼写是否正确,以及是否存在于您的 Snowflake 账户中。

  • 无引入主机: 确保您的控制平面主机 URL 正确且可访问。

语言: 中文