使用自定义逻辑跨分区处理数据

使用分布式分区函数 (DPF) 在计算池中的一个或多个节点上并行处理数据。该工具可以自动处理分布式编排、错误、可观察性及成果持久化。

DPF 按指定列对 Snowpark DataFrame 进行分区,并在每个分区上并行执行 Python 函数。您只需专注于处理逻辑,DPF 将自动处理基础设施复杂性并实现自动扩展。

您可以利用 DPF 在不同数据分段间高效处理大型数据集。该工具非常适合以下场景:按区域分析销售数据、按地理分区处理客户数据,或对需要相同处理逻辑的每个数据分区执行数据转换。DPF 可自动处理分布式数据处理,消除了管理分布式计算基础设施的复杂性。

DPF 允许您在具有 GPU 访问权限的容器化基础设施上使用开源库编写自定义 Python 代码。

重要

使用 DPF 之前,请确保满足以下条件:

  • 容器运行时环境:DPF 需要 Snowflake ML 容器运行时环境。

  • 暂存区访问权限:DPF 会自动将结果和工件存储在 Snowflake 暂存区中。请确保您拥有访问指定命名暂存区的相应权限。

以下章节将引导您在假设的销售数据集上使用 DPF。

按地区处理销售数据

以下示例演示如何使用 DPF 并行处理按地区划分的销售数据:

  1. 定义处理函数 – 创建一个 Python 函数,指定应用于每个数据分区(区域)的转换逻辑。

  2. 使用 DPF 处理数据 – 使用您的处理函数对 DPF 进行初始化,然后在所有分区上同时执行该函数。

  3. 监控进度并等待完成 – 跟踪处理状态并等待所有分区完成执行。

  4. 检索每个分区的结果 – 成功完成后,收集并合并所有区域的处理结果。

  5. 可选:稍后恢复结果 – 无需重新运行整个过程即可访问先前完成的结果。

如果您有销售数据集,则可能会将区域数据作为数据框中的一列。您可能需要将处理逻辑应用于每个区域的数据。您可以使用分布式分区函数 (DPF) 并行处理不同地区的销售数据。

您可以有一个名为 sales_data 的数据框,它包含以下列:

  • region:选择使用 时默认使用的角色和仓库。“北”、“南”、“东”、“西”

  • customer_id:唯一的客户标识符

  • amount:交易金额

  • order_date:交易日期

定义处理函数

以下代码定义了按地区处理销售数据的处理函数。它指定应用于每个数据分区的转换逻辑,并将结果保存到一个暂存区。

from snowflake.ml.utils.connection_params import SnowflakeLoginOptions
from snowflake.ml.modeling.distributors.distributed_partition_function.dpf import DPF
from snowflake.ml.modeling.distributors.distributed_partition_function.entities import RunStatus

# Define function to process each region's data
Copy
def process_sales_data(data_connector, context):
    df = data_connector.to_pandas()
    print(f"Processing {len(df)} records for region: {context.partition_id}")

    # Perform region-specific analytics
    summary = {
        'region': context.partition_id,
        'total_sales': df['amount'].sum(),
        'avg_order_value': df['amount'].mean(),
        'customer_count': df['customer_id'].nunique(),
        'record_count': len(df)
    }

    # Store results in stage for subsequent access
    import json
    context.upload_to_stage(summary, "sales_summary.json",
        write_function=lambda obj, path: json.dump(obj, open(path, 'w')))
Copy

process_sales_data 函数接收两个支持分布式处理的关键参数:

  • data_connector:选择使用 时默认使用的角色和仓库。用于提供 sales_data DataFrame 的访问权限

  • context:选择使用 时默认使用的角色和仓库。用于将结果写入暂存区

对于每个区域,该函数创建以下列:

  • total_sales

  • avg_order_value

  • customer_count

  • record_count

然后,它将结果作为名为 sales_summary.json 的 JSON 文件写入暂存区。

利用函数来使用 DPF 处理数据

创建处理函数后,您可以使用以下代码使用 DPF 函数跨分区并行处理数据。

dpf = DPF(process_sales_data, "analytics_stage")
run = dpf.run(
    partition_by="region",  # Creates separate partitions for North, South, East, West
    snowpark_dataframe=sales_data,
    run_id="regional_analytics_2024"
)
Copy

您可以使用以下代码在多个节点上运行 DPF:

from snowflake.ml.runtime_cluster import scale_cluster

# Scale cluster before running tuner
scale_cluster(2)  # Scale to 2 nodes for parallel trials
Copy

监控进度并等待完成

您可以使用以下代码来监控 DPF 运行进度。

final_status = run.wait()  # Shows progress bar by default
print(f"Job completed with status: {final_status}")
Copy

以下是代码的输出:

Processing partitions: 100%|██████████| 4/4 [02:15<00:00, 33.75s/partition]
Job completed with status: RunStatus.SUCCESS

检索每个分区的结果

如果运行成功完成,则可以检索每个分区的结果。以下代码按地区获取销售结果。

if final_status == RunStatus.SUCCESS:
    # Get results from each region
    import json
    all_results = []
    for partition_id, details in run.partition_details.items():
        summary = details.stage_artifacts_manager.get("sales_summary.json",
            read_function=lambda path: json.load(open(path, 'r')))
        all_results.append(summary)

    # Combine results across all regions
    total_sales = sum(r['total_sales'] for r in all_results)
    total_customers = sum(r['customer_count'] for r in all_results)
else:
    # Handle failures - check logs for failed partitions
    for partition_id, details in run.partition_details.items():
        if details.status != "DONE":
            error_logs = details.logs
Copy

可选:恢复已完成运行的结果

您可以从暂存区恢复已完成的运行并访问相同的结果,而无需重新运行该进程。以下代码演示了如何执行此操作:

# Restore completed run from stage and access same results as above without re-running.
from snowflake.ml.modeling.distributors.distributed_partition_function.dpf_run import (
    DPFRun
)
restored_run = DPFRun.restore_from("regional_analytics_2024", "analytics_stage")
Copy

后续步骤

  • 探索 跨数据分区训练模型,瞭解如何使用 DPF 作为底层基础设施训练多个 ML 模型

  • 有关更高级的使用模式、错误处理策略和性能优化技术,请参阅 Snowflake ML Python 包中的完整 API 文档

语言: 中文