教程:开始使用 Snowpipe Streaming 高性能架构 SDK

本教程提供了有关设置和运行演示应用程序的分步说明,该演示应用程序利用了基于 snowpipe-streaming SDK 的新型高性能架构。

先决条件

在运行演示之前,请确保满足以下先决条件:

  • Snowflake account: Verify that you have access to a Snowflake account. You will need a user with sufficient privileges (e.g., ACCOUNTADMIN or USERADMIN for the initial setup) to create the dedicated user and custom role detailed in 第 1 步:配置 Snowflake 对象.

  • Network access: Ensure that your network allows outbound connectivity to Snowflake and Amazon S3 or Google Cloud Platform (GCS) or Azure Blob Storage. Adjust firewall rules if necessary because the SDK makes REST API calls to Snowflake and to your cloud storage provider.

    • To verify network connectivity, use the following command:

    # Test connectivity to Snowflake; replace with your account URL
    curl -I https://<your_account_identifier>.snowflakecomputing.cn
    
    # Test connectivity to AWS S3
    curl -I https://s3.amazonaws.com
    
    # Test connectivity to GCS
    curl -I https://storage.googleapis.com
    
    # Test connectivity to Azure Blob Storage
    curl -I https://azure.blob.core.windows.net  or curl -I https://<your_account_name>.blob.core.windows.net
    
    Copy
  • Java 开发环境:安装 Java 11 或更高版本,以及用于管理依赖关系的 Maven。

  • Python:安装 Python 版本 3.9 或更高版本。

  • System requirements: The SDK requires glibc version 2.26 or later. You can check your current glibc version with:

    ldd --version
    
    Copy
  • Snowpipe Streaming SDKs and the sample code:

    • For AWS: Obtain the Java SDK (https://central.sonatype.com/artifact/com.snowflake/snowpipe-streaming) or Python SDK (https://pypi.org/project/snowpipe-streaming/) (any version).

    • For Azure: Requires SDK version 1.1.0 or later.

    • For GCP: Requires SDK version 1.1.0 or later.

    Download the sample code for your preferred language from the Snowpipe Streaming SDK examples in the GitHub repository (https://github.com/snowflakedb/snowpipe-streaming-sdk-examples).

开始使用

本节概述了设置和运行演示应用程序所需的步骤。

第 1 步:配置 Snowflake 对象

Before you can use the snowpipe-streaming SDK, you must create a target table within your Snowflake environment. Unlike the classic architecture, the high-performance architecture requires a PIPE object for data ingestion. This tutorial uses the default pipe that is automatically created at ingest time for your target table. If you require additional features, such as in-flight transformations or clustering at ingest time, see CREATE PIPE.

生成用于身份验证的密钥对

使用 OpenSSL 生成用于身份验证的私有公钥对。有关更多信息,请参阅 密钥对身份验证和密钥对轮换

在终端中运行以下命令以生成密钥:

openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
Copy
PUBK=$(cat ./rsa_key.pub | grep -v KEY- | tr -d '\012')
echo "ALTER USER MY_USER SET RSA_PUBLIC_KEY='$PUBK';"
Copy

重要

Save the generated rsa_key.p8 (private key) and rsa_key.pub (public key) files securely. You will use these keys in subsequent authentication steps.

Create database, schema, table, and configure user authentication

Run the following SQL commands in your Snowflake account; for example, by using Snowsight or Snowflake CLI). You must have a role with permissions to create users, roles, and databases --- such as ACCOUNTADMIN or USERADMIN for the first few lines, and then switching to the new role. Replace placeholders like MY_USER, MY_ROLE, MY_DATABASE, and so on, with the names that you want.

-- 1. Create a dedicated role and user (Run with a highly-privileged role)
CREATE OR REPLACE USER MY_USER;
CREATE ROLE IF NOT EXISTS MY_ROLE;
GRANT ROLE MY_ROLE TO USER MY_USER;

-- 2. Set the public key for key-pair authentication
-- NOTE: Replace 'YOUR_FORMATTED_PUBLIC_KEY' with the output of the PUBK variable from the key generation step.
ALTER USER MY_USER SET RSA_PUBLIC_KEY='YOUR_FORMATTED_PUBLIC_KEY';

-- 3. Set the default role (Recommended)
ALTER USER MY_USER SET DEFAULT_ROLE=MY_ROLE;

-- 4. Switch to the new role and create objects
USE ROLE MY_ROLE;
-- NOTE: You may also need to run USE WAREHOUSE YOUR_WH; here if a default warehouse isn't set.

-- Create database and schema
CREATE OR REPLACE DATABASE MY_DATABASE;
CREATE OR REPLACE SCHEMA MY_SCHEMA;

-- Create a target table
CREATE OR REPLACE TABLE MY_TABLE (
    data VARIANT,
    c1 NUMBER,
    c2 STRING
);

-- 5. Configure authentication policy (Optional, but recommended for explicit control)
CREATE OR REPLACE AUTHENTICATION POLICY testing_auth_policy
  AUTHENTICATION_METHODS = ('KEYPAIR')
  CLIENT_TYPES = ('DRIVERS');

-- Apply authentication policy (if created)
ALTER USER MY_USER SET AUTHENTICATION POLICY testing_auth_policy;
Copy

第 2 步:配置身份验证配置文件

演示应用程序需要一个 profile.json 文件来存储连接设置,包括身份验证的详细信息。SDK 使用密钥对身份验证进行安全连接。

创建配置文件配置文件

创建或更新位于演示项目的根目录中的 profile.json 文件。

profile.json 模板

{
    "user": "MY_USER",
    "account": "your_account_identifier",
    "url": "https://your_account_identifier.snowflakecomputing.cn:443",
    "private_key_file": "rsa_key.p8",
    "role": "MY_ROLE"
}
Copy

替换占位符:

第 3 步:设置演示项目

Download: Sample Java code (https://github.com/snowflakedb/snowpipe-streaming-sdk-examples/tree/main/java-example)

添加 JAR 依赖项

要包含 Snowpipe Streaming SDK,请将以下依赖项添加到您的 Maven pom.xml 中。Maven 会自动从公共存储库中下载 JAR。

<dependency>
    <groupId>com.snowflake</groupId>
    <artifactId>snowpipe-streaming</artifactId>
    <version>YOUR_SDK_VERSION</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.18.1</version>
</dependency>
Copy

重要

YOUR_SDK_VERSION 替换为 Maven Central (https://central.sonatype.com/artifact/com.snowflake/snowpipe-streaming) 上可用的特定版本。

放置配置文件

确保 profile.json 文件(在 第 2 步:配置身份验证配置文件 中配置)位于项目的根目录中。

第 4 步:使用提供的代码示例并运行演示应用程序

在终端中导航到项目的根目录。

构建并执行

  • 构建项目:

    mvn clean install
    
    Copy
  • 运行主类:

    mvn exec:java -Dexec.mainClass="com.snowflake.snowpipestreaming.demo.Main"
    
    Copy

第 5 步:验证数据

运行演示后,在 Snowflake 中验证引入的数据:

SELECT COUNT(*) FROM MY_DATABASE.MY_SCHEMA.MY_TABLE;
SELECT * FROM MY_DATABASE.MY_SCHEMA.MY_TABLE LIMIT 10;
Copy
语言: 中文