使用 JDBC 驱动程序

本主题提供有关如何使用 JDBC 驱动程序的信息。

本主题内容:

Snowflake JDBC API 扩展

Snowflake JDBC 驱动程序支持标准 JDBC 规范之外的其他方法。本节介绍如何使用展开来访问特定于 Snowflake 的方法,然后介绍可能需要展开的三种情况:

展开特定于 Snowflake 的类

Snowflake JDBC 驱动程序支持特定于 Snowflake 的方法。这些方法在特定于 Snowflake 的 Java 语言接口中定义,例如 SnowflakeConnection、SnowflakeStatement 和 SnowflakeResultSet。例如, SnowflakeStatement 接口包含不在 JDBC 语句接口中的 getQueryID() 方法。

当要求 Snowflake JDBC 驱动程序创建 JDBC 对象(例如,通过调用 Connection 对象的 createStatement() 方法创建 JDBC Statement 对象)时,Snowflake JDBC 驱动程序实际上会创建特定于 Snowflake 的对象,这些对象不仅实现了标准的 JDBC 方法,还实现了 Snowflake 接口中的其他方法。

要访问这些 Snowflake 方法,请“展开”对象(如 Statement 对象)以公开 Snowflake 对象及其方法。然后,您可以调用其他方法。

下面的代码演示如何展开 JDBC Statement 对象以公开接口的 SnowflakeStatement 方法,然后调用其中一个方法,在本例中,setParameter 如下所示:

Statement statement1;
...
// Unwrap the statement1 object to expose the SnowflakeStatement object, and call the
// SnowflakeStatement object's setParameter() method.
statement1.unwrap(SnowflakeStatement.class).setParameter(...);
Copy

执行异步查询

Snowflake JDBC 驱动程序支持异步查询(即在查询完成前将控制权交还给用户的查询)。用户可以启动查询,然后使用轮询来确定查询何时完成。查询完成后,用户可以读取结果集。

此功能允许客户端程序并行运行多个查询,而客户端程序本身无需使用多线程。

异步查询使用添加到 SnowflakeConnectionSnowflakeStatementSnowflakePreparedStatementSnowflakeResultSet 类中的方法。

备注

要执行异步查询,您必须确保 ABORT_DETACHED_QUERY 配置参数为 FALSE (默认值)。

Snowflake 在一段时间(默认:5 分钟)后会自动关闭连接,这会使所有活跃的查询处于孤立状态。如果值为 TRUE,Snowflake 将终止这些孤立查询,这可能会影响异步查询。

您可以在同一会话中混合运行同步查询和异步查询。

异步查询的最佳实践

  • 在并行运行任何查询之前,请确保知道哪些查询依赖于其他查询。有些查询相互依赖且对顺序敏感,因此不适合并行化。例如,很显然, INSERT 语句应该在相应的 CREATE TABLE 语句完成后开始。

  • 确保运行的查询数量不要超过可用内存承受能力。并行运行多个查询通常会消耗更多内存,尤其是在内存中同时存储多个 ResultSet 查询时。

  • 轮询时,处理查询不成功的罕见情况。例如,避免以下潜在的无限循环:

    QueryStatus queryStatus = QueryStatus.RUNNING;
    while (queryStatus != QueryStatus.SUCCESS)  {     //  NOT RECOMMENDED
        Thread.sleep(2000);   // 2000 milliseconds.
        queryStatus = resultSet.unwrap(SnowflakeResultSet.class).getStatus();
        }
    
    Copy

    请改用类似于以下内容的代码:

    // Assume that the query is not done yet.
    QueryStatus queryStatus = QueryStatus.RUNNING;
    while (queryStatus == QueryStatus.RUNNING || queryStatus == QueryStatus.RESUMING_WAREHOUSE)  {
        Thread.sleep(2000);   // 2000 milliseconds.
        queryStatus = resultSet.unwrap(SnowflakeResultSet.class).getStatus();
        }
    
    if (queryStatus == QueryStatus.SUCCESS) {
        ...
        }
    
    Copy
  • 确保事务控制语句(BEGIN、COMMIT 和 ROLLBACK)不与其他语句并行执行。

异步查询示例

这些示例中的大多数都要求程序导入类,如下所示:

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import net.snowflake.client.core.QueryStatus;
import net.snowflake.client.jdbc.SnowflakeConnection;
import net.snowflake.client.jdbc.SnowflakeResultSet;
import net.snowflake.client.jdbc.SnowflakeStatement;
Copy

下面是一个非常简单的示例:

    String sql_command = "";
    ResultSet resultSet;

    System.out.println("Create JDBC statement.");
    Statement statement = connection.createStatement();
    sql_command = "SELECT PI()";
    System.out.println("Simple SELECT query: " + sql_command);
    resultSet = statement.unwrap(SnowflakeStatement.class).executeAsyncQuery(sql_command);

    // Assume that the query isn't done yet.
    QueryStatus queryStatus = QueryStatus.RUNNING;
    while (queryStatus == QueryStatus.RUNNING || queryStatus == QueryStatus.RESUMING_WAREHOUSE) {
      Thread.sleep(2000); // 2000 milliseconds.
      queryStatus = resultSet.unwrap(SnowflakeResultSet.class).getStatus();
    }

    if (queryStatus == QueryStatus.FAILED_WITH_ERROR) {
      // Print the error code to stdout
      System.out.format("Error code: %d%n", queryStatus.getErrorCode());
      System.out.format("Error message: %s%n", queryStatus.getErrorMessage());
    } else if (queryStatus != QueryStatus.SUCCESS) {
      System.out.println("ERROR: unexpected QueryStatus: " + queryStatus);
    } else {
      boolean result_exists = resultSet.next();
      if (!result_exists) {
        System.out.println("ERROR: No rows returned.");
      } else {
        float pi_result = resultSet.getFloat(1);
        System.out.println("pi = " + pi_result);
      }
    }
Copy

此示例存储查询 ID,关闭连接,重新打开连接,并使用查询 ID 检索数据:

    String sql_command = "";
    ResultSet resultSet;
    String queryID = "";

    System.out.println("Create JDBC statement.");
    Statement statement = connection.createStatement();
    sql_command = "SELECT PI() * 2";
    System.out.println("Simple SELECT query: " + sql_command);
    resultSet = statement.unwrap(SnowflakeStatement.class).executeAsyncQuery(sql_command);
    queryID = resultSet.unwrap(SnowflakeResultSet.class).getQueryID();
    System.out.println("INFO: Closing statement.");
    statement.close();
    System.out.println("INFO: Closing connection.");
    connection.close();

    System.out.println("INFO: Re-opening connection.");
    connection = create_connection(args);
    use_warehouse_db_and_schema(connection);
    resultSet = connection.unwrap(SnowflakeConnection.class).createResultSet(queryID);

    // Assume that the query isn't done yet.
    QueryStatus queryStatus = QueryStatus.RUNNING;
    while (queryStatus == QueryStatus.RUNNING) {
      Thread.sleep(2000); // 2000 milliseconds.
      queryStatus = resultSet.unwrap(SnowflakeResultSet.class).getStatus();
    }

    if (queryStatus == QueryStatus.FAILED_WITH_ERROR) {
      System.out.format(
          "ERROR %d: %s%n", queryStatus.getErrorMessage(), queryStatus.getErrorCode());
    } else if (queryStatus != QueryStatus.SUCCESS) {
      System.out.println("ERROR: unexpected QueryStatus: " + queryStatus);
    } else {
      boolean result_exists = resultSet.next();
      if (!result_exists) {
        System.out.println("ERROR: No rows returned.");
      } else {
        float pi_result = resultSet.getFloat(1);
        System.out.println("pi = " + pi_result);
      }
    }
Copy

将数据文件直接从流上传到内部暂存区

您可以使用 PUT 命令上传数据文件。但是,有时将数据作为文件直接从流传输到内部(即 Snowflake)暂存区是有意义的。(暂存区 可以是任何内部暂存区类型:表暂存区、用户暂存区或命名暂存区。JDBC 驱动程序不支持上传到外部暂存区。)下面是 SnowflakeConnection 类中公开的方法:

/**
 * Method to compress data from a stream and upload it at a stage location.
 * The data will be uploaded as one file. No splitting is done in this method.
 *
 * Caller is responsible for releasing the inputStream after the method is
 * called.
 *
 * @param stageName    stage name: e.g. ~ or table name or stage name
 * @param destPrefix   path / prefix under which the data should be uploaded on the stage
 * @param inputStream  input stream from which the data will be uploaded
 * @param destFileName destination file name to use
 * @param compressData compress data or not before uploading stream
 * @throws java.sql.SQLException failed to compress and put data from a stream at stage
 */
public void uploadStream(String stageName,
                         String destPrefix,
                         InputStream inputStream,
                         String destFileName,
                         boolean compressData)
    throws SQLException
Copy

示例用法:

Connection connection = DriverManager.getConnection(url, prop);
File file = new File("/tmp/test.csv");
FileInputStream fileInputStream = new FileInputStream(file);

// upload file stream to user stage
connection.unwrap(SnowflakeConnection.class).uploadStream("MYSTAGE", "testUploadStream",
   fileInputStream, "destFile.csv", true);
Copy

为 3.9.2 之前的 JDBC 驱动程序版本编写的代码可能会转换 SnowflakeConnectionV1 而不是展开 SnowflakeConnection.class。例如:

...

// For versions prior to 3.9.2:
// upload file stream to user stage
((SnowflakeConnectionV1) connection.uploadStream("MYSTAGE", "testUploadStream",
   fileInputStream, "destFile.csv", true));
Copy

备注

使用较新版本的驱动程序的客户应更新其代码以使用 unwrap

将数据文件直接从内部暂存区下载到流

您可以使用 GET 命令下载数据文件。但是,有时将数据直接从内部(即 Snowflake)暂存区的文件传输到流是有意义的。(暂存区 可以是任何内部暂存区类型:表暂存区、用户暂存区或命名暂存区。JDBC 驱动程序不支持下载到外部暂存区。)下面是 SnowflakeConnection 类中公开的方法:

/**
 * Download file from the given stage and return an input stream
 *
 * @param stageName      stage name
 * @param sourceFileName file path in stage
 * @param decompress     true if file compressed
 * @return an input stream
 * @throws SnowflakeSQLException if any SQL error occurs.
 */
InputStream downloadStream(String stageName,
                           String sourceFileName,
                           boolean decompress) throws SQLException;
Copy

示例用法:

Connection connection = DriverManager.getConnection(url, prop);
InputStream out = connection.unwrap(SnowflakeConnection.class).downloadStream(
    "~",
    DEST_PREFIX + "/" + TEST_DATA_FILE + ".gz",
    true);
Copy

为 3.9.2 之前的 JDBC 驱动程序版本编写的代码可能会转换 SnowflakeConnectionV1 而不是展开 SnowflakeConnection.class。例如:

...

// For versions prior to 3.9.2:
// download file stream to user stage
((SnowflakeConnectionV1) connection.downloadStream(...));
Copy

多语句支持

本节介绍如何使用 JDBC 驱动程序 在单个请求中执行多条语句。

备注

默认情况下,Snowflake 会为使用多个语句发出的查询返回错误。这样做的部分原因是为了防止 ` SQL 注入 <https://en.wikipedia.org/wiki/SQL_injection (link removed)>`_。使用多语句功能可能会导致 SQL 注入,因此应谨慎使用。使用 SnowflakeStatement 类的 setParameter() 方法来指定要执行的语句数,会使通过追加语句来注入语句变得更加困难,从而可以降低风险。有关详细信息,请参阅 接口:SnowflakeStatement

发送多个语句并处理结果

包含多个语句的查询的执行方式与具有单个语句的查询相同,但查询字符串包含多个用分号分隔的语句。

有两种方法可以允许多个语句:

  • 调用 Statement.setParameter(“MULTI_STATEMENT_COUNT”,n)指定一次应允许执行多少条语句。有关详细信息,请参见下文。

  • 通过执行以下命令之一,在会话级别或账户级别设置 MULTI_STATEMENT_COUNT 参数:

    alter session set MULTI_STATEMENT_COUNT = 0;
    
    Copy

    或者:

    alter account set MULTI_STATEMENT_COUNT = 0;
    
    Copy

    将该参数设置为 0 时,语句数量不受限制。将参数设置为 1 时,一次只能执行一条语句。

为了增加 SQL 注入攻击的难度,用户可以调用 setParameter 方法来指定单次调用中要执行的语句数,如下所示。在此示例中,单次调用中要执行的语句数为 3:

// Specify the number of statements that we expect to execute.
statement.unwrap(SnowflakeStatement.class).setParameter(
        "MULTI_STATEMENT_COUNT", 3);
Copy

默认语句数为 1;换言之,多语句模式处于关闭状态。

若要在不指定确切数字的情况下执行多个语句,请传递值 0。

MULTI_STATEMENT_COUNT 参数不是 JDBC 标准的一部分;而是 Snowflake 扩展。此参数会影响多个 Snowflake 驱动程序/连接器。

在单个 execute() 调用中执行多条语句时,第一条语句的结果可通过标准 getResultSet()getUpdateCount() 方法获得。若要访问后续语句的结果,请使用 getMoreResults() 方法。当有更多语句可供迭代时,此方法将返回 true,否则返回 false

下面的示例设置了 MULTI_STATEMENT_COUNT 参数,执行了 3 条语句,并检索更新计数和结果集:

// Create a string that contains multiple SQL statements.
String command_string = "create table test(n int); " +
                        "insert into test values (1), (2); " +
                        "select * from test order by n";
Statement stmt = connection.createStatement();
// Specify the number of statements (3) that we expect to execute.
stmt.unwrap(SnowflakeStatement.class).setParameter(
        "MULTI_STATEMENT_COUNT", 3);

// Execute all of the statements.
stmt.execute(command_string);                       // false

// --- Get results. ---
// First statement (create table)
stmt.getUpdateCount();                              // 0 (DDL)

// Second statement (insert)
stmt.getMoreResults();                              // true
stmt.getUpdateCount();                              // 2

// Third statement (select)
stmt.getMoreResults();                              // true
ResultSet rs = stmt.getResultSet();
rs.next();                                          // true
rs.getInt(1);                                       // 1
rs.next();                                          // true
rs.getInt(1);                                       // 2
rs.next();                                          // false

// Past the last statement executed.
stmt.getMoreResults();                              // false
stmt.getUpdateCount();                              // 0 (no more results)
Copy

对于多语句查询,Snowflake 建议使用 execute()executeQuery()executeUpdate() 方法也支持多语句查询,但如果第一个结果不是预期的结果类型(分别为结果集和更新计数),则会引发异常。

失败的语句

如果任何 SQL 语句编译或执行失败,则执行将中止。之前运行的任何语句均不受影响。

例如,如果以下语句作为单个多语句查询运行,则查询将在进行到第三个语句时失败,并且将引发异常。

CREATE OR REPLACE TABLE test(n int);
INSERT INTO TEST VALUES (1), (2);
INSERT INTO TEST VALUES ('not_an_int');  -- execution fails here
INSERT INTO TEST VALUES (3);
Copy

如果您要查询表 test 中的内容,则值 12 将会出现。

不支持的功能

多语句查询不支持 PUT 和 GET 语句。

多语句查询也不支持准备语句和使用绑定变量。

将变量绑定到语句

绑定 允许 SQL 语句使用存储在 Java 变量中的值。

简单绑定

在没有绑定的情况下,SQL 语句通过在语句中指定字面量来指定值。例如,以下语句在 UPDATE 语句中使用字面量值 42

stmt.execute("UPDATE table1 SET integer_column = 42 WHERE ID = 1000");
Copy

通过绑定,可以使用变量中的值执行 SQL 语句。例如:

int my_integer_variable = 42;
PreparedStatement pstmt = connection.prepareStatement("UPDATE table1 SET integer_colum = ? WHERE ID = 1000");
pstmt.setInt(1, my_integer_variable);
pstmt.executeUpdate();
Copy

VALUES 子句中的 ? 表示 SQL 语句使用变量的值。setInt() 方法指定用名为 my_integer_variable 的变量中的值替换 SQL 语句中的第一个问号。请注意,setInt() 使用的是从 1 开始的值,而不是从 0 开始的值(即第一个问号的引用值是 1,而不是 0)。

将变量绑定到时间戳列

Snowflake 支持三种不同的时间戳变体:TIMESTAMP_LTZ、TIMESTAMP_NTZ、TIMESTAMP_TZ。当您调用 PreparedStatement.setTimestamp 将变量绑定到时间戳列时,JDBC 驱动程序会根据本地时区 (TIMESTAMP_LTZ) 或作为实参传递的 Calendar 对象的时区来解释时间戳值:

// The following call interprets the timestamp in terms of the local time zone.
insertStmt.setTimestamp(1, myTimestamp);
// The following call interprets the timestamp in terms of the time zone of the Calendar object.
insertStmt.setTimestamp(1, myTimestamp, Calendar.getInstance(TimeZone.getTimeZone("America/New_York")));
Copy

如果希望驱动程序使用不同的变体(例如 TIMESTAMP_NTZ)来解释时间戳,请使用以下方法之一:

  • 将会话参数 CLIENT_TIMESTAMP_TYPE_MAPPING 设置为变体。

    请注意,该参数会影响当前会话的所有绑定操作。如果需要更改变体(例如改回 TIMESTAMP_LTZ),则必须重新设置此会话参数。

  • (在 JDBC 驱动程序 3.13.3 及更高版本中)调用 PreparedStatement.setObject 方法,并使用 targetSqlType 参数指定以下 Snowflake 时间戳变体之一:

    • SnowflakeUtil.EXTRA_TYPES_TIMESTAMP_LTZ

    • SnowflakeUtil.EXTRA_TYPES_TIMESTAMP_TZ

    • SnowflakeUtil.EXTRA_TYPES_TIMESTAMP_NTZ

    例如:

    import net.snowflake.client.jdbc.SnowflakeUtil;
    ...
    insertStmt.setObject(1, myTimestamp, SnowflakeUtil.EXTRA_TYPES_TIMESTAMP_NTZ);
    
    Copy

批处理插入

在 Java 应用程序代码中,可以通过在 INSERT 语句中绑定参数并调用 addBatch()executeBatch() 来在单个批处理中插入多行。

例如,下面的代码在包含 INTEGER 列和 VARCHAR 列的表中插入两行。该示例将值与 INSERT 语句中的参数绑定,并调用 addBatch()executeBatch() 执行批处理插入。

Connection connection = DriverManager.getConnection(url, prop);
connection.setAutoCommit(false);

PreparedStatement pstmt = connection.prepareStatement("INSERT INTO t(c1, c2) VALUES(?, ?)");
pstmt.setInt(1, 101);
pstmt.setString(2, "test1");
pstmt.addBatch();

pstmt.setInt(1, 102);
pstmt.setString(2, "test2");
pstmt.addBatch();

int[] count = pstmt.executeBatch(); // After execution, count[0]=1, count[1]=1
connection.commit();
Copy

使用此技术插入大量值时,驱动程序可以通过将数据流传输(而无需在本地计算机上创建文件)到临时暂存区进行引入来提高性能。当值数超过阈值时,驱动程序会自动执行此操作。

此外,您必须设置会话的当前数据库和架构。如果未设置这些值,则驱动程序执行的 CREATE TEMPORARY STAGE 命令可能会失败,并出现以下错误:

CREATE TEMPORARY STAGE SYSTEM$BIND file_format=(type=csv field_optionally_enclosed_by='"')
Cannot perform CREATE STAGE. This session does not have a current schema. Call 'USE SCHEMA', or use a qualified name.
Copy

备注

有关将数据加载到 Snowflake 数据库的其他方法(包括使用 COPY 命令进行批量加载),请参阅 将数据载入 Snowflake

Java 示例程序

对于用 Java 编写的工作示例,请右键单击文件的名称 (SnowflakeJDBCExample.java),然后将链接/文件保存到本地文件系统。

故障排除

I/O 错误:连接重置

在某些情况下,JDBC 驱动程序可能会在一段时间不活动后失败并显示以下错误消息:

I/O error: Connection reset
Copy

您可以通过为连接设置特定的“生效时间”来解决此问题。如果连接处于空闲状态的时间超过“生效时间”,则 JDBC 驱动程序将从连接池中删除该连接,并创建新连接。

若要设置生效时间,请将名为 net.snowflake.jdbc.ttl 的 Java 系统属性设置为连接应生效的秒数:

  • 若要以编程方式设置此属性,请调用 System.setProperty

    // Set the "time to live" to 60 seconds.
    System.setProperty("net.snowflake.jdbc.ttl", "60")
    
    Copy
  • 若要在运行 java 命令时设置此属性,请使用 -D 标志:

    # Set the "time to live" to 60 seconds.
    java -cp .:snowflake-jdbc-<version>.jar -Dnet.snowflake.jdbc.ttl=60 <ClassName>
    
    Copy

net.snowflake.jdbc.ttl 属性的默认值为 -1,这意味着不会从连接池中移除空闲连接。

处理错误

在处理 JDBC 应用程序的错误和异常时,您可以使用 Snowflake 提供的 ` ErrorCode.java <https://github.com/snowflakedb/snowflake-jdbc/blob/master/src/main/java/net/snowflake/client/jdbc/ErrorCode.java (https://github.com/snowflakedb/snowflake-jdbc/blob/master/src/main/java/net/snowflake/client/jdbc/ErrorCode.java)>`_ 文件来确定问题的原因。JDBC 驱动程序特有的错误代码以 2 开头,形式如下:2NNNNN

备注

公共 snowflake-jdbc git 存储库中的 ErrorCode.java 的链接指向该文件的最新版本,该版本可能与您当前使用的 JDBC 驱动程序版本不同。

语言: 中文