在 Clean Room 中上传和运行自定义函数

概述

您可以将自定义 Python UDFs 和 UDTFs 上传到您的 Clean Room 中,然后通过模板运行它们来执行复杂的数据操作。这些操作包括机器学习或查询中的自定义数据操作,是单步或多步 流程 的一部分。Python 是唯一支持自定义 UDFs 的编码语言。

您上传的代码可以导入和使用 经批准的 Python 包捆绑包 (https://repo.anaconda.com/pkgs/snowflake/) 和 Snowpark API 中的包。

提供商和使用者都可以将自定义 Python 代码上传到 Clean Room,不过两者的流程可能有所不同。每个上传的代码捆绑包都可以定义多个相互调用的函数,但一个捆绑包只能公开一个处理程序函数。这个处理程序函数可以通过使用 Clean Room 的任何人创建或运行的模板来调用。如果代码创建了内部表,则可以按 设计多步流程 中所述访问这些表。

您无法查看上传的代码,甚至无法查看自己的代码。已上传的代码无法删除,但 可以更新

本页向您展示如何以提供商或使用者的身份上传和运行自定义 Python UDFs 和 UDTFs。

小技巧

有关如何在 Clean Room 中开发自己的 Python UDFs 的背景信息,请参阅以下主题:

更新自定义函数

您可以上传或覆盖已上传的现有函数,但不能删除现有函数。

如果上传的函数与先前上传的函数签名完全相同,则会覆盖现有函数。签名由外部处理程序的函数名称(不区分大小写)以及所有参数的数据类型(顺序相同)构成。参数名称无关紧要。您不能覆盖其他账户上传的函数。

由于在更新函数时,签名必须匹配,因此您无法更改现有函数的签名:如果您上传函数 foo(name VARIANT age INTEGER),然后又上传函数 foo(name VARIANT age FLOAT),则除了第一个函数外,第二个函数也会添加到 Clean Room 中,因为实参类型不同。

提供商提交的代码

提供商提交的函数可以作为内联代码上传,也可以从 Snowflake 暂存区上传。本文会介绍这两种技术。

您上传的代码可以原生导入并使用 一组经批准的 Python 包 (https://repo.anaconda.com/pkgs/snowflake/) 中的包。如果您需要非默认包,则必须 在 Clean Room 中使用 Snowpark Container Services 来托管代码。

小技巧

更新提供商编写的代码后,您应该更新默认发布指令,然后调用 provider.create_or_update_cleanroom_listing 将更改传播给使用者。如果您未调用 provider.create_or_update_cleanroom_listing,则对于当前正在使用 Clean Room 的使用者,您的默认版本将不会更新。

下面简要介绍了提供商如何向 Clean Room 添加代码:

  1. 提供商以正常方式创建和配置 Clean Room。

  2. 提供商通过调用 provider.load_python_into_cleanroom 上传代码。您可以直接在该过程中 内联上传代码,也可以 将代码文件上传到某个暂存区,然后将暂存区位置提供给该过程。

    尽管您的代码可以包含多个函数,但每次上传只能公开一个处理程序。如果您向模板公开多个函数,请通过调用 provider.load_python_into_cleanroom 上传每个处理程序。

  3. 每次成功上传代码后,都会生成一个 新的 Clean Room 补丁版本。然后,您必须使用新的补丁号调用 provider.set_default_release_directive,以递增默认版本号。如果 Clean Room 对外公开,则在安装代码之前会运行安全检查,并且在递增默认版本号之前,您必须调用 provider.view_cleanroom_scan_status 以确认安全检查已通过。

  4. 您可以创建并上传一个调用您的代码的自定义模板。模板使用 cleanroom 范围调用处理程序函数,即 cleanroom.my_function。例如,调用您上传的自定义 simple_add 函数的模板可能如下所示:

    SELECT cleanroom.simple_add(1, 2), cleanroom.simple_add({{ price | sqlsafe | int }}, {{ tax | sqlsafe | int }})
    
    Copy
  5. 使用者按照运行其他任何模板的相同方式运行您的模板。

    小技巧

    如果使用者在安装包含自定义代码的 Clean Room 时遇到挂载错误,这可能表明代码中存在语法错误。

您可以在 提供商编写的代码示例部分 中找到演示此流程的代码示例。

关于版本控制的重要注意事项

提供商每次上传函数时,都会递增补丁编号(补丁编号的数量上限为 99 个)。因此,在将代码添加到 Clean Room 之前,应尽可能对其进行彻底的测试和调试,以减少开发期间的版本更新。

如果您确实更新了补丁编号,则使用 Clean Room UI 的客户可能需要刷新页面才能看到更改。使用 API 的客户应该会立即看到更改,但实际上可能会有延迟,具体取决于可用资源。了解有关 Clean Room 版本控制的更多信息。

上传提供商编写的内联函数

您可以在 provider.load_python_into_cleanroomcode 参数中通过内联的方式上传代码。以下是通过内联的方式上传简单函数的示例:

CALL samooha_by_snowflake_local_db.provider.load_python_into_cleanroom(
$cleanroom_name,
'simple_add',                         -- Name used to call the UDF from a template.
['first INTEGER', 'second INTEGER'],  -- Arguments of the UDF, specified as '<variable_name> <SQL type>' pairs.
['numpy', 'pandas'],                  -- Packages imported by the UDF.
'INTEGER',                            -- SQL return type of UDF.
'add_two',                            -- Handler function in your code called when external name is called.
$$
import numpy as np   # Not used, but you can load supported packages.
import pandas as pd

def add_two(first, second):
    return first + second
$$
);
Copy

调用方模板会调用 cleanroom.simple_add 以调用此函数。提供商示例 演示如何上传内联代码。

从暂存区上传提供商编写的函数

您可以将 Python 文件上传到 Clean Room 暂存区,并在调用 provider.load_python_into_cleanroom 时引用该暂存区。从暂存区加载代码可以让您在本地系统中使用编辑器开发代码,避免在通过内联方式加载代码时出现复制/粘贴错误,还可以更好地实施版本控制。请注意,您可以在一次过程调用中上传多个文件,但每次上传只能公开一个处理程序函数。

在您调用 load_python_into_cleanroom 时,代码会从一个暂存区加载到 Clean Room 中;后续对暂存区中代码的更改不会传播到 Clean Room。

要将 UDF 上传到暂存区,请执行以下操作:

  1. 创建 .py 文件,并将其置于可以上传到 Snowsight 暂存区的位置。

  2. 要获取 Clean Room 的暂存区名称,请调用 provider.get_stage_for_python_files($cleanroom_name)。Clean Room 可以访问此暂存区,您不能使用您创建的任意暂存区。

  3. 将 .py 文件上传到 Clean Room 的暂存区。有 几种方法可以实现此目的,包括使用 CLI、Snowsight 或特定语言的驱动程序。

  4. 使用暂存区位置、处理程序、外部名称、实参和返回类型调用 provider.load_python_into_cleanroom。现在,Clean Room 中的模板可以调用此函数。

以下示例代码显示如何从暂存区将代码加载到 Clean Room 中。

-- Save the following code as reverser.py:
--import numpy as np
--def main(some_string):
--  '''Return the reverse of a string plus a random number 1-10'''
--  return some_string[::-1] + str(np.random.randint(1,10))

-- Get the stage for your clean room.
CALL samooha_by_snowflake_local_db.provider.get_stage_for_python_files($cleanroom_name);

-- Save the file to the stage. Here is how to do it by using the Snowflake CLI
PUT file://~/reverser.py <STAGE_NAME> overwrite=True auto_compress=False;

-- Load the code from the stage into the clean room.
CALL samooha_by_snowflake_local_db.provider.load_python_into_cleanroom(
    $cleanroom_name,
    'reverse', -- Name used to call the function
    ['some_string  STRING'], -- Arguments and SQL types
    ['numpy'],               -- Any required packages
    ['/reverser.py'],        -- Relative path to file on stage
    'STRING',                -- Return type
    'reverser.main'          -- <FILE_NAME>.<FUNCTION_NAME>
);

-- Uploading code, even from a stage, increases the patch number.
CALL samooha_by_snowflake_local_db.provider.set_default_release_directive(
  $cleanroom_name, 'V1_0', <NEW_PATCH_NUMBER>);

-- Upload a template that calls the function.
CALL samooha_by_snowflake_local_db.provider.add_custom_sql_template(
    $cleanroom_name,
    $udf_template_name,
    $$
    SELECT
      p.status,
      cleanroom.reverse(p.status)
    FROM SAMOOHA_SAMPLE_DATABASE.DEMO.CUSTOMERS AS p
    LIMIT 100;
    $$
);

-- Switch to the consumer account and run the template to see the results.
Copy

提供商示例 演示了如何从一个暂存区上传代码。

提供商编写的代码示例

以下示例演示如何将提供商编写的 UDFs 和 UDTFs 添加到 Clean Room 中。

下载以下示例,并将其作为工作表文件上传到 Snowflake 账户中。您需要为提供商和使用者开设单独的账户,每个账户都要安装 Clean Room API。按照示例文件中的说明替换信息。

使用者提交的代码

使用者可以提交 UDF 或 UDTF 代码,并通过自定义模板调用它。使用者上传的代码与自定义模板一起捆绑到单个过程中,并在单个过程调用中上传。使用者代码直接绑定到该模板,不能被其他模板调用。

要以使用者身份上传代码,您应该了解 自定义模板语法 以及 如何提交使用者定义的模板

以下是上传自定义使用者代码的步骤概述:

  1. 提供商以标准方式创建 Clean Room,然后邀请使用者。

  2. 使用者以标准方式安装和配置 Clean Room。

  3. 使用者准备一个模板。模板在 cleanroom 命名空间内调用 UDF 或 UDTF。例如,要调用使用者定义的 calculate_tax 函数,一个简单的模板可能如以下代码段所示:

    SELECT {{ cleanroom.calculate_tax(p.cost) }} AS Tax FROM my_db.my_sch.sales AS p;
    
    Copy
  4. 使用者准备 Python 代码。我们建议在代码中使用双引号 ("") 而非单引号 ('),以避免后续需要额外的转义。您的代码可以引用一个 选定的 Python 库捆绑包 (https://repo.anaconda.com/pkgs/snowflake/)。

  5. 使用者将其 Python 代码传递到 consumer.generate_python_request_template 中。该过程以存储过程的形式返回 Python 代码,其中包含自定义 JinjaSQL 模板的占位符。模板中有几个将 $$ 用作多行分隔符的多行字符串。

  6. generate_python_request_template 输出中的模板占位符替换为您的 JinjaSQL 模板。

  7. 在组合模板中,转义任何单引号,如下所示:\'。这是因为单引号将用作整个多行过程字符串最外层的分隔符。例如:

      BEGIN
    
      CREATE OR REPLACE FUNCTION CLEANROOM.custom_compare(min_status STRING, max_status STRING, this_status STRING)
      RETURNS boolean
      LANGUAGE PYTHON
      RUNTIME_VERSION = 3.10
      PACKAGES = (\'numpy\')
    
      HANDLER = \'custom_compare\'
      AS $$
      import numpy as np
    
      def custom_compare(min_status:str, max_status:str, this_status:str):
        statuses = [\'MEMBER\', \'SILVER\', \'GOLD\', \'PLATINUM\']
        return ((statuses.index(this_status) >= statuses.index(min_status)) &
                (statuses.index(this_status) <= statuses.index(max_status)))
      $$;
    
      -- Custom template
      LET SQL_TEXT varchar := $$
      SELECT
        c.status,
        c.hashed_email
      FROM IDENTIFIER( {{ my_table[0] }} ) as c
      WHERE cleanroom.custom_compare({{ min_status }}, {{ max_status }}, c.status);
      $$;
    
      LET RES resultset := (EXECUTE IMMEDIATE :SQL_TEXT);
      RETURN TABLE(RES);
    
      END;
    
    Copy
  8. 使用您的组合模板调用 consumer.create_template_request。在 template_definition 实参中为存储过程提供的代码前后使用单引号 '',而非双美元符号分隔符 $$...$$。例如:

    CALL samooha_by_snowflake_local_db.consumer.create_template_request(
      $cleanroom_name,
      $template_name,
      '
    BEGIN
    
    -- First, define the Python UDF.
    CREATE OR REPLACE FUNCTION CLEANROOM.custom_compare(min_status STRING, max_status STRING, this_status STRING)
    RETURNS boolean
    LANGUAGE PYTHON
    RUNTIME_VERSION = 3.10
    PACKAGES = (\'numpy\')
    
    HANDLER = \'custom_compare\'
    AS $$
    import numpy as np
    
    def custom_compare(min_status:str, max_status:str, this_status:str):
      statuses = [\'MEMBER\', \'SILVER\', \'GOLD\', \'PLATINUM\']
      return ((statuses.index(this_status) >= statuses.index(min_status)) &
              (statuses.index(this_status) <= statuses.index(max_status)))
        $$;
    
    -- Then define and execute the SQL query.
    LET SQL_TEXT varchar := $$
    SELECT
      c.status,
      c.hashed_email
    FROM IDENTIFIER( {{ my_table[0] }} ) as c
    WHERE cleanroom.custom_compare({{ min_status }}, {{ max_status }}, c.status);
    $$;
    
    -- Execute the query and then return the result.
    LET RES resultset := (EXECUTE IMMEDIATE :SQL_TEXT);
    RETURN TABLE(RES);
    
    END;
    ');
    
    Copy
  9. 使用者和提供商继续使用标准 使用者定义的模板流程

    1. 提供商查看 (provider.list_pending_template_requests) 并批准模板请求 (approve_template_request)

    2. 使用者检查请求状态 (consumer.list_template_requests),当状态为 APPROVED 时,运行模板 (consumer.run_analysis)。

使用者编写的代码示例

以下示例演示了如何向 Clean Room 添加由提供商编写的 UDFs。

下载以下示例,并将其作为工作表文件上传到 Snowflake 账户中。您需要为提供商和使用者开设单独的账户,每个账户都要安装 Clean Room API。按照示例文件中的说明替换信息:

语言: 中文