Snowflake Data Clean Room:安全的 Snowpark 过程

本主题介绍了以编程方式建立 Clean Room、与使用者共享 Clean Room 以及在 Clean Room 上运行分析所需的提供商和使用者流程,这些过程使用从提供商账户加载到 Clean Room 的安全 Snowpark 过程。在此流程中,提供商使用将底层 Python 代码对使用者完全保密的 API 将安全的 Snowpark 过程加载到 Clean Room 中。

在此流程中,Snowpark 过程是对到达率与展示次数进行线性回归分析,以估计斜率。它接收输入表,其中包括提供商账户的展示次数 IDs、用户 IDs 和时间戳,还可以选择性地接收使用者的用户表。如果提供了使用者的用户数据,Snowpark 过程会动态创建 SQL,将展示次数数据连接到使用者的用户数据上,并在 Clean Room 中创建一个中间表,其中包含按天计算的展示次数和到达率。

接下来,在 Snowpark 过程内处理这些来自中间表的数据,并进行回归分析,以估计截距、斜率和一些其他参数。然后将这些数据写入 Clean Room 内的结果表,并将该表的 ID 作为输出提供给用户。最后,使用者可以使用具有此 ID 的 get_results 模板从 Clean Room 获取数据。在 Snowpark 过程结束之前,它会清理在 Clean Room 中创建的所有中间表。

注意:所有中间表都是在 Clean Room 内创建的,因此除了 Snowpark 过程本身外,任何人都无法访问。

除上述内容外,该流程的关键方面还有:

  1. 提供商:

    a.将 Snowpark 过程安全地添加到 Clean Room 中。

    b.添加一个运行 Snowpark 过程的自定义模板和另一个检索结果的模板。

    c.与使用者共享 Clean Room。

  2. 使用者:

    a.运行执行回归分析的模板。

    b.检索分析结果。

先决条件

您需要两个独立的 Snowflake 账户才能完成此流程。使用第一个账户执行提供商的命令,然后切换到第二个账户执行使用者的命令。

提供商

备注

应在提供商账户的 Snowflake 工作表中运行以下命令。

设置环境

在利用开发者 APIs 使用 Snowflake Data Clean Room 之前,请执行以下命令来设置 Snowflake 环境。如果您没有 SAMOOHA_APP_ROLE 角色,请联系账户管理员。

use role samooha_app_role;
use warehouse app_wh;
Copy

创建 Clean Room

为 Clean Room 创建名称。输入新的 Clean Room 名称,避免与现有 Clean Room 名称冲突。请注意,Clean Room 名称只能是 字母数字。除空格和下划线外,Clean Room 名称不能包含特殊字符。

set cleanroom_name = 'Snowpark Demo clean room';
Copy

您可以使用上面设置的 Clean Room 名称创建新的 Clean Room。如果上面设置的 Clean Room 名称已经作为现有 Clean Room 存在,则此过程将失败。

运行此过程大约需要 45 秒。

provider.cleanroom_init 的第二个实参是 Clean Room 的分发。它可以是 INTERNAL 或 EXTERNAL。出于测试目的,如果您将 Clean Room 共享给同一组织中的账户,则可以使用 INTERNAL 绕过自动安全扫描,该扫描必须在将应用程序包发布给协作者前进行。但是,如果要将此 Clean Room 共享给其他组织中的账户,则必须使用 EXTERNAL Clean Room 分发。

call samooha_by_snowflake_local_db.provider.cleanroom_init($cleanroom_name, 'INTERNAL');
Copy

要查看安全扫描的状态,请使用:

call samooha_by_snowflake_local_db.provider.view_cleanroom_scan_status($cleanroom_name);
Copy

创建 Clean Room 之后,必须先设置其发布指令,然后才能与协作者共享。但是,如果您的分发设置为 EXTERNAL,则必须先等待安全扫描完成,然后再设置发布指令。您可以在扫描运行时继续运行其余步骤,并在执行 provider.create_cleanroom_listing 步骤之前返回此处。

要设置发布指令,请调用:

call samooha_by_snowflake_local_db.provider.set_default_release_directive($cleanroom_name, 'V1_0', '0');
Copy

跨区域共享

如果 Snowflake 客户的账户与您的账户位于不同区域,则必须启用 Cross-Cloud Auto-Fulfillment 才能与该客户共享 Clean Room。有关与其他区域的使用者协作的额外成本的信息,请参阅 Cross-Cloud Auto-Fulfillment 成本

使用开发者 APIs 时,启用跨区域共享分为两步:

  1. 具有 ACCOUNTADMIN 角色的 Snowflake 管理员为 Snowflake 账户启用 Cross-Cloud Auto-Fulfillment。有关说明,请参阅 与不同区域的账户协作

  2. 您执行 provider.enable_laf_for_cleanroom 命令,为 Clean Room 启用 Cross-Cloud Auto-Fulfillment例如:

    call samooha_by_snowflake_local_db.provider.enable_laf_for_cleanroom($cleanroom_name);
    
    Copy

在为 Clean Room 启用 Cross-Cloud Auto-Fulfillment 后,您可以使用 provider.create_cleanroom_listing 命令照常将使用者添加到列表中。列表会根据需要自动复制到远程云和区域。

将 Snowpark 过程机密地加载到 Clean Room 中。

本节将向您展示如何将 Snowpark 过程加载到 Clean Room 中。该过程执行以下步骤:

  1. 预处理展示次数数据:创建动态 SQL,将提供商的展示次数数据与使用者的用户数据(如果提供了使用者表)联接起来,按日期计算不同的展示次数计数和到达率,并将其存储在 Clean Room 内的中间表中。如果没有提供使用者表,则使用提供商展示次数表的全部内容。

  2. 加载中间表:中间表作为 Pandas Dataframe 加载到 Snowpark 过程中。

  3. 执行回归分析:使用 statsmodels 库进行回归计算,并将结果以 Pandas 数据帧的形式返回。

  4. 将结果写入 Snowflake 表:将结果写入 Clean Room 内的结果表,并将该表的 ID 后缀返回给使用者。

    a.由于 Snowpark 过程是在 Clean Room 内运行的,因此它直接写入使用者租户的能力有限。相反,为了保证结果更加安全,会将其写入 Clean Room 内的一个表中,并让使用者能够从表中读取结果。

  5. 删除中间表:在 Clean Room 内计算期间创建的不再需要的中间表将在 Snowpark 过程结束前删除。

以下 API 允许您将 Python 函数直接定义为 Clean Room 中的内联函数。或者,您可以从已上传到 Clean Room 暂存区的暂存文件中加载 Python。请参阅 API 参考指南 中的示例。

call samooha_by_snowflake_local_db.provider.load_python_into_cleanroom(
    $cleanroom_name,
    'reach_impression_regression',
    ['source_table string', 'my_table string'],
    ['snowflake-snowpark-python', 'pandas', 'statsmodels', 'numpy'],
    'string',
    'main',
    $$
import traceback
import pandas as pd
import numpy as np

import statsmodels.formula.api as sm


def drop_tables(session, table_names):
    """
    Drop the tables passed in
    """
    for tbl in table_names:
        session.sql(f'drop table {tbl}').collect()

def preprocess_regression_data(session, source_table, my_table, suffix):
    """
    Preprocess the impressions and customer data into an intermediary table for regression
    """
    table_name = f'cleanroom.intermediary_{suffix}'

    my_table_statement = f'inner join {my_table} c on p.hem = c.hem' if my_table != 'NONE' else ''
    session.sql(f"""
    create or replace table {table_name} as (
        with joint_data as (
            select
                date,
                p.hem as hem,
                impression_id
            from {source_table} p
            {my_table_statement}
        )
        select
            date,
            count(distinct hem) as reach,
            count(distinct impression_id) as num_impressions
        from joint_data
        group by date
        order by date
    );
    """).collect()

    return table_name

def calculate_regression(df):
    """
    Calculate the regression data from the dataframe we put together
    """
    result = sm.ols('REACH ~ 1 + NUM_IMPRESSIONS', data=df).fit()
    retval = pd.concat([
        result.params,
        result.tvalues,
        result.pvalues
    ], keys=['params', 't-stat', 'p-value'], names=['STATISTIC', 'PARAMETER']).rename('VALUE').reset_index()
    return retval

def main(session, source_table, my_table):
    """
    First compute the regression data from an overlap between customer and provider data, and counting
    the number of impressions and reach per day. Next regress these locally and compute the regression
    statistics. Finally write it to a results table which can be queried to get the output.
    """
    suffix = f'regression_results_{abs(hash((source_table, my_table))) % 10000}'

    try:
        # Preprocess impressions and customer data into an intermediary form to use for regression
        intermediary_table_name = preprocess_regression_data(session, source_table, my_table, suffix)

        # Load the data into Python locally
        df = session.table(intermediary_table_name).to_pandas()

        # Carry out the regression and get statistics as an output
        regression_output = calculate_regression(df)

        # Write the statistics to an output table
        # The table and the schema names should be in upper case to quoted identifier related issues.
        table = f'results_{suffix}'.upper()
        retval_df = session.write_pandas(regression_output, table,  schema = 'CLEANROOM', auto_create_table = True)

        # Drop any intermediary tables
        drop_tables(session, [intermediary_table_name])

        # Tell the user the name of the table the results have been written to
        return f'Done, results have been written to the following suffix: {suffix}'
    except:
        return traceback.format_exc()
$$
);
Copy

备注

将 Python 加载到 Clean Room 中会为 Clean Room 创建一个新补丁。如果 Clean Room 分布设置为 EXTERNAL,则需要等待安全扫描完成,然后使用以下方式更新默认发布指令:

-- See the versions available inside the clean room
show versions in application package samooha_cleanroom_Snowpark_Demo_clean_room;

-- Once the security scan is approved, update the release directive to the latest version
call samooha_by_snowflake_local_db.provider.set_default_release_directive($cleanroom_name, 'V1_0', '1');
Copy

使用 UDFs 添加自定义模板

要将自定义分析模板添加到 Clean Room 中,您需要在提供商端和使用者端提供表名占位符,以及提供者端的联接列。在 SQL Jinja 模板中,这些占位符必须始终为以下值:

  • source_table:来自提供商的表名称 数组

  • my_table:来自使用者的表名称 数组

通过使用这些变量,可以使表名称动态化,但是如果需要,也可以使用链接到 Clean Room 的视图的名称将其硬编码到模板中。如果需要,列名称可以硬编码到模板中,也可以通过参数动态设置。如果通过参数设置列名称,请记住,您需要调用参数 dimensionsmeasure_column,这些参数必须是数组,以便根据列策略进行检查。您可以将其作为 SQL Jinja 参数添加到模板中,之后在查询时由使用者传递。联接策略确保使用者不能联接除授权列之外的列。

另外,还可以使用以下筛选器检查自定义 SQL Jinja 模板中的实参是否符合联接和列策略:

  • join_policy:检查字符串值或筛选器子句是否符合联接策略

  • column_policy:检查字符串值或筛选器子句是否符合列策略

  • join_and_column_policy:检查筛选器子句中用于联接的列是否符合联接策略,以及用作筛选器的列是否符合列策略

例如,在子句 {{ provider_id | sqlsafe | join_policy }} 中,将解析 HEM 的输入以检查联接策略中是否有 HEM。注意:只能谨慎使用 sqlsafe 筛选器,因为它允许合作者将纯 SQL 放入模板。

备注

必须使用这些实参来引用所有提供商/使用者表,因为实际链接到 Clean Room 的安全视图名称与表名不同。重要的是,提供商表别名 MUST 为 p(或 p1)、p2、p3、p4 等。使用者表别名 必须 为 c(或 c1)、c2、c3 等。要在 Clean Room 中执行安全策略,必须满足此条件。

请注意,此函数将替换任何具有相同名称的现有模板。如果您想更新任何现有模板,只需使用更新后的模板再次调用这个函数。

call samooha_by_snowflake_local_db.provider.add_custom_sql_template(
        $cleanroom_name,
        'prod_calculate_regression',
        $$
call cleanroom.reach_impression_regression({{ source_table[0] }}, {{ my_table[0] | default('NONE') }});
$$
);
Copy

最后,还添加了一个自定义模板,使使用者能够根据 calculate_regression 模板返回的结果后缀 ID,检索分析结果。

call samooha_by_snowflake_local_db.provider.add_custom_sql_template(
        $cleanroom_name,
        'prod_get_results',
        $$
select * from cleanroom.results_{{ results_suffix | sqlsafe }};
$$
);
Copy

如果要查看 Clean Room 中当前活跃的模板,请调用以下过程。

call samooha_by_snowflake_local_db.provider.view_added_templates($cleanroom_name);
Copy

与使用者共享

最后,通过添加数据使用者的 Snowflake 账户定位器和账户名称,将数据使用者添加到 Clean Room,如下所示。Snowflake 账户名称的格式必须为 <ORGANIZATION>.<ACCOUNT_NAME>。

备注

为了调用以下过程,请确保首先使用 provider.set_default_release_directive 设置发布指令。您可以使用以下方式查看最新的可用版本和补丁:

show versions in application package samooha_cleanroom_Snowpark_Demo_clean_room;
Copy

备注

请注意,此调用大约需要 60 秒才能完成,因为它设置了许多任务来监听和记录来自使用者的请求。

call samooha_by_snowflake_local_db.provider.add_consumers($cleanroom_name, '<CONSUMER_ACCOUNT_LOCATOR>', '<CONSUMER_ACCOUNT_NAME>');
call samooha_By_snowflake_local_db.provider.create_cleanroom_listing($cleanroom_name, '<CONSUMER_ACCOUNT_NAME>');
Copy

对于多个使用者账户定位器,可以用逗号分隔的字符串形式传递到 provider.add_consumers 函数中,也可以分别调用 provider.add_consumers

如果要查看已添加到此 Clean Room 的使用者,请调用以下过程。

call samooha_by_snowflake_local_db.provider.view_consumers($cleanroom_name);
Copy

通过以下过程查看最近创建的 Clean Room:

call samooha_by_snowflake_local_db.provider.view_cleanrooms();
Copy

通过以下过程查看最近创建的 Clean Room 的更多洞察信息。

call samooha_by_snowflake_local_db.provider.describe_cleanroom($cleanroom_name);
Copy

创建的任何 Clean Room 也都可以删除。以下命令将完全删除 Clean Room,因此以前有权访问 Clean Room 的任何使用者都将无法再对其进行使用。如果将来需要使用相同名称的 Clean Room,则必须按照上述流程重新初始化。

call samooha_by_snowflake_local_db.provider.drop_cleanroom($cleanroom_name);
Copy

备注

提供商流程现在已完成。切换到使用者账户,继续执行使用者流程。

使用者

备注

应在使用者账户的 Snowflake 工作表中运行以下命令

设置环境

在利用开发者 APIs 使用 Snowflake Data Clean Room 之前,请执行以下命令来设置 Snowflake 环境。如果您没有 SAMOOHA_APP_ROLE 角色,请联系账户管理员。

use role samooha_app_role;
use warehouse app_wh;
Copy

安装 Clean Room

安装 Clean Room 共享后,可以使用以下命令查看可用的 Clean Room 列表。

call samooha_by_snowflake_local_db.consumer.view_cleanrooms();
Copy

为提供商与您共享的 Clean Room 命名。

set cleanroom_name = 'Snowpark Demo clean room';
Copy

以下命令将 Clean Room 安装到具有相关提供商和所选 Clean Room 的使用者账户中。

运行此过程大约需要 45 秒。

call samooha_by_snowflake_local_db.consumer.install_cleanroom($cleanroom_name, '<PROVIDER_ACCOUNT_LOCATOR>');
Copy

安装了 Clean Room 后,提供商必须在启用前完成提供商侧的 Clean Room 设置。您可以通过以下函数查看 Clean Room 的状态。启用 Clean Room 后,您应该能够运行下面的“运行分析”命令。启用 Clean Room 通常需要大约 1 分钟。

运行此函数前,请确保 install_cleanroom 函数已完成。

call samooha_by_snowflake_local_db.consumer.is_enabled($cleanroom_name);
Copy

运行分析

现在,Clean Room 已经安装完毕,您可以使用“run_analysis”命令运行由提供商提供给 Clean Room 的分析模板。您可以在以下各节中看到如何确定每个字段。

可传递的数据集数量受提供商实施的模板的限制。某些模板需要特定数量的表。模板创建者可以实施其希望支持的要求。

备注

在运行分析之前,您可以更改仓库规模;如果表较大,也可以使用新的、更大的仓库规模。

call samooha_by_snowflake_local_db.consumer.run_analysis(
  $cleanroom_name,               -- cleanroom
  'prod_calculate_regression',    -- template name

  ['<USERS_TABLE>'],    -- consumer tables

  ['<IMPRESSSIONS_TABLE>'],     -- provider tables

  object_construct()     -- Rest of the custom arguments needed for the template
);
Copy

该分析的输出结果将是 ID,可用于使用以下模板检索回归结果:

set result_suffix = 'regression_results_<ID>';

call samooha_by_snowflake_local_db.consumer.run_analysis(
    $cleanroom_name,        -- cleanroom
    'prod_get_results',     -- template name
    [],                     -- consumer tables
    [],                     -- provider tables
    object_construct(
        'results_suffix', $result_suffix  -- The suffix with the results
    )
);
Copy

如何确定 run_analysis 的输入

要运行分析,需要向 run_analysis 函数传递一些参数。本节将向您展示如何确定要传入的参数。

模板名称

首先,您可以通过调用以下过程来查看支持的分析模板。

call samooha_by_snowflake_local_db.consumer.view_added_templates($cleanroom_name);
Copy

要运行分析,需要向 run_analysis 函数传递一些参数。本节将向您展示如何确定要传入的参数。

call samooha_by_snowflake_local_db.consumer.view_template_definition($cleanroom_name, 'prod_calculate_regression');
Copy

这通常还可以包含大量不同的 SQL Jinja 参数。以下功能可解析 SQL Jinja 模板,并将需要在 run_analysis 中指定的实参提取到列表中。

call samooha_by_snowflake_local_db.consumer.get_arguments_from_template($cleanroom_name, 'prod_calculate_regression');
Copy

数据集名称

如果要查看提供商已添加到 Clean Room 的数据集名称,请调用以下过程。请注意,由于 Clean Room 的安全属性,您无法查看提供商添加到 Clean Room 的数据集中的数据。

call samooha_by_snowflake_local_db.consumer.view_provider_datasets($cleanroom_name);
Copy

您还可以使用以下调用,查看已关联到 Clean Room 的表:

call samooha_by_snowflake_local_db.consumer.view_consumer_datasets($cleanroom_name);
Copy

建议

  • 尽可能通过动态 SQL 保持所有重要的数据预处理,使用 cleanroom 架构将数据存储在中间表中。它的速度更快,效率更高。例如:

    session.sql("create or replace table cleanroom.intermediary as ...")
    
    Copy
  • 通过 cleanroom 架构中的 session.sql 而不是使用 Snowpark 装饰器执行 SQL,创建 UDFs、UDTFs 和过程。例如:

    session.sql("create or replace function cleanroom.udf(...")
    
    Copy
  • 如果需要加载的数据太大,内存无法容纳,可使用 .to_pandas_batches() 对其进行迭代。例如:

    df_iter = session.table(intermediary_table_name).to_pandas_batches()
    for df in df_iter:
        ...
    
    Copy
语言: 中文