处理异步子作业

本主题介绍如何在 Snowflake Scripting 中使用异步子作业。

异步子作业简介

在 Snowflake Scripting 中,异步子作业是指在代码块中的代码继续运行时在后台运行的查询。查询可以是任何有效的 SQL 语句,包括 SELECT 语句和 DML 语句,例如 INSERT 或 UPDATE。

要将查询作为异步子作业运行,请在查询前加上 ASYNC 关键字。当省略此关键字时,Snowflake Scripting 块将按顺序运行子作业,每个子作业都要等待正在运行的子作业完成后才能开始。异步子作业可以并发运行,这样可以提高效率并减少总体运行时间。

您可以通过以下方式使用 ASYNC 关键字:

  • 用于为 RESULTSET 运行的查询。

  • 用于独立于 RESULTSET 运行的查询。

要管理异步子作业,可以使用 AWAITCANCEL 语句:

  • AWAIT 语句会等待所有正在运行的异步子作业完成,或等待为 RESULTSET 运行的特定子作业完成,然后分别在所有作业完成或特定作业完成后返回。

  • CANCEL 语句会取消正在为 RESULTSET 运行的异步子作业。

通过调用 SYSTEM$GET_RESULTSET_STATUS 函数,您可以检查为 RESULTSET 运行的异步子作业的状态。

使用异步子作业的示例

以下各节提供了使用异步子作业的示例:

示例:运行并发查询表的子作业

以下代码显示如何使用 ASYNC 关键字运行多个并发查询表的子作业。示例为针对 RESULTSETs 运行的查询指定了 ASYNC 关键字。

此示例使用下表中的数据:

CREATE OR REPLACE TABLE orders_q1_2024 (
  order_id INT,
  order_amount NUMBER(12,2));

INSERT INTO orders_q1_2024 VALUES (1, 500.00);
INSERT INTO orders_q1_2024 VALUES (2, 225.00);
INSERT INTO orders_q1_2024 VALUES (3, 725.00);
INSERT INTO orders_q1_2024 VALUES (4, 150.00);
INSERT INTO orders_q1_2024 VALUES (5, 900.00);

CREATE OR REPLACE TABLE orders_q2_2024 (
  order_id INT,
  order_amount NUMBER(12,2));

INSERT INTO orders_q2_2024 VALUES (1, 100.00);
INSERT INTO orders_q2_2024 VALUES (2, 645.00);
INSERT INTO orders_q2_2024 VALUES (3, 275.00);
INSERT INTO orders_q2_2024 VALUES (4, 800.00);
INSERT INTO orders_q2_2024 VALUES (5, 250.00);
Copy

以下存储程序执行以下操作:

  • 在两个表中查询所有行的 order_amount 值,并将结果返回给不同的 RESULTSETs(每个表一个)。

  • 使用 ASYNC 关键字指定查询作为并发子作业来运行。

  • 为每个 RESULTSET 执行 AWAIT 语句,以便程序等待查询完成之后再继续。RESULTSET 的查询结果无法访问,直到为 RESULTSET 运行 AWAIT 为止。

  • 使用游标计算每个表的 order_amount 行的总和。

  • 添加表的总计并返回值。

CREATE OR REPLACE PROCEDURE test_sp_async_child_jobs_query()
RETURNS INTEGER
LANGUAGE SQL
AS
DECLARE
  accumulator1 INTEGER DEFAULT 0;
  accumulator2 INTEGER DEFAULT 0;
  res1 RESULTSET DEFAULT ASYNC (SELECT order_amount FROM orders_q1_2024);
  res2 RESULTSET DEFAULT ASYNC (SELECT order_amount FROM orders_q2_2024);
BEGIN
  AWAIT res1;
  LET cur1 CURSOR FOR res1;
  OPEN cur1;
  AWAIT res2;
  LET cur2 CURSOR FOR res2;
  OPEN cur2;
  FOR row_variable IN cur1 DO
      accumulator1 := accumulator1 + row_variable.order_amount;
  END FOR;
  FOR row_variable IN cur2 DO
      accumulator2 := accumulator2 + row_variable.order_amount;
  END FOR;
  RETURN accumulator1 + accumulator2;
END;
Copy

注意:如果您在 Python Connector 代码中使用 Snowflake CLISnowSQLClassic Console 或者 execute_streamexecute_string 方法,请改用本示例(请参阅 在 Snowflake CLI、SnowSQL、Classic Console 和 Python Connector 中使用 Snowflake Scripting):

CREATE OR REPLACE PROCEDURE test_sp_async_child_jobs_query()
RETURNS INTEGER
LANGUAGE SQL
AS
$$
  DECLARE
    accumulator1 INTEGER DEFAULT 0;
    accumulator2 INTEGER DEFAULT 0;
    res1 RESULTSET DEFAULT ASYNC (SELECT order_amount FROM orders_q1_2024);
    res2 RESULTSET DEFAULT ASYNC (SELECT order_amount FROM orders_q2_2024);
  BEGIN
    AWAIT res1;
    LET cur1 CURSOR FOR res1;
    OPEN cur1;
    AWAIT res2;
    LET cur2 CURSOR FOR res2;
    OPEN cur2;
    FOR row_variable IN cur1 DO
        accumulator1 := accumulator1 + row_variable.order_amount;
    END FOR;
    FOR row_variable IN cur2 DO
        accumulator2 := accumulator2 + row_variable.order_amount;
    END FOR;
    RETURN accumulator1 + accumulator2;
  END;
$$;
Copy

调用存储过程:

CALL test_sp_async_child_jobs_query();
Copy
+--------------------------------+
| TEST_SP_ASYNC_CHILD_JOBS_QUERY |
|--------------------------------|
|                           4570 |
+--------------------------------+

示例:运行子作业,并发地将行插入表中

以下代码显示如何使用 ASYNC 关键字运行多个子作业,并发地将行插入到表中。示例为针对 RESULTSETs 运行的查询指定了 ASYNC 关键字。

以下存储程序执行以下操作:

  • 如果 orders_q3_2024 表不存在,则创建该表。

  • 创建两个 RESULTSETs,即 insert_1insert_2,保存插入表中的结果。存储程序实参指定插入表中的值。

  • 指定使用 ASYNC 关键字将插入作为并发子作业来运行。

  • 为每个 RESULTSET 执行 AWAIT 语句,以便程序等待插入完成之后再继续。RESULTSET 的结果无法访问,直到为 RESULTSET 运行 AWAIT 为止。

  • 创建一个新的 RESULTSET res,保存 orders_q3_2024 表的查询结果。

  • 返回查询的结果。

CREATE OR REPLACE PROCEDURE test_sp_async_child_jobs_insert(
  arg1 INT,
  arg2 NUMBER(12,2),
  arg3 INT,
  arg4 NUMBER(12,2))
RETURNS TABLE()
LANGUAGE SQL
AS
  BEGIN
   CREATE TABLE IF NOT EXISTS orders_q3_2024 (
      order_id INT,
      order_amount NUMBER(12,2));
    LET insert_1 RESULTSET := ASYNC (INSERT INTO orders_q3_2024 SELECT :arg1, :arg2);
    LET insert_2 RESULTSET := ASYNC (INSERT INTO orders_q3_2024 SELECT :arg3, :arg4);
    AWAIT insert_1;
    AWAIT insert_2;
    LET res RESULTSET := (SELECT * FROM orders_q3_2024 ORDER BY order_id);
    RETURN TABLE(res);
  END;
Copy

注意:如果您在 Python Connector 代码中使用 Snowflake CLISnowSQLClassic Console 或者 execute_streamexecute_string 方法,请改用本示例(请参阅 在 Snowflake CLI、SnowSQL、Classic Console 和 Python Connector 中使用 Snowflake Scripting):

CREATE OR REPLACE PROCEDURE test_sp_async_child_jobs_insert(
  arg1 INT,
  arg2 NUMBER(12,2),
  arg3 INT,
  arg4 NUMBER(12,2))
RETURNS TABLE()
LANGUAGE SQL
AS
$$
  BEGIN
   CREATE TABLE IF NOT EXISTS orders_q3_2024 (
      order_id INT,
      order_amount NUMBER(12,2));
    LET insert_1 RESULTSET := ASYNC (INSERT INTO orders_q3_2024 SELECT :arg1, :arg2);
    LET insert_2 RESULTSET := ASYNC (INSERT INTO orders_q3_2024 SELECT :arg3, :arg4);
    AWAIT insert_1;
    AWAIT insert_2;
    LET res RESULTSET := (SELECT * FROM orders_q3_2024 ORDER BY order_id);
    RETURN TABLE(res);
  END;
$$;
Copy

调用存储过程:

CALL test_sp_async_child_jobs_insert(1, 325, 2, 241);
Copy
+----------+--------------+
| ORDER_ID | ORDER_AMOUNT |
|----------+--------------|
|        1 |       325.00 |
|        2 |       241.00 |
+----------+--------------+

示例:使用 AWAIT ALL 语句在存储过程中运行子作业

以下示例使用 ASYNC 关键字在存储过程中并发运行多个子作业。这些示例为未关联到 RESULTSET 的语句指定了 ASYNC 关键字,然后使用 AWAIT ALL 语句,使存储过程代码在继续执行之前等待所有异步子作业完成。

创建可并发插入数值的存储过程

以下存储过程使用 ASYNC 关键字运行多个子作业,并发地将行插入到表中。示例为 INSERT 语句指定了 ASYNC 关键字。该示例还使用了 AWAIT ALL 语句,以便存储过程等待所有异步子作业完成。

CREATE OR REPLACE PROCEDURE test_async_child_job_inserts()
RETURNS VARCHAR
LANGUAGE SQL
AS
BEGIN
  CREATE OR REPLACE TABLE test_child_job_queries1 (col1 INT);
  ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(1));
  ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(2));
  ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(3));
  AWAIT ALL;
END;
Copy

注意:如果您在 Python Connector 代码中使用 Snowflake CLISnowSQLClassic Console 或者 execute_streamexecute_string 方法,请改用本示例(请参阅 在 Snowflake CLI、SnowSQL、Classic Console 和 Python Connector 中使用 Snowflake Scripting):

CREATE OR REPLACE PROCEDURE test_async_child_job_inserts()
RETURNS VARCHAR
LANGUAGE SQL
AS
$$
BEGIN
  CREATE OR REPLACE TABLE test_child_job_queries1 (col1 INT);
  ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(1));
  ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(2));
  ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(3));
  AWAIT ALL;
END;
$$
;
Copy

创建可并发更新值的存储过程

以下存储过程使用 ASYNC 关键字运行多个子作业,并发地更新表中的行。示例为 UPDATE 语句指定了 ASYNC 关键字。该示例还使用了 AWAIT ALL 语句,以便存储过程等待所有异步子作业完成。

创建表并插入数据:

CREATE OR REPLACE TABLE test_child_job_queries2 (id INT, cola INT);

INSERT INTO test_child_job_queries2 VALUES
  (1, 100), (2, 101), (3, 102);
Copy

创建存储过程:

CREATE OR REPLACE PROCEDURE test_async_child_job_updates()
RETURNS VARCHAR
LANGUAGE SQL
AS
BEGIN
  ASYNC (UPDATE test_child_job_queries2 SET cola=200 WHERE id=1);
  ASYNC (UPDATE test_child_job_queries2 SET cola=201 WHERE id=2);
  ASYNC (UPDATE test_child_job_queries2 SET cola=202 WHERE id=3);
  AWAIT ALL;
END;
Copy

注意:如果您在 Python Connector 代码中使用 Snowflake CLISnowSQLClassic Console 或者 execute_streamexecute_string 方法,请改用本示例(请参阅 在 Snowflake CLI、SnowSQL、Classic Console 和 Python Connector 中使用 Snowflake Scripting):

CREATE OR REPLACE PROCEDURE test_async_child_job_updates()
RETURNS VARCHAR
LANGUAGE SQL
AS
$$
BEGIN
  ASYNC (UPDATE test_child_job_queries2 SET cola=200 WHERE id=1);
  ASYNC (UPDATE test_child_job_queries2 SET cola=201 WHERE id=2);
  ASYNC (UPDATE test_child_job_queries2 SET cola=202 WHERE id=3);
  AWAIT ALL;
END;
$$
;
Copy

创建可并发调用其他存储过程的存储过程

CREATE OR REPLACE PROCEDURE test_async_child_job_calls()
RETURNS VARCHAR
LANGUAGE SQL
AS
BEGIN
  ASYNC (CALL test_async_child_job_inserts());
  ASYNC (CALL test_async_child_job_updates());
  AWAIT ALL;
END;
Copy

注意:如果您在 Python Connector 代码中使用 Snowflake CLISnowSQLClassic Console 或者 execute_streamexecute_string 方法,请改用本示例(请参阅 在 Snowflake CLI、SnowSQL、Classic Console 和 Python Connector 中使用 Snowflake Scripting):

CREATE OR REPLACE PROCEDURE test_async_child_job_calls()
RETURNS VARCHAR
LANGUAGE SQL
AS
$$
BEGIN
  ASYNC (CALL test_async_child_job_inserts());
  ASYNC (CALL test_async_child_job_updates());
  AWAIT ALL;
END;
$$
;
Copy

调用 test_async_child_job_calls 存储过程:

CALL test_async_child_job_calls();
Copy

查询表以查看结果:

SELECT col1 FROM test_child_job_queries1 ORDER BY col1;
Copy
+------+
| COL1 |
|------|
|    1 |
|    2 |
|    3 |
+------+
SELECT * FROM test_child_job_queries2 ORDER BY id;
Copy
+----+------+
| ID | COLA |
|----+------|
|  1 |  200 |
|  2 |  201 |
|  3 |  202 |
+----+------+

示例:在循环中为插入运行子作业

以下代码显示如何在循环中使用 ASYNC 关键字运行多个子作业,并发地将行插入到表中。

此示例使用下表中的数据:

CREATE OR REPLACE TABLE async_loop_test1(col1 VARCHAR, col2 INT);

INSERT INTO async_loop_test1 VALUES
  ('child', 0),
  ('job', 1),
  ('loop', 2),
  ('test', 3);

CREATE OR REPLACE TABLE async_loop_test2(col1 INT, col2 VARCHAR);
Copy

创建存储过程,在 FOR 循环中使用异步子作业将从 async_loop_test1 获取的值与文本 async_ 拼接后插入到 async_loop_test2 中。在每次迭代时,循环都会创建一个单独的异步子作业。AWAIT ALL 语句会阻止存储过程的进程,直到所有子作业完成。

CREATE OR REPLACE PROCEDURE async_insert()
RETURNS VARCHAR
LANGUAGE SQL
EXECUTE AS CALLER
AS
begin
  LET res RESULTSET := (SELECT * FROM async_loop_test1 ORDER BY 1);

  FOR record IN res DO
    LET v VARCHAR := record.col1;
    LET x INT := record.col2;
      ASYNC (INSERT INTO async_loop_test2(col1, col2) VALUES (:x, (SELECT 'async_' || :v)));
    END FOR;

    AWAIT ALL;
    RETURN 'Success';
END;
Copy

注意:如果您在 Python Connector 代码中使用 Snowflake CLISnowSQLClassic Console 或者 execute_streamexecute_string 方法,请改用本示例(请参阅 在 Snowflake CLI、SnowSQL、Classic Console 和 Python Connector 中使用 Snowflake Scripting):

CREATE OR REPLACE PROCEDURE async_insert()
RETURNS VARCHAR
LANGUAGE SQL
EXECUTE AS CALLER
AS
$$
begin
  LET res RESULTSET := (SELECT * FROM async_loop_test1 ORDER BY 1);

  FOR record IN res DO
    LET v VARCHAR := record.col1;
    LET x INT := record.col2;
      ASYNC (INSERT INTO async_loop_test2(col1, col2) VALUES (:x, (SELECT 'async_' || :v)));
    END FOR;

    AWAIT ALL;
    RETURN 'Success';
END;
$$;
Copy

调用存储过程:

CALL async_insert();
Copy
+--------------+
| ASYNC_INSERT |
|--------------|
| Success      |
+--------------+

查询 async_loop_test2 表以查看结果:

SELECT * FROM async_loop_test2 ORDER BY col1;
Copy
+------+-------------+
| COL1 | COL2        |
|------+-------------|
|    0 | async_child |
|    1 | async_job   |
|    2 | async_loop  |
|    3 | async_test  |
+------+-------------+
语言: 中文