使用自定义逻辑跨分区处理数据¶
使用分布式分区函数 (DPF) 在计算池中的一个或多个节点上并行处理数据。该工具可以自动处理分布式编排、错误、可观察性及成果持久化。
DPF 按指定列对 Snowpark DataFrame 进行分区,并在每个分区上并行执行 Python 函数。您只需专注于处理逻辑,DPF 将自动处理基础设施复杂性并实现自动扩展。
您可以利用 DPF 在不同数据分段间高效处理大型数据集。该工具非常适合以下场景:按区域分析销售数据、按地理分区处理客户数据,或对需要相同处理逻辑的每个数据分区执行数据转换。DPF 可自动处理分布式数据处理,消除了管理分布式计算基础设施的复杂性。
DPF 允许您在具有 GPU 访问权限的容器化基础设施上使用开源库编写自定义 Python 代码。
重要
使用 DPF 之前,请确保满足以下条件:
容器运行时环境:DPF 需要 Snowflake ML 容器运行时环境。
暂存区访问权限:DPF 会自动将结果和工件存储在 Snowflake 暂存区中。请确保您拥有访问指定命名暂存区的相应权限。
以下章节将引导您在假设的销售数据集上使用 DPF。
按地区处理销售数据¶
以下示例演示如何使用 DPF 并行处理按地区划分的销售数据:
定义处理函数 – 创建一个 Python 函数,指定应用于每个数据分区(区域)的转换逻辑。
使用 DPF 处理数据 – 使用您的处理函数对 DPF 进行初始化,然后在所有分区上同时执行该函数。
监控进度并等待完成 – 跟踪处理状态并等待所有分区完成执行。
检索每个分区的结果 – 成功完成后,收集并合并所有区域的处理结果。
可选:稍后恢复结果 – 无需重新运行整个过程即可访问先前完成的结果。
如果您有销售数据集,则可能会将区域数据作为数据框中的一列。您可能需要将处理逻辑应用于每个区域的数据。您可以使用分布式分区函数 (DPF) 并行处理不同地区的销售数据。
您可以有一个名为 sales_data
的数据框,它包含以下列:
region
:选择使用 时默认使用的角色和仓库。“北”、“南”、“东”、“西”customer_id
:唯一的客户标识符amount
:交易金额order_date
:交易日期
定义处理函数¶
以下代码定义了按地区处理销售数据的处理函数。它指定应用于每个数据分区的转换逻辑,并将结果保存到一个暂存区。
from snowflake.ml.utils.connection_params import SnowflakeLoginOptions
from snowflake.ml.modeling.distributors.distributed_partition_function.dpf import DPF
from snowflake.ml.modeling.distributors.distributed_partition_function.entities import RunStatus
# Define function to process each region's data
def process_sales_data(data_connector, context):
df = data_connector.to_pandas()
print(f"Processing {len(df)} records for region: {context.partition_id}")
# Perform region-specific analytics
summary = {
'region': context.partition_id,
'total_sales': df['amount'].sum(),
'avg_order_value': df['amount'].mean(),
'customer_count': df['customer_id'].nunique(),
'record_count': len(df)
}
# Store results in stage for subsequent access
import json
context.upload_to_stage(summary, "sales_summary.json",
write_function=lambda obj, path: json.dump(obj, open(path, 'w')))
process_sales_data
函数接收两个支持分布式处理的关键参数:
data_connector
:选择使用 时默认使用的角色和仓库。用于提供sales_data
DataFrame 的访问权限context
:选择使用 时默认使用的角色和仓库。用于将结果写入暂存区
对于每个区域,该函数创建以下列:
total_sales
avg_order_value
customer_count
record_count
然后,它将结果作为名为 sales_summary.json
的 JSON 文件写入暂存区。
利用函数来使用 DPF 处理数据¶
创建处理函数后,您可以使用以下代码使用 DPF 函数跨分区并行处理数据。
dpf = DPF(process_sales_data, "analytics_stage")
run = dpf.run(
partition_by="region", # Creates separate partitions for North, South, East, West
snowpark_dataframe=sales_data,
run_id="regional_analytics_2024"
)
您可以使用以下代码在多个节点上运行 DPF:
from snowflake.ml.runtime_cluster import scale_cluster
# Scale cluster before running tuner
scale_cluster(2) # Scale to 2 nodes for parallel trials
监控进度并等待完成¶
您可以使用以下代码来监控 DPF 运行进度。
final_status = run.wait() # Shows progress bar by default
print(f"Job completed with status: {final_status}")
以下是代码的输出:
Processing partitions: 100%|██████████| 4/4 [02:15<00:00, 33.75s/partition]
Job completed with status: RunStatus.SUCCESS
检索每个分区的结果¶
如果运行成功完成,则可以检索每个分区的结果。以下代码按地区获取销售结果。
if final_status == RunStatus.SUCCESS:
# Get results from each region
import json
all_results = []
for partition_id, details in run.partition_details.items():
summary = details.stage_artifacts_manager.get("sales_summary.json",
read_function=lambda path: json.load(open(path, 'r')))
all_results.append(summary)
# Combine results across all regions
total_sales = sum(r['total_sales'] for r in all_results)
total_customers = sum(r['customer_count'] for r in all_results)
else:
# Handle failures - check logs for failed partitions
for partition_id, details in run.partition_details.items():
if details.status != "DONE":
error_logs = details.logs
可选:恢复已完成运行的结果¶
您可以从暂存区恢复已完成的运行并访问相同的结果,而无需重新运行该进程。以下代码演示了如何执行此操作:
# Restore completed run from stage and access same results as above without re-running.
from snowflake.ml.modeling.distributors.distributed_partition_function.dpf_run import (
DPFRun
)
restored_run = DPFRun.restore_from("regional_analytics_2024", "analytics_stage")
后续步骤¶
探索 跨数据分区训练模型,瞭解如何使用 DPF 作为底层基础设施训练多个 ML 模型
有关更高级的使用模式、错误处理策略和性能优化技术,请参阅 Snowflake ML Python 包中的完整 API 文档