Snowpipe Streaming REST API 入门:cURL 和 JWT 教程¶
本指南将向您介绍如何使用 Snowpipe Streaming REST API 和 使用 SnowSQL 生成的 JSON Web 令牌 (JWT) 将数据流式传输到 Snowflake。
先决条件¶
在开始之前,请确保您准备好以下各项:
Snowflake 用户和对象:
已配置密钥对身份验证的 Snowflake 用户。使用以下 SQL 命令注册您的公钥:
ALTER USER MY_USER SET RSA_PUBLIC_KEY='<your-public-key>';
Snowflake 数据库、架构和用于流式传输引入的 PIPE 对象。您可以使用以下 SQL 命令创建它们(用您所需的名称替换 MY_DATABASE
、MY_SCHEMA
、MY_PIPE
、MY_TABLE
等占位符):
-- Create Database and Schema
CREATE OR REPLACE DATABASE MY_DATABASE;
CREATE OR REPLACE SCHEMA MY_SCHEMA;
-- Create Target Table
CREATE OR REPLACE TABLE MY_TABLE (
data VARIANT,
c1 NUMBER,
c2 STRING
);
-- Create PIPE Object for Streaming Ingestion
CREATE OR REPLACE PIPE MY_PIPE
AS COPY INTO MY_TABLE FROM (SELECT $1, $1:c1, $1:ts FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
已安装的工具:
curl
:用于提出 HTTP 请求。jq
:用于解析 JSON 响应。SnowSQL
:要运行命令,请使用 Snowflake 的命令行客户端。
生成的 JWT: 使用 SnowSQL 生成您的 JWT:
snowsql --private-key-path rsa_key.p8 --generate-jwt \
-a <ACCOUNT_LOCATOR> \
-u MY_USER
小心
安全存储您的 JWT。避免将其暴露在日志或脚本中。
分步说明¶
完成以下步骤,将数据流式传输到 Snowflake。
第 1 步:设置环境变量¶
为您的 Snowflake 账户和流式传输操作设置必要的环境变量:
# Paste the JWT token obtained from SnowSQL
export JWT_TOKEN="PASTE_YOUR_JWT_TOKEN_HERE"
# Configure your Snowflake account and resources:
export ACCOUNT="<ACCOUNT_LOCATOR>" # For example, ab12345
export USER="MY_USER"
export DB="MY_DATABASE"
export SCHEMA="MY_SCHEMA"
export PIPE="MY_PIPE"
export CHANNEL="MY_CHANNEL"
# Replace ACCOUNT with your Account URL Host to form the control plane host:
export CONTROL_HOST="${ACCOUNT}.snowflakecomputing.cn"
第 2 步:探索引入主机¶
引入主机是流式传输数据的端点。使用 JWT 发现引入主机:
export INGEST_HOST=$(curl -sS -X GET \
-H "Authorization: Bearer $JWT_TOKEN" \
-H "X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT" \
"https://${CONTROL_HOST}/v2/streaming/hostname")
echo "Ingest Host: $INGEST_HOST"
获取作用域令牌以授权在引入主机上进行操作:
export SCOPED_TOKEN=$(curl -sS -X POST "https://$CONTROL_HOST/oauth/token" \
-H 'Content-Type: application/x-www-form-urlencoded' \
-H "Authorization: Bearer $JWT_TOKEN" \
-d "grant_type=urn:ietf:params:oauth:grant-type:jwt-bearer&scope=${INGEST_HOST}")
echo "Scoped Token obtained for ingest host"
第 3 步:打开通道¶
打开流式传输通道开始引入数据:
curl -sS -X PUT \
-H "Authorization: Bearer $SCOPED_TOKEN" \
-H "Content-Type: application/json" \
"https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL" \
-d '{}' | tee open_resp.json | jq .
第 4 步:追加一行数据¶
向打开的通道追加一行数据。
4.1 提取延续和偏移令牌¶
这些令牌对于维持流式传输会话的状态至关重要。
export CONT_TOKEN=$(jq -r '.next_continuation_token' open_resp.json)
export OFFSET_TOKEN=$(jq -r '.channel_status.last_committed_offset_token' open_resp.json)
export NEW_OFFSET=$((OFFSET_TOKEN + 1))
4.2 创建示例行¶
生成 NDJSON 格式的示例数据行:
export NOW_TS=$(date -u +"%Y-%m-%dT%H:%M:%SZ")
cat <<EOF > rows.ndjson
{
"id": 1,
"c1": $RANDOM,
"ts": "$NOW_TS"
}
EOF
4.3 追加行¶
将示例行发送到流式传输通道:
curl -sS -X POST \
-H "Authorization: Bearer $SCOPED_TOKEN" \
-H "Content-Type: application/x-ndjson" \
"https://${INGEST_HOST}/v2/streaming/data/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL/rows?continuationToken=$CONT_TOKEN&offsetToken=$NEW_OFFSET" \
--data-binary @rows.ndjson | jq .
重要
每次追加操作后,都必须为下一次追加调用更新 continuationToken。追加行调用的响应包含一个应用于进行更新的 next_continuation_token
字段。
4.4 验证数据引入¶
确认数据已引入您的 Snowflake 表中。
在 Snowflake 中运行以下 SQL 查询:
SELECT * FROM MY_DATABASE.MY_SCHEMA.MY_TABLE WHERE id = 1;
第 5 步:获取通道状态¶
检查流式传输通道的当前状态:
curl -sS -X POST \
-H "Authorization: Bearer $SCOPED_TOKEN" \
-H "Content-Type: application/json" \
"https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE:bulk-channel-status" \
-d "{\"channel_names\": [\"$CHANNEL\"]}" | jq ".channel_statuses.\"$CHANNEL\""
(可选)第 6 步:清理¶
移除临时文件并取消设置环境变量:
rm -f rows.ndjson open_resp.json
unset JWT_TOKEN SCOPED_TOKEN ACCOUNT USER DB SCHEMA PIPE CHANNEL CONTROL_HOST INGEST_HOST CONT_TOKEN OFFSET_TOKEN NEW_OFFSET NOW_TS
故障排除¶
HTTP 401(未授权): 确认您的 JWT 令牌有效且未过期。如果需要,请重新生成令牌。
** HTTP 404(未找到):** 仔细检查数据库、架构、管道和通道名称拼写是否正确,以及是否存在于您的 Snowflake 账户中。
无引入主机: 确保您的控制平面主机 URL 正确且可访问。