Process data with custom logic across partitions¶
Use the Distributed Partition Function (DPF) to process data in parallel across one or more nodes in a compute pool. It handles distributed orchestration, errors, observability, and artifact persistence automatically.
DPF partitions your Snowpark DataFrame by a specified column and executes your Python function on each partition in parallel. Focus on your processing logic while DPF handles infrastructure complexity and scales automatically.
You can use DPF to process large datasets efficiently across different data segments. This tool is ideal for scenarios like analyzing sales data by region, processing customer data by geographic segments, or performing data transformations where each data partition requires the same processing logic. DPF handles the distributed data processing automatically, eliminating the complexity of managing distributed computing infrastructure.
DPF lets you write custom Python code using open source libraries on containerized infrastructure with GPU access.
Important
Before you use DPF, make sure you have the following:
Container Runtime Environment: DPF requires a Snowflake ML container runtime environment.
Stage Access Permissions: DPF automatically stores results and artifacts in Snowflake stages. Ensure you have appropriate permissions to access the specified named stage.
The following section walks you through using DPF on a hypothetical sales dataset.
Process sales data by region¶
The following example demonstrates how to use DPF to process sales data by region in parallel:
Define the processing function - Create a Python function that specifies the transformation logic to apply to each data partition (region).
Use DPF to process data - Initialize DPF with your processing function and execute it across all partitions simultaneously.
Monitor progress and wait for completion - Track the processing status and wait for all partitions to finish executing.
Retrieve results from each partition - Collect and combine the processed results from all regions after successful completion.
Optional: Restore results later - Access previously completed results without re-running the entire process.
If you have a sales dataset, you might have regional data as a column in the dataframe. You might want to apply processing logic to each region’s data. You can use the Distributed Partition Function (DPF) to process sales data for different regions in parallel.
You could have a dataframe called sales_data
that contains the following columns:
region
: ‘North’, ‘South’, ‘East’, ‘West’customer_id
: unique customer identifiersamount
: transaction amountsorder_date
: transaction dates
Define the processing function¶
The following code defines a processing function that processes the sales data by region. It specifies the transformation logic that’s applied to each data partition and saves the results to a stage.
from snowflake.ml.utils.connection_params import SnowflakeLoginOptions
from snowflake.ml.distributed import DPF, RunStatus
# Define function to process each region's data
def process_sales_data(data_connector, context):
df = data_connector.to_pandas()
print(f"Processing {len(df)} records for region: {context.partition_id}")
# Perform region-specific analytics
summary = {
'region': context.partition_id,
'total_sales': df['amount'].sum(),
'avg_order_value': df['amount'].mean(),
'customer_count': df['customer_id'].nunique(),
'record_count': len(df)
}
# Store results in stage for subsequent access
import json
context.upload_to_stage(summary, "sales_summary.json",
write_function=lambda obj, path: json.dump(obj, open(path, 'w')))
The process_sales_data
function receives two key arguments that enable distributed processing:
data_connector
: Used to provide access to thesales_data
DataFramecontext
: Used to write the results to a stage
For each region, the function creates the following columns:
total_sales
avg_order_value
customer_count
record_count
It then writes the results to a stage as a JSON file named sales_summary.json
.
Use DPF to process data with the function¶
After you create a processing function, you can use the following code to process the data in parallel across partitions with the DPF function.
dpf = DPF(process_sales_data, "analytics_stage")
run = dpf.run(
partition_by="region", # Creates separate partitions for North, South, East, West
snowpark_dataframe=sales_data,
run_id="regional_analytics_2024"
)
You can use the following code to run DPF across multiple nodes:
from snowflake.ml.runtime_cluster import scale_cluster
# Scale cluster before running tuner
scale_cluster(2) # Scale to 2 nodes for parallel trials
Monitor Progress and Wait for Completion¶
You can use the following code to monitor the progress of the DPF run.
final_status = run.wait() # Shows progress bar by default
print(f"Job completed with status: {final_status}")
The following is the code’s output:
Processing partitions: 100%|██████████| 4/4 [02:15<00:00, 33.75s/partition]
Job completed with status: RunStatus.SUCCESS
Retrieve results from each partition¶
If the run completes successfully, you can retrieve the results from each partition. The following code gets the sales results by region.
if final_status == RunStatus.SUCCESS:
# Get results from each region
import json
all_results = []
for partition_id, details in run.partition_details.items():
summary = details.stage_artifacts_manager.get("sales_summary.json",
read_function=lambda path: json.load(open(path, 'r')))
all_results.append(summary)
# Combine results across all regions
total_sales = sum(r['total_sales'] for r in all_results)
total_customers = sum(r['customer_count'] for r in all_results)
else:
# Handle failures - check logs for failed partitions
for partition_id, details in run.partition_details.items():
if details.status != "DONE":
error_logs = details.logs
Optional: Restore results from completed run¶
You can restore the completed run from the stage and access the same results without re-running the process. The following code demonstrates how to do this:
# Restore completed run from stage and access same results as above without re-running.
restored_run = DPFRun.restore_from("regional_analytics_2024", "analytics_stage")
Next Steps¶
Explore Train models across data partitions to learn about training multiple ML models using DPF as the underlying infrastructure
For more advanced usage patterns, error handling strategies, and performance optimization techniques, refer to the complete API documentation in the Snowflake ML Python package