在 Collaboration Clean Room 中上传并使用自定义函数

简介

任何协作者都可以将自定义 Python UDFs 和 UDTFs 上传到协作中。协作中的模板可以运行这些函数以执行复杂的数据操作。常见用途包括机器学习或在查询中进行自定义数据操作。您上传的代码可以从 批准的 Python 包捆绑包 (https://repo.anaconda.com/pkgs/snowflake/) 和 Snowpark API 包 中导入并使用包。

备注

Python 是唯一支持自定义 UDFs 的编码语言。

以下部分向您展示如何上传和使用自定义函数。

定义和使用自定义代码包

以下是上传和使用自定义函数的方法:

代码提交者:

  1. 通过调用 REGISTER_CODE_SPEC 创建并注册代码

    代码可以内联在规范中,也可以 从暂存区链接

  2. 通过在模板的 code_specs 数组中的 ID,创建一个引用代码包 规范的模板。如下例所示,将此字段添加为 template 和 parameters 字段的同级字段:

     parameters:
       - name: <parameter_name>
         description: <parameter_description>
         required: <true_or_false>
         default: <default_value>
         type: <data_type>
    
     code_specs:             # Optional: List of code bundles used by this template
     - <code_spec_id>        # One or more code spec IDs.
    
     template: |
       <template_content>
    
  3. 注册模板,然后 将模板链接到协作中

    备注

    Snowflake 会扫描上传的代码,看看是否存在安全问题。如果发现安全问题,该代码及其包含的模板将不会被添加到协作中。

分析运行器:

  • 通过调用 RUN 以标准方式运行模板。

重要

在将任何上传的捆绑包部署到 Clean Room 之前,Snowflake 会对其执行安全检查。如果安全检查失败,该模板及其捆绑代码将不会被部署,也无法使用。

要确认带有代码捆绑包的模板已部署并可供使用,请执行以下步骤:

  1. 查找您尝试部署代码捆绑包的洁净室应用程序的名称:

    SHOW APPLICATIONS LIKE 'SFDCR_<collaboration name>';
    
  2. 检查 DESCRIBE APPLICATION 响应中的 upgrade_state 值。当升级状态为 COMPLETE 时,表示安全检查已通过,新模板和捆绑包可供使用。使用 SQL(如下例所示)传入上一步中命令返回的应用程序名称:SQL 代码:

    DESCRIBE APPLICATION <application name>
    

创建并注册代码包规范

上传自定义代码的第一步是创建并注册代码包规范。

自定义函数在 YAML 代码包规范中定义。每个代码包都会公开一个或多个可由模板调用的函数。代码包规范既可以包含规范中的内联代码,也可以 链接到位于 Snowflake 暂存区上的代码

协作者通过调用 REGISTRY.REGISTER_CODE_SPEC 来注册规范,这将返回代码包 ID。任何角色的协作者都可以注册并链接代码包。

代码包链接到协作后,协作中任何能够访问链接该代码包的模板的人员都可以看到该代码包。调用 VIEW_CODE_SPECS 以列出协作中可访问的代码包。

协作中任何能看到代码包的人,都可以在该协作的自有模板中查看并使用它。任何内联代码都可以由协作中的任何成员查看,但协作者无法查看暂存的成品代码。

以下代码包规范公开了名为 normalize_value 的单个 Python UDF,它会调用该规范中定义的 normalize 函数:

CALL SAMOOHA_BY_SNOWFLAKE_LOCAL_DB.REGISTRY.REGISTER_CODE_SPEC(
  $$
  api_version: 2.0.0
  spec_type: code_spec
  name: custom_udf
  version: v1
  functions:
    - name: normalize_value
      type: UDF
      language: PYTHON
      handler: normalize
      arguments:
        - name: value
          type: FLOAT
      returns: FLOAT
      code_body: |
        def normalize(value):
            return value / 100.0
  $$
);

创建并注册调用模板

注册代码规范后,协作者将注册一个使用此代码包的模板。要使用代码包,请在模板的 code_specs 字段中添加代码包规范 ID。

模板使用语法 cleanroom.spec_name$function_name 调用自定义函数。请注意字面量 .$ 名称范围标记。

备注

请使用规范名称(而不是规范 ID)来引用模板中的函数。

在以下示例中,模板使用了来自代码包 custom_udf 的函数 normalize_value

CALL SAMOOHA_BY_SNOWFLAKE_LOCAL_DB.REGISTRY.REGISTER_TEMPLATE(
  $$
  api_version: 2.0.0
  spec_type: template
  name: normalization_template
  version: v1
  type: sql_analysis
  code_specs:
    - custom_udf_v1  -- Imports the code bundle.
  template: |
    SELECT cleanroom.custom_udf$normalize_value(metric, 0, 100)  -- Calls the UDF.
      AS normalized
        FROM {{ source_tables[0] }}
  $$
);

将模板添加到协作

以标准方式将调用函数的模板添加到协作中。有关更多信息,请参阅 模板

当调用模板被添加到协作时,Snowflake 会进行验证并上传到协作中。在安装代码之前,Snowflake 会扫描上传的代码是否存在安全问题。

以下示例展示了向现有协作添加模板的请求:

CALL SAMOOHA_BY_SNOWFLAKE_LOCAL_DB.COLLABORATION.ADD_TEMPLATE_REQUEST(
  'my_collaboration',
  'normalization_template_v1',
  ['consumer']
);

提交代码包的新版本

在账户的所有注册表中,每个注册的代码规范必须具有唯一的名称 + 版本。模板会加载特定名称和版本的代码规范。如果要创建或使用新版本的代码,则必须提交新版本的模板,并在 code_specs 字段中引用新的代码版本。您不需要更改模板正文。例如:

第 1 步: 使用代码包的版本 1:

CALL SAMOOHA_BY_SNOWFLAKE_LOCAL_DB.REGISTRY.REGISTER_TEMPLATE(
  $$
  api_version: 2.0.0
  spec_type: template
  name: normalization_template
  version: v1
  type: sql_analysis
  code_specs:
    - custom_udf_v1  -- Bundle ID includes the version number.
  template: |
    SELECT cleanroom.custom_udf$normalize_value(metric, 0, 100)  -- Calls the UDF.
      AS normalized
        FROM {{ source_tables[0] }}
  $$
);

第 2 步: 更新并注册代码包的新版本,然后更新模板以使用新版本:

CALL SAMOOHA_BY_SNOWFLAKE_LOCAL_DB.REGISTRY.REGISTER_TEMPLATE(
  $$
  api_version: 2.0.0
  spec_type: template
  name: normalization_template
  version: v2        -- Update the template version.
  type: sql_analysis
  code_specs:
    - custom_udf_v2  -- Use the new code bundle.
  template: |
    SELECT cleanroom.custom_udf$normalize_value(metric, 0, 100)  -- No change needed here.
      AS normalized
        FROM {{ source_tables[0] }}
  $$
);

请注意,函数名称不包含版本,因此在上传函数的新版本时,无需更改模板正文中的调用代码。

代码包规范

此规范定义了一个或多个可由模板调用的代码函数或过程的捆绑包。

一个代码包规范最多包含 5 个函数和过程。

代码包规范中的标识符具有以下一般要求:

  • 名称:必须是以字母开头、仅包含字母数字字符和下划线的有效 Snowflake 标识符

  • 带引号的标识符:包含特殊字符的名称支持使用双引号标识符。

  • 区分大小写:不带引号的标识符不区分大小写;带引号的标识符保留大小写。

api_version: 2.0.0              # Required: Must be "2.0.0"
spec_type: code_spec            # Required: Must be "code_spec"
name: <identifier>              # Required: Unique name of this code bundle.
version: <version_id>           # Required: Alphanumeric with underscores (max 20 chars)
description: <description_text> # Optional: Description (max 1,000 chars)

artifacts:                      # Optional: Staged files for import
  - alias: <identifier>         # One or more artifact items...
    stage_path: <stage_path>    # Required: Full stage path. See below for additional requirements.
    description: <description_text>  # Optional: Description (max 500 chars)
    content_hash: <sha256_hash>      # Optional: SHA-256 hash for integrity verification

functions:                      # Required if no procedures defined
  - name: <identifier>          # One or more functions...
    type: UDF | UDTF            # Required: Function type
    language: PYTHON            # Required: Currently only PYTHON supported
    runtime_version: <python_version>  # Optional: Python runtime (3.10 - 3.14)
    handler: <handler>          # Required: Handler function
    arguments:                  # Optional: One or more function arguments
      - name: <arg_name>        # Argument name
        type: <sql_type>        # Snowflake SQL type of this argument
    returns: <sql_type>         # Required: Snowflake return type
    packages:                   # Optional: Package dependencies
      - <package_name>          # One or more package items...
    imports:                    # Optional: Artifact aliases to import
      - <artifact_alias>        # One or more import items...
    code_body: |                # Optional: Inline Python code (max 12 MB)
      <inline_python_code>
    description: <description_text>  # Optional: Description of this function.

procedures:                     # Required if no functions defined
  - name: <identifier>          # One or more procedure items...
    language: PYTHON            # Required: Currently only PYTHON supported
    runtime_version: <python_version>  # Optional: Python runtime version
    handler: <handler>          # Required: Handler function
    arguments:                  # Optional: One or more procedure arguments
      - name: <arg_name>        # Argument name
        type: <sql_type>        # Snowflake SQL type of this argument
    returns: <sql_type>         # Optional: Return type
    packages:                   # Optional: Package dependencies
      - <package_name>          # One or more package items...
    imports:                    # Optional: Artifact aliases to import
      - <artifact_alias>        # One or more import items...
    code_body: |                # Optional: Inline Python code
      # inline python_code ...
    description: <description_text>  # Optional: Description of this procedure.
api_version

所使用的 Collaboration API 的版本。必须是 2.0.0

spec_type

规范类型标识符。必须是 code_spec

name: identifier

此注册表中该代码包规范的唯一名称。必须是有效的 Snowflake 标识符,且最多 75 个字符。在模板中调用函数时,这将用作名称的最后一段:cleanroom.code_spec_name$function_name

version: version_id

自定义版本标识符。必须是带下划线的字母数字,最多 20 个字符。

:samp:`description: {description_text}`(可选

代码包规范的描述(最多 1,000 个字符)。

``artifacts``(可选

可由您的函数或过程导入的暂存文件或包列表,并 可以选择通过处理程序函数公开。每个规范最多 5 个。

alias: identifier

用于在导入中引用此工件的别名。在本规范中引用此别名时,请使用裸别名而不是 cleanroom.spec_name$alias;也就是说,使用裸函数名称来引用本规范中的另一个函数。

stage_path: stage_path

工件文件的完整暂存区路径;例如 @DB.SCHEMA.STAGE/path/file.whl

  • 暂存区必须是内部暂存区。 不支持外部暂存区。

  • 暂存区必须启用 DIRECTORY:包含工件的暂存区必须设置 DIRECTORY = TRUE

  • 暂存区路径格式:必须遵循 @[DB.]SCHEMA.STAGE/path/to/file.ext 格式。

  • 无路径遍历:暂存区路径不能包含 ..\

  • 此工件必须存在:注册代码包时,该文件必须存在于指定的暂存区路径中。

  • 暂存区必须启用 SNOWFLAKE_SSE 服务端加密。 创建或更改暂存区时,请设置 ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE')

  • 如果您推送、删除或更新暂存的代码文件, 则必须调用 ALTER STAGE stage name REFRESH 以确保协作拥有来自暂存区的最新信息。仅在注册代码规范之前支持代码更新,因为此时会分配版本并计算哈希校验和。

:samp:`description: {description_text}`(可选

工件的描述(最多 500 个字符)。

:samp:`content_hash: {sha256_hash}`(可选

用于完整性验证的 SHA-256 哈希值(64 个十六进制字符)。

functions如果未定义过程,则为必填项

UDF 或 UDTF 定义的列表。

name identifier

要向调用模板公开的函数名称。必须是有效的 Snowflake 标识符

type

函数类型。UDFUDTF 之一。

language

函数语言。目前仅支持 PYTHON

:samp:`runtime_version: {python_version}`(可选

要使用的 Python 运行时版本。支持的版本:3.103.14

handler: handler

调用 name 时要在函数代码中调用的处理程序函数名称。

``arguments``(可选

以名称-类型对列表形式表示的函数实参。类型必须是有效的 Snowflake SQL 类型。

returns: sql_type

返回类型。对于 UDFs,请使用 SQL 类型,例如 STRINGFLOAT。对于 UDTFs,请使用 TABLE(column_definitions)

``packages``(可选

此代码使用的包列表。这可以是 这些 Anaconda Python 包 (https://repo.anaconda.com/pkgs/snowflake/) 或 这些 Snowpark API 包 中的任何一个。例如:snowflake-snowpark-pythonnumpy

``imports``(可选

要导入的工件列表。这些必须是本规范中工件列表中的别名。

``code_body``(可选

内联 Python 代码。与暂存导入互斥。最大大小为 12 MB。

:samp:`description: {description_text}`(可选

函数的描述(最多 500 个字符)。

procedures如果未定义函数,则为必填项

存储过程定义列表。字段与 functions 类似,但没有 type 字段。

API 参考

以下过程用于管理协作中的自定义代码包:

REGISTER_CODE_SPEC

架构:

REGISTRY

注册代码包。这会将 Clean Room 环境中的代码存储在 REGISTRY.CODE_SPECS 表中。注册代码规范后,模板即可使用它。

注册的每个代码规范在您账户中的所有注册表中都必须具有唯一的名称和版本。

语法

REGISTER_CODE_SPEC( ['<registry_name>' ,] <code_spec> )

实参

registry_name (可选)

用于注册此代码规范的 自定义注册表 的名称。如果未指定,则在默认账户注册表中注册代码包。

code_spec

以字符串形式表示的 YAML 格式的代码包规范定义。

返回

生成的代码包规范 ID。

示例

在默认注册表中注册代码包:

CALL SAMOOHA_BY_SNOWFLAKE_LOCAL_DB.REGISTRY.REGISTER_CODE_SPEC(
  $$
  api_version: 2.0.0
  spec_type: code_spec
  name: custom_udf
  version: v1
  description: Custom UDF for data normalization

  functions:
    - name: normalize_value
      type: UDF
      language: PYTHON
      runtime_version: "3.10"
      handler: normalize
      arguments:
        - name: value
          type: FLOAT
        - name: min_val
          type: FLOAT
        - name: max_val
          type: FLOAT
      returns: FLOAT
      code_body: |
        def normalize(value, min_val, max_val):
            if max_val == min_val:
                return 0.0
            return (value - min_val) / (max_val - min_val)
  $$
);

在自定义注册表中注册代码包:

CALL SAMOOHA_BY_SNOWFLAKE_LOCAL_DB.REGISTRY.REGISTER_CODE_SPEC(
  'my_custom_registry',
  $$
  api_version: 2.0.0
  spec_type: code_spec
  name: custom_udf
  version: v1
  description: Custom UDF for data normalization

  functions:
    - name: normalize_value
      type: UDF
      language: PYTHON
      runtime_version: "3.10"
      handler: normalize
      arguments:
        - name: value
          type: FLOAT
        - name: min_val
          type: FLOAT
        - name: max_val
          type: FLOAT
      returns: FLOAT
      code_body: |
        def normalize(value, min_val, max_val):
            if max_val == min_val:
                return 0.0
            return (value - min_val) / (max_val - min_val)
  $$
);

访问要求

如果您不使用 SAMOOHA_APP_ROLE 角色,则必须使用被授予以下任何权限的角色。

要在默认注册表中注册代码规范:

  • GRANT_PRIVILEGE_ON_ACCOUNT_TO_ROLE('REGISTER CODE SPEC', 'role name')

要在自定义注册表中注册项目,请执行以下操作:

  • 您对自己创建的任何自定义注册表都具有读写权限。

  • 要访问其他用户创建的自定义注册表,您需要 GRANT_PRIVILEGE_ON_OBJECT_TO_ROLE('REGISTER', 'REGISTRY', 'MY_REGISTRY', 'role name')


VIEW_REGISTERED_CODE_SPECS

架构:

REGISTRY

列出此角色在本地账户注册表中注册的所有代码包规范。

语法

VIEW_REGISTERED_CODE_SPECS( [ '<registry_name>' ] )

实参

registry_name (可选)

要从中列出代码规范的 自定义注册表 的名称。如果未指定,则列出默认账户注册表中的代码规范。

返回

列出您在此账户中注册的所有代码包详细信息的表。该表包括以下各列:

  • CODE_SPEC_ID:代码包规范的 ID。

  • NAME:选择使用 时默认使用的角色和仓库。代码包规范名称。

  • VERSION:选择使用 时默认使用的角色和仓库。代码包规范版本。

  • CODE_SPEC:选择使用 时默认使用的角色和仓库。代码包规范的完整 YAML 规范。

示例

CALL SAMOOHA_BY_SNOWFLAKE_LOCAL_DB.REGISTRY.VIEW_REGISTERED_CODE_SPECS();

访问要求

如果您不使用 SAMOOHA_APP_ROLE 角色,则必须使用被授予以下任何权限的角色。

要查看默认注册表中的项目,请执行以下操作:

  • GRANT_PRIVILEGE_ON_ACCOUNT_TO_ROLE('VIEW REGISTERED CODE SPECS', 'role name')

  • GRANT_PRIVILEGE_ON_ACCOUNT_TO_ROLE('REVIEW COLLABORATION', 'role name')

  • GRANT_PRIVILEGE_ON_ACCOUNT_TO_ROLE('CREATE COLLABORATION', 'role name')

要查看特定注册表中的项目:

  • 您对自己创建的任何自定义注册表都具有读写权限。

  • 要访问其他用户创建的自定义注册表,您需要 GRANT_PRIVILEGE_ON_OBJECT_TO_ROLE('READ', 'REGISTRY', 'MY_REGISTRY', 'role name')


VIEW_CODE_SPECS

架构:

COLLABORATION

返回由您创建的或可以在指定协作中运行的任何模板引用的所有代码包规范。

语法

VIEW_CODE_SPECS( <collaboration_name> )

实参

collaboration_name

协作的 ID。

返回

列出指定协作中可用的代码包的表。该表包括以下各列:

  • CODE_SPEC_ID:此代码包规范的 ID。

  • CODE_SPEC:选择使用 时默认使用的角色和仓库。代码包规范的完整 YAML 规范。

  • SHARED_BY:选择使用 时默认使用的角色和仓库。共享代码包规范的协作者别名。

示例

CALL SAMOOHA_BY_SNOWFLAKE_LOCAL_DB.COLLABORATION.VIEW_CODE_SPECS(
  $collaboration_id
);

访问要求

如果您不使用 SAMOOHA_APP_ROLE 角色,则必须使用被授予以下任意权限的角色:

  • GRANT_PRIVILEGE_ON_OBJECT_TO_ROLE('VIEW CODE SPECS', 'COLLABORATION', 'collaboration name', 'role name')

  • GRANT_PRIVILEGE_ON_ACCOUNT_TO_ROLE('REVIEW COLLABORATION', 'role name')

  • GRANT_PRIVILEGE_ON_ACCOUNT_TO_ROLE('CREATE COLLABORATION', 'role name')

示例规范

带有代码正文的内联 UDF

一个带有内联 Python 代码的简单 UDF:

api_version: 2.0.0
spec_type: code_spec
name: string_utils
version: v1
description: String utility functions

functions:
  - name: clean_string
    type: UDF
    language: PYTHON
    runtime_version: "3.10"
    handler: clean
    arguments:
      - name: input_str
        type: STRING
    returns: STRING
    description: Removes leading/trailing whitespace and converts to lowercase
    code_body: |
      def clean(input_str):
          if input_str is None:
              return None
          return input_str.strip().lower()

  - name: extract_domain
    type: UDF
    language: PYTHON
    runtime_version: "3.10"
    handler: extract
    arguments:
      - name: email
        type: STRING
    returns: STRING
    description: Extracts domain from email address
    code_body: |
      def extract(email):
          if email is None or '@' not in email:
              return None
          return email.split('@')[1]

UDTF(用户定义的表函数)

此示例 YAML 定义了一个返回多行的 UDTF:

api_version: 2.0.0
spec_type: code_spec
name: tokenizer
version: v1
description: Text tokenization UDTF

functions:
  - name: tokenize_text
    type: UDTF
    language: PYTHON
    runtime_version: "3.10"
    handler: Tokenizer
    arguments:
      - name: text
        type: STRING
      - name: delimiter
        type: STRING
    returns: TABLE(token STRING, position INTEGER)
    description: Splits text into tokens and returns each with its position
    code_body: |
      class Tokenizer:
          def process(self, text, delimiter):
              if text is None:
                  return
              tokens = text.split(delimiter if delimiter else ' ')
              for i, token in enumerate(tokens):
                  yield (token.strip(), i)

带有 wheel 包的暂存工件

请务必阅读 stage_path 文档要求,了解如何在代码规范中链接到暂存代码。

此示例 YAML 使用暂存的 Python wheel 包:

api_version: 2.0.0
spec_type: code_spec
name: ml_scoring
version: v2
description: ML scoring functions using custom library

artifacts:
  - alias: ml_lib
    stage_path: "@MY_DB.PUBLIC.CODE_STAGE/libs/ml_scoring_lib-1.0.0-py3-none-any.whl"
    description: Custom ML scoring library
    content_hash: "a1b2c3d4e5f6..."

functions:
  - name: predict_score
    type: UDF
    language: PYTHON
    runtime_version: "3.10"
    handler: ml_scoring_lib.predictor.predict
    arguments:
      - name: features
        type: ARRAY
    returns: FLOAT
    packages:
      - numpy
      - scikit-learn
    imports:
      - ml_lib
    description: Predicts score using trained ML model

存储过程

此示例 YAML 定义了用于数据处理的存储过程:

api_version: 2.0.0
spec_type: code_spec
name: data_processor
version: v1
description: Data processing procedures

procedures:
  - name: aggregate_metrics
    language: PYTHON
    runtime_version: "3.10"
    handler: process
    arguments:
      - name: table_name
        type: STRING
      - name: group_column
        type: STRING
    returns: STRING
    packages:
      - snowflake-snowpark-python
    description: Aggregates metrics by specified column
    code_body: |
      def process(session, table_name, group_column):
          df = session.table(table_name)
          result = df.group_by(group_column).count()
          result.write.mode("overwrite").save_as_table("aggregated_results")
          return f"Aggregated {df.count()} rows into aggregated_results"

作为暂存工件的多个 Python 文件

请务必阅读 stage_path 文档要求,了解如何在代码规范中链接到暂存代码。

此示例 YAML 使用多个暂存的 Python 源文件:

api_version: 2.0.0
spec_type: code_spec
name: analytics_suite
version: v3
description: Analytics suite with multiple modules

artifacts:
  - alias: utils
    stage_path: "@MY_DB.PUBLIC.CODE_STAGE/analytics/utils.py"
    description: Utility functions
  - alias: transformers
    stage_path: "@MY_DB.PUBLIC.CODE_STAGE/analytics/transformers.py"
    description: Data transformation functions
  - alias: validators
    stage_path: "@MY_DB.PUBLIC.CODE_STAGE/analytics/validators.py"
    description: Validation functions

functions:
  - name: transform_and_validate
    type: UDF
    language: PYTHON
    runtime_version: "3.10"
    handler: transformers.transform_validate
    arguments:
      - name: data
        type: OBJECT
    returns: OBJECT
    imports:
      - utils
      - transformers
      - validators
    description: Transforms and validates input data

代码包故障排除

错误:

CodeSpecAlreadyExistsException

原因:

已注册具有相同名称和版本的代码包规范。

解决方案:

使用其他版本或更新现有版本。


 

错误:

SpecValidationError

原因:

YAML 不符合架构。

解决方案:

检查必填字段和格式。


 

错误:

CodeSpecStageNotAccessibleError

原因:

工件中引用的暂存区不可访问。

解决方案:

授予对暂存区的访问权限或验证暂存区是否存在。


 

错误:

CodeSpecArtifactNotFoundAtStageError

原因:

在指定的暂存区路径中找不到文件。

解决方案:

注册前将文件上传到暂存区。


 

错误:

StageDirectoryNotEnabledError

原因:

暂存区没有启用 DIRECTORY。

解决方案:

在暂存区上启用目录:ALTER STAGE ... SET DIRECTORY = (ENABLE = TRUE)


 

错误:

CodeSpecNotFoundForOwnerException

原因:

模板引用了未注册的代码包规范。

解决方案:

在注册模板之前注册代码包规范。