存储过程 Python 处理程序示例

使用工作进程运行并发任务

您可以使用 Python 工作进程运行并发任务。当您需要运行的并行任务利用仓库节点上的多个 CPU 核心时,您可能会发现此操作很有用。

备注

Snowflake 建议您不要使用内置的 Python 多处理模块。

要解决 ` Python 全局解释器锁 <https://wiki.python.org/moin/GlobalInterpreterLock (https://wiki.python.org/moin/GlobalInterpreterLock)>`_ 阻止多任务处理方法扩展到所有 CPU 核心,您可以使用单独的工作进程而不是线程来执行并发任务。

您可以使用 joblib 库的 Parallel 类对 Snowflake 仓库执行此操作,如以下示例所示。

CREATE OR REPLACE PROCEDURE joblib_multiprocessing_proc(i INT)
  RETURNS STRING
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.9
  HANDLER = 'joblib_multiprocessing'
  PACKAGES = ('snowflake-snowpark-python', 'joblib')
AS $$
import joblib
from math import sqrt

def joblib_multiprocessing(session, i):
  result = joblib.Parallel(n_jobs=-1)(joblib.delayed(sqrt)(i ** 2) for i in range(10))
  return str(result)
$$;
Copy

备注

Snowflake Standard 和 Snowpark-Optimized Warehouses 用于 joblib.Parallel 的默认后端有所不同。

  • Standard Warehouse 默认值:threading

  • Snowpark-Optimized Warehouse 默认值:loky (多处理)

您可以通过调用 joblib.parallel_backend 函数来替换默认后端设置,如下例所示。

import joblib
joblib.parallel_backend('loky')
Copy

使用 Snowpark APIs 进行异步处理

以下示例说明了如何使用 Snowpark APIs 启动异步子作业,以及这些作业在不同条件下的行为方式。

检查异步子作业的状态

在下面的示例中,过程 checkStatus 会执行一个等待 60 秒的异步子作业。然后,过程会在任务完成之前检查任务的状态,因此检查返回 False

CREATE OR REPLACE PROCEDURE checkStatus()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(60)").collect_nowait()
    return async_job.is_done()
$$;
Copy

以下代码会调用该过程。

CALL checkStatus();
Copy
+-------------+
| checkStatus |
|-------------|
| False       |
+-------------+

取消异步子作业

在下面的示例中,过程 cancelJob 使用 SQL 将数据插入 test_tb 表,异步子作业需要 10 秒才能完成。然后,它会在子作业完成并插入数据之前取消子作业。

CREATE OR REPLACE TABLE test_tb(c1 STRING);
Copy
CREATE OR REPLACE PROCEDURE cancelJob()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'async_handler'
EXECUTE AS OWNER
AS $$
def async_handler(session):
    async_job = session.sql("insert into test_tb (select system$wait(10))").collect_nowait()
    return async_job.cancel()
$$;

CALL cancelJob();
Copy

以下代码会查询 test_tb 表,但由于没有插入数据,所以不返回结果。

SELECT * FROM test_tb;
Copy
+----+
| C1 |
|----|
+----+

异步子作业运行时的等待和阻塞

在下面的示例中,存储过程 blockUntilDone 会执行一项异步子作业,需要 5 秒才能完成。使用 snowflake.snowpark.AsyncJob.result 方法,过程会等待并在任务完成后返回。

CREATE OR REPLACE PROCEDURE blockUntilDone()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(5)").collect_nowait()
    return async_job.result()
$$;
Copy

以下代码会调用 blockUntilDone 过程,等待 5 秒后返回。

CALL blockUntilDone();
Copy
+------------------------------------------+
| blockUntilDone                               |
|------------------------------------------|
| [Row(SYSTEM$WAIT(5)='waited 5 seconds')] |
+------------------------------------------+

在请求未完成异步子作业的结果后返回错误

在下面的示例中,过程 earlyReturn 会执行一项异步子作业,需要 60 秒才能完成。然后,在作业完成之前,过程会尝试从作业结果中返回 DataFrame。结果是返回错误。

CREATE OR REPLACE PROCEDURE earlyReturn()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(60)").collect_nowait()
    df = async_job.to_df()
    try:
        df.collect()
    except Exception as ex:
        return 'Error: (02000): Result for query <UUID> has expired'
$$;
Copy

以下代码调用 earlyReturn 过程,返回错误。

CALL earlyReturn();
Copy
+------------------------------------------------------------+
| earlyReturn                                                 |
|------------------------------------------------------------|
| Error: (02000): Result for query <UUID> has expired        |
+------------------------------------------------------------+

在子作业完成前完成父作业,取消子作业

在下面的示例中,过程 earlyCancelJob 会执行异步子作业,向表中插入数据,需要 10 秒才能完成。但是,父作业 async_handler 会在子作业完成前返回,从而取消子作业。

CREATE OR REPLACE PROCEDURE earlyCancelJob()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS OWNER
AS $$
def async_handler(session):
    async_job = session.sql("insert into test_tb (select system$wait(10))").collect_nowait()
$$;
Copy

以下代码会调用 earlyCancelJob 过程。然后查询 test_tb 表,但由于取消的子作业没有插入数据,因此没有返回结果。

CALL earlyCancelJob();
SELECT * FROM test_tb;
Copy
+----+
| C1 |
|----|
+----+
语言: 中文