Process data with custom logic across partitions

Use the Distributed Partition Function (DPF) to process data in parallel across one or more nodes in a compute pool. It handles distributed orchestration, errors, observability, and artifact persistence automatically.

DPF partitions your Snowpark DataFrame by a specified column and executes your Python function on each partition in parallel. Focus on your processing logic while DPF handles infrastructure complexity and scales automatically.

You can use DPF to process large datasets efficiently across different data segments. This tool is ideal for scenarios like analyzing sales data by region, processing customer data by geographic segments, or performing data transformations where each data partition requires the same processing logic. DPF handles the distributed data processing automatically, eliminating the complexity of managing distributed computing infrastructure.

DPF lets you write custom Python code using open source libraries on containerized infrastructure with GPU access.

Important

Before you use DPF, make sure you have the following:

  • Container Runtime Environment: DPF requires a Snowflake ML container runtime environment.

  • Stage Access Permissions: DPF automatically stores results and artifacts in Snowflake stages. Ensure you have appropriate permissions to access the specified named stage.

The following section walks you through using DPF on a hypothetical sales dataset.

Process sales data by region

The following example demonstrates how to use DPF to process sales data by region in parallel:

  1. Define the processing function - Create a Python function that specifies the transformation logic to apply to each data partition (region).

  2. Use DPF to process data - Initialize DPF with your processing function and execute it across all partitions simultaneously.

  3. Monitor progress and wait for completion - Track the processing status and wait for all partitions to finish executing.

  4. Retrieve results from each partition - Collect and combine the processed results from all regions after successful completion.

  5. Optional: Restore results later - Access previously completed results without re-running the entire process.

If you have a sales dataset, you might have regional data as a column in the dataframe. You might want to apply processing logic to each region’s data. You can use the Distributed Partition Function (DPF) to process sales data for different regions in parallel.

You could have a dataframe called sales_data that contains the following columns:

  • region: ‘North’, ‘South’, ‘East’, ‘West’

  • customer_id: unique customer identifiers

  • amount: transaction amounts

  • order_date: transaction dates

Define the processing function

The following code defines a processing function that processes the sales data by region. It specifies the transformation logic that’s applied to each data partition and saves the results to a stage.

from snowflake.ml.utils.connection_params import SnowflakeLoginOptions
from snowflake.ml.distributed import DPF, RunStatus

# Define function to process each region's data
Copy
def process_sales_data(data_connector, context):
    df = data_connector.to_pandas()
    print(f"Processing {len(df)} records for region: {context.partition_id}")

    # Perform region-specific analytics
    summary = {
        'region': context.partition_id,
        'total_sales': df['amount'].sum(),
        'avg_order_value': df['amount'].mean(),
        'customer_count': df['customer_id'].nunique(),
        'record_count': len(df)
    }

    # Store results in stage for subsequent access
    import json
    context.upload_to_stage(summary, "sales_summary.json",
        write_function=lambda obj, path: json.dump(obj, open(path, 'w')))
Copy

The process_sales_data function receives two key arguments that enable distributed processing:

  • data_connector: Used to provide access to the sales_data DataFrame

  • context: Used to write the results to a stage

For each region, the function creates the following columns:

  • total_sales

  • avg_order_value

  • customer_count

  • record_count

It then writes the results to a stage as a JSON file named sales_summary.json.

Use DPF to process data with the function

After you create a processing function, you can use the following code to process the data in parallel across partitions with the DPF function.

dpf = DPF(process_sales_data, "analytics_stage")
run = dpf.run(
    partition_by="region",  # Creates separate partitions for North, South, East, West
    snowpark_dataframe=sales_data,
    run_id="regional_analytics_2024"
)
Copy

You can use the following code to run DPF across multiple nodes:

from snowflake.ml.runtime_cluster import scale_cluster

# Scale cluster before running tuner
scale_cluster(2)  # Scale to 2 nodes for parallel trials
Copy

Monitor Progress and Wait for Completion

You can use the following code to monitor the progress of the DPF run.

final_status = run.wait()  # Shows progress bar by default
print(f"Job completed with status: {final_status}")
Copy

The following is the code’s output:

Processing partitions: 100%|██████████| 4/4 [02:15<00:00, 33.75s/partition]
Job completed with status: RunStatus.SUCCESS

Retrieve results from each partition

If the run completes successfully, you can retrieve the results from each partition. The following code gets the sales results by region.

if final_status == RunStatus.SUCCESS:
    # Get results from each region
    import json
    all_results = []
    for partition_id, details in run.partition_details.items():
        summary = details.stage_artifacts_manager.get("sales_summary.json",
            read_function=lambda path: json.load(open(path, 'r')))
        all_results.append(summary)

    # Combine results across all regions
    total_sales = sum(r['total_sales'] for r in all_results)
    total_customers = sum(r['customer_count'] for r in all_results)
else:
    # Handle failures - check logs for failed partitions
    for partition_id, details in run.partition_details.items():
        if details.status != "DONE":
            error_logs = details.logs
Copy

Optional: Restore results from completed run

You can restore the completed run from the stage and access the same results without re-running the process. The following code demonstrates how to do this:

# Restore completed run from stage and access same results as above without re-running.
restored_run = DPFRun.restore_from("regional_analytics_2024", "analytics_stage")
Copy

Next Steps

  • Explore Train models across data partitions to learn about training multiple ML models using DPF as the underlying infrastructure

  • For more advanced usage patterns, error handling strategies, and performance optimization techniques, refer to the complete API documentation in the Snowflake ML Python package

Language: English