代码捆绑包

任何合作者都可以将自定义的 Python 过程、UDFs 或 UDTFs 与协作模板捆绑在一起。模板随后会引用这些捆绑的代码,以在协作中执行复杂的数据操作。常见用途包括机器学习或在查询中进行自定义数据操作。您上传的代码可以导入和使用 经批准的 Python 包捆绑包 (https://repo.anaconda.com/pkgs/snowflake/) 和 Snowpark API 中的包。

自定义代码只能通过模板调用,不能直接调用。

备注

代码捆绑包仅支持 Python 编程语言。

以下部分向您展示如何上传和使用代码捆绑包。

实施自定义代码捆绑包

以下是上传和使用代码捆绑包的方法:

代码提交者:

  1. 通过调用 REGISTER_CODE_SPEC 创建并注册代码

    代码可以内联在规范中,也可以 从暂存区链接

  2. 通过在模板的 code_specs 数组中的 ID,创建一个引用代码包 规范的模板。如下例所示,将此字段添加为 template 和 parameters 字段的同级字段:

     parameters:
       - name: <parameter_name>
         description: <parameter_description>
         required: <true_or_false>
         default: <default_value>
         type: <data_type>
    
     code_specs:             # Optional: List of code bundles used by this template
     - <code_spec_id>        # One or more code spec IDs.
    
     template: |
       <template_content>
    
  3. 注册模板,然后 将模板链接到协作中

分析运行器:

  • 通过调用 RUN 以标准方式运行模板。

重要

在将任何上传的捆绑包部署到 Clean Room 之前,Snowflake 会对其执行安全检查。如果安全检查失败,该模板及其捆绑代码将不会被部署,也无法使用。

要确认带有代码捆绑包的模板已部署并可供使用,请执行以下步骤:

  1. 查找您尝试部署代码捆绑包的洁净室应用程序的名称:

    SHOW APPLICATIONS LIKE 'SFDCR_<collaboration name>';
    
  2. 检查 DESCRIBE APPLICATION 响应中的 upgrade_state 值。当升级状态为 COMPLETE 时,表示安全检查已通过,新模板和捆绑包可供使用。使用 SQL(如下例所示)传入上一步中命令返回的应用程序名称:SQL 代码:

    DESCRIBE APPLICATION <application name>
    

创建并注册代码包规范

上传自定义代码的第一步是创建并注册代码包规范。

自定义函数在 YAML 代码包规范中定义。每个代码包都会公开一个或多个可由模板调用的函数。代码包规范既可以包含规范中的内联代码,也可以 链接到位于 Snowflake 暂存区上的代码

协作者通过调用 REGISTRY.REGISTER_CODE_SPEC 来注册规范,这将返回代码包 ID。

在引用代码捆绑包的模板链接到协作后,协作中任何能够访问链接该代码包的模板的人员都可以看到该代码包。调用 VIEW_CODE_SPECS 以列出协作中可访问的代码包。

协作中任何能看到代码包的人,都可以在该协作的自有模板中查看并使用它。任何内联代码都可以由协作中的任何成员查看,但协作者无法查看暂存的成品代码。协作者需要确保所引用工件的 content_hash 相匹配,以进行代码完整性验证。

以下代码包规范公开了名为 normalize_value 的单个 Python UDF,它会调用该规范中定义的 normalize 函数:

CALL SAMOOHA_BY_SNOWFLAKE_LOCAL_DB.REGISTRY.REGISTER_CODE_SPEC(
  $$
  api_version: 2.0.0
  spec_type: code_spec
  name: custom_udf
  version: v1
  functions:
    - name: normalize_value
      type: UDF
      language: PYTHON
      handler: normalize
      arguments:
        - name: value
          type: FLOAT
      returns: FLOAT
      code_body: |
        def normalize(value):
            return value / 100.0
  $$
);

创建并注册调用模板

注册代码规范后,协作者将注册一个使用此代码包的模板。要使用代码包,请在模板的 code_specs 字段中添加代码包规范 ID。将此模板添加到协作中也将导致捆绑的代码在协作中可用。

模板使用语法 cleanroom.spec_name$function_name 调用自定义函数。请注意字面量 .$ 名称范围标记。

备注

请使用规范名称(而不是规范 ID)来引用模板中的函数。这样您就可以快速更新代码捆绑包的版本,而无需更改模板中对该代码捆绑包的所有引用。

在以下示例中,模板使用了来自代码包 custom_udf 的函数 normalize_value

CALL SAMOOHA_BY_SNOWFLAKE_LOCAL_DB.REGISTRY.REGISTER_TEMPLATE(
  $$
  api_version: 2.0.0
  spec_type: template
  name: normalization_template
  version: v1
  type: sql_analysis
  code_specs:
    - custom_udf_v1  -- Imports the code bundle.
  template: |
    SELECT cleanroom.custom_udf$normalize_value(100)  -- Calls the UDF.
      AS normalized
        FROM {{ source_tables[0] }}
  $$
);

将模板添加到协作

以标准方式将调用函数的模板添加到协作中。有关更多信息,请参阅 模板

当调用模板被添加到协作时,Snowflake 会进行验证并上传到协作中。以下示例展示了向现有协作添加模板的请求:

CALL SAMOOHA_BY_SNOWFLAKE_LOCAL_DB.COLLABORATION.ADD_TEMPLATE_REQUEST(
  'my_collaboration',
  'normalization_template_v1',
  ['consumer']
);

备注

使用代码捆绑包安装模板会触发 Snowflake 安全检查,并发布底层 Clean Room 的新补丁。在此过程完成且补丁安装成功之前,该模板将不可用、也无法使用。

要查看补丁安装进度,请执行以下步骤:

  1. 查找 Clean Room 应用程序的名称。通常,该名称会是 SFDCR_<clean room name>,但您可以通过搜索来确认:

    -- Find the exact name of the clean room application.
    SHOW APPLICATIONS LIKE 'SFDCR_%';
    
  2. 检查补丁安装的状态。在以下查询中等待 upgrade_state 变为 COMPLETE:

    DESCRIBE APPLICATION SFDCR_<application name>;
    

代码版本管理

在账户的所有注册表中,每个注册的代码规范必须具有唯一的名称 + 版本。模板会加载特定名称和版本的代码规范。如果要创建或使用新版本的代码,则必须提交新版本的模板,并在 code_specs 字段中引用新的代码版本。您不需要更改模板正文。例如:

第 1 步: 使用代码包的版本 1:

CALL SAMOOHA_BY_SNOWFLAKE_LOCAL_DB.REGISTRY.REGISTER_TEMPLATE(
  $$
  api_version: 2.0.0
  spec_type: template
  name: normalization_template
  version: v1
  type: sql_analysis
  code_specs:
    - custom_udf_v1  -- Bundle ID includes the version number.
  template: |
    SELECT cleanroom.custom_udf$normalize_value(100)  -- Calls the UDF.
      AS normalized
        FROM {{ source_tables[0] }}
  $$
);

第 2 步: 更新并注册代码包的新版本,然后更新模板以使用新版本:

CALL SAMOOHA_BY_SNOWFLAKE_LOCAL_DB.REGISTRY.REGISTER_TEMPLATE(
  $$
  api_version: 2.0.0
  spec_type: template
  name: normalization_template
  version: v2        -- Update the template version.
  type: sql_analysis
  code_specs:
    - custom_udf_v2  -- Use the new code bundle.
  template: |
    SELECT cleanroom.custom_udf$normalize_value(100)  -- No change needed here.
      AS normalized
        FROM {{ source_tables[0] }}
  $$
);

请注意,函数名称不包含版本,因此在上传函数的新版本时,无需更改模板正文中的调用代码。

示例规范

带有代码正文的内联 UDF

一个带有内联 Python 代码的简单 UDF:

api_version: 2.0.0
spec_type: code_spec
name: string_utils
version: v1
description: String utility functions

functions:
  - name: clean_string
    type: UDF
    language: PYTHON
    runtime_version: "3.10"
    handler: clean
    arguments:
      - name: input_str
        type: STRING
    returns: STRING
    description: Removes leading/trailing whitespace and converts to lowercase
    code_body: |
      def clean(input_str):
          if input_str is None:
              return None
          return input_str.strip().lower()

  - name: extract_domain
    type: UDF
    language: PYTHON
    runtime_version: "3.10"
    handler: extract
    arguments:
      - name: email
        type: STRING
    returns: STRING
    description: Extracts domain from email address
    code_body: |
      def extract(email):
          if email is None or '@' not in email:
              return None
          return email.split('@')[1]

UDTF(用户定义的表函数)

此示例 YAML 定义了一个返回多行的 UDTF:

api_version: 2.0.0
spec_type: code_spec
name: tokenizer
version: v1
description: Text tokenization UDTF

functions:
  - name: tokenize_text
    type: UDTF
    language: PYTHON
    runtime_version: "3.10"
    handler: Tokenizer
    arguments:
      - name: text
        type: STRING
      - name: delimiter
        type: STRING
    returns: TABLE(token STRING, position INTEGER)
    description: Splits text into tokens and returns each with its position
    code_body: |
      class Tokenizer:
          def process(self, text, delimiter):
              if text is None:
                  return
              tokens = text.split(delimiter if delimiter else ' ')
              for i, token in enumerate(tokens):
                  yield (token.strip(), i)

带有 wheel 包的暂存工件

请务必阅读 stage_path 文档要求,了解如何在代码规范中链接到暂存代码。

此示例 YAML 使用暂存的 Python wheel 包:

api_version: 2.0.0
spec_type: code_spec
name: ml_scoring
version: v2
description: ML scoring functions using custom library

artifacts:
  - alias: ml_lib
    stage_path: "@MY_DB.PUBLIC.CODE_STAGE/libs/ml_scoring_lib-1.0.0-py3-none-any.whl"
    description: Custom ML scoring library
    content_hash: "a1b2c3d4e5f6..."

functions:
  - name: predict_score
    type: UDF
    language: PYTHON
    runtime_version: "3.10"
    handler: ml_scoring_lib.predictor.predict
    arguments:
      - name: features
        type: ARRAY
    returns: FLOAT
    packages:
      - numpy
      - scikit-learn
    imports:
      - ml_lib
    description: Predicts score using trained ML model

存储过程

此示例 YAML 定义了用于数据处理的存储过程:

api_version: 2.0.0
spec_type: code_spec
name: data_processor
version: v1
description: Data processing procedures

procedures:
  - name: aggregate_metrics
    language: PYTHON
    runtime_version: "3.10"
    handler: process
    arguments:
      - name: table_name
        type: STRING
      - name: group_column
        type: STRING
    returns: STRING
    packages:
      - snowflake-snowpark-python
    description: Aggregates metrics by specified column
    code_body: |
      def process(session, table_name, group_column):
          df = session.table(table_name)
          result = df.group_by(group_column).count()
          result.write.mode("overwrite").save_as_table("aggregated_results")
          return f"Aggregated {df.count()} rows into aggregated_results"

作为暂存工件的多个 Python 文件

请务必阅读 stage_path 文档要求,了解如何在代码规范中链接到暂存代码。

此示例 YAML 使用多个暂存的 Python 源文件:

api_version: 2.0.0
spec_type: code_spec
name: analytics_suite
version: v3
description: Analytics suite with multiple modules

artifacts:
  - alias: utils
    stage_path: "@MY_DB.PUBLIC.CODE_STAGE/analytics/utils.py"
    description: Utility functions
  - alias: transformers
    stage_path: "@MY_DB.PUBLIC.CODE_STAGE/analytics/transformers.py"
    description: Data transformation functions
  - alias: validators
    stage_path: "@MY_DB.PUBLIC.CODE_STAGE/analytics/validators.py"
    description: Validation functions

functions:
  - name: transform_and_validate
    type: UDF
    language: PYTHON
    runtime_version: "3.10"
    handler: transformers.transform_validate
    arguments:
      - name: data
        type: OBJECT
    returns: OBJECT
    imports:
      - utils
      - transformers
      - validators
    description: Transforms and validates input data