使用 Ray 扩展应用

Snowflake 容器运行时集成了 Ray (https://docs.ray.io/),这是一个开源统一框架,用于扩展 AI 和 Python 应用。 此集成允许您在 Snowflake 上利用 Ray 的分布式计算能力处理机器学习工作负载。

Ray 已预装,并作为后台进程在 Snowflake ML 容器运行时中运行。您可以通过以下方式在 ML 的容器运行时中访问 Ray:

Snowflake 笔记本:交互式环境,可连接到 Ray、定义任务,并动态扩展集群以进行开发和实验。

Snowflake ML 作业:将 Ray 应用提交为结构化、可重复的作业。可在作业配置中指定集群规模以满足生产工作负载。

在 Snowflake Notebook 或 ML Job 中运行容器运行时时,Ray 进程会作为容器的一部分自动启动。。

使用以下 Python 代码连接到集群:

import ray
# Connect to the pre-existing Ray cluster within the Snowflake environment
ray.init(address="auto", ignore_reinit_error=True)
print(f"Ray cluster resources: {ray.cluster_resources()}")
Copy

重要

确保在连接 Ray 集群时始终使用 "auto" 地址。使用 "auto" 地址初始化将指向 Snowflake 为您的会话提供的 Ray 集群头节点。

扩展 Ray 集群

连接到 Ray 集群后,可根据工作负载的计算需求调整集群规模。

使用以下方法扩展 Ray 集群:

在笔记本中,可使用 scale_cluster 函数动态上下调集群规模,适合资源需求可能变化的交互式工作流。

指定 expected_cluster_size=5 时,集群包含 1 个头节点和 4 个工作节点。

from snowflake.ml.runtime_cluster import scale_cluster, get_nodes

# Check current cluster size
print(f"Current cluster size: {len(get_nodes())} nodes")

# Scale up to 4 nodes (1 head + 3 workers)
print("Scaling up cluster...")
scale_cluster(expected_cluster_size=4)
print(f"New cluster size: {len(get_nodes())} nodes")
Copy

使用完成后,可缩减集群规模以节约资源。有关更多信息,请参阅 清理

使用 Ray 控制面板监控

在 Snowflake 笔记本中运行作业时,可使用 Ray 控制面板监控集群。Dashboard 是 Web 界面,可查看集群资源、作业、任务和性能。使用以下代码访问控制面板的 URL:

from snowflake.ml.runtime_cluster import get_ray_dashboard_url

# This function is available in Notebooks to retrieve the dashboard URL
dashboard_url = get_ray_dashboard_url()
print(f"Access the Ray Dashboard here: {dashboard_url}")
Copy

在新浏览器标签中打开 URL,使用 Snowflake 凭据登录。

高级用例

本节涵盖复杂工作负载及迁移现有应用的高级 Ray 功能。

使用 Ray 创建和操作分布式工作负载

Ray 提供组件,可创建和操作分布式工作负载,包括 Ray Core 的基础组件,用于构建和扩展工作负载。

还包括以下库,可用于构建数据预处理、ML 训练、超参数调优和模型推理的工作流:

  • Ray Data:可扩展的数据处理和转换

  • Ray Train:ML 模型的分布式训练与微调

  • Ray Tune:高级搜索算法的超参数优化

  • Ray Serve:模型服务与推理

以下部分说明如何直接使用这些库,同时 Snowflake 基于 Ray 的原生接口提供额外工具以构建、部署和操作 Ray 应用。

射线核心:Ray Core:任务 (Tasks) 与 执行体 (Actors)

Ray 提供以下分布式计算原语:

  • Tasks(任务):无状态函数,远程执行并返回结果

  • Actors(执行体):有状态类,可远程实例化并多次调用

  • Objects(对象):存储在 Ray 分布式对象存储中的不可变值

  • Resources(资源):CPU、GPU 以及任务和执行体的自定义资源需求

示例展示如何使用基础 Ray Task 与 Actor 进行线性回归:

import ray
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression

# Initialize Ray (automatically connects to cluster in Snowflake ML)
ray.init(address="auto", ignore_reinit_error=True)

# Create sample data
large_dataset = np.random.randn(1000, 10)
batch_data = pd.DataFrame(np.random.randn(100, 5), columns=[f'feature_{i}' for i in range(5)])

# Ray Tasks - stateless remote functions
@ray.remote
def compute_heavy_task(data):
    """CPU-intensive computation example"""
    # Simulate heavy computation (matrix operations)
    result = np.dot(data, data.T)
    return np.mean(result)

# Ray Actors - stateful remote classes
@ray.remote
class DataProcessor:
    def __init__(self):
        # Load a simple model
        self.model = LinearRegression()
        # Train on dummy data
        X_dummy = np.random.randn(100, 5)
        y_dummy = np.random.randn(100)
        self.model.fit(X_dummy, y_dummy)

    def process_batch(self, batch):
        # Convert to numpy if it's a DataFrame
        if isinstance(batch, pd.DataFrame):
            batch_array = batch.values
        else:
            batch_array = batch
        return self.model.predict(batch_array)

# Submit tasks and get object references
future = compute_heavy_task.remote(large_dataset)
result = ray.get(future)  # Blocks until task completes
print(f"Task result: {result}")

# Create and use actors
processor = DataProcessor.remote()
batch_result = ray.get(processor.process_batch.remote(batch_data))
print(f"Batch processing result shape: {batch_result.shape}")
Copy

Ray Train:分布式训练

Ray Train 是一个支持分布式训练和模型微调的库。您可以在单机或整个集群上运行训练代码。Snowflake 环境中使用 Ray 时,Ray Train 支持单节点执行,不支持多节点执行。

对于多节点分布式训练,请使用容器运行时的优化训练函数。这些函数提供集成的 XGBoost、LightGBM、PyTorch 分布式训练和自动存储处理,内部使用相同 Ray 集群。

Ray Data:可扩展数据处理

为 ML 工作负载提供可扩展的分布式数据处理能力。通过流式执行和惰性计算处理比集群内存更大的数据集。

备注

Snowflake 提供原生集成,可将任何 Snowflake 数据源转换为 Ray Data。更多信息,请参阅 Data Connector 和 Ray Data Ingestion 页面。

Ray Data 的使用场景:

  • 处理单节点内存无法容纳的大型数据集

  • 分布式数据预处理与特征工程

  • 构建可与其他 Ray 库集成的数据管道

import ray
import ray.data as rd
import pandas as pd
import numpy as np
from snowflake.ml.runtime_cluster import scale_cluster

# Initialize Ray
ray.init(address="auto", ignore_reinit_error=True)

# Optional: Scale cluster for better performance with large datasets or CPU-intensive operations
# Scaling benefits Ray Data when:
# - Processing datasets larger than single-node memory (>10GB)
# - Performing CPU-intensive transformations (complex feature engineering, ML preprocessing)
# - Need faster processing through parallelization across multiple nodes
scale_cluster(expected_cluster_size=4)

# Create sample dataset
np.random.seed(42)
n_samples = 50000
n_features = 15

# Generate features with some correlation structure
base_features = np.random.randn(n_samples, 5)
derived_features = np.column_stack([
    base_features[:, 0] * base_features[:, 1],  # interaction
    np.sin(base_features[:, 2]),  # non-linear
    base_features[:, 3] ** 2,  # polynomial
    np.random.randn(n_samples, n_features - 8)  # additional random features
])

X = np.column_stack([base_features, derived_features])
y = (X[:, 0] + 0.5 * X[:, 1] - 0.3 * X[:, 2] + 0.1 * X[:, 5] + np.random.randn(n_samples) * 0.2 > 0).astype(int)

sample_data = pd.DataFrame(X, columns=[f'feature_{i}' for i in range(n_features)])
sample_data['target'] = y

print(f"Created dataset with {n_samples} samples and {n_features} features")

# Create Ray Dataset from pandas DataFrame
ray_dataset = rd.from_pandas(sample_data)

# Transform data with Ray Data operations
def preprocess_batch(batch):
    """Preprocess a batch of data"""
    # Get all feature columns
    feature_cols = [col for col in batch.columns if col.startswith('feature_')]

    # Normalize numerical features (first 3 for demo)
    for col in feature_cols[:3]:
        if col in batch.columns:
            batch[f'{col}_scaled'] = (batch[col] - batch[col].mean()) / batch[col].std()

    # Add derived features using actual column names
    if 'feature_0' in batch.columns and 'feature_1' in batch.columns:
        batch['feature_0_squared'] = batch['feature_0'] ** 2
        batch['feature_interaction'] = batch['feature_0'] * batch['feature_1']

    return batch

# Apply transformations lazily
processed_dataset = ray_dataset.map_batches(
    preprocess_batch,
    batch_format="pandas"
)

# Repartition for optimal performance across cluster nodes
processed_dataset = processed_dataset.repartition(num_blocks=8)

# Convert to different formats for downstream use
print("Converting to pandas...")
pandas_df = processed_dataset.to_pandas()  # Collect to pandas
print(f"Processed dataset shape: {pandas_df.shape}")
print(f"New columns: {list(pandas_df.columns)}")

# Iterate through batches for memory efficiency
print("Processing batches...")
batch_count = 0
for batch in processed_dataset.iter_batches(batch_size=1000, batch_format="pandas"):
    batch_count += 1
    print(f"Batch {batch_count}: {batch.shape}")
    if batch_count >= 3:  # Just show first 3 batches
        break

print(f"Total batches processed: {batch_count}")
Copy

Ray Tune:分布式超参数调优

Ray Tune 提供分布式超参数优化,支持高级搜索算法和提前停止功能。在读取 Snowflake 数据源时,使用原生超参数优化 (HPO) API 可获得更集成、优化的体验。有关使用 HPO 优化的更多信息,请参阅 优化模型的超参数

如果需要更可定制的分布式 HPO 实现,可使用 Ray Tune。

Ray Tune 的使用场景如下:

  • 并行多次试验的超参数优化

  • 高级搜索算法(贝叶斯优化、基于种群的训练)

  • 需要分布式执行的大规模超参数搜索

import ray
from ray import tune
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from snowflake.ml.runtime_cluster import scale_cluster

# Initialize Ray
ray.init(address="auto", ignore_reinit_error=True)

# Optional: Scale cluster for hyperparameter tuning
# Scaling benefits Ray Tune when:
# - Running many trials in parallel
# - Each trial is computationally intensive
# - Need faster hyperparameter search
scale_cluster(expected_cluster_size=6)

# Create sample dataset
np.random.seed(42)
n_samples = 5000
n_features = 10

X = np.random.randn(n_samples, n_features)
y = ((X[:, 0] + X[:, 1] * X[:, 2] + np.sin(X[:, 3]) + np.random.randn(n_samples) * 0.3) > 0).astype(int)

# Split data
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

def train_function(config):
    """Training function that gets hyperparameters from Ray Tune"""
    # Train model with current hyperparameters
    model = RandomForestClassifier(
        n_estimators=config["n_estimators"],
        max_depth=config["max_depth"],
        min_samples_split=config["min_samples_split"],
        random_state=42,
        n_jobs=-1
    )

    model.fit(X_train, y_train)

    # Evaluate and report results
    val_predictions = model.predict(X_val)
    accuracy = accuracy_score(y_val, val_predictions)

    # Report metrics back to Ray Tune
    return {"accuracy": accuracy}

# Define search space
search_space = {
    "n_estimators": tune.randint(50, 200),
    "max_depth": tune.randint(3, 15),
    "min_samples_split": tune.randint(2, 10)
}

# Configure and run hyperparameter optimization
tuner = tune.Tuner(
    tune.with_resources(
        train_function,
        resources={"CPU": 2}
    ),
    param_space=search_space,
    tune_config=tune.TuneConfig(
        metric="accuracy",
        mode="max",
        num_samples=20,  # Number of trials
        max_concurrent_trials=4
    )
)

print("Starting hyperparameter optimization...")
results = tuner.fit()

# Get best results
best_result = results.get_best_result()
print(f"✅ Hyperparameter tuning completed!")
print(f"   Best accuracy: {best_result.metrics['accuracy']:.4f}")
print(f"   Best parameters: {best_result.config}")

# Show results summary
df_results = results.get_dataframe()
print(f"\nTop 5 results:")
top_results = df_results.nlargest(5, 'accuracy')
for i, (_, row) in enumerate(top_results.iterrows(), 1):
    print(f"  {i}. Accuracy: {row['accuracy']:.4f}, n_estimators: {row['config/n_estimators']}, max_depth: {row['config/max_depth']}")
Copy

模型服务

对于模型部署,可使用 Snowflake 原生能力。有关更多信息,请参阅 Snowpark Container Services 中的 Model Serving

在 Ray 集群上提交与管理分布式应用

使用 Ray Jobs 提交和管理 Ray 集群上的分布式应用,以获得更好的资源隔离和生命周期管理。对于所有需要访问 Ray 集群的基于任务的执行,Snowflake 建议使用 ML 作业,您可以在其中定义 Ray 应用程序逻辑。在需要直接访问 Ray Job 接口的情况下(例如迁移现有实现),可以按 Ray 文档 (https://docs.ray.io/en/latest/cluster/running-applications/job-submission/sdk.html) 中的说明使用 Ray Job 基元。

使用 Ray Job 的场景包括:

  • 生产环境中的 ML 管道和计划任务

  • 需要容错能力的长期运行负载

  • 批处理和大规模数据处理

import ray
from ray.job_submission import JobSubmissionClient
import os

# Initialize Ray and get job client
ray.init(address="auto", ignore_reinit_error=True)

# Get Ray dashboard address for job submission
node_ip = os.getenv("NODE_IP_ADDRESS", "0.0.0.0")
dashboard_port = os.getenv("DASHBOARD_PORT", "9999")
dashboard_address = f"http://{node_ip}:{dashboard_port}"

client = JobSubmissionClient(dashboard_address)

# Simple job script
job_script = '''
import ray

@ray.remote
def compute_task(x):
    return x * x

# Submit tasks to Ray cluster
futures = [compute_task.remote(i) for i in range(5)]
results = ray.get(futures)
print(f"Results: {results}")
'''

# Submit job
job_id = client.submit_job(
    entrypoint=f"python -c '{job_script}'",
    runtime_env={"pip": ["numpy"]},
    submission_id="my-ray-job"
)

print(f"Submitted job: {job_id}")

# Monitor job status
status = client.get_job_status(job_id)
print(f"Job status: {status}")
Copy

使用选项扩展 Ray 集群

在 Snowflake 笔记本中,可以根据计算需求精确调整 Ray 集群的规模。一个集群由一个主节点(协调器)和多个工作节点(执行任务)组成。

from snowflake.ml.runtime_cluster import scale_cluster, get_nodes

# Asynchronous scaling - returns immediately
scale_cluster(
    expected_cluster_size=2,
    is_async=True  # Don't wait for all nodes to be ready
)

# Scaling with custom options
scale_cluster(
    expected_cluster_size=3,
    options={
        "rollback_after_seconds": 300,  # Auto-rollback after 5 minutes
        "block_until_min_cluster_size": 2  # Return when at least 2 nodes ready
    }
)

# Scale down for cost efficiency
scale_cluster(expected_cluster_size=2)
Copy

资源监视

import ray
from snowflake.ml.runtime_cluster import get_nodes
from snowflake.ml.runtime_cluster.cluster_manager import (
    get_available_cpu, get_available_gpu, get_num_cpus_per_node
)

# Check available resources
available_cpus = get_available_cpu()
available_gpus = get_available_gpu()
cpus_per_node = get_num_cpus_per_node()

print(f"Available CPUs: {available_cpus}")
print(f"Available GPUs: {available_gpus}")
print(f"CPUs per node: {cpus_per_node}")

# Get Ray's view of resources
ray_resources = ray.available_resources()
print(f"Ray available resources: {ray_resources}")

# Calculate resource utilization
total_cpus = ray.cluster_resources().get('CPU', 0)
used_cpus = total_cpus - available_cpus
utilization = (used_cpus / total_cpus * 100) if total_cpus > 0 else 0
print(f"CPU Utilization: {utilization:.1f}%")
Copy

清理

集群使用完毕后,可以将其缩小以避免产生额外费用。使用以下代码缩小集群:

# Scale down when finished to conserve resources
print("Scaling down cluster...")
scale_cluster(expected_cluster_size=1)
print(f"Final cluster size: {len(get_nodes())} nodes")
Copy
语言: 中文