存储过程 Python 处理程序示例

使用工作进程运行并发任务

您可以使用 Python 工作进程运行并发任务。当您需要运行的并行任务利用仓库节点上的多个 CPU 核心时,您可能会发现此操作很有用。

备注

Snowflake 建议您不要使用内置的 Python 多处理模块。

要解决 ` Python 全局解释器锁 <https://wiki.python.org/moin/GlobalInterpreterLock (https://wiki.python.org/moin/GlobalInterpreterLock)>`_ 阻止多任务处理方法扩展到所有 CPU 核心,您可以使用单独的工作进程而不是线程来执行并发任务。

您可以使用 joblib 库的 Parallel 类对 Snowflake 仓库执行此操作,如以下示例所示。

CREATE OR REPLACE PROCEDURE joblib_multiprocessing_proc(i INT)
  RETURNS STRING
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.12
  HANDLER = 'joblib_multiprocessing'
  PACKAGES = ('snowflake-snowpark-python', 'joblib')
AS $$
import joblib
from math import sqrt

def joblib_multiprocessing(session, i):
  result = joblib.Parallel(n_jobs=-1)(joblib.delayed(sqrt)(i ** 2) for i in range(10))
  return str(result)
$$;

备注

Snowflake Standard 和 Snowpark-Optimized Warehouses 用于 joblib.Parallel 的默认后端有所不同。

  • Standard Warehouse 默认值:threading

  • Snowpark-Optimized Warehouse 默认值:loky (多处理)

您可以通过调用 joblib.parallel_backend 函数来替换默认后端设置,如下例所示。

import joblib
joblib.parallel_backend('loky')

使用 Snowpark APIs 进行异步处理

以下示例说明了如何使用 Snowpark APIs 启动异步子作业,以及这些作业在不同条件下的行为方式。

检查异步子作业的状态

在下面的示例中,过程 checkStatus 会执行一个等待 60 秒的异步子作业。然后,过程会在任务完成之前检查任务的状态,因此检查返回 False

CREATE OR REPLACE PROCEDURE checkStatus()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.12
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(60)").collect_nowait()
    return async_job.is_done()
$$;

以下代码会调用该过程。

CALL checkStatus();
+-------------+
| checkStatus |
|-------------|
| False       |
+-------------+

取消异步子作业

在下面的示例中,过程 cancelJob 使用 SQL 将数据插入 test_tb 表,异步子作业需要 10 秒才能完成。然后,它会在子作业完成并插入数据之前取消子作业。

CREATE OR REPLACE TABLE test_tb(c1 STRING);
CREATE OR REPLACE PROCEDURE cancelJob()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.12
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'async_handler'
EXECUTE AS OWNER
AS $$
def async_handler(session):
    async_job = session.sql("insert into test_tb (select system$wait(10))").collect_nowait()
    return async_job.cancel()
$$;

CALL cancelJob();

以下代码会查询 test_tb 表,但由于没有插入数据,所以不返回结果。

SELECT * FROM test_tb;
+----+
| C1 |
|----|
+----+

异步子作业运行时的等待和阻塞

在下面的示例中,存储过程 blockUntilDone 会执行一项异步子作业,需要 5 秒才能完成。使用 snowflake.snowpark.AsyncJob.result 方法,过程会等待并在任务完成后返回。

CREATE OR REPLACE PROCEDURE blockUntilDone()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.12
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(5)").collect_nowait()
    return async_job.result()
$$;

以下代码会调用 blockUntilDone 过程,等待 5 秒后返回。

CALL blockUntilDone();
+------------------------------------------+
| blockUntilDone                               |
|------------------------------------------|
| [Row(SYSTEM$WAIT(5)='waited 5 seconds')] |
+------------------------------------------+

在请求未完成异步子作业的结果后返回错误

在下面的示例中,过程 earlyReturn 会执行一项异步子作业,需要 60 秒才能完成。然后,在作业完成之前,过程会尝试从作业结果中返回 DataFrame。结果是返回错误。

CREATE OR REPLACE PROCEDURE earlyReturn()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.12
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(60)").collect_nowait()
    df = async_job.to_df()
    try:
        df.collect()
    except Exception as ex:
        return 'Error: (02000): Result for query <UUID> has expired'
$$;

以下代码调用 earlyReturn 过程,返回错误。

CALL earlyReturn();
+------------------------------------------------------------+
| earlyReturn                                                 |
|------------------------------------------------------------|
| Error: (02000): Result for query <UUID> has expired        |
+------------------------------------------------------------+

在子作业完成前完成父作业,取消子作业

在下面的示例中,过程 earlyCancelJob 会执行异步子作业,向表中插入数据,需要 10 秒才能完成。但是,父作业 async_handler 会在子作业完成前返回,从而取消子作业。

CREATE OR REPLACE PROCEDURE earlyCancelJob()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.12
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS OWNER
AS $$
def async_handler(session):
    async_job = session.sql("insert into test_tb (select system$wait(10))").collect_nowait()
$$;

以下代码会调用 earlyCancelJob 过程。然后查询 test_tb 表,但由于取消的子作业没有插入数据,因此没有返回结果。

CALL earlyCancelJob();
SELECT * FROM test_tb;
+----+
| C1 |
|----|
+----+

读取文件和资产

使用 IMPORTS 读取静态指定的文件

您可以通过在 IMPORTS 命令的 CREATE PROCEDURE 子句中指定文件名和暂存区名来读取文件。

在 IMPORTS 子句中指定文件时,Snowflake 会将文件从暂存区复制到存储过程的 * 主目录 (也称为 *导入目录),存储过程正是从该目录中读取文件。

Snowflake 将导入的文件复制到单个目录。目录中的所有文件都必须具有唯一的名称,因此 IMPORTS 子句中的每个文件都必须具有不同的名称。即使文件在不同暂存区或暂存区内的不同子目录中启动,这点也适用。

以下示例使用内联 Python 处理程序,从名为 file.txt 的暂存区读取名为 my_stage 的文件。处理程序使用带 snowflake_import_directory 系统选项的 Python sys._xoptions 方法来检索存储过程主目录的位置。

Snowflake 在存储过程创建期间仅读取一次文件,如果是在目标处理程序外部读取文件,则在存储过程执行期间不会再次读取该文件。

使用内联处理程序创建存储过程:

CREATE OR REPLACE PROCEDURE test_file_import_sp()
RETURNS STRING
LANGUAGE PYTHON
PACKAGES = ('snowflake-snowpark-python')
IMPORTS = ('@my_stage/dir/file.txt')
HANDLER = 'run'
RUNTIME_VERSION = 3.12
EXECUTE AS CALLER
AS $$
import os
import sys

def run(session):
  with open(os.path.join(sys._xoptions["snowflake_import_directory"], 'file.txt'), "r") as f:
    return f.read()
$$;
CALL test_file_import_sp();
// return file content

使用 IMPORTS 导入目录

您可以使用 CREATE PROCEDURE 命令的 IMPORTS 子句导入目录。

备注

  • 目录的导入路径必须以斜杠 (/) 结尾。例如 IMPORTS = ('@my_stage/my_dir/')

  • 要在导入时重命名目录,请将 /=custom_name/ 追加到暂存区路径。自定义名称必须是单个目录名称,而不是路径。例如 IMPORTS = ('@my_stage/my_dir/=custom_name/')

  • 原生应用不支持目录导入。

以下示例从名为 my_stage 的暂存区导入名为 my_dir 的目录,并列出其中包含的文件。

CREATE OR REPLACE PROCEDURE my_directory_import_list_sp()
RETURNS STRING
LANGUAGE PYTHON
PACKAGES = ('snowflake-snowpark-python')
IMPORTS = ('@my_stage/my_dir/')
HANDLER = 'run'
RUNTIME_VERSION = 3.12
EXECUTE AS CALLER
AS $$
import os
import sys
def list_files(directory):
  files = []
  # Walk through the directory and its subdirectories
  for dirpath, _, filenames in os.walk(directory):
    for filename in filenames:
      # Append the relative path to each file to the list
      full_path = os.path.join(dirpath, filename)
      files.append(os.path.relpath(full_path, directory))
  return files
def run(session):
  directory_path = sys._xoptions["snowflake_import_directory"]
  file_list = list_files(directory_path)
  file_list_str = ' '.join(file_list)
  return file_list_str
$$;
CALL my_directory_import_list_sp();