处理异步子作业

本主题介绍如何在 Snowflake Scripting 中使用异步子作业。

异步子作业简介

在 Snowflake Scripting 中,异步子作业是指在代码块中的代码继续运行时在后台运行的查询。查询可以是任何有效的 SQL 语句,包括 SELECT 语句和 DML 语句,例如 INSERT 或 UPDATE。

要将查询作为异步子作业运行,请在查询前加上 ASYNC 关键字。当省略此关键字时,Snowflake Scripting 块将按顺序运行子作业,每个子作业都要等待正在运行的子作业完成后才能开始。异步子作业可以并发运行,这样可以提高效率并减少总体运行时间。

您可以通过以下方式使用 ASYNC 关键字:

  • For a query that is run for a RESULTSET.
  • 用于独立于 RESULTSET 运行的查询。

To manage asynchronous child jobs, use the AWAIT and CANCEL statements:

  • The AWAIT statement waits for all asynchronous child jobs that are running to finish or for a specific child job that is running for a RESULTSET to finish, then returns when the all jobs have finished or the specific job has finished, respectively.
  • CANCEL 语句会取消正在为 RESULTSET 运行的异步子作业。

You can check the status of an asynchronous child job that is running for a RESULTSET by calling the SYSTEM$GET_RESULTSET_STATUS function.

当前,最多可以同时运行 4,000 个异步子任务。如果并发异步子作业的数量超过此限制,则会返回错误。

Note

When multiple asynchronous child jobs run concurrently in the same session, the job completion order isn’t known until the jobs have finished running. Since the completion order can vary, using the LAST_QUERY_ID function with asynchronous child jobs is non-deterministic.

使用异步子作业的示例

以下各节提供了使用异步子作业的示例:

示例:运行并发查询表的子作业

以下代码显示如何使用 ASYNC 关键字运行多个并发查询表的子作业。示例为针对 RESULTSETs 运行的查询指定了 ASYNC 关键字。

此示例使用下表中的数据:

CREATE OR REPLACE TABLE orders_q1_2024 (
  order_id INT,
  order_amount NUMBER(12,2));

INSERT INTO orders_q1_2024 VALUES (1, 500.00);
INSERT INTO orders_q1_2024 VALUES (2, 225.00);
INSERT INTO orders_q1_2024 VALUES (3, 725.00);
INSERT INTO orders_q1_2024 VALUES (4, 150.00);
INSERT INTO orders_q1_2024 VALUES (5, 900.00);

CREATE OR REPLACE TABLE orders_q2_2024 (
  order_id INT,
  order_amount NUMBER(12,2));

INSERT INTO orders_q2_2024 VALUES (1, 100.00);
INSERT INTO orders_q2_2024 VALUES (2, 645.00);
INSERT INTO orders_q2_2024 VALUES (3, 275.00);
INSERT INTO orders_q2_2024 VALUES (4, 800.00);
INSERT INTO orders_q2_2024 VALUES (5, 250.00);

以下存储程序执行以下操作:

  • Queries both tables for the order_amount values in all rows and returns the results to different RESULTSETs (one for each table).
  • 使用 ASYNC 关键字指定查询作为并发子作业来运行。
  • Executes the AWAIT statement for each RESULTSET so that the procedure waits for the queries to finish before proceeding. Query results for a RESULTSET can’t be accessed until AWAIT is run for the RESULTSET.
  • Uses a cursor to calculate the sum of the order_amount rows for each table.
  • 添加表的总计并返回值。
CREATE OR REPLACE PROCEDURE test_sp_async_child_jobs_query()
RETURNS INTEGER
LANGUAGE SQL
AS
DECLARE
  accumulator1 INTEGER DEFAULT 0;
  accumulator2 INTEGER DEFAULT 0;
  res1 RESULTSET DEFAULT ASYNC (SELECT order_amount FROM orders_q1_2024);
  res2 RESULTSET DEFAULT ASYNC (SELECT order_amount FROM orders_q2_2024);
BEGIN
  AWAIT res1;
  LET cur1 CURSOR FOR res1;
  OPEN cur1;
  AWAIT res2;
  LET cur2 CURSOR FOR res2;
  OPEN cur2;
  FOR row_variable IN cur1 DO
      accumulator1 := accumulator1 + row_variable.order_amount;
  END FOR;
  FOR row_variable IN cur2 DO
      accumulator2 := accumulator2 + row_variable.order_amount;
  END FOR;
  RETURN accumulator1 + accumulator2;
END;

Note: If you use Snowflake CLI, SnowSQL, the Classic Console, or the execute_stream or execute_string method in Python Connector code, use this example instead (see Using Snowflake Scripting in Snowflake CLI, SnowSQL, and Python Connector):

CREATE OR REPLACE PROCEDURE test_sp_async_child_jobs_query()
RETURNS INTEGER
LANGUAGE SQL
AS
$$
  DECLARE
    accumulator1 INTEGER DEFAULT 0;
    accumulator2 INTEGER DEFAULT 0;
    res1 RESULTSET DEFAULT ASYNC (SELECT order_amount FROM orders_q1_2024);
    res2 RESULTSET DEFAULT ASYNC (SELECT order_amount FROM orders_q2_2024);
  BEGIN
    AWAIT res1;
    LET cur1 CURSOR FOR res1;
    OPEN cur1;
    AWAIT res2;
    LET cur2 CURSOR FOR res2;
    OPEN cur2;
    FOR row_variable IN cur1 DO
        accumulator1 := accumulator1 + row_variable.order_amount;
    END FOR;
    FOR row_variable IN cur2 DO
        accumulator2 := accumulator2 + row_variable.order_amount;
    END FOR;
    RETURN accumulator1 + accumulator2;
  END;
$$;

调用存储过程:

CALL test_sp_async_child_jobs_query();
+--------------------------------+
| TEST_SP_ASYNC_CHILD_JOBS_QUERY |
%--------------------------------%
|                           4570 |
+--------------------------------+

示例:运行子作业,并发地将行插入表中

以下代码显示如何使用 ASYNC 关键字运行多个子作业,并发地将行插入到表中。示例为针对 RESULTSETs 运行的查询指定了 ASYNC 关键字。

以下存储程序执行以下操作:

  • Creates the orders_q3_2024 table if it doesn’t exist.
  • Creates two RESULTSETs, insert_1 and insert_2, that hold the results of inserts into the table. The stored procedure arguments specify the values that are inserted into the table.
  • 指定使用 ASYNC 关键字将插入作为并发子作业来运行。
  • Executes the AWAIT statement for each RESULTSET so that the procedure waits for the inserts to finish before proceeding. The results of a RESULTSET can’t be accessed until AWAIT is run for the RESULTSET.
  • Creates a new RESULTSET res that holds the results of a query on the orders_q3_2024 table.
  • 返回查询的结果。
CREATE OR REPLACE PROCEDURE test_sp_async_child_jobs_insert(
  arg1 INT,
  arg2 NUMBER(12,2),
  arg3 INT,
  arg4 NUMBER(12,2))
RETURNS TABLE()
LANGUAGE SQL
AS
  BEGIN
   CREATE TABLE IF NOT EXISTS orders_q3_2024 (
      order_id INT,
      order_amount NUMBER(12,2));
    LET insert_1 RESULTSET := ASYNC (INSERT INTO orders_q3_2024 SELECT :arg1, :arg2);
    LET insert_2 RESULTSET := ASYNC (INSERT INTO orders_q3_2024 SELECT :arg3, :arg4);
    AWAIT insert_1;
    AWAIT insert_2;
    LET res RESULTSET := (SELECT * FROM orders_q3_2024 ORDER BY order_id);
    RETURN TABLE(res);
  END;

Note: If you use Snowflake CLI, SnowSQL, the Classic Console, or the execute_stream or execute_string method in Python Connector code, use this example instead (see Using Snowflake Scripting in Snowflake CLI, SnowSQL, and Python Connector):

CREATE OR REPLACE PROCEDURE test_sp_async_child_jobs_insert(
  arg1 INT,
  arg2 NUMBER(12,2),
  arg3 INT,
  arg4 NUMBER(12,2))
RETURNS TABLE()
LANGUAGE SQL
AS
$$
  BEGIN
   CREATE TABLE IF NOT EXISTS orders_q3_2024 (
      order_id INT,
      order_amount NUMBER(12,2));
    LET insert_1 RESULTSET := ASYNC (INSERT INTO orders_q3_2024 SELECT :arg1, :arg2);
    LET insert_2 RESULTSET := ASYNC (INSERT INTO orders_q3_2024 SELECT :arg3, :arg4);
    AWAIT insert_1;
    AWAIT insert_2;
    LET res RESULTSET := (SELECT * FROM orders_q3_2024 ORDER BY order_id);
    RETURN TABLE(res);
  END;
$$;

调用存储过程:

CALL test_sp_async_child_jobs_insert(1, 325, 2, 241);
+----------+--------------+
| ORDER_ID | ORDER_AMOUNT |
|----------+--------------|
|        1 |       325.00 |
|        2 |       241.00 |
+----------+--------------+

示例:使用 AWAIT ALL 语句在存储过程中运行子作业

以下示例使用 ASYNC 关键字在存储过程中并发运行多个子作业。这些示例为未关联到 RESULTSET 的语句指定了 ASYNC 关键字,然后使用 AWAIT ALL 语句,使存储过程代码在继续执行之前等待所有异步子作业完成。

创建可并发插入数值的存储过程

以下存储过程使用 ASYNC 关键字运行多个子作业,并发地将行插入到表中。示例为 INSERT 语句指定了 ASYNC 关键字。该示例还使用了 AWAIT ALL 语句,以便存储过程等待所有异步子作业完成。

CREATE OR REPLACE PROCEDURE test_async_child_job_inserts()
RETURNS VARCHAR
LANGUAGE SQL
AS
BEGIN
  CREATE OR REPLACE TABLE test_child_job_queries1 (col1 INT);
  ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(1));
  ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(2));
  ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(3));
  AWAIT ALL;
END;

Note: If you use Snowflake CLI, SnowSQL, the Classic Console, or the execute_stream or execute_string method in Python Connector code, use this example instead (see Using Snowflake Scripting in Snowflake CLI, SnowSQL, and Python Connector):

CREATE OR REPLACE PROCEDURE test_async_child_job_inserts()
RETURNS VARCHAR
LANGUAGE SQL
AS
$$
BEGIN
  CREATE OR REPLACE TABLE test_child_job_queries1 (col1 INT);
  ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(1));
  ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(2));
  ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(3));
  AWAIT ALL;
END;
$$
;

创建可并发更新值的存储过程

以下存储过程使用 ASYNC 关键字运行多个子作业,并发地更新表中的行。示例为 UPDATE 语句指定了 ASYNC 关键字。该示例还使用了 AWAIT ALL 语句,以便存储过程等待所有异步子作业完成。

创建表并插入数据:

CREATE OR REPLACE TABLE test_child_job_queries2 (id INT, cola INT);

INSERT INTO test_child_job_queries2 VALUES
  (1, 100), (2, 101), (3, 102);

创建存储过程:

CREATE OR REPLACE PROCEDURE test_async_child_job_updates()
RETURNS VARCHAR
LANGUAGE SQL
AS
BEGIN
  ASYNC (UPDATE test_child_job_queries2 SET cola=200 WHERE id=1);
  ASYNC (UPDATE test_child_job_queries2 SET cola=201 WHERE id=2);
  ASYNC (UPDATE test_child_job_queries2 SET cola=202 WHERE id=3);
  AWAIT ALL;
END;

Note: If you use Snowflake CLI, SnowSQL, the Classic Console, or the execute_stream or execute_string method in Python Connector code, use this example instead (see Using Snowflake Scripting in Snowflake CLI, SnowSQL, and Python Connector):

CREATE OR REPLACE PROCEDURE test_async_child_job_updates()
RETURNS VARCHAR
LANGUAGE SQL
AS
$$
BEGIN
  ASYNC (UPDATE test_child_job_queries2 SET cola=200 WHERE id=1);
  ASYNC (UPDATE test_child_job_queries2 SET cola=201 WHERE id=2);
  ASYNC (UPDATE test_child_job_queries2 SET cola=202 WHERE id=3);
  AWAIT ALL;
END;
$$
;

创建可并发调用其他存储过程的存储过程

CREATE OR REPLACE PROCEDURE test_async_child_job_calls()
RETURNS VARCHAR
LANGUAGE SQL
AS
BEGIN
  ASYNC (CALL test_async_child_job_inserts());
  ASYNC (CALL test_async_child_job_updates());
  AWAIT ALL;
END;

Note: If you use Snowflake CLI, SnowSQL, the Classic Console, or the execute_stream or execute_string method in Python Connector code, use this example instead (see Using Snowflake Scripting in Snowflake CLI, SnowSQL, and Python Connector):

CREATE OR REPLACE PROCEDURE test_async_child_job_calls()
RETURNS VARCHAR
LANGUAGE SQL
AS
$$
BEGIN
  ASYNC (CALL test_async_child_job_inserts());
  ASYNC (CALL test_async_child_job_updates());
  AWAIT ALL;
END;
$$
;

Call the test_async_child_job_calls stored procedure:

CALL test_async_child_job_calls();

查询表以查看结果:

SELECT col1 FROM test_child_job_queries1 ORDER BY col1;
+------+
| COL1 |
%------%
|    1 |
|    2 |
|    3 |
+------+
SELECT * FROM test_child_job_queries2 ORDER BY id;
+----+------+
| ID | COLA |
|----+------|
|  1 |  200 |
|  2 |  201 |
|  3 |  202 |
+----+------+

示例:在循环中为插入运行子作业

以下代码显示如何在循环中使用 ASYNC 关键字运行多个子作业,并发地将行插入到表中。

此示例使用下表中的数据:

CREATE OR REPLACE TABLE async_loop_test1(col1 VARCHAR, col2 INT);

INSERT INTO async_loop_test1 VALUES
  ('child', 0),
  ('job', 1),
  ('loop', 2),
  ('test', 3);

CREATE OR REPLACE TABLE async_loop_test2(col1 INT, col2 VARCHAR);

Create a stored procedure that inserts values from async_loop_test1, concatenated with the text async_ into async_loop_test2 using asynchronous child jobs in a FOR loop. The loop creates a separate asynchronous child job on each iteration. The AWAIT ALL statement blocks progress in the stored procedure until all of the child jobs are done.

CREATE OR REPLACE PROCEDURE async_insert()
RETURNS VARCHAR
LANGUAGE SQL
EXECUTE AS CALLER
AS
begin
  LET res RESULTSET := (SELECT * FROM async_loop_test1 ORDER BY 1);

  FOR record IN res DO
    LET v VARCHAR := record.col1;
    LET x INT := record.col2;
      ASYNC (INSERT INTO async_loop_test2(col1, col2) VALUES (:x, (SELECT 'async_' || :v)));
    END FOR;

    AWAIT ALL;
    RETURN 'Success';
END;

Note: If you use Snowflake CLI, SnowSQL, the Classic Console, or the execute_stream or execute_string method in Python Connector code, use this example instead (see Using Snowflake Scripting in Snowflake CLI, SnowSQL, and Python Connector):

CREATE OR REPLACE PROCEDURE async_insert()
RETURNS VARCHAR
LANGUAGE SQL
EXECUTE AS CALLER
AS
$$
begin
  LET res RESULTSET := (SELECT * FROM async_loop_test1 ORDER BY 1);

  FOR record IN res DO
    LET v VARCHAR := record.col1;
    LET x INT := record.col2;
      ASYNC (INSERT INTO async_loop_test2(col1, col2) VALUES (:x, (SELECT 'async_' || :v)));
    END FOR;

    AWAIT ALL;
    RETURN 'Success';
END;
$$;

调用存储过程:

CALL async_insert();
+--------------+
| ASYNC_INSERT |
%--------------%
| Success      |
+--------------+

Query the async_loop_test2 table to see the results:

SELECT * FROM async_loop_test2 ORDER BY col1;
+------+-------------+
| COL1 | COL2        |
|------+-------------|
|    0 | async_child |
|    1 | async_job   |
|    2 | async_loop  |
|    3 | async_test  |
+------+-------------+