使用 Ray 扩展应用¶
Snowflake 容器运行时集成了 Ray (https://docs.ray.io/),这是一个开源统一框架,用于扩展 AI 和 Python 应用。 此集成允许您在 Snowflake 上利用 Ray 的分布式计算能力处理机器学习工作负载。
Ray 已预装,并作为后台进程在 Snowflake ML 容器运行时中运行。您可以通过以下方式在 ML 的容器运行时中访问 Ray:
Snowflake 笔记本:交互式环境,可连接到 Ray、定义任务,并动态扩展集群以进行开发和实验。
Snowflake ML 作业:将 Ray 应用提交为结构化、可重复的作业。可在作业配置中指定集群规模以满足生产工作负载。
在 Snowflake Notebook 或 ML Job 中运行容器运行时时,Ray 进程会作为容器的一部分自动启动。。
使用以下 Python 代码连接到集群:
import ray
# Connect to the pre-existing Ray cluster within the Snowflake environment
ray.init(address="auto", ignore_reinit_error=True)
print(f"Ray cluster resources: {ray.cluster_resources()}")
重要
确保在连接 Ray 集群时始终使用 "auto" 地址。使用 "auto" 地址初始化将指向 Snowflake 为您的会话提供的 Ray 集群头节点。
扩展 Ray 集群¶
连接到 Ray 集群后,可根据工作负载的计算需求调整集群规模。
使用以下方法扩展 Ray 集群:
在笔记本中,可使用 scale_cluster 函数动态上下调集群规模,适合资源需求可能变化的交互式工作流。
指定 expected_cluster_size=5 时,集群包含 1 个头节点和 4 个工作节点。
from snowflake.ml.runtime_cluster import scale_cluster, get_nodes
# Check current cluster size
print(f"Current cluster size: {len(get_nodes())} nodes")
# Scale up to 4 nodes (1 head + 3 workers)
print("Scaling up cluster...")
scale_cluster(expected_cluster_size=4)
print(f"New cluster size: {len(get_nodes())} nodes")
在 ML 作业中,可在作业定义中声明性地定义集群规模。在作业定义中指定集群规模,可确保作业启动时按需提供所需节点。
作业装饰器可能包括:
from snowflake.ml.jobs import remote
@remote(
"MY_COMPUTE_POOL",
stage_name="payload_stage",
session=session,
target_instances=5 # Specify the number of nodes
)
def distributed_ray():
import ray
ray.init(address="auto", ignore_reinit_error=True)
print(f"Ray cluster resources: {ray.cluster_resources()}")
job = distributed_ray()
使用完成后,可缩减集群规模以节约资源。有关更多信息,请参阅 清理。
使用 Ray 控制面板监控¶
在 Snowflake 笔记本中运行作业时,可使用 Ray 控制面板监控集群。Dashboard 是 Web 界面,可查看集群资源、作业、任务和性能。使用以下代码访问控制面板的 URL:
from snowflake.ml.runtime_cluster import get_ray_dashboard_url
# This function is available in Notebooks to retrieve the dashboard URL
dashboard_url = get_ray_dashboard_url()
print(f"Access the Ray Dashboard here: {dashboard_url}")
在新浏览器标签中打开 URL,使用 Snowflake 凭据登录。
高级用例¶
本节涵盖复杂工作负载及迁移现有应用的高级 Ray 功能。
使用 Ray 创建和操作分布式工作负载¶
Ray 提供组件,可创建和操作分布式工作负载,包括 Ray Core 的基础组件,用于构建和扩展工作负载。
还包括以下库,可用于构建数据预处理、ML 训练、超参数调优和模型推理的工作流:
Ray Data:可扩展的数据处理和转换
Ray Train:ML 模型的分布式训练与微调
Ray Tune:高级搜索算法的超参数优化
Ray Serve:模型服务与推理
以下部分说明如何直接使用这些库,同时 Snowflake 基于 Ray 的原生接口提供额外工具以构建、部署和操作 Ray 应用。
射线核心:Ray Core:任务 (Tasks) 与 执行体 (Actors)¶
Ray 提供以下分布式计算原语:
Tasks(任务):无状态函数,远程执行并返回结果
Actors(执行体):有状态类,可远程实例化并多次调用
Objects(对象):存储在 Ray 分布式对象存储中的不可变值
Resources(资源):CPU、GPU 以及任务和执行体的自定义资源需求
示例展示如何使用基础 Ray Task 与 Actor 进行线性回归:
import ray
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression
# Initialize Ray (automatically connects to cluster in Snowflake ML)
ray.init(address="auto", ignore_reinit_error=True)
# Create sample data
large_dataset = np.random.randn(1000, 10)
batch_data = pd.DataFrame(np.random.randn(100, 5), columns=[f'feature_{i}' for i in range(5)])
# Ray Tasks - stateless remote functions
@ray.remote
def compute_heavy_task(data):
"""CPU-intensive computation example"""
# Simulate heavy computation (matrix operations)
result = np.dot(data, data.T)
return np.mean(result)
# Ray Actors - stateful remote classes
@ray.remote
class DataProcessor:
def __init__(self):
# Load a simple model
self.model = LinearRegression()
# Train on dummy data
X_dummy = np.random.randn(100, 5)
y_dummy = np.random.randn(100)
self.model.fit(X_dummy, y_dummy)
def process_batch(self, batch):
# Convert to numpy if it's a DataFrame
if isinstance(batch, pd.DataFrame):
batch_array = batch.values
else:
batch_array = batch
return self.model.predict(batch_array)
# Submit tasks and get object references
future = compute_heavy_task.remote(large_dataset)
result = ray.get(future) # Blocks until task completes
print(f"Task result: {result}")
# Create and use actors
processor = DataProcessor.remote()
batch_result = ray.get(processor.process_batch.remote(batch_data))
print(f"Batch processing result shape: {batch_result.shape}")
Ray Train:分布式训练¶
Ray Train 是一个支持分布式训练和模型微调的库。您可以在单机或整个集群上运行训练代码。Snowflake 环境中使用 Ray 时,Ray Train 支持单节点执行,不支持多节点执行。
对于多节点分布式训练,请使用容器运行时的优化训练函数。这些函数提供集成的 XGBoost、LightGBM、PyTorch 分布式训练和自动存储处理,内部使用相同 Ray 集群。
Ray Data:可扩展数据处理¶
为 ML 工作负载提供可扩展的分布式数据处理能力。通过流式执行和惰性计算处理比集群内存更大的数据集。
备注
Snowflake 提供原生集成,可将任何 Snowflake 数据源转换为 Ray Data。更多信息,请参阅 Data Connector 和 Ray Data Ingestion 页面。
Ray Data 的使用场景:
处理单节点内存无法容纳的大型数据集
分布式数据预处理与特征工程
构建可与其他 Ray 库集成的数据管道
import ray
import ray.data as rd
import pandas as pd
import numpy as np
from snowflake.ml.runtime_cluster import scale_cluster
# Initialize Ray
ray.init(address="auto", ignore_reinit_error=True)
# Optional: Scale cluster for better performance with large datasets or CPU-intensive operations
# Scaling benefits Ray Data when:
# - Processing datasets larger than single-node memory (>10GB)
# - Performing CPU-intensive transformations (complex feature engineering, ML preprocessing)
# - Need faster processing through parallelization across multiple nodes
scale_cluster(expected_cluster_size=4)
# Create sample dataset
np.random.seed(42)
n_samples = 50000
n_features = 15
# Generate features with some correlation structure
base_features = np.random.randn(n_samples, 5)
derived_features = np.column_stack([
base_features[:, 0] * base_features[:, 1], # interaction
np.sin(base_features[:, 2]), # non-linear
base_features[:, 3] ** 2, # polynomial
np.random.randn(n_samples, n_features - 8) # additional random features
])
X = np.column_stack([base_features, derived_features])
y = (X[:, 0] + 0.5 * X[:, 1] - 0.3 * X[:, 2] + 0.1 * X[:, 5] + np.random.randn(n_samples) * 0.2 > 0).astype(int)
sample_data = pd.DataFrame(X, columns=[f'feature_{i}' for i in range(n_features)])
sample_data['target'] = y
print(f"Created dataset with {n_samples} samples and {n_features} features")
# Create Ray Dataset from pandas DataFrame
ray_dataset = rd.from_pandas(sample_data)
# Transform data with Ray Data operations
def preprocess_batch(batch):
"""Preprocess a batch of data"""
# Get all feature columns
feature_cols = [col for col in batch.columns if col.startswith('feature_')]
# Normalize numerical features (first 3 for demo)
for col in feature_cols[:3]:
if col in batch.columns:
batch[f'{col}_scaled'] = (batch[col] - batch[col].mean()) / batch[col].std()
# Add derived features using actual column names
if 'feature_0' in batch.columns and 'feature_1' in batch.columns:
batch['feature_0_squared'] = batch['feature_0'] ** 2
batch['feature_interaction'] = batch['feature_0'] * batch['feature_1']
return batch
# Apply transformations lazily
processed_dataset = ray_dataset.map_batches(
preprocess_batch,
batch_format="pandas"
)
# Repartition for optimal performance across cluster nodes
processed_dataset = processed_dataset.repartition(num_blocks=8)
# Convert to different formats for downstream use
print("Converting to pandas...")
pandas_df = processed_dataset.to_pandas() # Collect to pandas
print(f"Processed dataset shape: {pandas_df.shape}")
print(f"New columns: {list(pandas_df.columns)}")
# Iterate through batches for memory efficiency
print("Processing batches...")
batch_count = 0
for batch in processed_dataset.iter_batches(batch_size=1000, batch_format="pandas"):
batch_count += 1
print(f"Batch {batch_count}: {batch.shape}")
if batch_count >= 3: # Just show first 3 batches
break
print(f"Total batches processed: {batch_count}")
Ray Tune:分布式超参数调优¶
Ray Tune 提供分布式超参数优化,支持高级搜索算法和提前停止功能。在读取 Snowflake 数据源时,使用原生超参数优化 (HPO) API 可获得更集成、优化的体验。有关使用 HPO 优化的更多信息,请参阅 优化模型的超参数。
如果需要更可定制的分布式 HPO 实现,可使用 Ray Tune。
Ray Tune 的使用场景如下:
并行多次试验的超参数优化
高级搜索算法(贝叶斯优化、基于种群的训练)
需要分布式执行的大规模超参数搜索
import ray
from ray import tune
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from snowflake.ml.runtime_cluster import scale_cluster
# Initialize Ray
ray.init(address="auto", ignore_reinit_error=True)
# Optional: Scale cluster for hyperparameter tuning
# Scaling benefits Ray Tune when:
# - Running many trials in parallel
# - Each trial is computationally intensive
# - Need faster hyperparameter search
scale_cluster(expected_cluster_size=6)
# Create sample dataset
np.random.seed(42)
n_samples = 5000
n_features = 10
X = np.random.randn(n_samples, n_features)
y = ((X[:, 0] + X[:, 1] * X[:, 2] + np.sin(X[:, 3]) + np.random.randn(n_samples) * 0.3) > 0).astype(int)
# Split data
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)
def train_function(config):
"""Training function that gets hyperparameters from Ray Tune"""
# Train model with current hyperparameters
model = RandomForestClassifier(
n_estimators=config["n_estimators"],
max_depth=config["max_depth"],
min_samples_split=config["min_samples_split"],
random_state=42,
n_jobs=-1
)
model.fit(X_train, y_train)
# Evaluate and report results
val_predictions = model.predict(X_val)
accuracy = accuracy_score(y_val, val_predictions)
# Report metrics back to Ray Tune
return {"accuracy": accuracy}
# Define search space
search_space = {
"n_estimators": tune.randint(50, 200),
"max_depth": tune.randint(3, 15),
"min_samples_split": tune.randint(2, 10)
}
# Configure and run hyperparameter optimization
tuner = tune.Tuner(
tune.with_resources(
train_function,
resources={"CPU": 2}
),
param_space=search_space,
tune_config=tune.TuneConfig(
metric="accuracy",
mode="max",
num_samples=20, # Number of trials
max_concurrent_trials=4
)
)
print("Starting hyperparameter optimization...")
results = tuner.fit()
# Get best results
best_result = results.get_best_result()
print(f"✅ Hyperparameter tuning completed!")
print(f" Best accuracy: {best_result.metrics['accuracy']:.4f}")
print(f" Best parameters: {best_result.config}")
# Show results summary
df_results = results.get_dataframe()
print(f"\nTop 5 results:")
top_results = df_results.nlargest(5, 'accuracy')
for i, (_, row) in enumerate(top_results.iterrows(), 1):
print(f" {i}. Accuracy: {row['accuracy']:.4f}, n_estimators: {row['config/n_estimators']}, max_depth: {row['config/max_depth']}")
模型服务¶
对于模型部署,可使用 Snowflake 原生能力。有关更多信息,请参阅 Snowpark Container Services 中的 Model Serving。
在 Ray 集群上提交与管理分布式应用¶
使用 Ray Jobs 提交和管理 Ray 集群上的分布式应用,以获得更好的资源隔离和生命周期管理。对于所有需要访问 Ray 集群的基于任务的执行,Snowflake 建议使用 ML 作业,您可以在其中定义 Ray 应用程序逻辑。在需要直接访问 Ray Job 接口的情况下(例如迁移现有实现),可以按 Ray 文档 (https://docs.ray.io/en/latest/cluster/running-applications/job-submission/sdk.html) 中的说明使用 Ray Job 基元。
使用 Ray Job 的场景包括:
生产环境中的 ML 管道和计划任务
需要容错能力的长期运行负载
批处理和大规模数据处理
import ray
from ray.job_submission import JobSubmissionClient
import os
# Initialize Ray and get job client
ray.init(address="auto", ignore_reinit_error=True)
# Get Ray dashboard address for job submission
node_ip = os.getenv("NODE_IP_ADDRESS", "0.0.0.0")
dashboard_port = os.getenv("DASHBOARD_PORT", "9999")
dashboard_address = f"http://{node_ip}:{dashboard_port}"
client = JobSubmissionClient(dashboard_address)
# Simple job script
job_script = '''
import ray
@ray.remote
def compute_task(x):
return x * x
# Submit tasks to Ray cluster
futures = [compute_task.remote(i) for i in range(5)]
results = ray.get(futures)
print(f"Results: {results}")
'''
# Submit job
job_id = client.submit_job(
entrypoint=f"python -c '{job_script}'",
runtime_env={"pip": ["numpy"]},
submission_id="my-ray-job"
)
print(f"Submitted job: {job_id}")
# Monitor job status
status = client.get_job_status(job_id)
print(f"Job status: {status}")
使用选项扩展 Ray 集群¶
在 Snowflake 笔记本中,可以根据计算需求精确调整 Ray 集群的规模。一个集群由一个主节点(协调器)和多个工作节点(执行任务)组成。
from snowflake.ml.runtime_cluster import scale_cluster, get_nodes
# Asynchronous scaling - returns immediately
scale_cluster(
expected_cluster_size=2,
is_async=True # Don't wait for all nodes to be ready
)
# Scaling with custom options
scale_cluster(
expected_cluster_size=3,
options={
"rollback_after_seconds": 300, # Auto-rollback after 5 minutes
"block_until_min_cluster_size": 2 # Return when at least 2 nodes ready
}
)
# Scale down for cost efficiency
scale_cluster(expected_cluster_size=2)
资源监视¶
import ray
from snowflake.ml.runtime_cluster import get_nodes
from snowflake.ml.runtime_cluster.cluster_manager import (
get_available_cpu, get_available_gpu, get_num_cpus_per_node
)
# Check available resources
available_cpus = get_available_cpu()
available_gpus = get_available_gpu()
cpus_per_node = get_num_cpus_per_node()
print(f"Available CPUs: {available_cpus}")
print(f"Available GPUs: {available_gpus}")
print(f"CPUs per node: {cpus_per_node}")
# Get Ray's view of resources
ray_resources = ray.available_resources()
print(f"Ray available resources: {ray_resources}")
# Calculate resource utilization
total_cpus = ray.cluster_resources().get('CPU', 0)
used_cpus = total_cpus - available_cpus
utilization = (used_cpus / total_cpus * 100) if total_cpus > 0 else 0
print(f"CPU Utilization: {utilization:.1f}%")
清理¶
集群使用完毕后,可以将其缩小以避免产生额外费用。使用以下代码缩小集群:
# Scale down when finished to conserve resources
print("Scaling down cluster...")
scale_cluster(expected_cluster_size=1)
print(f"Final cluster size: {len(get_nodes())} nodes")