结合使用 Clean Room 与 Snowpark

简介

当您需要查询或处理大规模数据时,Snowflake Data Clean Room 可以与 Snowpark 合作,为您的 Clean Room 提供更高的计算能力。

Clean Room 可以通过两种方式使用 Snowpark:

  • Snowpark UDFs:在 Clean Room 代码中使用 Snowpark API 来创建利用 Snowpark 扩展和处理能力的 Snowpark UDFs。

  • Snowpark Container Services:如果您想更好地控制 Snowpark 环境,或者想使用 Snowpark API 不提供的库,则可以在 Clean Room 中配置和托管容器。这使您能够根据特定计算和存储需求配置环境,并自定义适用于您环境的库。

如果需要加载的数据太大,内存无法容纳,可使用 to_pandas_batches() 对其进行迭代。例如:

df_iter = session.table(intermediary_table_name).to_pandas_batches()
for df in df_iter:
  ...
Copy

复杂使用流程的总体设计

尽管您可以通过调用一个模板来生成数据并显示所有数据,但在许多情况下,最好将数据生成步骤与结果查看步骤分开。这样,使用者可以多次查看结果,而无需每次都触发重新计算,也可以查看过程中各个点的数据。要将流程分成多个用户可访问的暂存区,请创建单独的模板来触发数据生成或处理以及查看存储的结果。阅读有关设计复杂使用流程的更多信息

在 Clean Room 中使用 Snowpark UDFs

您可以在上传的 Python 代码中使用 Snowpark API 来加快大型数据加载的处理速度。Clean Room 仅支持 Snowpark Python API。提供商和使用者都可以在上传的 Python 代码中使用 Snowpark Python API。

先决条件

在 Clean Room 中使用 Snowpark API

在 Clean Room Python 代码中使用 Snowpark API 与上传和运行其他 Python UDF 相同,只是需要链接 snowflake-snowpark-python 库。

通过使用 cleanroom 架构中的 session.sql 而非 Snowpark 装饰器执行 SQL 来创建 UDFs、UDTFs 和过程。例如:

session.sql("CREATE OR REPLACE FUNCTION cleanroom.udf(...")
Copy

基本步骤

以下是通过 UDF 或 UDTF 在 Clean Room 中使用 Snowpark API 的基本步骤:

提供商

  1. 创建 Clean Room,设置默认发布指令,并以标准方式链接数据。

  2. 因为您可能已经为代码设计了一个非常具体的用例,所以可能不需要向 Clean Room 添加联接或列策略,但您可以这样做。

  3. 通过调用 provider.load_python_into_cleanroom 将您的自定义 Snowpark 处理程序代码加载到 Clean Room 中。该代码应至少加载 snowflake-snowpark-python 包,以及您需要的任何其他包。UDFs 可以逐行处理和返回数据,但是 Snowpark 用例通常会生成通过调用单独的结果模板读取的输出表。

  4. 更新默认发布指令(因为添加的代码会生成新的补丁版本)。

  5. 创建并上传 自定义模板 来运行您的 Snowpark 代码。运行 UDF 的唯一方法是通过调用 UDF 的模板触发。有关 UDF 调用模板的一些详细信息:

    • 它应使用您在 provider.load_python_into_cleanroom 中指定的别名和参数调用该函数。模板必须使用 cleanroom 命名空间来调用函数的别名。

    • 如果 UDF 将结果写入 Clean Room 中的表,并且每次运行的表名称都不同,则结果生成模板应返回结果表的名称,并且结果模板应将表名称作为用户的实参。

  6. 如果您生成了中间结果表,请上传自定义 SQL 模板以访问您的 Snowpark UDF 生成的结果表。要么使用硬编码的结果表名称,要么让用户传入由您的代码生成并由结果生成模板返回的表名称。

  7. 添加协作者并以标准方式发布 Clean Room。

使用者

使用者安装 Clean Room 并以标准方式运行分析。如果将数据生成和结果读取分成不同的模板,则使用者将需要按顺序调用每个模板。

示例代码

以下示例代码演示如何上传和运行“展示次数覆盖率”的线性回归来估算斜率。

  1. 使用者首先运行 prod_calculate_regression 模板以生成结果,该模板运行提供商 UDF。提供商 UDF 执行以下操作:

    1. 预处理展示次数数据。 创建动态 SQL,将提供商的展示次数数据与使用者的数据联接起来,按日期计算不同的展示次数计数和覆盖率,并将其存储在 Clean Room 内的中间表中。如果使用者不提供表,则代码将针对提供商的整个展示次数表运行。

    2. 加载中间表。 中间表作为 Pandas DataFrame 加载到 Snowpark 过程中。

    3. 执行回归分析。 回归使用 statsmodels 库计算,以 Pandas DataFrame 的形式返回结果。

    4. 将结果写入内部 Clean Room 表。 将结果写入 Clean Room 内的结果表,并将该表名称的 ID 后缀返回给使用者。由于 Snowpark 过程在 Clean Room 内运行,因此它激活使用者账户数据的能力有限。为了使结果更加安全,应将其写入 Clean Room 内的表,然后使用者运行另一个模板来读取结果数据。

    5. 删除中间表。 在 Clean Room 内计算期间创建的不再需要的中间表将在 Snowpark 过程结束前删除。

    6. 返回结果表的名称。 在运行模板以获取结果时必须指定返回给使用者的名称,因为所有先前运行的结果都会被保留。

  2. 然后,使用者运行 get_results 模板,传入第一个模板返回的结果表后缀以查看结果。

要运行以下示例,您需要同一云托管区域内的两个账户(除非已实现 Cross-Cloud Auto-Fulfillment):一个提供商账户和一个使用者账户。

示例代码应在 Snowflake 工作表中运行,无需任何额外的 Snowpark 配置。如果您在其他环境中运行,可能需要安装和配置 Snowpark Python API。

更多信息

在 Clean Room 中使用 Snowpark Container Service

如果您想更好地控制执行 Python 代码的环境,可以在 Clean Room 中运行 Snowpark Container Service。这样您可以精细控制代码的执行环境,非常适合需要专门的计算、存储或其他资源以最大限度地提高性能和最小化成本,或者引入自定义包或其他环境功能的用例。

当您在 Clean Room 中托管容器服务时,您的模板和任何自定义 Python 代码都可以调用服务公开的函数。使用 Snowpark Container Service 与在 Snowpark 中使用 UDFs 类似,不同之处在于 UDFs 可以作为 HTTP 端点公开供模板调用。您将定义服务和端点并将其上传到Clean Room。

内部托管的端点只能通过 Clean Room 内的模板访问,不能由 Clean Room 协作者直接调用。

先决条件

您需要了解以下主题才能在 Clean Room 中使用 Snowpark Container Service:

基本步骤

提供商

  1. 创建服务规范、代码和处理请求的端点。

  2. 创建镜像仓库并授予对该仓库的 SAMOOHA_APP_ROLE 访问权限。

  3. 捕获仓库 URL 以供下一步使用。

  4. 构建镜像并将其上传到仓库 URL。

  5. 以标准方式创建 Clean Room、链接数据、添加联接策略以及使用者。

  6. 定义调用服务点的模板 并将其上传到您的 Clean Room。服务函数是在命名空间 service_functions 中创建和调用的(与在 cleanroom 命名空间中创建和调用的 UDFs 不同)。

    -- Template to call an SPCS function named train.
    SELECT service_functions.train(
          {{ source_table[0] }},
          {{ provider_join_col }},
          {{ my_table[0] }},
          {{ consumer_join_col }},
          {{ dimensions | sqlsafe }},
    
    Copy
  7. 调用 provider.load_service_into_cleanroom 将您的服务详细信息上传到 Clean Room。这定义了图像 URL、端点

    ) AS train_result;

  8. 调用 provider.load_service_into_cleanroom 将您的服务详细信息上传到 Clean Room。这定义了图像 URL、端点和其他服务选项。此处定义的端点名称必须与您的服务规范相匹配,并且是您的模板调用函数时使用的名称。

    CALL samooha_by_snowflake_local_db.provider.load_service_into_cleanroom(
    $cleanroom_name,
    $$
    spec:
      containers:
      - name: lal
        image: /dcr_spcs/repos/lal_example/lal_service_image:latest
        env:
          SERVER_PORT: 8000
      endpoints:
      - name: lalendpoint
        port: 8000
        public: false
    $$,
    $$
    functions:
      - name: train
        args: PROVIDER_TABLE VARCHAR, PROVIDER_JOIN_COL VARCHAR, CONSUMER_TABLE VARCHAR, CONSUMER_JOIN_COL VARCHAR, DIMENSIONS ARRAY, FILTER VARCHAR
        returns: VARCHAR
        endpoint: lalendpoint
        path: /train
    $$);
    
    Copy
  9. 为您的 Clean Room 设置默认发布指令。每次上传或修改服务时,它都会创建一个新的补丁版本。

  10. 发布您的 Clean Room。

  11. 对图像、函数或代码进行任何更改时,您和使用者都必须 更新您的实例

使用者

  1. 安装 Clean Room 并以标准方式链接所需的任何数据。

  2. 创建计算池 并授予对 Clean Room 的访问权限。

  3. 如果您要运行查询(几乎肯定会运行查询),您还必须为正在使用的仓库中的 Clean Room 授予 USAGE 权限。

  4. 通过调用 samooha_by_snowflake_local_db.consumer.start_or_update_service,传入 Clean Room 名称、计算池名称和仓库名称(如使用了仓库)开启服务。

  5. 运行 SHOW ENDPOINTS IN SERVICE SAMOOHA_CLEANROOM_APP_clean_room_name.services.service; 检查服务的可用端点

  6. 当服务启动并运行时,您可以开始运行任何通过标准方式调用 consumer.run_analysis 来访问服务端点的 Clean Room 模板。

创建计算池

根据谁应该拥有和配置池,提供商可以在 Clean Room 内创建计算池,或者使用者可以在 Clean Room 之外创建计算池。

如果计算池是在 Clean Room 之外创建的,则必须向 Clean Room 授予适当的权限才能访问该池并创建服务,如下所示:

-- Grant access to a warehouse to run queries. Needed only if the service queries Snowflake accounts.
USE ROLE ACCOUNTADMIN;
GRANT USAGE ON WAREHOUSE APP_WH TO APPLICATION SAMOOHA_CLEANROOM_APP_<CLEANROOM_NAME>;

-- Grant SAMOOHA_APP_ROLE privileges to create compute pools and create services
GRANT CREATE COMPUTE POOL ON ACCOUNT TO ROLE SAMOOHA_APP_ROLE WITH GRANT OPTION;
GRANT BIND SERVICE ENDPOINT ON ACCOUNT TO ROLE SAMOOHA_APP_ROLE WITH GRANT OPTION;


USE ROLE SAMOOHA_APP_ROLE;
-- Create the compute pool
CREATE COMPUTE POOL DCR_LAL_POOL
  FOR APPLICATION SAMOOHA_CLEANROOM_APP_<CLEANROOM_NAME>
  min_nodes = 1 max_nodes = 1
  instance_family = highmem_x64_l
  auto_resume = true;

-- Grant the clean room the privileges to access a pool running outside the clean room.
-- Grant the clean room access to the compute pool
GRANT USAGE ON COMPUTE POOL DCR_LAL_POOL TO APPLICATION SAMOOHA_CLEANROOM_<CLEANROOM_NAME>;

-- Allow the clean room to create the service
GRANT BIND SERVICE ENDPOINT ON ACCOUNT TO APPLICATION SAMOOHA_CLEANROOM_APP_<CLEANROOM_NAME>;
Copy

更新您的服务代码或配置

如果提供商更新图像、服务规范、端点名称或源代码,则提供商和使用者都必须采取以下步骤。

1.提供商:

  1. 根据需要更新图像或源代码。

  2. 调用 provider.load_service_into_cleanroom,这会返回一个新补丁号。

  3. 使用新补丁号调用 provider.set_default_release_directive

2.使用者:

  • 调用 consumer.start_or_update_service

监控您的服务

默认情况下,使用者可以监控他们的服务。可以使用 provider.load_service_into_cleanroomservice_config 实参中的 allow_monitoring 值来更改此行为。

如果启用了使用者监控功能,则使用者可以访问给定 Clean Room 服务(格式 SAMOOHA_CLEANROOM_APP_SPCS_cleanroom_name.services.service)、服务 ID 和容器的监控日志,如下所示:

SELECT VALUE AS log_line
  FROM TABLE(
    SPLIT_TO_TABLE(SYSTEM$GET_SERVICE_LOGS(
        'SAMOOHA_CLEANROOM_APP_SPCS_Lookalike_Demo.services.service', 0, 'lal'), '\n')
  );
Copy

使用者还可以使用如下所示的 DESCRIBESERVICE 命令查看其服务状态:

-- See the state of the service.
DESCRIBE SERVICE SAMOOHA_CLEANROOM_APP_SPCS_Lookalike_Demo.services.service;
Copy

您可以通过运行 SHOW ENDPOINTS IN SERVICE SAMOOHA_CLEANROOM_APP_clean_room_name.services.service; 来查看服务端点。例如:

SHOW ENDPOINTS IN SERVICE SAMOOHA_CLEANROOM_APP_SPCS_Lookalike_Demo.services.service;
Copy

示例代码

以下笔记本和 zip 文件演示了如何在 Clean Room 中使用 Snowflake Container Service。您需要两个安装了 Clean Room 的账户:一个给提供商,一个给使用者。它们应该在同一个云托管区域中。使用压缩的配置文件来定义服务。

语言: 中文