选项 1:使用 Snowpipe REST API 加载数据

本主题介绍如何调用公共 REST 端点来加载数据并检索加载历史记录报告。这些说明假定您已完成 使用 Snowpipe REST API 做好加载数据的准备 中的设置说明。

本主题内容:

加载数据

加载分两步进行:

第 1 步:

暂存数据文件:

  • 内部暂存区:使用 PUT 命令暂存文件。

  • 外部暂存区:使用云提供商提供的客户端工具将文件复制到暂存区位置(Amazon S3、Google Cloud Storage 或 Microsoft Azure)。

第 2 步:

insertFiles REST 端点提交请求以加载暂存数据文件。

为方便起见,本主题提供了说明如何提交 REST 端点的示例 Java 和 Python 程序。

Java SDK 的示例程序

import net.snowflake.ingest.SimpleIngestManager;
import net.snowflake.ingest.connection.HistoryRangeResponse;
import net.snowflake.ingest.connection.HistoryResponse;
import org.bouncycastle.asn1.pkcs.PrivateKeyInfo;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.bouncycastle.openssl.PEMParser;
import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;
import org.bouncycastle.openssl.jcajce.JceOpenSSLPKCS8DecryptorProviderBuilder;
import org.bouncycastle.operator.InputDecryptorProvider;
import org.bouncycastle.operator.OperatorCreationException;
import org.bouncycastle.pkcs.PKCS8EncryptedPrivateKeyInfo;
import org.bouncycastle.pkcs.PKCSException;
import java.io.FileReader;
import java.io.IOException;
import java.nio.file.Paths;
import java.security.PrivateKey;
import java.security.Security;
import java.time.Instant;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class SDKTest
{
  // Path to the private key file that you generated earlier.
  private static final String PRIVATE_KEY_FILE = "/<path>/rsa_key.p8";

  public static class PrivateKeyReader
  {
    // If you generated an encrypted private key, implement this method to return
    // the passphrase for decrypting your private key.
    private static String getPrivateKeyPassphrase() {
      return "<private_key_passphrase>";
    }

    public static PrivateKey get(String filename)
            throws Exception
    {
      PrivateKeyInfo privateKeyInfo = null;
      Security.addProvider(new BouncyCastleProvider());
      // Read an object from the private key file.
      PEMParser pemParser = new PEMParser(new FileReader(Paths.get(filename).toFile()));
      Object pemObject = pemParser.readObject();
      if (pemObject instanceof PKCS8EncryptedPrivateKeyInfo) {
        // Handle the case where the private key is encrypted.
        PKCS8EncryptedPrivateKeyInfo encryptedPrivateKeyInfo = (PKCS8EncryptedPrivateKeyInfo) pemObject;
        String passphrase = getPrivateKeyPassphrase();
        InputDecryptorProvider pkcs8Prov = new JceOpenSSLPKCS8DecryptorProviderBuilder().build(passphrase.toCharArray());
        privateKeyInfo = encryptedPrivateKeyInfo.decryptPrivateKeyInfo(pkcs8Prov);
      } else if (pemObject instanceof PrivateKeyInfo) {
        // Handle the case where the private key is unencrypted.
        privateKeyInfo = (PrivateKeyInfo) pemObject;
      }
      pemParser.close();
      JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider(BouncyCastleProvider.PROVIDER_NAME);
      return converter.getPrivateKey(privateKeyInfo);
    }
  }

  private static HistoryResponse waitForFilesHistory(SimpleIngestManager manager,
                                                     Set<String> files)
          throws Exception
  {
    ExecutorService service = Executors.newSingleThreadExecutor();

    class GetHistory implements
            Callable<HistoryResponse>
    {
      private Set<String> filesWatchList;
      GetHistory(Set<String> files)
      {
        this.filesWatchList = files;
      }
      String beginMark = null;

      public HistoryResponse call()
              throws Exception
      {
        HistoryResponse filesHistory = null;
        while (true)
        {
          Thread.sleep(500);
          HistoryResponse response = manager.getHistory(null, null, beginMark);
          if (response.getNextBeginMark() != null)
          {
            beginMark = response.getNextBeginMark();
          }
          if (response != null && response.files != null)
          {
            for (HistoryResponse.FileEntry entry : response.files)
            {
              //if we have a complete file that we've
              // loaded with the same name..
              String filename = entry.getPath();
              if (entry.getPath() != null && entry.isComplete() &&
                      filesWatchList.contains(filename))
              {
                if (filesHistory == null)
                {
                  filesHistory = new HistoryResponse();
                  filesHistory.setPipe(response.getPipe());
                }
                filesHistory.files.add(entry);
                filesWatchList.remove(filename);
                //we can return true!
                if (filesWatchList.isEmpty()) {
                  return filesHistory;
                }
              }
            }
          }
        }
      }
    }

    GetHistory historyCaller = new GetHistory(files);
    //fork off waiting for a load to the service
    Future<HistoryResponse> result = service.submit(historyCaller);

    HistoryResponse response = result.get(2, TimeUnit.MINUTES);
    return response;
  }

  public static void main(String[] args)
  {
    final String host = "<account_identifier>.snowflakecomputing.cn";
    final String user = "<user_login_name>";
    final String pipe = "<db_name>.<schema_name>.<pipe_name>";
    try
    {
      final long oneHourMillis = 1000 * 3600L;
      String startTime = Instant
              .ofEpochMilli(System.currentTimeMillis() - 4 * oneHourMillis).toString();
      final PrivateKey privateKey = PrivateKeyReader.get(PRIVATE_KEY_FILE);
      SimpleIngestManager manager = new SimpleIngestManager(host.split("\.")[0], user, pipe, privateKey, "https", host, 443);
      List<StagedFileWrapper> files = new ArrayList<>();
      // Add the paths and sizes the files that you want to load.
      // Use paths that are relative to the stage where the files are located
      // (the stage that is specified in the pipe definition)..
      files.add(new StagedFileWrapper("<path>/<filename>", <file_size_in_bytes> /* file size is optional but recommended, pass null when it is not available */));
      files.add(new StagedFileWrapper("<path>/<filename>", <file_size_in_bytes> /* file size is optional but recommended, pass null when it is not available */));
      ...
      manager.ingestFiles(files, null);
      HistoryResponse history = waitForFilesHistory(manager, files);
      System.out.println("Received history response: " + history.toString());
      String endTime = Instant
              .ofEpochMilli(System.currentTimeMillis()).toString();

      HistoryRangeResponse historyRangeResponse =
              manager.getHistoryRange(null,
                                      startTime,
                                      endTime);
      System.out.println("Received history range response: " +
                                 historyRangeResponse.toString());

    }
    catch (Exception e)
    {
      e.printStackTrace();
    }

  }
}
Copy

此示例使用 Bouncy Castle Crypto APIs (https://www.bouncycastle.org/java.html)。为了编译和运行此示例,必须在类路径中包含以下 JAR 文件:

  • 提供商 JAR 文件 (bcprov-jdkversions.jar)

  • PKIX / CMS / EAC / PKCS / OCSP / TSP / OPENSSL JAR 文件 (bcpkix-jdkversions.jar)

其中, versions 指定 JAR 文件支持的 JDK 版本。

在编译示例代码之前,请替换以下占位符值:

PRIVATE_KEY_FILE = "/<path>/rsa_key.p8"

指定您在 使用密钥对身份验证和密钥轮换 中创建的私钥文件的本地路径(在 使用 Snowpipe REST API 做好加载数据的准备 中)。

return "<private_key_passphrase>" in getPrivateKeyPassphrase()

如果生成了加密密钥,请实现 getPrivateKeyPassphrase() 方法以返回用于解密该密钥的密码。

host = "<account_identifier>.snowflakecomputing.cn"

以 URL 的形式指定主机信息。

账户标识符的首选格式如下:

organization_name-account_name

Snowflake 组织和账户的名称。有关详细信息,请参阅 格式 1(首选):您所在组织的账户名称

如果需要,还可以指定 账户定位器,以及托管该账户的 区域云平台。有关详细信息,请参阅 格式 2:区域中的账户定位器

user = "<user_login_name>"

指定 Snowflake 登录名。

pipe = "<db_name>.<schema_name>.<pipe_name>"

指定用于加载数据的管道的完全限定名称。

files.add("<path>/<filename>", <file_size_in_bytes>)

在文件对象列表中指定要加载的文件的路径。

(可选)指定每个文件的大小(以字节为单位),以避免在 Snowpipe 计算加载数据所需的操作时出现延迟。

指定的路径必须 相对于 文件所在的暂存区。包括每个文件的完整名称,包括文件扩展名。例如,gzip 压缩的 CSV 文件的扩展名可能是 .csv.gz

Python SDK 的示例程序

from logging import getLogger
from snowflake.ingest import SimpleIngestManager
from snowflake.ingest import StagedFile
from snowflake.ingest.utils.uris import DEFAULT_SCHEME
from datetime import timedelta
from requests import HTTPError
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.primitives.serialization import PrivateFormat
from cryptography.hazmat.primitives.serialization import NoEncryption
import time
import datetime
import os
import logging

logging.basicConfig(
        filename='/tmp/ingest.log',
        level=logging.DEBUG)
logger = getLogger(__name__)

# If you generated an encrypted private key, implement this method to return
# the passphrase for decrypting your private key.
def get_private_key_passphrase():
  return '<private_key_passphrase>'

with open("/<private_key_path>/rsa_key.p8", 'rb') as pem_in:
  pemlines = pem_in.read()
  private_key_obj = load_pem_private_key(pemlines,
  get_private_key_passphrase().encode(),
  default_backend())

private_key_text = private_key_obj.private_bytes(
  Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()).decode('utf-8')
# Assume the public key has been registered in Snowflake:
# private key in PEM format

ingest_manager = SimpleIngestManager(account='<account_identifier>',
                                     host='<account_identifier>.snowflakecomputing.cn',
                                     user='<user_login_name>',
                                     pipe='<db_name>.<schema_name>.<pipe_name>',
                                     private_key=private_key_text)
# List of files, but wrapped into a class
staged_file_list = [
  StagedFile('<path>/<filename>', <file_size_in_bytes>),  # file size is optional but recommended, pass None if not available
  StagedFile('<path>/<filename>', <file_size_in_bytes>),  # file size is optional but recommended, pass None if not available
  ...
  ]

try:
    resp = ingest_manager.ingest_files(staged_file_list)
except HTTPError as e:
    # HTTP error, may need to retry
    logger.error(e)
    exit(1)

# This means Snowflake has received file and will start loading
assert(resp['responseCode'] == 'SUCCESS')

# Needs to wait for a while to get result in history
while True:
    history_resp = ingest_manager.get_history()

    if len(history_resp['files']) > 0:
        print('Ingest Report:\n')
        print(history_resp)
        break
    else:
        # wait for 20 seconds
        time.sleep(20)

    hour = timedelta(hours=1)
    date = datetime.datetime.utcnow() - hour
    history_range_resp = ingest_manager.get_history_range(date.isoformat() + 'Z')

    print('\nHistory scan report: \n')
    print(history_range_resp)
Copy

在执行示例代码之前,请替换以下占位符值:

<private_key_path>

指定您在 使用密钥对身份验证和密钥轮换 中创建的私钥文件的本地路径(在 使用 Snowpipe REST API 做好加载数据的准备 中)。

return "<private_key_passphrase>" in get_private_key_passphrase()

如果生成了加密密钥,请实现 get_private_key_passphrase() 函数以返回用于解密该密钥的密码。

account='<account_identifier>'

指定账户的唯一标识符(由 Snowflake 提供)。请参阅 host 描述。

host='<account_identifier>.snowflakecomputing.cn'

为 Snowflake 账户指定唯一的主机名。

账户标识符的首选格式如下:

organization_name-account_name

Snowflake 组织和账户的名称。有关详细信息,请参阅 格式 1(首选):您所在组织的账户名称

如果需要,还可以指定 账户定位器,以及托管该账户的 区域云平台。有关详细信息,请参阅 格式 2:区域中的账户定位器

user='<user_login_name>'

指定 Snowflake 登录名。

pipe='<db_name>.<schema_name>.<pipe_name>'

指定用于加载数据的管道的完全限定名称。

file_list=['<path>/<filename>', '<path>/<filename>'] | staged_file_list=[StagedFile('<path>/<filename>', <file_size_in_bytes>), StagedFile('<path>/<filename>', <file_size_in_bytes>)]

在文件对象列表中指定要加载的文件的路径。

指定的路径必须 相对于 文件所在的暂存区。包括每个文件的完整名称,包括文件扩展名。例如,gzip 压缩的 CSV 文件的扩展名可能是 .csv.gz

(可选)指定每个文件的大小(以字节为单位),以避免在 Snowpipe 计算加载数据所需的操作时出现延迟。

查看加载历史记录

Snowflake 提供 REST 端点Snowflake Information Schema 表函数,用于查看加载历史记录:

请注意,与调用 REST 端点不同,查询 Information Schema 表函数或账户使用视图需要运行仓库。

删除暂存文件

在成功加载数据并且不再需要这些文件后删除暂存文件。有关信息,请参阅 在 Snowpipe 加载数据后删除暂存文件

语言: 中文