教程:开始使用 Snowpipe Streaming 高性能架构 SDK¶
本教程提供了有关设置和运行演示应用程序的分步说明,该演示应用程序利用了基于 snowpipe-streaming SDK 的新型高性能架构。
先决条件¶
在运行演示之前,请确保满足以下先决条件:
Snowflake account: Verify that you have access to a Snowflake account. You will need a user with sufficient privileges (e.g., ACCOUNTADMIN or USERADMIN for the initial setup) to create the dedicated user and custom role detailed in 第 1 步:配置 Snowflake 对象.
网络访问:确保您的网络允许出站连接到 Snowflake 和 AWS S3。必要时调整防火墙规则,因为 SDK 会向 Snowflake 和 AWS S3 发起 REST API 调用。
Java 开发环境:安装 Java 11 或更高版本,以及用于管理依赖关系的 Maven。
Python:安装 Python 版本 3.9 或更高版本。
Snowpipe Streaming SDKs and the sample code: Obtain the Java SDK (https://central.sonatype.com/artifact/com.snowflake/snowpipe-streaming) or Python SDK (https://pypi.org/project/snowpipe-streaming/) and download the sample code for your preferred language from the Snowpipe Streaming SDK examples in the GitHub repository (https://github.com/snowflakedb/snowpipe-streaming-sdk-examples).
开始使用¶
本节概述了设置和运行演示应用程序所需的步骤。
第 1 步:配置 Snowflake 对象¶
在使用 :code:`snowpipe-streaming`SDK 之前,必须在 Snowflake 环境中创建目标表和专用 PIPE 对象。与经典架构不同,高性能架构需要一个 PIPE 对象来引入数据。
生成用于身份验证的密钥对¶
使用 OpenSSL 生成用于身份验证的私有公钥对。有关更多信息,请参阅 密钥对身份验证和密钥对轮换。
在终端中运行以下命令以生成密钥:
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
PUBK=$(cat ./rsa_key.pub | grep -v KEY- | tr -d '\012')
echo "ALTER USER MY_USER SET RSA_PUBLIC_KEY='$PUBK';"
创建数据库、架构、表、管道并配置用户身份验证¶
Run the following SQL commands in your Snowflake account (for example, using Snowsight or Snowflake CLI). Note that you will need a role with permissions to create users, roles, and databases (such as ACCOUNTADMIN or USERADMIN for the first few lines, and then switching to the new role). Replace placeholders like MY_USER, MY_ROLE, MY_DATABASE, and so on, with your desired names.
-- 1. Create a dedicated role and user (Run with a highly-privileged role)
CREATE OR REPLACE USER MY_USER;
CREATE ROLE IF NOT EXISTS MY_ROLE;
GRANT ROLE MY_ROLE TO USER MY_USER;
-- 2. Set the public key for key-pair authentication
-- NOTE: Replace 'YOUR_FORMATTED_PUBLIC_KEY' with the output of the PUBK variable from the key generation step.
ALTER USER MY_USER SET RSA_PUBLIC_KEY='YOUR_FORMATTED_PUBLIC_KEY';
-- 3. Set the default role (Recommended)
ALTER USER MY_USER SET DEFAULT_ROLE=MY_ROLE;
-- 4. Switch to the new role and create objects
USE ROLE MY_ROLE;
-- NOTE: You may also need to run USE WAREHOUSE YOUR_WH; here if a default warehouse isn't set.
-- Create database and schema
CREATE OR REPLACE DATABASE MY_DATABASE;
CREATE OR REPLACE SCHEMA MY_SCHEMA;
-- Create a target table
CREATE OR REPLACE TABLE MY_TABLE (
data VARIANT,
c1 NUMBER,
c2 STRING
);
-- Create PIPE object for streaming ingestion
CREATE OR REPLACE PIPE MY_PIPE
AS COPY INTO MY_TABLE FROM (SELECT $1, $1:c1, $1:ts FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
-- 5. Configure authentication policy (Optional, but recommended for explicit control)
CREATE OR REPLACE AUTHENTICATION POLICY testing_auth_policy
AUTHENTICATION_METHODS = ('KEYPAIR')
CLIENT_TYPES = ('DRIVERS');
-- Apply authentication policy (if created)
ALTER USER MY_USER SET AUTHENTICATION POLICY testing_auth_policy;
第 2 步:配置身份验证配置文件¶
演示应用程序需要一个 profile.json 文件来存储连接设置,包括身份验证的详细信息。SDK 使用密钥对身份验证进行安全连接。
创建配置文件配置文件¶
创建或更新位于演示项目的根目录中的 profile.json 文件。
profile.json 模板¶
{
"user": "MY_USER",
"account": "your_account_identifier",
"url": "https://your_account_identifier.snowflakecomputing.cn:443",
"private_key_file": "rsa_key.p8",
"role": "MY_ROLE"
}
替换占位符:
MY_USER:选择使用 时默认使用的角色和仓库。您在 第 1 步:配置 Snowflake 对象 中配置的 Snowflake 用户名。your_account_identifier:Snowflake 账户的标识符(例如xy12345)。rsa_key.p8:选择使用 时默认使用的角色和仓库。您在 第 1 步:配置 Snowflake 对象 中生成的私钥文件。MY_ROLE: The dedicated role (MY_ROLE) you created and granted to the user in 第 1 步:配置 Snowflake 对象.
第 3 步:设置演示项目¶
Download: Sample Java code (https://github.com/snowflakedb/snowpipe-streaming-sdk-examples/tree/main/java-example)
添加 JAR 依赖项
要包含 Snowpipe Streaming SDK,请将以下依赖项添加到您的 Maven pom.xml 中。Maven 会自动从公共存储库中下载 JAR。
<dependency>
<groupId>com.snowflake</groupId>
<artifactId>snowpipe-streaming</artifactId>
<version>YOUR_SDK_VERSION</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.18.1</version>
</dependency>
重要
将 YOUR_SDK_VERSION 替换为 Maven Central (https://central.sonatype.com/artifact/com.snowflake/snowpipe-streaming) 上可用的特定版本。
Download: Sample Python code (https://github.com/snowflakedb/snowpipe-streaming-sdk-examples/tree/main/python-example)
添加 Python 依赖项
SDK 需要 Python 版本 3.9 或更高版本。
要安装 Snowpipe Streaming SDK for Python,请运行以下命令:
pip install snowpipe-streaming
有关该包的更多信息,请参阅 PyPI (https://pypi.org/project/snowpipe-streaming/)。
放置配置文件¶
确保 profile.json 文件(在 第 2 步:配置身份验证配置文件 中配置)位于项目的根目录中。
第 4 步:使用提供的代码示例并运行演示应用程序¶
在终端中导航到项目的根目录。
构建并执行
构建项目:
mvn clean install
运行主类:
mvn exec:java -Dexec.mainClass="com.snowflake.snowpipestreaming.demo.Main"
运行演示应用程序
运行 Python 演示:
python example.py
第 5 步:验证数据¶
运行演示后,在 Snowflake 中验证引入的数据:
SELECT COUNT(*) FROM MY_DATABASE.MY_SCHEMA.MY_TABLE;
SELECT * FROM MY_DATABASE.MY_SCHEMA.MY_TABLE LIMIT 10;