Snowflake Data Clean Room:基于 Python 的安全模板

本主题介绍了以编程方式设置 Clean Room、与使用者共享 Clean Room 以及以使用加载到 Clean Room 中的安全 Python UDFs 的方式对其运行分析所需的提供商和使用者流程。在此流程中,提供者使用 API 将安全的Python代码加载到洁净室中,该 API 使底层Python代码对消费者完全保密。

此流程将两个 Python 函数加载到 Clean Room 中,以进行一些自定义数据处理和聚合。然后,在充当数据流粘合剂的自定义 SQL Jinja 模板中调用这些 Python UDFs。模板本身按 Python UDFs 创建的自定义分组计算聚合。

除上述内容外,该流程的关键方面还有:

  1. 提供商:

    a.将 2 个机密的 Python UDFs 安全加载到一个新的 Clean Room。

    b.使用 Python UDFs 创建自定义 SQL Jinja 分析模板。

    c.与使用者共享。

  2. 使用者:

    a.检查 Clean Room 中提供的模板。

    b.使用该模板在 Clean Room 中运行分析。

先决条件

您需要两个独立的 Snowflake 账户才能完成此流程。使用第一个账户执行提供商的命令,然后切换到第二个账户执行使用者的命令。

提供商

备注

应在提供商账户的 Snowflake 工作表中运行以下命令。

设置环境

在利用开发者 APIs 使用 Snowflake Data Clean Room 之前,请执行以下命令来设置 Snowflake 环境。如果您没有 SAMOOHA_APP_ROLE 角色,请联系账户管理员。

use role samooha_app_role;
use warehouse app_wh;
Copy

创建 Clean Room

为 Clean Room 创建名称。输入新的 Clean Room 名称,避免与现有 Clean Room 名称冲突。请注意,Clean Room 名称只能是 字母数字。除空格和下划线外,Clean Room 名称不能包含特殊字符。

set cleanroom_name = 'Custom Secure Python UDF Demo clean room';
Copy

您可以使用上面设置的 Clean Room 名称创建新的 Clean Room。如果上面设置的 Clean Room 名称已经作为现有 Clean Room 存在,则此过程将失败。

运行此过程大约需要 45 秒。

provider.cleanroom_init 的第二个实参是 Clean Room 的分发。它可以是 INTERNAL 或 EXTERNAL。出于测试目的,如果您将 Clean Room 共享给同一组织中的账户,则可以使用 INTERNAL 绕过自动安全扫描,该扫描必须在将应用程序包发布给协作者前进行。但是,如果要将此 Clean Room 共享给其他组织中的账户,则必须使用 EXTERNAL Clean Room 分发。

call samooha_by_snowflake_local_db.provider.cleanroom_init($cleanroom_name, 'INTERNAL');
Copy

要查看安全扫描的状态,请使用:

call samooha_by_snowflake_local_db.provider.view_cleanroom_scan_status($cleanroom_name);
Copy

创建 Clean Room 之后,必须先设置其发布指令,然后才能与协作者共享。但是,如果您的分发设置为 EXTERNAL,则必须先等待安全扫描完成,然后再设置发布指令。您可以在扫描运行时继续运行其余步骤,并在执行 provider.create_cleanroom_listing 步骤之前返回此处。

要设置发布指令,请调用:

call samooha_by_snowflake_local_db.provider.set_default_release_directive($cleanroom_name, 'V1_0', '0');
Copy

跨区域共享

如果 Snowflake 客户的账户与您的账户位于不同区域,则必须启用 Cross-Cloud Auto-Fulfillment 才能与该客户共享 Clean Room。有关与其他区域的使用者协作的额外成本的信息,请参阅 Cross-Cloud Auto-Fulfillment 成本

使用开发者 APIs 时,启用跨区域共享分为两步:

  1. 具有 ACCOUNTADMIN 角色的 Snowflake 管理员为 Snowflake 账户启用 Cross-Cloud Auto-Fulfillment。有关说明,请参阅 与不同区域的账户协作

  2. 您执行 provider.enable_laf_for_cleanroom 命令,为 Clean Room 启用 Cross-Cloud Auto-Fulfillment例如:

    call samooha_by_snowflake_local_db.provider.enable_laf_for_cleanroom($cleanroom_name);
    
    Copy

在为 Clean Room 启用 Cross-Cloud Auto-Fulfillment 后,您可以使用 provider.create_cleanroom_listing 命令照常将使用者添加到列表中。列表会根据需要自动复制到远程云和区域。

将自定义 Python 代码以 UDFs 形式机密加载到 Clean Room 中

本节将向您展示如何将以下 Python 函数加载到 Clean Room 中。

  • assign_group -> UDF 逐行分配一个组 ID。

  • group_agg -> UDF 按 ID 分组并汇总数据的某个方面。

以下 API 允许您将 Python 函数直接定义为 Clean Room 中的内联函数。或者,您可以从已上传到 Clean Room 暂存区的暂存文件中加载 Python。请参阅 API 参考指南 中的示例。

以下代码定义并加载 assign_group UDF,逐行分配一个组 ID:

call samooha_by_snowflake_local_db.provider.load_python_into_cleanroom(
    $cleanroom_name, 
    'assign_group',                      -- Name of the UDF
    ['data variant', 'index integer'],   -- Arguments of the UDF, specified as (variable name, variable type)
    ['numpy', 'pandas'],                 -- Packages UDF will use
    'integer',                           -- Return type of UDF
    'main',                              -- Handler
    $$
import numpy as np
import pandas as pd

def main(data, index):
    df = pd.DataFrame(data) # Just as an example of what we could do
    np.random.seed(0)
    
    # First let's combine the data row and the additional index into a string
    data.append(index)
    data_string = ",".join(str(d) for d in data)

    # Hash it 
    encoded_data_string = data_string.encode()
    hashed_string = hash(encoded_data_string)

    # Return the hashed string
    return hashed_string
    $$
);
Copy

以下代码定义并加载 group_agg UDF,按 ID 分组并汇总数据的某个方面:

call samooha_by_snowflake_local_db.provider.load_python_into_cleanroom(
    $cleanroom_name, 
    'group_agg',              -- Name of the UDF
    ['data variant'],         -- Arguments of the UDF, specified as (variable name, variable type)
    ['pandas'],               -- Packages UDF will use
    'integer',                -- Return type of UDF
    'main',                   -- Handler
    $$
import pandas as pd

def main(s):
    s = pd.Series(s)
    return (s == 'SILVER').sum()
    $$
);
Copy

备注

将 Python 加载到 Clean Room 中会为 Clean Room 创建一个新补丁。如果 Clean Room 分布设置为 EXTERNAL,则需要等待安全扫描完成,然后使用以下方式更新默认发布指令:

-- See the versions available inside the clean room
show versions in application package samooha_cleanroom_Custom_Secure_Python_UDF_Demo_clean_room;

-- Once the security scan is approved, update the release directive to the latest version
call samooha_by_snowflake_local_db.provider.set_default_release_directive($cleanroom_name, 'V1_0', '2');
Copy

从暂存区中的 Python 文件加载 Python 代码

备注

本节是上文中定义 Python 内联的 load_python_into_cleanroom 命令的可选替代方案。这种方案改为从 .py 文件加载 Python,然后加载到 Clean Room 暂存区。

作为定义的替代方案,您还可以从暂存区中的 .py 文件加载 Python。为此,您必须将代码上传到 Clean Room 代码暂存区。重要的是,只有处于 Clean Room 代码暂存区中的文件可供 Clean Room 使用,因此文件不能位于其他地方。文件必须位于以下暂存区中:

ls @samooha_cleanroom_Custom_Secure_Python_UDF_Demo_clean_room.app.code;
Copy

要以这种方式定义 assign_groupgroup_agg UDFs,您可以将以下脚本上传到 Clean Room 暂存区:

在主目录中创建一个名为 ~/assign_group.py 的文件,然后将以下代码粘贴到该文件中:

import numpy as np
import pandas as pd


def main(data, index):
    _ = pd.DataFrame(data)  # Just as an example of what we could do
    np.random.seed(0)

    # First let's combine the data row and the additional index into a string
    data.append(index)
    data_string = ",".join(str(d) for d in data)

    # Hash it
    encoded_data_string = data_string.encode()
    hashed_string = hash(encoded_data_string)

    # Return the hashed string
    return hashed_string
Copy

现在,您需要将代码上传到 Clean Room 暂存区。为此,请将其添加到包含当前发布的 Clean Room 应用程序文件版本的文件夹中。要获取必要的文件夹,可以使用以下步骤:

call samooha_by_snowflake_local_db.provider.get_stage_for_python_files($cleanroom_name);
Copy

这为您提供了要将此文件上传到的暂存区。您可以使用来自 [snowsql] (https://docs.snowflake.com/en/user-guide/snowsql) 的以下命令将此文件上传到暂存区:

PUT file://~/assign_group.py @samooha_cleanroom_Custom_Secure_Python_UDF_Demo_clean_room.app.code/V1_0P1/test_folder overwrite=True auto_compress=False;
Copy

最后,您可以使用以下命令将 Python 加载到 Clean Room 中:

call samooha_by_snowflake_local_db.provider.load_python_into_cleanroom(
    $cleanroom_name,
    'assign_group',                      // Name of the UDF
    ['data variant', 'index integer'],   // Arguments of the UDF, specified as (variable name, variable type)
    ['numpy', 'pandas'],                 // Packages UDF will use
    ['/test_folder/assign_group.py'],                // Name of Python file to import, relative to stage folder uploaded to
    'integer',                           // Return type of UDF
    'assign_group.main'                  // Handler - now scoped to file
);
Copy

同样,您可以使用以下代码创建一个名为 ~/group_agg.py 的文件:

import pandas as pd


def main(s):
    s = pd.Series(s)
    return (s == "SILVER").sum()
Copy

自从上次调用 load_python_into_cleanroom 向 Clean Room 中添加补丁以来,需要上传到的文件夹现已发生了变化。可以通过重新运行以下命令来获取新文件夹:

call samooha_by_snowflake_local_db.provider.get_stage_for_python_files($cleanroom_name);
Copy

然后,将该文件上传到相应的文件夹中:

PUT file://~/group_agg.py @samooha_cleanroom_Custom_Secure_Python_UDF_Demo_clean_room.app.code/V1_0P2 overwrite=True auto_compress=False;
Copy

上传后,可以使用以下命令从该文件创建 Python UDF:

call samooha_by_snowflake_local_db.provider.load_python_into_cleanroom(
    $cleanroom_name,
    'group_agg',                         // Name of the UDF
    ['data variant'],                    // Arguments of the UDF, specified as (variable name, variable type)
    ['pandas'],                          // Packages UDF will use
    ['/group_agg.py'],                   // Name of Python file to import, relative to stage folder uploaded to
    'integer',                           // Return type of UDF
    'group_agg.main'                     // Handler - now scoped to file
);
Copy

使用 UDFs 添加自定义模板

要将自定义分析模板添加到 Clean Room 中,您需要在提供商端和使用者端提供表名占位符,以及提供者端的联接列。在 SQL Jinja 模板中,这些占位符必须始终为以下值:

  • source_table:来自提供商的表名称 数组

  • my_table:来自使用者的表名称 数组

通过使用这些变量,可以使表名称动态化,但是如果需要,也可以使用链接到 Clean Room 的视图的名称将其硬编码到模板中。如果需要,列名称可以硬编码到模板中,也可以通过参数动态设置。如果通过参数设置列名称,请记住,您需要调用参数 dimensionsmeasure_column,这些参数必须是数组,以便根据列策略进行检查。您可以将其作为 SQL Jinja 参数添加到模板中,之后在查询时由使用者传递。联接策略确保使用者不能联接除授权列之外的列。

另外,还可以使用以下筛选器检查自定义 SQL Jinja 模板中的实参是否符合联接和列策略:

  • join_policy:检查字符串值或筛选器子句是否符合联接策略

  • column_policy:检查字符串值或筛选器子句是否符合列策略

  • join_and_column_policy:检查筛选器子句中用于联接的列是否符合联接策略,以及用作筛选器的列是否符合列策略

例如,在子句 {{ provider_id | sqlsafe | join_policy }} 中,将解析 HEM 的输入以检查联接策略中是否有 HEM。注意:只能谨慎使用 sqlsafe 筛选器,因为它允许合作者将纯 SQL 放入模板。

备注

必须使用这些实参来引用所有提供商/使用者表,因为实际关联到 Clean Room 的安全视图名称与表名不同。重要的是,提供商表别名 MUST 为 p(或 p1)、p2、p3、p4 等。使用者表别名 必须 为 c(或 c1)、c2、c3 等。要在 Clean Room 中执行安全策略,必须满足此条件。

请注意,此函数将替换任何具有相同名称的现有模板。如果您想更新任何现有模板,只需使用更新后的模板再次调用这个函数。

该模板首先使用提供商表中一系列列的哈希值来丰富提供商数据。然后,这些丰富后的数据将通过电子邮件内部联接到使用者的数据集中,并传入一个可选的筛选器子句。最后,使用自定义 Python group_agg UDF 作为第一个 UDF 的哈希列的函数来计算一些聚合。

call samooha_by_snowflake_local_db.provider.add_custom_sql_template(
    $cleanroom_name, 
    'prod_custom_udf_template', 
    $$
with enriched_provider_data as (
    select 
        cleanroom.assign_group(array_construct(identifier({{ filter_column | column_policy }}), identifier({{ dimensions[0] | column_policy }})), identifier({{ measure_column[0] | column_policy }})) as groupid,
        {{ filter_column | sqlsafe }},
        hem
    from identifier({{ source_table[0] }}) p
), filtered_data as (
    select 
        groupid,
        identifier({{ filter_column | column_policy }})
    from enriched_provider_data p
    inner join identifier({{ my_table[0] }}) c
    on p.hem = c.hem
    {% if where_clause %}
    where {{ where_clause | sqlsafe }}
    {% endif %}
)
select groupid, cleanroom.group_agg(array_agg({{ filter_column | sqlsafe }})) as count from filtered_data p
group by groupid;
    $$
);
Copy

备注

您可以为上面的 samooha_by_snowflake_local_db.provider.add_custom_sql_template 过程调用添加差分隐私敏感度作为最后一个参数(如果不添加,则默认为 1)

如果要查看 Clean Room 中当前活跃的模板,请调用以下过程。

call samooha_by_snowflake_local_db.provider.view_added_templates($cleanroom_name);
Copy

对每个表设置列策略

显示关联的数据以查看表中存在的列。要查看前 10 行,请调用以下过程。

select * from SAMOOHA_SAMPLE_DATABASE_NAV2.DEMO.CUSTOMERS limit 10;
Copy

为每个表和模板组合设置使用者可以分组、汇总(例如 SUM/AVG)和通常用于分析的列。这提供了灵活性,使同一个表可以根据基础模板允许不同的列选择。只有在添加模板后才可调用此函数。

请注意,列策略是 仅替换,因此如果再次调用函数,新的列策略会完全替换之前设置的列策略。

不应将列策略用于 email、HEM、RampID 等身份列,因为您不希望使用者能够按这些列进行分组。在生产环境中,系统会智能推断 PII 列并阻止此操作,但在沙盒环境中此功能不可用。它应该只用于您希望使用者能够汇总和分组的列,如状态、年龄段、地区代码、活动天数等。

请注意,要使“column_policy”和“join_policy”对使用者分析请求执行检查,在 SQL Jinja 模版中,所有列名 MUST 称为 dimensionsmeasure_columns。确保使用这些标签来引用自定义 SQL Jinja 模板中要检查的列。

call samooha_by_snowflake_local_db.provider.set_column_policy($cleanroom_name, [
    'prod_custom_udf_template:SAMOOHA_SAMPLE_DATABASE_NAV2.DEMO.CUSTOMERS:STATUS', 
    'prod_custom_udf_template:SAMOOHA_SAMPLE_DATABASE_NAV2.DEMO.CUSTOMERS:REGION_CODE',
    'prod_custom_udf_template:SAMOOHA_SAMPLE_DATABASE_NAV2.DEMO.CUSTOMERS:AGE_BAND',
    'prod_custom_udf_template:SAMOOHA_SAMPLE_DATABASE_NAV2.DEMO.CUSTOMERS:DAYS_ACTIVE']);
Copy

如果要查看已添加到 Clean Room 的列策略,请调用以下过程。

call samooha_by_snowflake_local_db.provider.view_column_policy($cleanroom_name);
Copy

与使用者共享

最后,通过添加数据使用者的 Snowflake 账户定位器和账户名称,将数据使用者添加到 Clean Room,如下所示。Snowflake 账户名称的格式必须为 <ORGANIZATION>.<ACCOUNT_NAME>。

备注

为了调用以下过程,请确保首先使用 provider.set_default_release_directive 设置发布指令。您可以使用以下方式查看最新的可用版本和补丁:

show versions in application package samooha_cleanroom_Custom_Secure_Python_UDF_Demo_clean_room;
Copy

备注

请注意,此调用大约需要 60 秒才能完成,因为它设置了许多任务来监听和记录来自使用者的请求。

call samooha_by_snowflake_local_db.provider.add_consumers($cleanroom_name, '<CONSMUMER_ACCOUNT_LOCATOR>');
Copy

对于多个使用者账户定位器,可以用逗号分隔的字符串形式传递到 provider.add_consumers 函数中,也可以分别调用 provider.add_consumers

如果要查看已添加到此 Clean Room 的使用者,请调用以下过程。

call samooha_by_snowflake_local_db.provider.view_consumers($cleanroom_name);
Copy

通过以下过程查看最近创建的 Clean Room:

call samooha_by_snowflake_local_db.provider.view_cleanrooms();
Copy

通过以下过程查看最近创建的 Clean Room 的更多洞察信息。

call samooha_by_snowflake_local_db.provider.describe_cleanroom($cleanroom_name);
Copy

创建的任何 Clean Room 也都可以删除。以下命令将完全删除 Clean Room,因此以前有权访问 Clean Room 的任何使用者都将无法再对其进行使用。如果将来需要使用相同名称的 Clean Room,则必须按照上述流程重新初始化。

call samooha_by_snowflake_local_db.provider.drop_cleanroom($cleanroom_name);
Copy

备注

提供商流程现在已完成。切换到使用者账户,继续执行使用者流程。

使用者

备注

应在使用者账户的 Snowflake 工作表中运行以下命令

设置环境

在利用开发者 APIs 使用 Snowflake Data Clean Room 之前,请执行以下命令来设置 Snowflake 环境。如果您没有 SAMOOHA_APP_ROLE 角色,请联系账户管理员。

use role samooha_app_role;
use warehouse app_wh;
Copy

安装 Clean Room

安装 Clean Room 共享后,可以使用以下命令查看可用的 Clean Room 列表。

call samooha_by_snowflake_local_db.consumer.view_cleanrooms();
Copy

为提供商与您共享的 Clean Room 命名。

set cleanroom_name = 'Custom Secure Python UDF Demo clean room';
Copy

以下命令将 Clean Room 安装到具有相关提供商和所选 Clean Room 的使用者账户中。

运行此过程大约需要 45 秒。

call samooha_by_snowflake_local_db.consumer.install_cleanroom($cleanroom_name, '<PROVIDER_ACCOUNT_LOCATOR>');
Copy

安装了 Clean Room 后,提供商必须在启用前完成提供商侧的 Clean Room 设置。您可以通过以下函数查看 Clean Room 的状态。启用 Clean Room 后,您应该能够运行下面的“运行分析”命令。启用 Clean Room 通常需要大约 1 分钟。

call samooha_by_snowflake_local_db.consumer.is_enabled($cleanroom_name);
Copy

运行分析

现在,Clean Room 已经安装完毕,您可以使用“run_analysis”命令运行由提供商提供给 Clean Room 的分析模板。您可以在以下各节中看到如何确定每个字段。

可传递的数据集数量受提供商实施的模板的限制。某些模板需要特定数量的表。模板创建者可以实施其希望支持的要求。

备注

在运行分析之前,您可以更改仓库规模;如果表较大,也可以使用新的、更大的仓库规模。

call samooha_by_snowflake_local_db.consumer.run_analysis(
  $cleanroom_name,               -- cleanroom
  'prod_custom_udf_template',    -- template name

  ['SAMOOHA_SAMPLE_DATABASE_NAV2.DEMO.CUSTOMERS'],    -- consumer tables

  ['SAMOOHA_SAMPLE_DATABASE_NAV2.DEMO.CUSTOMERS'],     -- provider tables

  object_construct(    -- Rest of the custom arguments needed for the template
    'filter_column', 'p.status',            -- One of the SQL Jinja arguments, the column the UDF filters on

    'dimensions', ['p.DAYS_ACTIVE'],
    
    'measure_column', ['p.AGE_BAND'],

    'where_clause', 'c.status = $$GOLD$$'   -- A boolean filter applied to the data
  )
);
Copy

对于在筛选“where_clause”、“dimensions”或“measure_columns”的数据集中引用的每一列,您可以使用 p. 引用提供商表中的字段,使用 c. 引用使用者表中的字段。对多个提供商表,可以使用 p2、p3 等;对多个使用者表,可以使用 c2、c3 等。

如何确定 run_analysis 的输入

要运行分析,需要向 run_analysis 函数传递一些参数。本节将向您展示如何确定要传入的参数。

模板名称

首先,您可以通过调用以下过程来查看支持的分析模板。

call samooha_by_snowflake_local_db.consumer.view_added_templates($cleanroom_name);
Copy

在使用模板运行分析之前,您需要知道要指定哪些实参以及需要哪些类型。对于自定义模板,您可以执行以下操作。

call samooha_by_snowflake_local_db.consumer.view_template_definition($cleanroom_name, 'prod_custom_udf_template');
Copy

这通常还可以包含大量不同的 SQL Jinja 参数。以下功能可解析 SQL Jinja 模板,并将需要在 run_analysis 中指定的实参提取到方便的列表中。

call samooha_by_snowflake_local_db.consumer.get_arguments_from_template($cleanroom_name, 'prod_custom_udf_template');
Copy

数据集名称

如果要查看提供商已添加到 Clean Room 的数据集名称,请调用以下过程。请注意,由于 Clean Room 的安全属性,您无法查看提供商添加到 Clean Room 的数据集中的数据。

call samooha_by_snowflake_local_db.consumer.view_provider_datasets($cleanroom_name);
Copy

您还可以使用以下调用,查看已关联到 Clean Room 的表:

call samooha_by_snowflake_local_db.consumer.view_consumer_datasets($cleanroom_name);
Copy

维度和度量列

在运行分析时,您可能希望对某些列进行筛选、分组和汇总。如果要查看提供商已添加到 Clean Room 的列策略,请调用以下过程。

call samooha_by_snowflake_local_db.consumer.view_provider_column_policy($cleanroom_name);
Copy

常见错误

如果运行分析的结果是出现 Not approved: unauthorized columns used 错误,您可能需要再次查看提供商设置的联接策略和列策略。

call samooha_by_snowflake_local_db.consumer.view_provider_join_policy($cleanroom_name);
call samooha_by_snowflake_local_db.consumer.view_provider_column_policy($cleanroom_name);
Copy

也有可能您已经耗尽了隐私预算,这样一来系统会阻止您执行更多查询。您可以使用以下命令查看剩余的隐私预算。预算会每天重置;Clean Room 提供商如果需要,也可以将其重置。

call samooha_by_snowflake_local_db.consumer.view_remaining_privacy_budget($cleanroom_name);
Copy

您可以通过以下 API 检查 Clean Room 是否启用了差分隐私:

call samooha_by_snowflake_local_db.consumer.is_dp_enabled($cleanroom_name);
Copy
语言: 中文