使用 Python 管理 Snowflake 函数和存储过程

您可以使用 Python 来管理 Snowflake 中的用户定义函数 (UDFs) 和存储过程。创建 UDF 或过程时,使用一种受支持的处理程序语言编写其逻辑,然后使用 Snowflake Python APIs 创建它。有关 UDFs 和存储过程的更多信息,请参阅 使用函数和过程扩展 Snowflake

先决条件

在本主题中的示例中,假设您已添加了用来连接 Snowflake 和创建 Root 对象以使用 Snowflake Python APIs 的代码。

例如,以下代码使用配置文件中定义的连接参数来创建与 Snowflake 的连接:

from snowflake.core import Root
from snowflake.snowpark import Session

session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Copy

代码可通过生成的 Session 对象创建 Root 对象,从而使用 API 的类型和方法。有关更多信息,请参阅 使用 Snowflake Python APIs 连接到 Snowflake

管理用户定义的函数 (UDFs)

您可以管理用户定义函数 (UDFs);您可以编写用户定义的函数来扩展系统,以执行 Snowflake 提供的内置系统定义函数无法执行的操作。创建 UDF 之后,便可以多次重复使用。有关更多信息,请参阅 用户定义的函数概述

备注

当前不支持使用 API 调用 UDFs。

Snowflake Python APIs 使用两种不同类型来表示 UDFs:

  • UserDefinedFunction:显示 UDF 的属性,例如其名称、实参列表、返回类型和函数定义。

  • UserDefinedFunctionResource:显示可用于获取相应 UserDefinedFunction 对象、重命名 UDF 和删除 UDF 的方法。

创建 UDF

要创建 UDF,请先创建 UserDefinedFunction 对象,然后根据 API Root 对象创建 UserDefinedFunctionCollection 对象。使用 UserDefinedFunctionCollection.create 将新 UDF 添加到 Snowflake。

创建 UDF 时,指定一个处理程序,该处理程序的代码是用支持的语言之一编写的。

Python

以下示例中的代码会创建一个 UserDefinedFunction 对象,其表示 my_db 数据库和 my_schema 架构中名为 my_python_function 且具有指定的实参、返回类型、语言和 UDF Python 定义的 UDF:

from snowflake.core.user_defined_function import (
    PythonFunction,
    ReturnDataType,
    UserDefinedFunction
)

function_of_python = UserDefinedFunction(
    "my_python_function",
    arguments=[],
    return_type=ReturnDataType(datatype="VARIANT"),
    language_config=PythonFunction(runtime_version="3.9", packages=[], handler="udf"),
    body="""
def udf():
    return {"key": "value"}
    """,
)

root.databases["my_db"].schemas["my_schema"].user_defined_functions.create(function_of_python)
Copy

Java

以下示例中的代码会创建一个 UserDefinedFunction 对象,其表示 my_db 数据库和 my_schema 架构中名为 my_java_function 且具有指定的实参、返回类型、语言和 UDF Java 定义的 UDF:

from snowflake.core.user_defined_function import (
    Argument,
    JavaFunction,
    ReturnDataType,
    UserDefinedFunction
)

function_body = """
    class TestFunc {
        public static String echoVarchar(String x) {
            return x;
        }
    }
"""

function_of_java = UserDefinedFunction(
    name="my_java_function",
    arguments=[Argument(name="x", datatype="STRING")],
    return_type=ReturnDataType(datatype="VARCHAR", nullable=True),
    language_config=JavaFunction(
        handler="TestFunc.echoVarchar",
        runtime_version="11",
        target_path=target_path,
        packages=[],
        called_on_null_input=True,
        is_volatile=True,
    ),
    body=function_body,
    comment="test_comment",
)

root.databases["my_db"].schemas["my_schema"].user_defined_functions.create(function_of_java)
Copy

JavaScript

以下示例中的代码会创建一个 UserDefinedFunction 对象,其表示 my_db 数据库和 my_schema 架构中名为 my_javascript_function 且具有指定的实参、返回类型、语言和 UDF JavaScript 定义的 UDF:

from snowflake.core.user_defined_function import (
    Argument,
    ReturnDataType,
    JavaScriptFunction,
    UserDefinedFunction
)

function_body = """
    if (D <= 0) {
        return 1;
    } else {
        var result = 1;
        for (var i = 2; i <= D; i++) {
            result = result * i;
        }
        return result;
    }
"""

function_of_javascript = UserDefinedFunction(
    name="my_javascript_function",
    arguments=[Argument(name="d", datatype="DOUBLE")],
    return_type=ReturnDataType(datatype="DOUBLE"),
    language_config=JavaScriptFunction(),
    body=function_body,
)

root.databases["my_db"].schemas["my_schema"].user_defined_functions.create(function_of_javascript)
Copy

Scala

以下示例中的代码会创建一个 UserDefinedFunction 对象,其表示 my_db 数据库和 my_schema 架构中名为 my_scala_function 且具有指定的实参、返回类型、语言和 UDF Scala 定义的 UDF:

from snowflake.core.user_defined_function import (
    Argument,
    ReturnDataType,
    ScalaFunction,
    UserDefinedFunction
)

function_body = """
    class Echo {
        def echoVarchar(x : String): String = {
            return x
        }
    }
"""

function_of_scala = UserDefinedFunction(
    name="my_scala_function",
    arguments=[Argument(name="x", datatype="VARCHAR")],
    return_type=ReturnDataType(datatype="VARCHAR"),
    language_config=ScalaFunction(
        runtime_version="2.12", handler="Echo.echoVarchar", target_path=target_path, packages=[]
    ),
    body=function_body,
    comment="test_comment",
)

root.databases["my_db"].schemas["my_schema"].user_defined_functions.create(function_of_scala)
Copy

SQL

以下示例中的代码会创建一个 UserDefinedFunction 对象,其表示 my_db 数据库和 my_schema 架构中名为 my_sql_function 且具有指定的实参、返回类型、语言和 UDF SQL 定义的 UDF:

from snowflake.core.user_defined_function import (
    ReturnDataType,
    SQLFunction,
    UserDefinedFunction
)

function_body = """3.141592654::FLOAT"""

function_of_sql = UserDefinedFunction(
    name="my_sql_function",
    arguments=[],
    return_type=ReturnDataType(datatype="FLOAT"),
    language_config=SQLFunction(),
    body=function_body,
)

root.databases["my_db"].schemas["my_schema"].user_defined_functions.create(function_of_sql)
Copy

获取 UDF 详细信息

您可以通过调用 UserDefinedFunctionResource.fetch 方法来获取有关 UDF 的信息,该方法会返回一个 UserDefinedFunction 对象。

以下示例中的代码会获取有关 my_db 数据库和 my_schema 模式中的 my_javascript_function(DOUBLE) UDF 的信息:

备注

获取 UDF 资源对象时,必须指定完整签名(UDF 名称及其参数数据类型),因为 UDFs 可以重载。

my_udf = root.databases["my_db"].schemas["my_schema"].user_defined_functions["my_javascript_function(DOUBLE)"].fetch()
print(my_udf.to_dict())
Copy

列出 UDFs

您可以使用 UserDefinedFunctionCollection.iter 方法列出 UDFs,该方法会返回 UserDefinedFunction 对象的 PagedIter 迭代器。

以下示例中的代码会列出 my_db 数据库和 my_schema 架构中名称以 my_java 开头的 UDFs,并打印每个自定义函数的名称:

udf_iter = root.databases["my_db"].schemas["my_schema"].user_defined_functions.iter(like="my_java%")
for udf_obj in udf_iter:
    print(udf_obj.name)
Copy

重命名 UDF

您可以使用 UserDefinedFunctionResource 对象重命名 UDF。

以下示例中的代码会获取 my_db 数据库和 my_schema 架构中的 my_javascript_function(DOUBLE) UDF 资源对象,然后将 UDF 重命名为 my_other_js_function,同时将其移动到 my_other_db 数据库和 my_other_schema 架构:

root.databases["my_db"].schemas["my_schema"].user_defined_functions["my_javascript_function(DOUBLE)"].rename(
    "my_other_js_function",
    target_database = "my_other_database",
    target_schema = "my_other_schema"
)
Copy

删除 UDF

您可以使用 UserDefinedFunctionResource 对象删除 UDF。

以下示例中的代码会获取 my_javascript_function(DOUBLE) UDF 资源对象,然后删除 UDF:

my_udf_res = root.databases["my_db"].schemas["my_schema"].user_defined_functions["my_javascript_function(DOUBLE)"]
my_udf_res.drop()
Copy

管理存储过程

您可以管理存储过程;您可以编写这些存储过程以使用执行 SQL 的过程代码来扩展系统。在存储过程中,可以使用程序化结构来执行分支和循环。创建存储过程后,便可以多次重复使用。有关更多信息,请参阅 存储过程概述

Snowflake Python APIs 使用两种不同类型来表示过程:

  • Procedure:显示过程的属性,例如其名称、实参列表、返回类型和过程定义。

  • ProcedureResource:显示可用于提取相应 Procedure 对象、调用过程和删除过程的方法。

创建过程

要创建过程,请先创建 Procedure 对象,然后根据 API Root 对象创建 ProcedureCollection 对象。使用 ProcedureCollection.create 将新过程添加到 Snowflake。

以下示例中的代码会创建一个 Procedure 对象,其表示 my_db 数据库和 my_schema 架构中名为 my_procedure 且具有指定的实参、返回类型和 SQL 过程定义的过程:

from snowflake.core.procedure import Argument, ColumnType, Procedure, ReturnTable, SQLFunction

procedure = Procedure(
    name="my_procedure",
    arguments=[Argument(name="id", datatype="VARCHAR")],
    return_type=ReturnTable(
        column_list=[
            ColumnType(name="id", datatype="NUMBER),
            ColumnType(name="price", datatype="NUMBER"),
        ]
    ),
    language_config=SQLFunction(),
    body="
        DECLARE
            res RESULTSET DEFAULT (SELECT * FROM invoices WHERE id = :id);
        BEGIN
            RETURN TABLE(res);
        END;
    ",
)

procedures = root.databases["my_db"].schemas["my_schema"].procedures
procedures.create(procedure)
Copy

调用过程

您可以使用 ProcedureResource 对象调用过程。

以下示例中的代码会获取 my_procedure(NUMBER, NUMBER) 过程资源对象,创建一个 CallArgumentList 对象,然后使用该实参列表调用该过程。

备注

在获取过程资源对象时,必须指定完整签名(过程名称及其参数数据类型),因为过程可能会重载。

from snowflake.core.procedure import CallArgument, CallArgumentList

procedure_reference = root.databases["my_db"].schemas["my_schema"].procedures["my_procedure(NUMBER, NUMBER)"]
call_argument_list = CallArgumentList(call_arguments=[
    CallArgument(name="id", datatype="NUMBER", value=1),
])
procedure_reference.call(call_argument_list)
Copy

获取过程详细信息

您可以通过调用 ProcedureResource.fetch 方法获取有关过程的信息,该方法会返回一个 Procedure 对象。

以下示例中的代码会获取有关 my_db 数据库和 my_schema 架构中的 my_procedure(NUMBER, NUMBER) 过程的信息:

my_procedure = root.databases["my_db"].schemas["my_schema"].procedures["my_procedure(NUMBER, NUMBER)"].fetch()
print(my_procedure.to_dict())
Copy

列出程序

您可以使用 ProcedureCollection.iter 方法列出过程,该方法会返回 Procedure 对象的 PagedIter 迭代器。

以下示例中的代码会列出 my_db 数据库和 my_schema 架构中名称以 my 开头的过程,然后打印每个过程的名称:

procedure_iter = root.databases["my_db"].schemas["my_schema"].procedures.iter(like="my%")
for procedure_obj in procedure_iter:
    print(procedure_obj.name)
Copy

删除过程

您可以使用 ProcedureResource 对象删除过程。

以下示例中的代码会获取 my_db 数据库和 my_schema 架构中的 my_procedure(NUMBER, NUMBER) 过程资源对象,然后删除该过程。

my_procedure_res = root.databases["my_db"].schemas["my_schema"].procedures["my_procedure(NUMBER, NUMBER)"]
my_procedure_res.drop()
Copy
语言: 中文