Snowflake Data Clean Room:机器学习

本主题介绍以编程方式设置 Clean Room、与使用者共享 Clean Room 并通过其中的高级机器学习算法运行分析所需的提供商和使用者流。提供商将实现基于随机森林的 XGBoost 机器学习算法的安全 Python 代码加载到 Clean Room。这是完全保密的,只有提供商可见。使用者看不到 Python 机器学习代码加载到 Clean Room。

该流程包括以下内容:

  1. 提供商:

    a.添加运行相似建模分析的自定义模板。

    b.利用 XGBoost 安全添加基于 Python 代码的机器学习模板。

    c.使用自定义模板调用 Clean Room 中的机器学习 UDFs。

  2. 使用者:

    a.运行使用提供商定义的 ML 函数的自定义模板。

相似建模 是一种分析方法,其中使用者试图通过对提供商的“高价值”客户进行统计模型训练,从供应商的数据中找到高价值客户。该模型使用使用者指定的标志来表示使用者数据集中的高价值用户,如支出超过某一阈值的用户。然后,利用训练好的模型来推断提供商数据中的哪些客户可能对使用者具有“高价值”。

先决条件

您需要两个独立的 Snowflake 账户才能完成此流程。使用第一个账户执行提供商的命令,然后切换到第二个账户执行使用者的命令。

提供商

备注

应在提供商账户的 Snowflake 工作表中运行以下命令。

设置环境

在利用开发者 APIs 使用 Snowflake Data Clean Room 之前,请执行以下命令来设置 Snowflake 环境。如果您没有 SAMOOHA_APP_ROLE 角色,请联系账户管理员。

use role samooha_app_role;
use warehouse app_wh;
Copy

创建 Clean Room

为 Clean Room 创建名称。输入新的 Clean Room 名称,避免与现有 Clean Room 名称冲突。请注意,Clean Room 名称只能是 字母数字。除空格和下划线外,Clean Room 名称不能包含特殊字符。

set cleanroom_name = 'Machine Learning Demo Clean room';
Copy

您可以使用上面设置的 Clean Room 名称创建新的 Clean Room。如果上面设置的 Clean Room 名称已经作为现有 Clean Room 存在,则此过程将失败。

运行此过程所需时间可能较长,通常约为半分钟。

provider.cleanroom_init 的第二个实参是 Clean Room 的分发。它可以是 INTERNAL 或 EXTERNAL。出于测试目的,如果您将 Clean Room 共享给同一组织中的账户,则可以使用 INTERNAL 绕过自动安全扫描,该扫描必须在将应用程序包发布给协作者前进行。但是,如果要将此 Clean Room 共享给其他组织中的账户,则必须使用 EXTERNAL Clean Room 分发。

call samooha_by_snowflake_local_db.provider.cleanroom_init($cleanroom_name, 'INTERNAL');
Copy

要查看安全扫描的状态,请使用:

call samooha_by_snowflake_local_db.provider.view_cleanroom_scan_status($cleanroom_name);
Copy

创建 Clean Room 之后,必须先设置其发布指令,然后才能与协作者共享。但是,如果您的分发设置为 EXTERNAL,则必须先等待安全扫描完成,然后再设置发布指令。您可以在扫描运行时继续运行其余步骤,并在执行 provider.create_cleanroom_listing 步骤之前返回此处。

要设置发布指令,请调用:

call samooha_by_snowflake_local_db.provider.set_default_release_directive($cleanroom_name, 'V1_0', '0');
Copy

跨区域共享

如果 Snowflake 客户的账户与您的账户位于不同区域,则必须启用 Cross-Cloud Auto-Fulfillment 才能与该客户共享 Clean Room。有关与其他区域的使用者协作的额外成本的信息,请参阅 Cross-Cloud Auto-Fulfillment 成本

使用开发者 APIs 时,启用跨区域共享分为两步:

  1. 具有 ACCOUNTADMIN 角色的 Snowflake 管理员为 Snowflake 账户启用 Cross-Cloud Auto-Fulfillment。有关说明,请参阅 与不同区域的账户协作

  2. 您执行 provider.enable_laf_for_cleanroom 命令,为 Clean Room 启用 Cross-Cloud Auto-Fulfillment例如:

    call samooha_by_snowflake_local_db.provider.enable_laf_for_cleanroom($cleanroom_name);
    
    Copy

在为 Clean Room 启用 Cross-Cloud Auto-Fulfillment 后,您可以使用 provider.create_cleanroom_listing 命令照常将使用者添加到列表中。列表会根据需要自动复制到远程云和区域。

为 Clean Room 添加机密的机器学习 Python 代码

本节向您展示如何在 Clean Room 中加载一些 Python 函数,以便相似 ML 运行。Clean Room 中安装的所有 Python 函数都是完全机密的。使用者无法看到。

以下 API 允许您将 Python 函数直接定义为 Clean Room 中的内联函数。或者,您可以从已上传到 Clean Room 暂存区的暂存文件中加载 Python。请参阅 API 参考指南 中的示例。

备注

需要注意的是,这种实施方式受总 Snowflake 大小对于 ARRAY_AGG 可汇总的数据量(即 16MB)约束的限制。利用批处理和流模型的实施方案可通过批处理扩展到任意大小的数据集,这种实施方案 可应要求提供

call samooha_by_snowflake_local_db.provider.load_python_into_cleanroom(
    $cleanroom_name, 
    'lookalike_train',
    ['input_data variant', 'labels variant'],
    ['pandas', 'numpy', 'xgboost'],
    'variant',
    'train',
    $$
import numpy as np
import pandas as pd
import xgboost
from sklearn import preprocessing
import sys
import os
import pickle
import codecs
import threading


class TrainXGBoostClassifier(object):
    def __init__(self):
        self.model = None
        self._params = {
            "objective": "binary:logistic",
            "max_depth": 3,
            "nthread": 1,
            "eval_metric": "auc",
        }
        self.num_boosting_rounds = 10

    def get_params(self):
        if self.model is not None and "updater" not in self._params:
            self._params.update(
                {"process_type": "update", "updater": "refresh", "refresh_leaf": True}
            )
        return self._params

    def train(self, X, y):
        """
        Train the model in a threadsafe way
        """
        # pick only the categorical attributes
        categorical = X.select_dtypes(include=[object])

        # fit a one-hot-encoder to convert categorical features to binary features (required by XGBoost)
        ohe = preprocessing.OneHotEncoder()
        categorical_ohe = ohe.fit_transform(categorical)
        self.ohe = ohe

        # get the rest of the features and add them to the binary features
        non_categorical = X.select_dtypes(exclude=[object])
        train_x = np.concatenate((categorical_ohe.toarray(), non_categorical.to_numpy()), axis=1)

        xg_train = xgboost.DMatrix(train_x, label=y)

        params = self.get_params()
        params["eval_metric"] = "auc"
        evallist = [(xg_train, "train")]
        evals_result = {}

        self.model = xgboost.train(
            params, xg_train, self.num_boosting_rounds, evallist, evals_result=evals_result
        )

        self.evals_result = evals_result

    def __dump_model(self, model):
        """
        Save down the model as a json string to load up for scoring/inference
        """
        pickle_jar = codecs.encode(pickle.dumps([model, self.ohe]), "base64").decode()
        return pickle_jar

    def dump_model(self):
        """
        Save down the model as a json string to load up for scoring/inference
        """
        if self.model is not None:
            return self.__dump_model(self.model)
        else:
            raise ValueError("Model needs to be trained first")


def train(d1, l1):

    # get take training features and put them in a pandas dataframe
    X = pd.DataFrame(d1)

    # get the labels into a Numpy array
    y = np.array(l1)

    trainer = TrainXGBoostClassifier()
    trainer.train(X, y)

    # return training stats, accuracy, and the pickled model and pickled one-hot-encoder
    return {
        "total_rows": len(d1),
        "total_bytes_in": sys.getsizeof(d1),
        "model": trainer.dump_model(),
        "iteration": trainer.num_boosting_rounds,
        "auc": np.max(trainer.evals_result["train"]["auc"]),
        "error": 1 - np.max(trainer.evals_result["train"]["auc"])
    }    
    $$
);
Copy

现在,我们在 Clean Room 中安装评分函数

call samooha_by_snowflake_local_db.provider.load_python_into_cleanroom(
    $cleanroom_name, 
    'lookalike_score',
    ['pickle_jar variant', 'emails variant', 'features variant'],
    ['pandas', 'numpy', 'xgboost'],
    'string',
    'score',
    $$
import numpy as np
import pandas as pd
import xgboost as xgb
import pickle
import codecs
import json


def score(model, emails, features):
    # load model
    model = model[0] if not isinstance(model, str) else model
    model = pickle.loads(codecs.decode(model.encode(), "base64"))

    # retrieve the XGBoost trainer from the pickle jar
    bst = model[0]

    # retrieve the fitted one-hot-encoder from the pickle jar
    ohe2 = model[1]

    # create pandas dataframe from the inference features
    Y = pd.DataFrame(features)

    # select the categorical attributes and one-hot-encode them
    Y1 = Y.select_dtypes(include=[object])
    Y2 = ohe2.transform(Y1)

    # select the non-categorical attributes
    Y3 = Y.select_dtypes(exclude=[object])

    # join the results of the one-hot encoding to the rest of the attributes
    Y_pred = np.concatenate((Y2.toarray(), Y3.to_numpy()), axis=1)

    # inference
    dscore = xgb.DMatrix(Y_pred)
    pred = bst.predict(dscore)

    retval = list(zip(np.array(emails), list(map(str, pred))))
    retval = [{"email": r[0], "score": r[1]} for r in retval]
    return json.dumps(retval)  
    $$
);
Copy

备注

将 Python 加载到 Clean Room 中会为 Clean Room 创建一个新补丁。如果 Clean Room 分布设置为 EXTERNAL,则需要等待安全扫描完成,然后使用以下方式更新默认发布指令:

-- See the versions available inside the cleanroom
show versions in application package samooha_cleanroom_Machine_Learning_Demo_clean_room;

-- Once the security scan is approved, update the release directive to the latest version
call samooha_by_snowflake_local_db.provider.set_default_release_directive($cleanroom_name, 'V1_0', '2');
Copy

添加自定义相似建模模板

要将自定义分析模板添加到 Clean Room 中,您需要在提供商端和使用者端提供表名占位符,以及提供者端的联接列。在 SQL Jinja 模板中,这些占位符必须始终为以下值:

  • source_table:来自提供商的表名称 数组

  • my_table:来自使用者的表名称 数组

通过使用这些变量,可以使表名称动态化,但是如果需要,也可以使用链接到 Clean Room 的视图的名称将其硬编码到模板中。如果需要,列名称可以硬编码到模板中,也可以通过参数动态设置。如果通过参数设置列名称,请记住,您需要调用参数 dimensionsmeasure_column,这些参数必须是数组,以便根据列策略进行检查。您可以将其作为 SQL Jinja 参数添加到模板中,之后在查询时由使用者传递。联接策略确保使用者不能联接除授权列之外的列。

另外,还可以使用以下筛选器检查自定义 SQL Jinja 模板中的实参是否符合联接和列策略:

  • join_policy:检查字符串值或筛选器子句是否符合联接策略

  • column_policy:检查字符串值或筛选器子句是否符合列策略

  • join_and_column_policy:检查筛选器子句中用于联接的列是否符合联接策略,以及用作筛选器的列是否符合列策略

例如,在子句 {{ provider_id | sqlsafe | join_policy }} 中,将解析 HEM 的输入以检查联接策略中是否有 HEM。注意:只能谨慎使用 sqlsafe 筛选器,它允许协作者将纯 SQL 放入模板。

备注

必须使用这些实参来引用所有提供商/使用者表,因为实际链接到 Clean Room 的安全视图名称与表名不同。重要的是,提供商表别名 MUST 为 p(或 p1)、p2、p3、p4 等。使用者表别名 必须 为 c(或 c1)、c2、c3 等。要在 Clean Room 中执行安全策略,必须满足此条件。

请注意,此函数将替换任何具有相同名称的现有模板。如果您想更新任何现有模板,只需使用更新后的模板再次调用这个函数。

从提供商数据集中选取一组特征,从使用者数据集中选出一组标签以及一个“高价值”标志(称为 label_value)。然后将这 2 个表内部联接在电子邮件上,并传递给随机森林训练算法。最后,将模型训练步骤的输出结果传递给推理函数,该函数利用训练好的模型来“推断”NOT 使用者数据集中的哪些提供商客户可能是“高价值”客户。然后返回此类个体的 数量 以及模型误差。

确定客户是否“可能是高价值”客户的分数阈值在模板中手动设置为 0.5。在将模板添加到 Clean Room 时,可以轻松更改此设置。

call samooha_by_snowflake_local_db.provider.add_custom_sql_template(
    $cleanroom_name, 
    'prod_custom_lookalike_template', 
    $$
WITH
features AS (
    SELECT
        p.hashed_email,
        array_construct(identifier({{ dimensions[0] | column_policy }}) {% for feat in dimensions[1:] %} , identifier({{ feat | column_policy }}) {% endfor %}) as features
    FROM
        identifier({{ source_table[0] }}) as p
),
labels AS (
    SELECT
        c.hashed_email,
        {{ filter_clause | sqlsafe | column_policy }} as label_value
    FROM
        identifier({{ my_table[0] }}) as c
),
trained_model AS (
    SELECT
        train_out:model::varchar as model,
        train_out:error::float as error
    FROM (
      SELECT
        cleanroom.lookalike_train(array_agg(f.features), array_agg(l.label_value)) as train_out
      FROM features f, labels l
      WHERE f.hashed_email = l.hashed_email
    )
),
inference_output AS (
    SELECT
        MOD(seq4(), 100) as batch,
        cleanroom.lookalike_score(
            array_agg(distinct t.model), 
            array_agg(p.hashed_email), 
            array_agg(array_construct( identifier({{ dimensions[0] | column_policy }}) {% for feat in dimensions[1:] %} , identifier({{ feat | column_policy }}) {% endfor %}) )
        ) as scores
    FROM trained_model t, identifier({{ source_table[0] }}) p
    WHERE p.hashed_email NOT IN (SELECT c.hashed_email FROM identifier({{ my_table[0] }}) c)
    GROUP BY batch
),
processed_output AS (
    SELECT value:email::string as email, value:score::float as score FROM (select scores from inference_output), lateral flatten(input => parse_json(scores))
)
SELECT p.audience_size, t.error from (SELECT count(distinct email) as audience_size FROM processed_output WHERE score > 0.5) p, trained_model t;
    $$
);  
Copy

备注

您可以为上面的 samooha_by_snowflake_local_db.provider.add_custom_sql_template 过程调用添加差分隐私敏感度作为最后一个参数(如果不添加,则默认为 1)

如果要查看 Clean Room 中当前活跃的模板,请调用以下过程。您可以进行修改,以便在分析中启用差分隐私保证。您选择编写的任何自定义模板都可以采用类似的模式。

call samooha_by_snowflake_local_db.provider.view_added_templates($cleanroom_name);
Copy

对每个表设置列策略

显示关联的数据以查看表中存在的列。要查看前 10 行,请调用以下过程。

select * from samooha_provider_sample_database.lookalike_modeling.customers limit 10;
Copy

为每个表和模板组合设置要分组、汇总(例如 SUM/AVG)和通常用于分析的列。这提供了灵活性,使同一个表可以根据基础模板允许不同的列选择。只有在添加模板后才可调用此过程。

请注意,列策略是 仅替换,因此如果再次调用函数,新的列策略会完全替换之前设置的列策略。

不应将列策略用于 email、HEM、RampID 等身份列,因为您不希望使用者能够按这些列进行分组。在生产环境中,系统会智能推断 PII 列并阻止此操作,但在沙盒环境中此功能不可用。它应该只用于您希望使用者能够汇总和分组的列,如状态、年龄段、地区代码、活动天数等。

请注意,要使“column_policy”和“join_policy”对使用者分析请求执行检查,在 SQL Jinja 模版中,所有列名 MUST 称为 dimensionsmeasure_columns。确保使用这些标签来引用自定义 SQL Jinja 模板中要检查的列。

call samooha_by_snowflake_local_db.provider.set_column_policy($cleanroom_name, [
    'prod_custom_lookalike_template:samooha_provider_sample_database.lookalike_modeling.customers:status', 
    'prod_custom_lookalike_template:samooha_provider_sample_database.lookalike_modeling.customers:age', 
    'prod_custom_lookalike_template:samooha_provider_sample_database.lookalike_modeling.customers:region_code', 
    'prod_custom_lookalike_template:samooha_provider_sample_database.lookalike_modeling.customers:days_active', 
    'prod_custom_lookalike_template:samooha_provider_sample_database.lookalike_modeling.customers:income_bracket', 
    'prod_custom_lookalike_template:samooha_provider_sample_database.lookalike_modeling.customers:household_size', 
    'prod_custom_lookalike_template:samooha_provider_sample_database.lookalike_modeling.customers:gender'
]);
Copy

如果要查看已添加到 Clean Room 的列策略,请调用以下过程。

call samooha_by_snowflake_local_db.provider.view_column_policy($cleanroom_name);
Copy

与使用者共享

最后,通过添加数据使用者的 Snowflake 账户定位器和账户名称,将数据使用者添加到 Clean Room,如下所示。Snowflake 账户名称的格式必须为 <ORGANIZATION>.<ACCOUNT_NAME>。

备注

为了调用以下过程,请确保首先使用 provider.set_default_release_directive 设置发布指令。您可以使用以下方式查看最新的可用版本和补丁:

show versions in application package samooha_cleanroom_Machine_Learning_Demo_clean_room;
Copy
call samooha_by_snowflake_local_db.provider.add_consumers($cleanroom_name, '<CONSUMER_ACCOUNT_LOCATOR>', '<CONSUMER_ACCOUNT_NAME>');
call samooha_By_snowflake_local_db.provider.create_cleanroom_listing($cleanroom_name, '<CONSUMER_ACCOUNT_NAME>');
Copy

对于多个使用者账户定位器,可以用逗号分隔的字符串形式传递到 provider.add_consumers 函数中,也可以分别调用 provider.add_consumers

如果要查看已添加到此 Clean Room 的使用者,请调用以下过程。

call samooha_by_snowflake_local_db.provider.view_consumers($cleanroom_name);
Copy

如果想查看最近创建的 Clean Room,请使用以下过程。

call samooha_by_snowflake_local_db.provider.view_cleanrooms();
Copy

如果想更深入地了解您创建的 Clean Room,请使用以下过程。

call samooha_by_snowflake_local_db.provider.describe_cleanroom($cleanroom_name);
Copy

创建的任何 Clean Room 也都可以删除。以下命令将完全删除 Clean Room,因此以前有权访问 Clean Room 的任何使用者都将无法再对其进行使用。如果将来需要使用相同名称的 Clean Room,则必须按照上述流程重新初始化。

call samooha_by_snowflake_local_db.provider.drop_cleanroom($cleanroom_name);
Copy

备注

提供商流程现在已完成。切换到使用者账户,继续执行使用者流程。

使用者

备注

应在使用者账户的 Snowflake 工作表中运行以下命令

设置环境

在利用开发者 APIs 使用 Snowflake Data Clean Room 之前,请执行以下命令来设置 Snowflake 环境。如果您没有 SAMOOHA_APP_ROLE 角色,请联系账户管理员。

use role samooha_app_role;
use warehouse app_wh;
Copy

安装 Clean Room

安装 Clean Room 共享后,可以使用以下命令查看可用的 Clean Room 列表。

call samooha_by_snowflake_local_db.consumer.view_cleanrooms();
Copy

为提供商与您共享的 Clean Room 命名。

set cleanroom_name = 'Machine Learning Demo Clean room';
Copy

以下命令将 Clean Room 安装到具有相关提供商和所选 Clean Room 的使用者账户中。

运行此过程所需时间可能较长,通常约为半分钟。

call samooha_by_snowflake_local_db.consumer.install_cleanroom($cleanroom_name, '<PROVIDER_ACCOUNT_LOCATOR>');
Copy

安装了 Clean Room 后,提供商必须在启用前完成提供商侧的 Clean Room 设置。您可以通过以下函数查看 Clean Room 的状态。启用 Clean Room 后,您应该能够运行下面的“运行分析”命令。启用 Clean Room 通常需要大约 1 分钟。

call samooha_by_snowflake_local_db.consumer.is_enabled($cleanroom_name);
Copy

运行分析

现在 Clean Room 已经安装完毕,您可以使用“run_analysis”命令运行由提供商添加到 Clean Room 的分析模板。您可以在以下各节中看到如何确定每个字段。

“高值”用户用下面的查询中的 filter_clause 标识。如果 c.SALES_DLR 代表每个用户的销售额,那么有效的筛选器可能看起来像 c.HIGH_VALUE > 4000

备注

在运行分析之前,您可以更改仓库规模;如果表较大,也可以使用新的、更大的仓库规模。

call samooha_by_snowflake_local_db.consumer.run_analysis(
    $cleanroom_name,                     -- cleanroom
    'prod_custom_lookalike_template',    -- template name

    ['samooha_consumer_sample_database.lookalike_modeling.customers'],                -- consumer tables

    ['samooha_provider_sample_database.lookalike_modeling.customers'],                -- provider tables

    object_construct(                    -- Rest of the custom arguments needed for the template
        'dimensions', ['p.STATUS', 'p.AGE', 'p.REGION_CODE', 'p.DAYS_ACTIVE', 'p.INCOME_BRACKET'], -- Features used in training

        'filter_clause', 'c.SALES_DLR > 2000' -- Consumer flag for which customers are considered high value
    )
);
Copy

如何确定 run_analysis 的输入

要运行分析,需要向 run_analysis 函数传递一些参数。本节将向您展示如何确定要传入的参数。

模板名称

首先,您可以通过调用以下过程来查看支持的分析模板。

call samooha_by_snowflake_local_db.consumer.view_added_templates($cleanroom_name);
Copy

在使用模板运行分析之前,您需要知道要指定哪些实参以及需要哪些类型。对于自定义模板,您可以执行以下操作。

call samooha_by_snowflake_local_db.consumer.view_template_definition($cleanroom_name, 'prod_custom_lookalike_template');
Copy

这通常还可以包含大量不同的 SQL Jinja 参数。以下功能可解析 SQL Jinja 模板,并将需要在 run_analysis 中指定的实参提取到列表中。

call samooha_by_snowflake_local_db.consumer.get_arguments_from_template($cleanroom_name, 'prod_custom_lookalike_template');
Copy

数据集名称

如果要查看提供商已添加到 Clean Room 的数据集名称,请调用以下过程。请注意,由于 Clean Room 的安全属性,您无法查看提供商添加到 Clean Room 的数据集中的数据。

call samooha_by_snowflake_local_db.consumer.view_provider_datasets($cleanroom_name);
Copy

您还可以使用以下调用,查看已关联到 Clean Room 的表:

call samooha_by_snowflake_local_db.consumer.view_consumer_datasets($cleanroom_name);
Copy

维度和度量列

在运行分析时,您可能希望对某些列进行筛选、分组和汇总。如果要查看提供商已添加到 Clean Room 的列策略,请调用以下过程。

call samooha_by_snowflake_local_db.consumer.view_provider_column_policy($cleanroom_name);
Copy

常见错误

如果运行分析的结果是出现 Not approved: unauthorized columns used 错误,您可能需要再次查看提供商设置的联接策略和列策略。

call samooha_by_snowflake_local_db.consumer.view_provider_join_policy($cleanroom_name);
call samooha_by_snowflake_local_db.consumer.view_provider_column_policy($cleanroom_name);
Copy

也有可能您已经耗尽了隐私预算,这样一来系统会阻止您执行更多查询。您可以使用以下命令查看剩余的隐私预算。预算会每天重置;Clean Room 提供商如果需要,也可以将其重置。

call samooha_by_snowflake_local_db.consumer.view_remaining_privacy_budget($cleanroom_name);
Copy

您可以通过以下 API 检查 Clean Room 是否启用了差分隐私:

call samooha_by_snowflake_local_db.consumer.is_dp_enabled($cleanroom_name);
Copy
语言: 中文