创建并部署管道

概述

机器学习 (ML) 工作流程通常涉及几个关键阶段:

数据探索和准备:初始阶段包括了解原始数据、进行清理、处理缺失值以及将其转换为可用格式。

数据工程:在此阶段,原始数据被转换为可以更好地代表预测模型潜在问题的特征,通常涉及扩展、编码和在现有特征基础上创建新特征等技术。

模型开发:在此阶段,选择各种 ML 模型,根据准备好的数据进行训练,然后进行调整以优化其性能。使用适当的指标对已开发的模型进行严格评估,以评估其准确性、公平性和泛化能力。

模型部署:生产就绪模型保存到模型注册表中,随后进行部署,以便对新数据进行批量或实时预测。

ML 模型的初始开发通常受益于敏捷的迭代方法,这使数据科学家能够快速尝试不同的算法和功能。但是,随着模型成熟和展现价值,重点将转移到落地实施上,通过 CI/CD(持续集成/持续交付)来强化管道并实现自动化。这种自动化可确保持续构建、测试和部署对代码、数据管道或模型的更改,从而生成更可靠、更高效和可维护性更高的 ML 系统。

开发

首先在本地 IDE(例如 VS Code)或交互式笔记本(Snowflake Notebook 或 Jupyter)中进行交互式开发。对输入(表、暂存区、超参数)进行参数化,并保持步骤模块化以实现可移植性。例如,将数据准备、特征工程、模型训练等步骤分别放在不同的单元或函数中会更有帮助。

Snowflake 为机器学习生命周期的每个阶段提供以下工具:

暂存区

工具

用途

数据探索

Snowflake 笔记本

在基于浏览器的托管笔记本电脑环境中进行开发。使用 Python 和 SQL 集中分析数据集、直观呈现分布情况并快速迭代。

Snowpark DataFrames

使用熟悉的 DataFrame APIs,将计算下推到 Snowflake。

数据工程

Snowpark DataFrames

使用 SQL/Python/Scala 并结合下推优化,在数据仓库规模上构建可复现的转换流程。

UDFs/UDTFs

将自定义 Python 逻辑封装为函数或表函数,以便在团队和管道中重用复杂的转换。

特征商店

定义、注册并提供具有时间点准确性、可跨模型复用的特征。支持一致的离线训练集和低延迟在线检索,减少数据泄漏与重复。

模型训练

Snowflake 笔记本

在 Snowflake 笔记本中使用熟悉的开源库训练 ML 模型,例如 scikit-learn、XGBoost 和 PyTorch。利用弹性扩展,避免数据移动,并在同一位置持久存储模型和进行预处理。

ML 作业

在任何环境(包括本地 IDEs、笔记本和外部托管的编排工具)中,将资源密集型步骤分流到专门的计算选项,例如高内存实例、GPU 加速和分布式处理。

模型部署

模型注册表

使用沿袭和治理控制来注册模型,并进行版本控制。集中发现并推进安全提升工作流程、审计和回滚。

批量推理

提供来自 Python 或 SQL 的注册模型,或者使推理接近受治理的数据,并通过注册表支持的一致执行来简化运营。

实时推理

通过自动缩放将注册模型部署到托管的 HTTPS 端点。取消了服务基础架构,提供与 Snowflake 身份验证和治理集成在一起的简单、安全、低延迟推理。

模型监控

为每个模型版本创建一个监控器,以物化推理日志并自动刷新每日指标,在 Snowsight 中显示漂移、性能和统计信号。配置警报和自定义仪表板,以比较版本并快速诊断数据或管道问题

工作流程编排

Scheduled Notebooks

参数化并配置 Snowflake 笔记本,使其按计划以非交互方式执行。

任务图

将您的 ML 管道部署到有向无环图 (DAG),并将其配置为按计划运行或通过基于事件的触发器触发运行。

安全与治理

RBAC、标签、掩码、策略

将基于角色的访问权限、数据分类和掩码/行策略应用于训练数据、特征和模型。确保在整个 ML 生命周期中实现最低权限访问和合规性。

为投入使用做好准备

准备代码

在管道投入使用之前,请准备好生产代码。如果您是从笔记本入手,请先将代码重构为模块化、可复用的函数,将每个主要步骤(数据准备、特征工程、模型训练、评估)都设计为单独的函数,并明确其输入和输出。如果您已经有了模块化脚本,请确保每个函数都有明确定义的接口和职责。参数化所有配置值,例如表名和超参数,以实现跨环境部署。我们还建议编写一个入口点脚本,在本地执行端到端管道,用于调试和未来开发。

示例目录结构:

ml_pipeline_project/
├── README.md
├── requirements.txt
├── config/
├── src/ml_pipeline/
│   ├── utils/                     # Common utilities
│   ├── data/                      # Data preparation
│   ├── features/                  # Feature engineering
│   ├── models/                    # Model training
│   └── inference/                 # Model inference
├── scripts/
│   ├── run_pipeline.py            # Main entry point
│   └── dag.py
├── tests/
└── notebooks/
Copy

run_pipeline.py 脚本示例:

import argparse
from ml_pipeline.utils.config_loader import load_config
from ml_pipeline.data.ingestion import load_raw_data
from ml_pipeline.data.validation import validate_data_quality
from ml_pipeline.features.transformers import create_features
from ml_pipeline.models.training import train_model
from ml_pipeline.models.evaluation import evaluate_model
from ml_pipeline.models.registry import register_model

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--config", required=True, help="Config file path")
    parser.add_argument("--env", default="dev", help="Environment (dev/prod)")
    args = parser.parse_args()

    # Load configuration
    config = load_config(args.config, args.env)

    # Execute pipeline stages
    raw_data = load_raw_data(config.data.source_table)
    validate_data_quality(raw_data, config.data.quality_checks)
    features = create_features(raw_data, config.features.transformations)
    model = train_model(features, config.model.hyperparameters)
    metrics = evaluate_model(model, features, config.model.eval_metrics)
    register_model(model, metrics, config.model.registry_name)

if __name__ == "__main__":
    main()
Copy

从笔记本迁移到 ML 作业

使用 Snowflake 笔记本编写的大多数代码都可以在 ML 作业中运行,无需更改代码。需要注意的几个方面包括:

Runtime APIs

某些分布式 ML APIs 只能在 Container Runtime 内使用,尝试将它们导入 Container Runtime 之外的环境将失败。这些 APIs 可在 ML 作业中使用,但需要导入到 ML 作业负载中。

# Attempting to import distributed runtime APIs in local/external
# environments will fail!
from snowflake.ml.modeling.distributors.xgboost import XGBEstimator

from snowflake.ml.jobs import remote

@remote(...)
def my_remote_function(...):
  # Move imports *inside* your ML Job payloads
  from snowflake.ml.modeling.distributors.xgboost import XGBEstimator  # This works!
  ...

job = my_remote_function()  # Start ML Job
job.wait()  # Wait for job to complete
Copy

集群扩展

scale_cluster() API 只能在笔记本中使用,无法在 ML 作业中使用。改为在提交作业时指定所需的集群大小。有关更多信息,请参阅 Snowflake 多节点 ML 作业

from snowflake.ml.jobs import remote

@remote(..., target_instances=4)
def my_remote_function(...):
  # 4-node cluster will be provisioned for distributed processing
  # inside this job. The cluster will be automatically cleaned up on
  # job termination.
Copy

管道编排

准备好端到端管道后,使用 Snowflake Task Graphs、Scheduled Notebooks 等编排工具或外部编排工具(如 Airflow)来将您的管道投入使用。使用编排框架有几个关键优势:

  • 通过自动重试和故障隔离实现容错和可靠性

  • 具有运行历史记录、实时状态和警报的可观察性

  • 复杂依赖关系图和各种触发器的调度和协调

  • 通过版本控制集成和配置管理实现的运营健全度

Snowflake ML 与大多数编排框架兼容,包括 Airflow、Dagster 和 Prefect。如果您已经有工作流程/DAG 设置,我们建议您只将现有工作流程与 Snowflake ML 功能集成,并将计算或数据密集型步骤分流到 ML 作业或 UDFs。如果您没有现有的 DAG 设置,则可以将 Snowflake Task Graphs 用于 Snowflake 原生解决方案。

要使用 DAG 在 Snowflake 上设置编排,请按照以下高级步骤操作:

  1. 根据 准备代码 准备本地管道代码

  2. 创建一个新 dag.py 文件(或任何其他名称)来保存您的 DAG 定义

  3. 根据本指南,将您的管道实现为 DAG 形式

  4. 运行 dag.py 脚本,将任务图部署到您的 Snowflake 账户

小技巧

运行任务图脚本不一定会执行该图表;基本的任务图脚本只是定义和部署任务图。任务图必须单独触发才能执行,可以手动触发,也可以按计划执行。

将开发和生产分开

我们建议对 DAG 脚本进行参数化,以支持隔离开发 (DEV) 和生产 (PROD) 环境。您可以使用 Snowflake 连接管理、应用程序特定的配置或两者的任意组合来实现此目的。所需的隔离级别取决于您的治理要求,但通常我们建议为 DEV 和 PROD 使用单独的数据库,其中 PROD 数据库受 RBAC 策略保护,这些策略会限制管理员和专用服务账户。

CI/CD

您可以使用 CI/CD 管道(如 Azure Pipelines 和 GitHub Actions)自动验证和部署管道。通常,我们建议在部署到 PROD 之前,在 DEV 或 STAGING 环境中进行测试。最佳做法是使用合并门来配置源代码控制存储库,在合并到生产分支之前,在 DEV 中验证代码更改。对生产分支所做的更改可以持续部署到 PROD(即针对每一次更改),也可以定期部署(每天/每周)。最佳实践是在 DEV 或 STAGING 环境中对生产分支的状态进行最终验证,然后再将更改部署到 PROD 中。使用平台功能(GitHub Actions 的部署与环境)来定义和配置与各个部署环境的连接。配置您的 CI/CD 管道以将您的更改推送到部署环境中,包括:

  • (可选)以 Python 包的形式构建库和模块,并将它们推送到专有包源中

  • (可选)将文件上传到 Snowflake 暂存区

    当您在管道中使用 snowflake.ml.jobs.submit_from_stage() 时,这是最常需要的

    或者,您可以使用 Snowflake 的 GitHub 集成直接以 Snowflake 暂存区的形式跟踪 GitHub 存储库

  • 运行 dag.py 以在配置的环境中部署任务图

  • (可选)触发和监控新部署的任务图的执行以验证有效性

其他资源

  • E2E Task Graph Quickstart (https://quickstarts.snowflake.com/guide/e2e-task-graph/)

语言: 中文