使用 Python 管理 Snowflake 函数和存储过程¶
您可以使用 Python 来管理 Snowflake 中的用户定义函数 (UDFs) 和存储过程。创建 UDF 或过程时,使用一种受支持的处理程序语言编写其逻辑,然后使用 Snowflake Python APIs 创建它。有关 UDFs 和存储过程的更多信息,请参阅 使用函数和过程扩展 Snowflake。
先决条件¶
在本主题中的示例中,假设您已添加了用来连接 Snowflake 和创建 Root
对象以使用 Snowflake Python APIs 的代码。
例如,以下代码使用配置文件中定义的连接参数来创建与 Snowflake 的连接:
from snowflake.core import Root
from snowflake.snowpark import Session
session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
代码可通过生成的 Session
对象创建 Root
对象,从而使用 API 的类型和方法。有关更多信息,请参阅 使用 Snowflake Python APIs 连接到 Snowflake。
管理用户定义的函数 (UDFs)¶
您可以管理用户定义函数 (UDFs);您可以编写用户定义的函数来扩展系统,以执行 Snowflake 提供的内置系统定义函数无法执行的操作。创建 UDF 之后,便可以多次重复使用。有关更多信息,请参阅 用户定义的函数概述。
备注
当前不支持使用 API 调用 UDFs。
Snowflake Python APIs 使用两种不同类型来表示 UDFs:
UserDefinedFunction
:显示 UDF 的属性,例如其名称、实参列表、返回类型和函数定义。UserDefinedFunctionResource
:显示可用于获取相应UserDefinedFunction
对象、重命名 UDF 和删除 UDF 的方法。
创建 UDF¶
要创建 UDF,请先创建 UserDefinedFunction
对象,然后根据 API Root
对象创建 UserDefinedFunctionCollection
对象。使用 UserDefinedFunctionCollection.create
将新 UDF 添加到 Snowflake。
创建 UDF 时,指定一个处理程序,该处理程序的代码是用支持的语言之一编写的。
Python¶
以下示例中的代码会创建一个 UserDefinedFunction
对象,其表示 my_db
数据库和 my_schema
架构中名为 my_python_function
且具有指定的实参、返回类型、语言和 UDF Python 定义的 UDF:
from snowflake.core.user_defined_function import (
PythonFunction,
ReturnDataType,
UserDefinedFunction
)
function_of_python = UserDefinedFunction(
"my_python_function",
arguments=[],
return_type=ReturnDataType(datatype="VARIANT"),
language_config=PythonFunction(runtime_version="3.9", packages=[], handler="udf"),
body="""
def udf():
return {"key": "value"}
""",
)
root.databases["my_db"].schemas["my_schema"].user_defined_functions.create(function_of_python)
Java¶
以下示例中的代码会创建一个 UserDefinedFunction
对象,其表示 my_db
数据库和 my_schema
架构中名为 my_java_function
且具有指定的实参、返回类型、语言和 UDF Java 定义的 UDF:
from snowflake.core.user_defined_function import (
Argument,
JavaFunction,
ReturnDataType,
UserDefinedFunction
)
function_body = """
class TestFunc {
public static String echoVarchar(String x) {
return x;
}
}
"""
function_of_java = UserDefinedFunction(
name="my_java_function",
arguments=[Argument(name="x", datatype="STRING")],
return_type=ReturnDataType(datatype="VARCHAR", nullable=True),
language_config=JavaFunction(
handler="TestFunc.echoVarchar",
runtime_version="11",
target_path=target_path,
packages=[],
called_on_null_input=True,
is_volatile=True,
),
body=function_body,
comment="test_comment",
)
root.databases["my_db"].schemas["my_schema"].user_defined_functions.create(function_of_java)
JavaScript¶
以下示例中的代码会创建一个 UserDefinedFunction
对象,其表示 my_db
数据库和 my_schema
架构中名为 my_javascript_function
且具有指定的实参、返回类型、语言和 UDF JavaScript 定义的 UDF:
from snowflake.core.user_defined_function import (
Argument,
ReturnDataType,
JavaScriptFunction,
UserDefinedFunction
)
function_body = """
if (D <= 0) {
return 1;
} else {
var result = 1;
for (var i = 2; i <= D; i++) {
result = result * i;
}
return result;
}
"""
function_of_javascript = UserDefinedFunction(
name="my_javascript_function",
arguments=[Argument(name="d", datatype="DOUBLE")],
return_type=ReturnDataType(datatype="DOUBLE"),
language_config=JavaScriptFunction(),
body=function_body,
)
root.databases["my_db"].schemas["my_schema"].user_defined_functions.create(function_of_javascript)
Scala¶
以下示例中的代码会创建一个 UserDefinedFunction
对象,其表示 my_db
数据库和 my_schema
架构中名为 my_scala_function
且具有指定的实参、返回类型、语言和 UDF Scala 定义的 UDF:
from snowflake.core.user_defined_function import (
Argument,
ReturnDataType,
ScalaFunction,
UserDefinedFunction
)
function_body = """
class Echo {
def echoVarchar(x : String): String = {
return x
}
}
"""
function_of_scala = UserDefinedFunction(
name="my_scala_function",
arguments=[Argument(name="x", datatype="VARCHAR")],
return_type=ReturnDataType(datatype="VARCHAR"),
language_config=ScalaFunction(
runtime_version="2.12", handler="Echo.echoVarchar", target_path=target_path, packages=[]
),
body=function_body,
comment="test_comment",
)
root.databases["my_db"].schemas["my_schema"].user_defined_functions.create(function_of_scala)
SQL¶
以下示例中的代码会创建一个 UserDefinedFunction
对象,其表示 my_db
数据库和 my_schema
架构中名为 my_sql_function
且具有指定的实参、返回类型、语言和 UDF SQL 定义的 UDF:
from snowflake.core.user_defined_function import (
ReturnDataType,
SQLFunction,
UserDefinedFunction
)
function_body = """3.141592654::FLOAT"""
function_of_sql = UserDefinedFunction(
name="my_sql_function",
arguments=[],
return_type=ReturnDataType(datatype="FLOAT"),
language_config=SQLFunction(),
body=function_body,
)
root.databases["my_db"].schemas["my_schema"].user_defined_functions.create(function_of_sql)
获取 UDF 详细信息¶
您可以通过调用 UserDefinedFunctionResource.fetch
方法来获取有关 UDF 的信息,该方法会返回一个 UserDefinedFunction
对象。
以下示例中的代码会获取有关 my_db
数据库和 my_schema
模式中的 my_javascript_function(DOUBLE)
UDF 的信息:
备注
获取 UDF 资源对象时,必须指定完整签名(UDF 名称及其参数数据类型),因为 UDFs 可以重载。
my_udf = root.databases["my_db"].schemas["my_schema"].user_defined_functions["my_javascript_function(DOUBLE)"].fetch()
print(my_udf.to_dict())
列出 UDFs¶
您可以使用 UserDefinedFunctionCollection.iter
方法列出 UDFs,该方法会返回 UserDefinedFunction
对象的 PagedIter
迭代器。
以下示例中的代码会列出 my_db
数据库和 my_schema
架构中名称以 my_java
开头的 UDFs,并打印每个自定义函数的名称:
udf_iter = root.databases["my_db"].schemas["my_schema"].user_defined_functions.iter(like="my_java%")
for udf_obj in udf_iter:
print(udf_obj.name)
重命名 UDF¶
您可以使用 UserDefinedFunctionResource
对象重命名 UDF。
以下示例中的代码会获取 my_db
数据库和 my_schema
架构中的 my_javascript_function(DOUBLE)
UDF 资源对象,然后将 UDF 重命名为 my_other_js_function
,同时将其移动到 my_other_db
数据库和 my_other_schema
架构:
root.databases["my_db"].schemas["my_schema"].user_defined_functions["my_javascript_function(DOUBLE)"].rename(
"my_other_js_function",
target_database = "my_other_database",
target_schema = "my_other_schema"
)
删除 UDF¶
您可以使用 UserDefinedFunctionResource
对象删除 UDF。
以下示例中的代码会获取 my_javascript_function(DOUBLE)
UDF 资源对象,然后删除 UDF:
my_udf_res = root.databases["my_db"].schemas["my_schema"].user_defined_functions["my_javascript_function(DOUBLE)"]
my_udf_res.drop()
管理存储过程¶
您可以管理存储过程;您可以编写这些存储过程以使用执行 SQL 的过程代码来扩展系统。在存储过程中,可以使用程序化结构来执行分支和循环。创建存储过程后,便可以多次重复使用。有关更多信息,请参阅 存储过程概述。
Snowflake Python APIs 使用两种不同类型来表示过程:
Procedure
:显示过程的属性,例如其名称、实参列表、返回类型和过程定义。ProcedureResource
:显示可用于提取相应Procedure
对象、调用过程和删除过程的方法。
创建过程¶
要创建过程,请先创建 Procedure
对象,然后根据 API Root
对象创建 ProcedureCollection
对象。使用 ProcedureCollection.create
将新过程添加到 Snowflake。
以下示例中的代码会创建一个 Procedure
对象,其表示 my_db
数据库和 my_schema
架构中名为 my_procedure
且具有指定的实参、返回类型和 SQL 过程定义的过程:
from snowflake.core.procedure import Argument, ColumnType, Procedure, ReturnTable, SQLFunction
procedure = Procedure(
name="my_procedure",
arguments=[Argument(name="id", datatype="VARCHAR")],
return_type=ReturnTable(
column_list=[
ColumnType(name="id", datatype="NUMBER),
ColumnType(name="price", datatype="NUMBER"),
]
),
language_config=SQLFunction(),
body="
DECLARE
res RESULTSET DEFAULT (SELECT * FROM invoices WHERE id = :id);
BEGIN
RETURN TABLE(res);
END;
",
)
procedures = root.databases["my_db"].schemas["my_schema"].procedures
procedures.create(procedure)
调用过程¶
您可以使用 ProcedureResource
对象调用过程。
以下示例中的代码会获取 my_procedure(NUMBER, NUMBER)
过程资源对象,创建一个 CallArgumentList
对象,然后使用该实参列表调用该过程。
备注
在获取过程资源对象时,必须指定完整签名(过程名称及其参数数据类型),因为过程可能会重载。
from snowflake.core.procedure import CallArgument, CallArgumentList
procedure_reference = root.databases["my_db"].schemas["my_schema"].procedures["my_procedure(NUMBER, NUMBER)"]
call_argument_list = CallArgumentList(call_arguments=[
CallArgument(name="id", datatype="NUMBER", value=1),
])
procedure_reference.call(call_argument_list)
获取过程详细信息¶
您可以通过调用 ProcedureResource.fetch
方法获取有关过程的信息,该方法会返回一个 Procedure
对象。
以下示例中的代码会获取有关 my_db
数据库和 my_schema
架构中的 my_procedure(NUMBER, NUMBER)
过程的信息:
my_procedure = root.databases["my_db"].schemas["my_schema"].procedures["my_procedure(NUMBER, NUMBER)"].fetch()
print(my_procedure.to_dict())
列出程序¶
您可以使用 ProcedureCollection.iter
方法列出过程,该方法会返回 Procedure
对象的 PagedIter
迭代器。
以下示例中的代码会列出 my_db
数据库和 my_schema
架构中名称以 my
开头的过程,然后打印每个过程的名称:
procedure_iter = root.databases["my_db"].schemas["my_schema"].procedures.iter(like="my%")
for procedure_obj in procedure_iter:
print(procedure_obj.name)
删除过程¶
您可以使用 ProcedureResource
对象删除过程。
以下示例中的代码会获取 my_db
数据库和 my_schema
架构中的 my_procedure(NUMBER, NUMBER)
过程资源对象,然后删除该过程。
my_procedure_res = root.databases["my_db"].schemas["my_schema"].procedures["my_procedure(NUMBER, NUMBER)"]
my_procedure_res.drop()