存储过程 Python 处理程序示例¶
使用工作进程运行并发任务¶
您可以使用 Python 工作进程运行并发任务。当您需要运行的并行任务利用仓库节点上的多个 CPU 核心时,您可能会发现此操作很有用。
备注
Snowflake 建议您不要使用内置的 Python 多处理模块。
要解决 ` Python 全局解释器锁 <https://wiki.python.org/moin/GlobalInterpreterLock (https://wiki.python.org/moin/GlobalInterpreterLock)>`_ 阻止多任务处理方法扩展到所有 CPU 核心,您可以使用单独的工作进程而不是线程来执行并发任务。
您可以使用 joblib
库的 Parallel
类对 Snowflake 仓库执行此操作,如以下示例所示。
CREATE OR REPLACE PROCEDURE joblib_multiprocessing_proc(i INT)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
HANDLER = 'joblib_multiprocessing'
PACKAGES = ('snowflake-snowpark-python', 'joblib')
AS $$
import joblib
from math import sqrt
def joblib_multiprocessing(session, i):
result = joblib.Parallel(n_jobs=-1)(joblib.delayed(sqrt)(i ** 2) for i in range(10))
return str(result)
$$;
备注
Snowflake Standard 和 Snowpark-Optimized Warehouses 用于 joblib.Parallel
的默认后端有所不同。
Standard Warehouse 默认值:
threading
Snowpark-Optimized Warehouse 默认值:
loky
(多处理)
您可以通过调用 joblib.parallel_backend
函数来替换默认后端设置,如下例所示。
import joblib
joblib.parallel_backend('loky')
使用 Snowpark APIs 进行异步处理¶
以下示例说明了如何使用 Snowpark APIs 启动异步子作业,以及这些作业在不同条件下的行为方式。
检查异步子作业的状态¶
在下面的示例中,过程 checkStatus
会执行一个等待 60 秒的异步子作业。然后,过程会在任务完成之前检查任务的状态,因此检查返回 False
。
CREATE OR REPLACE PROCEDURE checkStatus()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
async_job = session.sql("select system$wait(60)").collect_nowait()
return async_job.is_done()
$$;
以下代码会调用该过程。
CALL checkStatus();
+-------------+
| checkStatus |
|-------------|
| False |
+-------------+
取消异步子作业¶
在下面的示例中,过程 cancelJob
使用 SQL 将数据插入 test_tb
表,异步子作业需要 10 秒才能完成。然后,它会在子作业完成并插入数据之前取消子作业。
CREATE OR REPLACE TABLE test_tb(c1 STRING);
CREATE OR REPLACE PROCEDURE cancelJob()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'async_handler'
EXECUTE AS OWNER
AS $$
def async_handler(session):
async_job = session.sql("insert into test_tb (select system$wait(10))").collect_nowait()
return async_job.cancel()
$$;
CALL cancelJob();
以下代码会查询 test_tb
表,但由于没有插入数据,所以不返回结果。
SELECT * FROM test_tb;
+----+
| C1 |
|----|
+----+
异步子作业运行时的等待和阻塞¶
在下面的示例中,存储过程 blockUntilDone
会执行一项异步子作业,需要 5 秒才能完成。使用 snowflake.snowpark.AsyncJob.result
方法,过程会等待并在任务完成后返回。
CREATE OR REPLACE PROCEDURE blockUntilDone()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
async_job = session.sql("select system$wait(5)").collect_nowait()
return async_job.result()
$$;
以下代码会调用 blockUntilDone
过程,等待 5 秒后返回。
CALL blockUntilDone();
+------------------------------------------+
| blockUntilDone |
|------------------------------------------|
| [Row(SYSTEM$WAIT(5)='waited 5 seconds')] |
+------------------------------------------+
在请求未完成异步子作业的结果后返回错误¶
在下面的示例中,过程 earlyReturn
会执行一项异步子作业,需要 60 秒才能完成。然后,在作业完成之前,过程会尝试从作业结果中返回 DataFrame
。结果是返回错误。
CREATE OR REPLACE PROCEDURE earlyReturn()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
async_job = session.sql("select system$wait(60)").collect_nowait()
df = async_job.to_df()
try:
df.collect()
except Exception as ex:
return 'Error: (02000): Result for query <UUID> has expired'
$$;
以下代码调用 earlyReturn
过程,返回错误。
CALL earlyReturn();
+------------------------------------------------------------+
| earlyReturn |
|------------------------------------------------------------|
| Error: (02000): Result for query <UUID> has expired |
+------------------------------------------------------------+
在子作业完成前完成父作业,取消子作业¶
在下面的示例中,过程 earlyCancelJob
会执行异步子作业,向表中插入数据,需要 10 秒才能完成。但是,父作业 async_handler
会在子作业完成前返回,从而取消子作业。
CREATE OR REPLACE PROCEDURE earlyCancelJob()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS OWNER
AS $$
def async_handler(session):
async_job = session.sql("insert into test_tb (select system$wait(10))").collect_nowait()
$$;
以下代码会调用 earlyCancelJob
过程。然后查询 test_tb
表,但由于取消的子作业没有插入数据,因此没有返回结果。
CALL earlyCancelJob();
SELECT * FROM test_tb;
+----+
| C1 |
|----|
+----+