执行语句

可以通过调用 connection.execute() 方法来执行语句。execute() 方法接受 options 对象,该对象可用于指定 SQL 文本和 complete 回调。当语句执行完成并且结果可供使用时,将调用 complete 回调:

const statement = connection.execute({
  sqlText: 'CREATE DATABASE testdb',
  complete: function (err, stmt, rows) {
    if (err) {
      console.error(`Failed to execute statement due to the following error: ${err.message}`);
    } else {
      console.log(`Successfully executed statement: ${stmt.getSqlText()}`);
    }
  }
});

备注

单个请求的最大有效负载大小为 128 MB。

异步执行查询

The Snowflake Node.js Driver supports asynchronous queries (that is, queries that return control to the user before the query completes). You can start a query, then use polling to determine when the query has completed. After the query completes, you can read the result set.

通过在 connection.execute 方法中包括 asyncExec: true 来启用异步查询。

下面的示例展示了如何使用 Promise 异步执行查询。

let queryId;

// 1. Execute query with asyncExec set to true
await new Promise((resolve) => {
  connection.execute({
    sqlText: "CALL SYSTEM$WAIT(3, 'SECONDS')",
    asyncExec: true,
    complete: async function (err, stmt, rows) {
      queryId = stmt.getQueryId(); // Get the query ID
      resolve();
    }
  });
});

// 2. Get results using the query ID
const statement = await connection.getResultsFromQueryId({ queryId: queryId });
await new Promise((resolve, reject) => {
  const stream = statement.streamRows();
  stream.on('error', err => {
    reject(err);
  });
  stream.on('data', row => {
    console.log(row);
  });
  stream.on('end', () => {
    resolve();
  });
});

您还可以使用回调来监控异步查询,如以下示例所示。

  1. 通过在 connection.execute 方法中包括 asyncExec: true 来启用异步查询。

    // 1. Execute query with asyncExec set to true
    connection.execute({
      sqlText: "CALL SYSTEM$WAIT(3, 'SECONDS')",
      asyncExec: true,
      complete: async function (err, stmt, rows) {
        const queryId = stmt.getQueryId();
    
        // 2. Get results using the query ID
        connection.getResultsFromQueryId({
          queryId: queryId,
          complete: async function (err, _stmt, rows) {
            console.log(rows);
          }
        });
      }
    });
    
  2. 检查查询的状态,该查询已提交以异步执行。

    let queryId;
    
    // 1. Execute query with asyncExec set to true
    await new Promise((resolve, reject) => {
      const statement = connection.execute({
        sqlText: "CALL SYSTEM$WAIT(3, 'SECONDS')",
        asyncExec: true,
        complete: async function (err, stmt, rows) {
          queryId = statement.getQueryId();
          resolve();
        }
      });
    });
    
    // 2. Check query status until it's finished executing
    const seconds = 2;
    let status = await connection.getQueryStatus(queryId);
    while (connection.isStillRunning(status)) {
      console.log(`Query status is ${status}, timeout for ${seconds} seconds`);
    
      await new Promise((resolve) => {
        setTimeout(() => resolve(), 1000 * seconds);
      });
    
      status = await connection.getQueryStatus(queryId);
    }
    
    console.log(`Query has finished executing, status is ${status}`);
    

执行一批 SQL 语句(多语句支持)

在 Node.js 连接器版本 1.6.18 及更高版本中,您可以发送一批 SQL 语句(用分号分隔),以在单个请求中执行。

备注

  • 在单个查询中执行多个语句要求会话中有一个可用的有效仓库。

  • 默认情况下,Snowflake 会针对使用多个语句发出的查询返回错误,以防止 SQL 注入攻击。在单个查询中执行多个语句会增加 SQL 注入。Snowflake 建议谨慎使用。您可以通过使用 MULTI_STATEMENT_COUNT 参数指定要执行的语句数来降低风险,这使得通过追加语句来注入语句变得更加困难。

您可以像使用单个语句执行查询一样批量执行多个语句,只是查询字符串包含多个用分号分隔的语句。请注意,多个语句按顺序执行,而不是并行执行。MULTI_STATEMENT_COUNT 参数指定批量语句包含的确切语句数量。

例如,如果设置 MULTI_STATEMENT_COUNT=3,则批量语句必须精确包含三个语句。如果提交的批量语句包含任何其他数量的语句,Node.js 驱动程序将拒绝该请求。您可以设置 MULTI_STATEMENT_COUNT=0 以允许批量查询包含任意数量的语句。但是,请注意,使用此值会降低对 SQL 注入攻击的防护。

您可以使用以下命令在会话级别设置此参数,也可以在每次提交查询时单独设置该值。

ALTER SESSION SET multi_statement_count = <n>

通过设置会话级别的值,每次执行批量语句时就不需要再设置。以下示例将会话级别的语句数设置为 3,然后执行 3 条 SQL 语句:

const statement = connection.execute({
  sqlText: 'ALTER SESSION SET multi_statement_count=0',
  complete: function (err, stmt, rows) {
    if (err) {
      console.error(`Failed to execute statement due to the following error: ${err.message}`);
    } else {
      testMulti();
    }
  }
});

function testMulti() {
  console.log('select bind execute.');
  const selectStatement = connection.execute({
    sqlText: 'create or replace table test(n int); insert into test values(1), (2); select * from test order by n',
    complete: function (err, stmt, rows) {
      if (err) {
        console.error(`Failed to execute statement due to the following error: ${err.message}`);
      } else {
        console.log('==== complete');
        console.log(`==== sqlText=${stmt.getSqlText()}`);
        if (stmt.hasNext()) {
          stmt.NextResult();
        } else {
          // do something else, for example close the connection
        }
      }
    }
  });
}

通过将 MULTI_STATEMENT_COUNT 设置为 connection.execute 函数的参数,您还可以在每次执行多语句查询时设置批量语句的数量。以下示例将批量语句数设置为 3,并在批量查询中包含 3 条 SQL 语句:

// connection needs to be already set up
connection.connect((err, conn) => {
  if (err) {
    console.error(`Unable to connect: ${err.message}`);
  } else {
    console.log(`Successfully connected to Snowflake, connection id ${conn.getId()}`);
    testMulti();
  }
});

function testMulti() {
  console.log('execute multi-statement query');
  connection.execute({
    sqlText: 'create or replace table test(n int); insert into test values(1), (2); select * from test order by n',
    parameters: { MULTI_STATEMENT_COUNT: 3 },
    complete: function (err, stmt, rows) {
      if (err) {
        console.error(`Failed to execute statement: ${err.message}`);
      } else {
        console.log('==== complete');
        console.log(`==== sqlText=${stmt.getSqlText()}`);
        if (rows) {
          const stream = stmt.streamRows();
          console.log(`====QueryId=${stmt.getQueryId()}`);

          stream.on('data', row => {
            console.log(row);
          });
          stream.on('end', () => {
            console.log('done');
          });
        }

        if ('hasNext' in stmt && stmt.hasNext()) {
          stmt.NextResult();
        } else {
          connection.destroy(err1 => {
            if (err1) {
              console.error(`Unable to disconnect: ${err1.message}`);
            } else {
              console.log(`Disconnected connection with id: ${connection.getId()}`);
            }
          });
        }
      }
    }
  });
}

绑定语句参数

有时,您可能希望将语句中的数据与占位符 绑定。以这种方式执行语句很有用,因为它有助于防止 SQL 注入攻击。请考虑以下语句:

connection.execute({
  sqlText: 'SELECT c1 FROM (SELECT 1 AS c1 UNION ALL SELECT 2 AS c1) WHERE c1 = 1;'
});

您可以使用以下绑定实现相同的结果:

connection.execute({
  sqlText: 'SELECT c1 FROM (SELECT :1 AS c1 UNION ALL SELECT :2 AS c1) WHERE c1 = :1;',
  binds: [1, 2]
});

还支持绑定的 ? 语法:

connection.execute({
  sqlText: 'SELECT c1 FROM (SELECT ? AS c1 UNION ALL SELECT ? AS c1) WHERE c1 = ?;',
  binds: [1, 2, 1]
});

备注

可以绑定的数据大小或可以批量合并的数据大小存在上限。有关详细信息,请参阅 查询文本大小的限制

绑定数组以进行批量插入

批量 INSERT 操作支持绑定数据数组。传递众多数组中的一个数组,如下所示:

connection.execute({
  sqlText: 'INSERT INTO t(c1, c2, c3) values(?, ?, ?)',
  binds: [[1, 'string1', 2.0], [2, 'string2', 4.0], [3, 'string3', 6.0]]
});

备注

绑定大型数组会影响性能,如果数据规模太大而无法由服务器处理,则可能会被拒绝。

您还可以绑定 VARIANT 数据的数组。为了说明问题,假设您创建了一个包含 VARIANT 数据列的表,如下所示:

create or replace table test(id int, foo variant);

然后,您可以执行以下脚本:

// standard stuff like defining connection, etc
const statement = connection.execute({
  // table columns are id: int, foo: variant
  sqlText: 'insert into test_db.public.test select value:id, value:foo from table(flatten(parse_json(?)))',
  binds: [JSON.stringify([
    { id: 1, foo: [{ a: '1', b: '2' }] },
    { id: 2, foo: [{ c: '3', d: '4' }] }
  ])],
  complete: function (err, stmt, rows) {
    if (err) {
      console.error(`Failed to execute statement due to the following error: ${err.message}`);
    } else {
      console.log(`[queryID ${statement.getStatementId()}, requestId ${statement.getRequestId()}] Number of rows produced: ${rows.length}`);
      // rest of the code
    }
  }
});

取消语句

可以通过调用 statement.cancel() 方法取消语句:

statement.cancel((err, stmt) => {
  if (err) {
    console.error(`Unable to abort statement due to the following error: ${err.message}`);
  } else {
    console.log('Successfully aborted statement');
  }
});

重新提交请求

如果您不确定 Snowflake 是否成功执行了 SQL 语句(可能是由于网络错误或超时),您可以使用其请求 ID 重新提交相同的语句。例如,假设您提交了一条 INSERT 命令来添加数据,但没有及时收到确认,因此您不知道该命令发生了什么。在这种情况下,您不希望只执行与新命令相同的命令,因为这可能会导致执行该命令两次,从而产生数据重复。

通过在 SQL 语句中包含请求 ID,可以避免数据重复的可能性。使用初始请求中的请求 ID 重新提交请求,可确保仅在初始请求失败时才执行重新提交的命令。有关更多信息,请参阅 重新提交执行 SQL 语句的请求

备注

要使用请求 ID 重新提交查询,必须使用生成请求 ID 的相同连接。如果要通过其他连接检索查询结果,请参阅 RESULT_SCAN

以下代码示例演示了如何保存和使用请求 ID 重新提交语句。执行语句时,可以使用 getRequestId() 函数检索所提交请求的 ID。然后,可以使用该 ID 稍后执行相同的语句。以下示例执行 INSERT 语句并将其请求 ID 保存在 requestId 变量中。

let requestId;
connection.execute({
  sqlText: 'INSERT INTO testTable VALUES (1);',
  complete: function (err, stmt, rows) {
    const stream = stmt.streamRows();
    requestId = stmt.getRequestId(); // Retrieves the request ID
    stream.on('data', row => {
      console.log(row);
    });
    stream.on('end', () => {
      console.log('done');
    });
  }
});

如果没有收到命令成功执行的确认,可以使用保存的请求 ID 重新提交请求,如下所示。

connection.execute({
  sqlText: 'INSERT INTO testTable VALUES (1);',  // optional
  requestId: requestId,  // Uses the request ID from before
  complete: function (err, stmt, rows) {
    const stream = stmt.streamRows();
    stream.on('data', row => {
      console.log(row);
    });
    stream.on('end', () => {
      console.log('done');
    });
  }
});

如果选择使用 requestIdsqlText 重新提交请求,请注意以下交互:

  • 如果 requestId 已存在,这意味着它与以前的请求匹配,则该命令将忽略 sqlText 查询,并通过原始命令重新提交查询。

  • 如果 requestId 不存在,这意味着它与以前的请求不匹配,则该命令将执行 sqlText 查询。