为无服务器任务提供 Python 和 Java 支持¶
无服务器任务 可以调用以下对象类型和函数:用户定义函数 (UDFs) 和用 Python、Java 和 Scala 编写的存储过程。
您可以通过几种不同的方式在任务中使用 Python 或 Java。要了解这些选项之间的区别,请参阅 选择是编写存储过程还是用户定义函数。
用户定义的函数¶
您可以创建 UDFs 以在任务的 AS 子句中调用。可以使用 UDFs 执行 SQL 中不可用的操作。有关 UDFs 的更多信息,请参阅 用户定义的函数概述。
下面的 Python 和 Java 示例创建了一个函数,该函数将输入值加 1。
CREATE OR REPLACE FUNCTION addone(i int)
RETURNS int
LANGUAGE python
RUNTIME_VERSION = '3.8'
HANDLER = 'addone_py'
AS
$$
def addone_py(i):
return i+1
$$;
CREATE OR REPLACE FUNCTION add_one(i int)
RETURNS int
LANGUAGE java
CALLED ON NULL INPUT
HANDLER = 'TestFunc.addOne'
TARGET_PATH = '@~/testfunc.jar'
AS
'class TestFunc {
public static int addOne(int i) {
return i+1;
}
}';
以下示例会创建 my_task2
,以将 my_task1
的返回值加 1。
CREATE OR REPLACE TASK IF NOT EXISTS my_task2
AFTER my_task1
AS
SELECT addone(SYSTEM$GET_PREDECESSOR_RETURN_VALUE());
CREATE OR REPLACE TASK IF NOT EXISTS my_task2
AFTER my_task1
AS
SELECT add_one(SYSTEM$GET_PREDECESSOR_RETURN_VALUE());
存储过程¶
可以创建存储过程来在任务的 AS 子句中调用。存储过程通常用于执行 SQL 语句,以便执行管理操作。有关存储过程的更多信息,请参阅 存储过程概述。
下面的 Python 和 Java 示例接受表名和角色名,以返回一个过滤后的表,其中包含与指定角色匹配的行。
CREATE OR REPLACE PROCEDURE filterByRole(tableName VARCHAR, role VARCHAR)
RETURNS TABLE(id NUMBER, name VARCHAR, role VARCHAR)
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'filter_by_role'
AS
$$
from snowflake.snowpark.functions import col
def filter_by_role(session, table_name, role):
df = session.table(table_name)
return df.filter(col("role") == role)
$$;
CREATE OR REPLACE PROCEDURE filter_by_role(table_name VARCHAR, role VARCHAR)
RETURNS TABLE()
LANGUAGE JAVA
RUNTIME_VERSION = '11'
PACKAGES = ('com.snowflake:snowpark:latest')
HANDLER = 'FilterClass.filterByRole'
AS
$$
import com.snowflake.snowpark_java.*;
public class FilterClass {
public DataFrame filterByRole(Session session, String tableName, String role) {
DataFrame table = session.table(tableName);
DataFrame filteredRows = table.filter(Functions.col("role").equal_to(Functions.lit(role)));
return filteredRows;
}
}
$$;
下面的示例会创建 task2
,以使用从 task1 返回的表和 dev
的角色调用存储过程。
CREATE OR REPLACE TASK IF NOT EXISTS my_task2
AFTER my_task1
AS
CALL filterByRole(SYSTEM$GET_PREDECESSOR_RETURN_VALUE(), 'dev');
CREATE OR REPLACE TASK IF NOT EXISTS my_task2
AFTER my_task1
AS
CALL filter_by_role(SYSTEM$GET_PREDECESSOR_RETURN_VALUE(), 'dev');
SQL AS 子句¶
您还可以直接在任务定义的 AS 子句中定义 Python 或 Java 代码。
下面的示例使用 Python 将 task2
的返回值设置为字符串。
CREATE OR REPLACE TASK IF NOT EXISTS task2
SCHEDULE = '1 minute'
AS
$$
print(Task completed successfully.)
$$
;