教程:开始使用 Snowpipe Streaming 高性能架构 SDK

本教程提供了有关设置和运行演示应用程序的分步说明,该演示应用程序利用了基于 snowpipe-streaming SDK 的新型高性能架构。

先决条件

在运行演示之前,请确保满足以下先决条件:

  • Snowflake account: Verify that you have access to a Snowflake account. You will need a user with sufficient privileges (e.g., ACCOUNTADMIN or USERADMIN for the initial setup) to create the dedicated user and custom role detailed in 第 1 步:配置 Snowflake 对象.

  • 网络访问:确保您的网络允许出站连接到 Snowflake 和 AWS S3。必要时调整防火墙规则,因为 SDK 会向 Snowflake 和 AWS S3 发起 REST API 调用。

  • Java 开发环境:安装 Java 11 或更高版本,以及用于管理依赖关系的 Maven。

  • Python:安装 Python 版本 3.9 或更高版本。

  • Snowpipe Streaming SDKs and the sample code: Obtain the Java SDK (https://central.sonatype.com/artifact/com.snowflake/snowpipe-streaming) or Python SDK (https://pypi.org/project/snowpipe-streaming/) and download the sample code for your preferred language from the Snowpipe Streaming SDK examples in the GitHub repository (https://github.com/snowflakedb/snowpipe-streaming-sdk-examples).

开始使用

本节概述了设置和运行演示应用程序所需的步骤。

第 1 步:配置 Snowflake 对象

在使用 :code:`snowpipe-streaming`SDK 之前,必须在 Snowflake 环境中创建目标表和专用 PIPE 对象。与经典架构不同,高性能架构需要一个 PIPE 对象来引入数据。

生成用于身份验证的密钥对

使用 OpenSSL 生成用于身份验证的私有公钥对。有关更多信息,请参阅 密钥对身份验证和密钥对轮换

在终端中运行以下命令以生成密钥:

openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
Copy
PUBK=$(cat ./rsa_key.pub | grep -v KEY- | tr -d '\012')
echo "ALTER USER MY_USER SET RSA_PUBLIC_KEY='$PUBK';"
Copy

重要

安全地保存生成的 :code:`rsa_key.p8`(私钥)和 :code:`rsa_key.pub`(公钥)文件。它们将在后续的身份验证步骤中使用。

创建数据库、架构、表、管道并配置用户身份验证

Run the following SQL commands in your Snowflake account (for example, using Snowsight or Snowflake CLI). Note that you will need a role with permissions to create users, roles, and databases (such as ACCOUNTADMIN or USERADMIN for the first few lines, and then switching to the new role). Replace placeholders like MY_USER, MY_ROLE, MY_DATABASE, and so on, with your desired names.

-- 1. Create a dedicated role and user (Run with a highly-privileged role)
CREATE OR REPLACE USER MY_USER;
CREATE ROLE IF NOT EXISTS MY_ROLE;
GRANT ROLE MY_ROLE TO USER MY_USER;

-- 2. Set the public key for key-pair authentication
-- NOTE: Replace 'YOUR_FORMATTED_PUBLIC_KEY' with the output of the PUBK variable from the key generation step.
ALTER USER MY_USER SET RSA_PUBLIC_KEY='YOUR_FORMATTED_PUBLIC_KEY';

-- 3. Set the default role (Recommended)
ALTER USER MY_USER SET DEFAULT_ROLE=MY_ROLE;

-- 4. Switch to the new role and create objects
USE ROLE MY_ROLE;
-- NOTE: You may also need to run USE WAREHOUSE YOUR_WH; here if a default warehouse isn't set.

-- Create database and schema
CREATE OR REPLACE DATABASE MY_DATABASE;
CREATE OR REPLACE SCHEMA MY_SCHEMA;

-- Create a target table
CREATE OR REPLACE TABLE MY_TABLE (
    data VARIANT,
    c1 NUMBER,
    c2 STRING
);

-- Create PIPE object for streaming ingestion
CREATE OR REPLACE PIPE MY_PIPE
AS COPY INTO MY_TABLE FROM (SELECT $1, $1:c1, $1:ts FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING')));


-- 5. Configure authentication policy (Optional, but recommended for explicit control)
CREATE OR REPLACE AUTHENTICATION POLICY testing_auth_policy
  AUTHENTICATION_METHODS = ('KEYPAIR')
  CLIENT_TYPES = ('DRIVERS');

-- Apply authentication policy (if created)
ALTER USER MY_USER SET AUTHENTICATION POLICY testing_auth_policy;
Copy

第 2 步:配置身份验证配置文件

演示应用程序需要一个 profile.json 文件来存储连接设置,包括身份验证的详细信息。SDK 使用密钥对身份验证进行安全连接。

创建配置文件配置文件

创建或更新位于演示项目的根目录中的 profile.json 文件。

profile.json 模板

{
    "user": "MY_USER",
    "account": "your_account_identifier",
    "url": "https://your_account_identifier.snowflakecomputing.cn:443",
    "private_key_file": "rsa_key.p8",
    "role": "MY_ROLE"
}
Copy

替换占位符:

第 3 步:设置演示项目

Download: Sample Java code (https://github.com/snowflakedb/snowpipe-streaming-sdk-examples/tree/main/java-example)

添加 JAR 依赖项

要包含 Snowpipe Streaming SDK,请将以下依赖项添加到您的 Maven pom.xml 中。Maven 会自动从公共存储库中下载 JAR。

<dependency>
    <groupId>com.snowflake</groupId>
    <artifactId>snowpipe-streaming</artifactId>
    <version>YOUR_SDK_VERSION</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.18.1</version>
</dependency>
Copy

重要

YOUR_SDK_VERSION 替换为 Maven Central (https://central.sonatype.com/artifact/com.snowflake/snowpipe-streaming) 上可用的特定版本。

放置配置文件

确保 profile.json 文件(在 第 2 步:配置身份验证配置文件 中配置)位于项目的根目录中。

第 4 步:使用提供的代码示例并运行演示应用程序

在终端中导航到项目的根目录。

构建并执行

  • 构建项目:

    mvn clean install
    
    Copy
  • 运行主类:

    mvn exec:java -Dexec.mainClass="com.snowflake.snowpipestreaming.demo.Main"
    
    Copy

第 5 步:验证数据

运行演示后,在 Snowflake 中验证引入的数据:

SELECT COUNT(*) FROM MY_DATABASE.MY_SCHEMA.MY_TABLE;
SELECT * FROM MY_DATABASE.MY_SCHEMA.MY_TABLE LIMIT 10;
Copy
语言: 中文