为无服务器任务提供 Python 和 Java 支持

无服务器任务 可以调用以下对象类型和函数:用户定义函数 (UDFs) 和用 Python、Java 和 Scala 编写的存储过程。

您可以通过几种不同的方式在任务中使用 Python 或 Java。要了解这些选项之间的区别,请参阅 选择是编写存储过程还是用户定义函数

用户定义的函数

您可以创建 UDFs 以在任务的 AS 子句中调用。可以使用 UDFs 执行 SQL 中不可用的操作。有关 UDFs 的更多信息,请参阅 用户定义的函数概述

下面的 Python 和 Java 示例创建了一个函数,该函数将输入值加 1。

CREATE OR REPLACE FUNCTION addone(i int)
  RETURNS int
  LANGUAGE python
  RUNTIME_VERSION = '3.8'
  HANDLER = 'addone_py'
  AS
    $$
    def addone_py(i):
      return i+1
    $$;
Copy

以下示例会创建 my_task2,以将 my_task1 的返回值加 1。

CREATE OR REPLACE TASK IF NOT EXISTS my_task2
  AFTER my_task1
  AS
    SELECT addone(SYSTEM$GET_PREDECESSOR_RETURN_VALUE());
Copy

存储过程

可以创建存储过程来在任务的 AS 子句中调用。存储过程通常用于执行 SQL 语句,以便执行管理操作。有关存储过程的更多信息,请参阅 存储过程概述

下面的 Python 和 Java 示例接受表名和角色名,以返回一个过滤后的表,其中包含与指定角色匹配的行。

CREATE OR REPLACE PROCEDURE filterByRole(tableName VARCHAR, role VARCHAR)
  RETURNS TABLE(id NUMBER, name VARCHAR, role VARCHAR)
  LANGUAGE PYTHON
  RUNTIME_VERSION = '3.8'
  PACKAGES = ('snowflake-snowpark-python')
  HANDLER = 'filter_by_role'
  AS
    $$
    from snowflake.snowpark.functions import col

    def filter_by_role(session, table_name, role):
      df = session.table(table_name)
      return df.filter(col("role") == role)
    $$;
Copy

下面的示例会创建 task2,以使用从 task1 返回的表和 dev 的角色调用存储过程。

CREATE OR REPLACE TASK IF NOT EXISTS my_task2
  AFTER my_task1
  AS
    CALL filterByRole(SYSTEM$GET_PREDECESSOR_RETURN_VALUE(), 'dev');
Copy

SQL AS 子句

您还可以直接在任务定义的 AS 子句中定义 Python 或 Java 代码。

下面的示例使用 Python 将 task2 的返回值设置为字符串。

CREATE OR REPLACE TASK IF NOT EXISTS task2
  SCHEDULE = '1 minute'
  AS
    $$
    print(Task completed successfully.)
    $$
  ;
Copy
语言: 中文