设计多步流程¶
概述¶
Most clean room usage involves running a single SQL query against one or more tables in a clean room and displaying the results in the response. However, there are many use cases where you might want to break up your flow into several steps, which can be run sequentially or individually, and can involve calling Python code to process (or pre-process) data. Examples include a machine learning flow where the model is trained once against a data set and then run multiple times against varying input data, either singly or in batches.
Clean Room 有多种机制可以实现这些高级场景:
模板链: 模板链 以特定顺序运行一组模板,使用每个模板的输出作为下一个模板的输入。链中第一个模板的输入由用户提供;链中最后一个模板的输出将返回给用户。
Internal tables: Your template or custom internal functions can create persistent tables within a clean room. These tables behave like linked tables in that they are accessible to templates or custom uploaded code. Internal tables are useful for maintaining state or data; in the machine learning example, the training data is saved in an internal table that is used by internal functions. Just as with linked tables, these tables can be accessed only by templates or uploaded code inside the clean room. Storing intermediary data in internal tables is more efficient than passing large blocks of information into and out of the clean room using templates.
自定义内部函数: 您可以在 Clean Room 中定义自定义函数,这些函数可由该 Clean Room 中的模板调用。可以在 Clean Room 中定义函数,方法是上传 Python UDFs 或UDTFs 到 Clean Room,也可以 在 Clean Room 中创建容器服务,公开实现函数的端点。这些函数只能由 Clean Room 内的模板调用。
备注
A unifying principle of all techniques is that tables and functions are accessed or run using a template. You cannot access a clean room internal table, run a custom clean room function, or access an internal clean room endpoint directly, only by using a template.
内部 Clean Room 表¶
You can create tables inside a clean room using SQL or Python to store intermediary results, or for persistent storage for the user or your internal functions (for example, to save training data that is used for multiple runs). These tables behave the same as linked tables, with the following notes:
Internal tables are created using a clean room template or a UDF/UDTF, and have no linkage to outside tables.
Internal tables are created in the
cleanroomnamespace.You can set row and column policies on internal tables after you create them.
If the table name is dynamic, and the table is accessed by other templates or code, return the name of the table to the user so the user can pass the dynamic table name to any other templates that need to access that table.
以下是一些创建内部表的示例:
JinjaSQL 模板可以创建内部表,这是通过某些类型的 激活 完成的。
This example returns the table name so that it can be passed in as a parameter to other templates.
CALL samooha_by_snowflake_local_db.provider.add_custom_sql_template(
$cleanroom_name,
$template_name,
$$
BEGIN
CREATE OR REPLACE TABLE cleanroom.analysis_results AS
SELECT count(*) AS ITEM_COUNT, c.status, c.age_band
FROM IDENTIFIER({{ my_table[0] }}) AS c
JOIN IDENTIFIER({{ source_table[0] }}) AS p
ON {{ c_join_col | sqlsafe | activation_policy }} = {{ p_join_col | sqlsafe | activation_policy }}
GROUP BY c.status, c.age_band
ORDER BY c.age_band;
RETURN 'analysis_results';
END;
$$);
UDF 可以创建内部表。这通常是通过在 Python 中执行 SQL 来完成的。
# Snippet of Python UDF to save results to an internal table.
table_name = f'cleanroom.results'
session.sql(f"""
CREATE OR REPLACE TABLE {table_name} AS (
WITH joint_data AS (
SELECT
date,
p.hashed_email AS hem,
impression_id
FROM {source_table} p
)
SELECT
date,
COUNT(DISTINCT hem) AS reach,
COUNT(DISTINCT impression_id) AS num_impressions
FROM joint_data
GROUP BY date
ORDER BY date
);
""").collect()
# Snippet of container services Python code to create an internal results table.
# 'cleanroom' table name prefix is added using the schema parameter when the table is created.
@app.post("/score")
def score():
... omitted content ...
df = pd.DataFrame({
"ID": ids,
"SCORE": scores
})
table = "LOOKALIKE_RESULTS"
session.write_pandas(df, table, schema="CLEANROOM", auto_create_table=True, overwrite=True)
end_time = time.perf_counter()
execution_time = end_time - start_time
response = make_json_response([[0, {"results_table": table, "size": len(ids), "execution_time": round(execution_time, 2)}]])
return response
生成必须通过模板或代码访问的内部表时,可以使用常量表名,也可以动态命名该表并将表的名称返回给用户,然后用户将表名传递给结果函数。
以下是用于存储结果的动态命名表的示例。用户进行了两次调用:一次用于生成数据并获取表名,另一次用于查看结果。
提供商模板调用
reach_impression_regressionUDF 以处理数据(cleanroom前缀表示这是 UDF)。UDF 将内部表前缀名称返回给模板,模板将其返回给调用者。-- This template calls a UDF uploaded by a collaborator. -- The UDF takes two input tables as parameters. CALL samooha_by_snowflake_local_db.provider.add_custom_sql_template( $cleanroom_name, 'prod_calculate_regression', $$ CALL cleanroom.reach_impression_regression({{ source_table[0] }}, {{ my_table[0] | default('NONE') }}); $$ );
The Python UDF generates the internal table and returns the generated table name to the template caller.
def main(session, source_table, my_table): ... table = f'results_{suffix}'.upper() retval_df = session.write_pandas(regression_output, table, schema = 'CLEANROOM', auto_create_table = True) return f'Done, results have been written to the following table: {table}'
The provider template accepts a table name passed in and displays the contents of that table. Note how the table is always accessed from the
cleanroomnamespace.CALL samooha_by_snowflake_local_db.provider.add_custom_sql_template( $cleanroom_name, 'prod_get_results', $$ SELECT * FROM cleanroom.{{ results_table | sqlsafe }}; $$ );
The consumer calls the template, passing in the table name.
CALL samooha_by_snowflake_local_db.consumer.run_analysis( $cleanroom_name, 'prod_get_results', [], [], object_construct( 'results_table', $table_name ) );
触发自定义函数¶
自定义函数可以通过模板调用,也可以通过 Clean Room 中的代码(UDFs、UDTFs、或容器服务端点)调用。任何协作者上传的函数都可以通过任何其他协作者的模板或代码来访问。
应始终在相应的命名空间范围内调用 Clean Room 函数:
:samp:`cleanroom.{function_name}`(调用自定义 UDF/UDTF 函数时)
:samp:`service_functions.{function_name}`(调用以嵌入式 Snowpark 容器服务函数形式公开的函数时)。
以下是从模板调用自定义 UDF 和自定义容器服务端点的示例:
模板使用 cleanroom 范围来访问 UDF 或 UDTFs。
-- Template to generate results. Calls the UDF 'my_function', which
-- generates a results table inside the clean room called 'results'.
CALL samooha_by_snowflake_local_db.provider.add_custom_sql_template(
$cleanroom_name,
'generate_results_template',
$$
CALL cleanroom.my_function({{ source_table[0] }}, {{ my_table[0] | default('NONE') }});
$$
);
模板使用 service_functions 范围来访问容器服务函数。
-- Template to trigger training data generation.
CALL samooha_by_snowflake_local_db.provider.add_custom_sql_template(
$cleanroom_name,
'lal_train',
$$
SELECT service_functions.my_func(
{{ source_table[0] }},
{{ provider_join_col }},
{{ my_table[0] }},
{{ consumer_join_col }},
{{ dimensions | sqlsafe }},
{{ filter_clause }}
) AS train_result;
$$
常见的多步流程模式¶
Snowpark API 示例 处理数据,生成中间表,然后通过一次模板调用生成结果表,再通过第二次模板调用直接公开结果。
Snowpark Container Services 示例 通过一次模板调用创建训练数据,并将训练数据存储到内部表中。第二个模板根据存储的训练数据分析用户输入。