设计多步流程¶
概述¶
大多数 Clean Room 的使用都涉及对 Clean Room 中的一个或多个表运行单次 SQL 查询,并在响应中显示结果。但是,在许多用例中,您可能希望将流程分成几个步骤,这些步骤可以按顺序依次运行,也可以按任意顺序运行,并且可能涉及调用 Python 代码来处理(或预处理)数据。示例包括机器学习流程,在该流程中,对数据集进行一次训练,然后针对不同的输入数据多次运行,可以单独或成批运行。
Clean Room 有多种机制可以实现这些高级场景:
模板链: 模板链 以特定顺序运行一组模板,使用每个模板的输出作为下一个模板的输入。链中第一个模板的输入由用户提供;链中最后一个模板的输出将返回给用户。
内部表: 模板或自定义内部函数可以在 Clean Room 中创建表。这些表的行为类似于链接表,因为它们可供模板或自定义上传的代码访问。内部表对于维护状态或数据很有用;在机器学习示例中,训练数据将保存在内部表中,供内部函数使用。这些表只能通过模板或在 Clean Room 中上传的代码进行访问。将中间数据存储在内部表中比使用模板将大量信息传入和传出 Clean Room 要有效得多。
自定义内部函数: 您可以在 Clean Room 中定义自定义函数,这些函数可由该 Clean Room 中的模板调用。可以在 Clean Room 中定义函数,方法是上传 Python UDFs 或UDTFs 到 Clean Room,也可以 在 Clean Room 中创建容器服务,公开实现函数的端点。这些函数只能由 Clean Room 内的模板调用。
备注
所有机制的统一原则是使用模板访问或运行表和函数。只能通过模板访问 Clean Room 内部表、运行自定义 Clean Room 函数或直接访问内部 Clean Room 端点。
内部 Clean Room 表¶
You can create tables inside a clean room using SQL or Python to store intermediary results, or for persistent storage for the user or your internal functions (for example, to save training data that is used for multiple runs). These tables behave the same as linked tables, with the following notes:
Internal tables are created using a clean room template or a UDF/UDTF, and have no linkage to outside tables.
Internal tables must be created in the
cleanroomnamespace.在创建内部表后,您可以为手动创建的内部表设置行策略和列策略。
If the table name is dynamic, and the table is accessed by other templates or code, you can return the name of the table to the user, so the user can pass in the dynamic table name to any other templates that need to access that table.
以下是一些创建内部表的示例:
JinjaSQL 模板可以创建内部表,这是通过某些类型的 激活 完成的。
This example returns the table name so that it can be passed in as a parameter to other templates.
CALL samooha_by_snowflake_local_db.provider.add_custom_sql_template(
$cleanroom_name,
$template_name,
$$
BEGIN
CREATE OR REPLACE TABLE cleanroom.data_analysis_results AS
SELECT count(*) AS ITEM_COUNT, c.status, c.age_band
FROM IDENTIFIER({{ my_table[0] }}) AS c
JOIN IDENTIFIER({{ source_table[0] }}) AS p
ON {{ c_join_col | sqlsafe | activation_policy }} = {{ p_join_col | sqlsafe | activation_policy }}
GROUP BY c.status, c.age_band
ORDER BY c.age_band;
RETURN 'analysis_results';
END;
$$);
UDF 可以创建内部表。这通常是通过在 Python 中执行 SQL 来完成的。
# Snippet of Python UDF to save results to an internal table.
table_name = f'cleanroom.results'
session.sql(f"""
CREATE OR REPLACE TABLE {table_name} AS (
WITH joint_data AS (
SELECT
date,
p.hashed_email AS hem,
impression_id
FROM {source_table} p
)
SELECT
date,
COUNT(DISTINCT hem) AS reach,
COUNT(DISTINCT impression_id) AS num_impressions
FROM joint_data
GROUP BY date
ORDER BY date
);
""").collect()
# Snippet of container services Python code to create an internal results table.
# 'cleanroom' table name prefix is added using the schema parameter when the table is created.
@app.post("/score")
def score():
... omitted content ...
df = pd.DataFrame({
"ID": ids,
"SCORE": scores
})
table = "LOOKALIKE_RESULTS"
session.write_pandas(df, table, schema="CLEANROOM", auto_create_table=True, overwrite=True)
end_time = time.perf_counter()
execution_time = end_time - start_time
response = make_json_response([[0, {"results_table": table, "size": len(ids), "execution_time": round(execution_time, 2)}]])
return response
生成必须通过模板或代码访问的内部表时,可以使用常量表名,也可以动态命名该表并将表的名称返回给用户,然后用户将表名传递给结果函数。
以下是用于存储结果的动态命名表的示例。用户进行了两次调用:一次用于生成数据并获取表名,另一次用于查看结果。
提供商模板调用
reach_impression_regressionUDF 以处理数据(cleanroom前缀表示这是 UDF)。UDF 将内部表前缀名称返回给模板,模板将其返回给调用者。CALL samooha_by_snowflake_local_db.provider.add_custom_sql_template( $cleanroom_name, 'prod_calculate_regression', $$ CALL cleanroom.reach_impression_regression({{ source_table[0] }}, {{ my_table[0] | default('NONE') }}); $$ );
Python UDF 将表名后缀名返回给模板调用者。
def main(session, source_table, my_table): ... table = f'results_{suffix}'.upper() retval_df = session.write_pandas(regression_output, table, schema = 'CLEANROOM', auto_create_table = True) return f'Done, results have been written to the following suffix: {suffix}'
提供商模板接受传入的表名后缀并显示该表的内容。
CALL samooha_by_snowflake_local_db.provider.add_custom_sql_template( $cleanroom_name, 'prod_get_results', $$ SELECT * FROM cleanroom.results_{{ results_suffix | sqlsafe }}; $$ );
使用者调用模板,传入表名后缀。
CALL samooha_by_snowflake_local_db.consumer.run_analysis( $cleanroom_name, 'prod_get_results', [], [], object_construct( 'results_suffix', $result_suffix -- Table name suffix to identify the results table. ) );
触发自定义函数¶
自定义函数可以通过模板调用,也可以通过 Clean Room 中的代码(UDFs、UDTFs、或容器服务端点)调用。任何协作者上传的函数都可以通过任何其他协作者的模板或代码来访问。
应始终在相应的命名空间范围内调用 Clean Room 函数:
:samp:`cleanroom.{function_name}`(调用自定义 UDF/UDTF 函数时)
:samp:`service_functions.{function_name}`(调用以嵌入式 Snowpark 容器服务函数形式公开的函数时)。
以下是从模板调用自定义 UDF 和自定义容器服务端点的示例:
模板使用 cleanroom 范围来访问 UDF 或 UDTFs。
-- Template to generate results. Calls the UDF 'my_function', which
-- generates a results table inside the clean room called 'results'.
CALL samooha_by_snowflake_local_db.provider.add_custom_sql_template(
$cleanroom_name,
'generate_results_template',
$$
CALL cleanroom.my_function({{ source_table[0] }}, {{ my_table[0] | default('NONE') }});
$$
);
模板使用 service_functions 范围来访问容器服务函数。
-- Template to trigger training data generation.
CALL samooha_by_snowflake_local_db.provider.add_custom_sql_template(
$cleanroom_name,
'lal_train',
$$
SELECT service_functions.my_func(
{{ source_table[0] }},
{{ provider_join_col }},
{{ my_table[0] }},
{{ consumer_join_col }},
{{ dimensions | sqlsafe }},
{{ filter_clause }}
) AS train_result;
$$
常见的多步流程模式¶
Snowpark API 示例 处理数据,生成中间表,然后通过一次模板调用生成结果表,再通过第二次模板调用直接公开结果。
Snowpark Container Services 示例 通过一次模板调用创建训练数据,并将训练数据存储到内部表中。第二个模板根据存储的训练数据分析用户输入。