处理异步子作业¶
本主题介绍如何在 Snowflake Scripting 中使用异步子作业。
异步子作业简介¶
在 Snowflake Scripting 中,异步子作业是指在代码块中的代码继续运行时在后台运行的查询。查询可以是任何有效的 SQL 语句,包括 SELECT 语句和 DML 语句,例如 INSERT 或 UPDATE。
要将查询作为异步子作业运行,请在查询前加上 ASYNC 关键字。当省略此关键字时,Snowflake Scripting 块将按顺序运行子作业,每个子作业都要等待正在运行的子作业完成后才能开始。异步子作业可以并发运行,这样可以提高效率并减少总体运行时间。
您可以通过以下方式使用 ASYNC 关键字:
用于为 RESULTSET 运行的查询。
用于独立于 RESULTSET 运行的查询。
要管理异步子作业,可以使用 AWAIT 和 CANCEL 语句:
AWAIT 语句会等待所有正在运行的异步子作业完成,或等待为 RESULTSET 运行的特定子作业完成,然后分别在所有作业完成或特定作业完成后返回。
CANCEL 语句会取消正在为 RESULTSET 运行的异步子作业。
通过调用 SYSTEM$GET_RESULTSET_STATUS 函数,您可以检查为 RESULTSET 运行的异步子作业的状态。
使用异步子作业的示例¶
以下各节提供了使用异步子作业的示例:
示例:运行并发查询表的子作业¶
以下代码显示如何使用 ASYNC 关键字运行多个并发查询表的子作业。示例为针对 RESULTSETs 运行的查询指定了 ASYNC 关键字。
此示例使用下表中的数据:
CREATE OR REPLACE TABLE orders_q1_2024 (
order_id INT,
order_amount NUMBER(12,2));
INSERT INTO orders_q1_2024 VALUES (1, 500.00);
INSERT INTO orders_q1_2024 VALUES (2, 225.00);
INSERT INTO orders_q1_2024 VALUES (3, 725.00);
INSERT INTO orders_q1_2024 VALUES (4, 150.00);
INSERT INTO orders_q1_2024 VALUES (5, 900.00);
CREATE OR REPLACE TABLE orders_q2_2024 (
order_id INT,
order_amount NUMBER(12,2));
INSERT INTO orders_q2_2024 VALUES (1, 100.00);
INSERT INTO orders_q2_2024 VALUES (2, 645.00);
INSERT INTO orders_q2_2024 VALUES (3, 275.00);
INSERT INTO orders_q2_2024 VALUES (4, 800.00);
INSERT INTO orders_q2_2024 VALUES (5, 250.00);
以下存储程序执行以下操作:
在两个表中查询所有行的
order_amount
值,并将结果返回给不同的 RESULTSETs(每个表一个)。使用 ASYNC 关键字指定查询作为并发子作业来运行。
为每个 RESULTSET 执行 AWAIT 语句,以便程序等待查询完成之后再继续。RESULTSET 的查询结果无法访问,直到为 RESULTSET 运行 AWAIT 为止。
使用游标计算每个表的
order_amount
行的总和。添加表的总计并返回值。
CREATE OR REPLACE PROCEDURE test_sp_async_child_jobs_query()
RETURNS INTEGER
LANGUAGE SQL
AS
DECLARE
accumulator1 INTEGER DEFAULT 0;
accumulator2 INTEGER DEFAULT 0;
res1 RESULTSET DEFAULT ASYNC (SELECT order_amount FROM orders_q1_2024);
res2 RESULTSET DEFAULT ASYNC (SELECT order_amount FROM orders_q2_2024);
BEGIN
AWAIT res1;
LET cur1 CURSOR FOR res1;
OPEN cur1;
AWAIT res2;
LET cur2 CURSOR FOR res2;
OPEN cur2;
FOR row_variable IN cur1 DO
accumulator1 := accumulator1 + row_variable.order_amount;
END FOR;
FOR row_variable IN cur2 DO
accumulator2 := accumulator2 + row_variable.order_amount;
END FOR;
RETURN accumulator1 + accumulator2;
END;
注意:如果您在 Python Connector 代码中使用 Snowflake CLI、SnowSQL、Classic Console 或者 execute_stream
或 execute_string
方法,请改用本示例(请参阅 在 Snowflake CLI、SnowSQL、Classic Console 和 Python Connector 中使用 Snowflake Scripting):
CREATE OR REPLACE PROCEDURE test_sp_async_child_jobs_query()
RETURNS INTEGER
LANGUAGE SQL
AS
$$
DECLARE
accumulator1 INTEGER DEFAULT 0;
accumulator2 INTEGER DEFAULT 0;
res1 RESULTSET DEFAULT ASYNC (SELECT order_amount FROM orders_q1_2024);
res2 RESULTSET DEFAULT ASYNC (SELECT order_amount FROM orders_q2_2024);
BEGIN
AWAIT res1;
LET cur1 CURSOR FOR res1;
OPEN cur1;
AWAIT res2;
LET cur2 CURSOR FOR res2;
OPEN cur2;
FOR row_variable IN cur1 DO
accumulator1 := accumulator1 + row_variable.order_amount;
END FOR;
FOR row_variable IN cur2 DO
accumulator2 := accumulator2 + row_variable.order_amount;
END FOR;
RETURN accumulator1 + accumulator2;
END;
$$;
调用存储过程:
CALL test_sp_async_child_jobs_query();
+--------------------------------+
| TEST_SP_ASYNC_CHILD_JOBS_QUERY |
|--------------------------------|
| 4570 |
+--------------------------------+
示例:运行子作业,并发地将行插入表中¶
以下代码显示如何使用 ASYNC 关键字运行多个子作业,并发地将行插入到表中。示例为针对 RESULTSETs 运行的查询指定了 ASYNC 关键字。
以下存储程序执行以下操作:
如果
orders_q3_2024
表不存在,则创建该表。创建两个 RESULTSETs,即
insert_1
和insert_2
,保存插入表中的结果。存储程序实参指定插入表中的值。指定使用 ASYNC 关键字将插入作为并发子作业来运行。
为每个 RESULTSET 执行 AWAIT 语句,以便程序等待插入完成之后再继续。RESULTSET 的结果无法访问,直到为 RESULTSET 运行 AWAIT 为止。
创建一个新的 RESULTSET
res
,保存orders_q3_2024
表的查询结果。返回查询的结果。
CREATE OR REPLACE PROCEDURE test_sp_async_child_jobs_insert(
arg1 INT,
arg2 NUMBER(12,2),
arg3 INT,
arg4 NUMBER(12,2))
RETURNS TABLE()
LANGUAGE SQL
AS
BEGIN
CREATE TABLE IF NOT EXISTS orders_q3_2024 (
order_id INT,
order_amount NUMBER(12,2));
LET insert_1 RESULTSET := ASYNC (INSERT INTO orders_q3_2024 SELECT :arg1, :arg2);
LET insert_2 RESULTSET := ASYNC (INSERT INTO orders_q3_2024 SELECT :arg3, :arg4);
AWAIT insert_1;
AWAIT insert_2;
LET res RESULTSET := (SELECT * FROM orders_q3_2024 ORDER BY order_id);
RETURN TABLE(res);
END;
注意:如果您在 Python Connector 代码中使用 Snowflake CLI、SnowSQL、Classic Console 或者 execute_stream
或 execute_string
方法,请改用本示例(请参阅 在 Snowflake CLI、SnowSQL、Classic Console 和 Python Connector 中使用 Snowflake Scripting):
CREATE OR REPLACE PROCEDURE test_sp_async_child_jobs_insert(
arg1 INT,
arg2 NUMBER(12,2),
arg3 INT,
arg4 NUMBER(12,2))
RETURNS TABLE()
LANGUAGE SQL
AS
$$
BEGIN
CREATE TABLE IF NOT EXISTS orders_q3_2024 (
order_id INT,
order_amount NUMBER(12,2));
LET insert_1 RESULTSET := ASYNC (INSERT INTO orders_q3_2024 SELECT :arg1, :arg2);
LET insert_2 RESULTSET := ASYNC (INSERT INTO orders_q3_2024 SELECT :arg3, :arg4);
AWAIT insert_1;
AWAIT insert_2;
LET res RESULTSET := (SELECT * FROM orders_q3_2024 ORDER BY order_id);
RETURN TABLE(res);
END;
$$;
调用存储过程:
CALL test_sp_async_child_jobs_insert(1, 325, 2, 241);
+----------+--------------+
| ORDER_ID | ORDER_AMOUNT |
|----------+--------------|
| 1 | 325.00 |
| 2 | 241.00 |
+----------+--------------+
示例:使用 AWAIT ALL 语句在存储过程中运行子作业¶
以下示例使用 ASYNC 关键字在存储过程中并发运行多个子作业。这些示例为未关联到 RESULTSET 的语句指定了 ASYNC 关键字,然后使用 AWAIT ALL 语句,使存储过程代码在继续执行之前等待所有异步子作业完成。
创建可并发插入数值的存储过程¶
以下存储过程使用 ASYNC 关键字运行多个子作业,并发地将行插入到表中。示例为 INSERT 语句指定了 ASYNC 关键字。该示例还使用了 AWAIT ALL 语句,以便存储过程等待所有异步子作业完成。
CREATE OR REPLACE PROCEDURE test_async_child_job_inserts()
RETURNS VARCHAR
LANGUAGE SQL
AS
BEGIN
CREATE OR REPLACE TABLE test_child_job_queries1 (col1 INT);
ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(1));
ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(2));
ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(3));
AWAIT ALL;
END;
注意:如果您在 Python Connector 代码中使用 Snowflake CLI、SnowSQL、Classic Console 或者 execute_stream
或 execute_string
方法,请改用本示例(请参阅 在 Snowflake CLI、SnowSQL、Classic Console 和 Python Connector 中使用 Snowflake Scripting):
CREATE OR REPLACE PROCEDURE test_async_child_job_inserts()
RETURNS VARCHAR
LANGUAGE SQL
AS
$$
BEGIN
CREATE OR REPLACE TABLE test_child_job_queries1 (col1 INT);
ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(1));
ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(2));
ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(3));
AWAIT ALL;
END;
$$
;
创建可并发更新值的存储过程¶
以下存储过程使用 ASYNC 关键字运行多个子作业,并发地更新表中的行。示例为 UPDATE 语句指定了 ASYNC 关键字。该示例还使用了 AWAIT ALL 语句,以便存储过程等待所有异步子作业完成。
创建表并插入数据:
CREATE OR REPLACE TABLE test_child_job_queries2 (id INT, cola INT);
INSERT INTO test_child_job_queries2 VALUES
(1, 100), (2, 101), (3, 102);
创建存储过程:
CREATE OR REPLACE PROCEDURE test_async_child_job_updates()
RETURNS VARCHAR
LANGUAGE SQL
AS
BEGIN
ASYNC (UPDATE test_child_job_queries2 SET cola=200 WHERE id=1);
ASYNC (UPDATE test_child_job_queries2 SET cola=201 WHERE id=2);
ASYNC (UPDATE test_child_job_queries2 SET cola=202 WHERE id=3);
AWAIT ALL;
END;
注意:如果您在 Python Connector 代码中使用 Snowflake CLI、SnowSQL、Classic Console 或者 execute_stream
或 execute_string
方法,请改用本示例(请参阅 在 Snowflake CLI、SnowSQL、Classic Console 和 Python Connector 中使用 Snowflake Scripting):
CREATE OR REPLACE PROCEDURE test_async_child_job_updates()
RETURNS VARCHAR
LANGUAGE SQL
AS
$$
BEGIN
ASYNC (UPDATE test_child_job_queries2 SET cola=200 WHERE id=1);
ASYNC (UPDATE test_child_job_queries2 SET cola=201 WHERE id=2);
ASYNC (UPDATE test_child_job_queries2 SET cola=202 WHERE id=3);
AWAIT ALL;
END;
$$
;
创建可并发调用其他存储过程的存储过程¶
CREATE OR REPLACE PROCEDURE test_async_child_job_calls()
RETURNS VARCHAR
LANGUAGE SQL
AS
BEGIN
ASYNC (CALL test_async_child_job_inserts());
ASYNC (CALL test_async_child_job_updates());
AWAIT ALL;
END;
注意:如果您在 Python Connector 代码中使用 Snowflake CLI、SnowSQL、Classic Console 或者 execute_stream
或 execute_string
方法,请改用本示例(请参阅 在 Snowflake CLI、SnowSQL、Classic Console 和 Python Connector 中使用 Snowflake Scripting):
CREATE OR REPLACE PROCEDURE test_async_child_job_calls()
RETURNS VARCHAR
LANGUAGE SQL
AS
$$
BEGIN
ASYNC (CALL test_async_child_job_inserts());
ASYNC (CALL test_async_child_job_updates());
AWAIT ALL;
END;
$$
;
调用 test_async_child_job_calls
存储过程:
CALL test_async_child_job_calls();
查询表以查看结果:
SELECT col1 FROM test_child_job_queries1 ORDER BY col1;
+------+
| COL1 |
|------|
| 1 |
| 2 |
| 3 |
+------+
SELECT * FROM test_child_job_queries2 ORDER BY id;
+----+------+
| ID | COLA |
|----+------|
| 1 | 200 |
| 2 | 201 |
| 3 | 202 |
+----+------+
示例:在循环中为插入运行子作业¶
以下代码显示如何在循环中使用 ASYNC 关键字运行多个子作业,并发地将行插入到表中。
此示例使用下表中的数据:
CREATE OR REPLACE TABLE async_loop_test1(col1 VARCHAR, col2 INT);
INSERT INTO async_loop_test1 VALUES
('child', 0),
('job', 1),
('loop', 2),
('test', 3);
CREATE OR REPLACE TABLE async_loop_test2(col1 INT, col2 VARCHAR);
创建存储过程,在 FOR 循环中使用异步子作业将从 async_loop_test1
获取的值与文本 async_
拼接后插入到 async_loop_test2
中。在每次迭代时,循环都会创建一个单独的异步子作业。AWAIT ALL 语句会阻止存储过程的进程,直到所有子作业完成。
CREATE OR REPLACE PROCEDURE async_insert()
RETURNS VARCHAR
LANGUAGE SQL
EXECUTE AS CALLER
AS
begin
LET res RESULTSET := (SELECT * FROM async_loop_test1 ORDER BY 1);
FOR record IN res DO
LET v VARCHAR := record.col1;
LET x INT := record.col2;
ASYNC (INSERT INTO async_loop_test2(col1, col2) VALUES (:x, (SELECT 'async_' || :v)));
END FOR;
AWAIT ALL;
RETURN 'Success';
END;
注意:如果您在 Python Connector 代码中使用 Snowflake CLI、SnowSQL、Classic Console 或者 execute_stream
或 execute_string
方法,请改用本示例(请参阅 在 Snowflake CLI、SnowSQL、Classic Console 和 Python Connector 中使用 Snowflake Scripting):
CREATE OR REPLACE PROCEDURE async_insert()
RETURNS VARCHAR
LANGUAGE SQL
EXECUTE AS CALLER
AS
$$
begin
LET res RESULTSET := (SELECT * FROM async_loop_test1 ORDER BY 1);
FOR record IN res DO
LET v VARCHAR := record.col1;
LET x INT := record.col2;
ASYNC (INSERT INTO async_loop_test2(col1, col2) VALUES (:x, (SELECT 'async_' || :v)));
END FOR;
AWAIT ALL;
RETURN 'Success';
END;
$$;
调用存储过程:
CALL async_insert();
+--------------+
| ASYNC_INSERT |
|--------------|
| Success |
+--------------+
查询 async_loop_test2
表以查看结果:
SELECT * FROM async_loop_test2 ORDER BY col1;
+------+-------------+
| COL1 | COL2 |
|------+-------------|
| 0 | async_child |
| 1 | async_job |
| 2 | async_loop |
| 3 | async_test |
+------+-------------+