创建并部署管道¶
概述¶
机器学习 (ML) 工作流程通常涉及几个关键阶段:
数据探索和准备:初始阶段包括了解原始数据、进行清理、处理缺失值以及将其转换为可用格式。
数据工程:在此阶段,原始数据被转换为可以更好地代表预测模型潜在问题的特征,通常涉及扩展、编码和在现有特征基础上创建新特征等技术。
模型开发:在此阶段,选择各种 ML 模型,根据准备好的数据进行训练,然后进行调整以优化其性能。使用适当的指标对已开发的模型进行严格评估,以评估其准确性、公平性和泛化能力。
模型部署:生产就绪模型保存到模型注册表中,随后进行部署,以便对新数据进行批量或实时预测。
ML 模型的初始开发通常受益于敏捷的迭代方法,这使数据科学家能够快速尝试不同的算法和功能。但是,随着模型成熟和展现价值,重点将转移到落地实施上,通过 CI/CD(持续集成/持续交付)来强化管道并实现自动化。这种自动化可确保持续构建、测试和部署对代码、数据管道或模型的更改,从而生成更可靠、更高效和可维护性更高的 ML 系统。
开发¶
首先在本地 IDE(例如 VS Code)或交互式笔记本(Snowflake Notebook 或 Jupyter)中进行交互式开发。对输入(表、暂存区、超参数)进行参数化,并保持步骤模块化以实现可移植性。例如,将数据准备、特征工程、模型训练等步骤分别放在不同的单元或函数中会更有帮助。
Snowflake 为机器学习生命周期的每个阶段提供以下工具:
暂存区 |
工具 |
用途 |
|---|---|---|
数据探索 |
Snowflake 笔记本 |
在基于浏览器的托管笔记本电脑环境中进行开发。使用 Python 和 SQL 集中分析数据集、直观呈现分布情况并快速迭代。 |
Snowpark DataFrames |
使用熟悉的 DataFrame APIs,将计算下推到 Snowflake。 |
|
数据工程 |
Snowpark DataFrames |
使用 SQL/Python/Scala 并结合下推优化,在数据仓库规模上构建可复现的转换流程。 |
UDFs/UDTFs |
将自定义 Python 逻辑封装为函数或表函数,以便在团队和管道中重用复杂的转换。 |
|
特征商店 |
定义、注册并提供具有时间点准确性、可跨模型复用的特征。支持一致的离线训练集和低延迟在线检索,减少数据泄漏与重复。 |
|
模型训练 |
Snowflake 笔记本 |
在 Snowflake 笔记本中使用熟悉的开源库训练 ML 模型,例如 scikit-learn、XGBoost 和 PyTorch。利用弹性扩展,避免数据移动,并在同一位置持久存储模型和进行预处理。 |
ML 作业 |
在任何环境(包括本地 IDEs、笔记本和外部托管的编排工具)中,将资源密集型步骤分流到专门的计算选项,例如高内存实例、GPU 加速和分布式处理。 |
|
模型部署 |
模型注册表 |
使用沿袭和治理控制来注册模型,并进行版本控制。集中发现并推进安全提升工作流程、审计和回滚。 |
批量推理 |
提供来自 Python 或 SQL 的注册模型,或者使推理接近受治理的数据,并通过注册表支持的一致执行来简化运营。 |
|
实时推理 |
通过自动缩放将注册模型部署到托管的 HTTPS 端点。取消了服务基础架构,提供与 Snowflake 身份验证和治理集成在一起的简单、安全、低延迟推理。 |
|
模型监控 |
为每个模型版本创建一个监控器,以物化推理日志并自动刷新每日指标,在 Snowsight 中显示漂移、性能和统计信号。配置警报和自定义仪表板,以比较版本并快速诊断数据或管道问题 |
|
工作流程编排 |
Scheduled Notebooks |
参数化并配置 Snowflake 笔记本,使其按计划以非交互方式执行。 |
任务图 |
将您的 ML 管道部署到有向无环图 (DAG),并将其配置为按计划运行或通过基于事件的触发器触发运行。 |
|
安全与治理 |
RBAC、标签、掩码、策略 |
将基于角色的访问权限、数据分类和掩码/行策略应用于训练数据、特征和模型。确保在整个 ML 生命周期中实现最低权限访问和合规性。 |
为投入使用做好准备¶
准备代码¶
在管道投入使用之前,请准备好生产代码。如果您是从笔记本入手,请先将代码重构为模块化、可复用的函数,将每个主要步骤(数据准备、特征工程、模型训练、评估)都设计为单独的函数,并明确其输入和输出。如果您已经有了模块化脚本,请确保每个函数都有明确定义的接口和职责。参数化所有配置值,例如表名和超参数,以实现跨环境部署。我们还建议编写一个入口点脚本,在本地执行端到端管道,用于调试和未来开发。
示例目录结构:
ml_pipeline_project/
├── README.md
├── requirements.txt
├── config/
├── src/ml_pipeline/
│ ├── utils/ # Common utilities
│ ├── data/ # Data preparation
│ ├── features/ # Feature engineering
│ ├── models/ # Model training
│ └── inference/ # Model inference
├── scripts/
│ ├── run_pipeline.py # Main entry point
│ └── dag.py
├── tests/
└── notebooks/
run_pipeline.py 脚本示例:
import argparse
from ml_pipeline.utils.config_loader import load_config
from ml_pipeline.data.ingestion import load_raw_data
from ml_pipeline.data.validation import validate_data_quality
from ml_pipeline.features.transformers import create_features
from ml_pipeline.models.training import train_model
from ml_pipeline.models.evaluation import evaluate_model
from ml_pipeline.models.registry import register_model
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--config", required=True, help="Config file path")
parser.add_argument("--env", default="dev", help="Environment (dev/prod)")
args = parser.parse_args()
# Load configuration
config = load_config(args.config, args.env)
# Execute pipeline stages
raw_data = load_raw_data(config.data.source_table)
validate_data_quality(raw_data, config.data.quality_checks)
features = create_features(raw_data, config.features.transformations)
model = train_model(features, config.model.hyperparameters)
metrics = evaluate_model(model, features, config.model.eval_metrics)
register_model(model, metrics, config.model.registry_name)
if __name__ == "__main__":
main()
从笔记本迁移到 ML 作业¶
使用 Snowflake 笔记本编写的大多数代码都可以在 ML 作业中运行,无需更改代码。需要注意的几个方面包括:
Runtime APIs
某些分布式 ML APIs 只能在 Container Runtime 内使用,尝试将它们导入 Container Runtime 之外的环境将失败。这些 APIs 可在 ML 作业中使用,但需要导入到 ML 作业负载中。
# Attempting to import distributed runtime APIs in local/external
# environments will fail!
from snowflake.ml.modeling.distributors.xgboost import XGBEstimator
from snowflake.ml.jobs import remote
@remote(...)
def my_remote_function(...):
# Move imports *inside* your ML Job payloads
from snowflake.ml.modeling.distributors.xgboost import XGBEstimator # This works!
...
job = my_remote_function() # Start ML Job
job.wait() # Wait for job to complete
集群扩展
scale_cluster() API 只能在笔记本中使用,无法在 ML 作业中使用。改为在提交作业时指定所需的集群大小。有关更多信息,请参阅 Snowflake 多节点 ML 作业。
from snowflake.ml.jobs import remote
@remote(..., target_instances=4)
def my_remote_function(...):
# 4-node cluster will be provisioned for distributed processing
# inside this job. The cluster will be automatically cleaned up on
# job termination.
管道编排¶
准备好端到端管道后,使用 Snowflake Task Graphs、Scheduled Notebooks 等编排工具或外部编排工具(如 Airflow)来将您的管道投入使用。使用编排框架有几个关键优势:
通过自动重试和故障隔离实现容错和可靠性
具有运行历史记录、实时状态和警报的可观察性
复杂依赖关系图和各种触发器的调度和协调
通过版本控制集成和配置管理实现的运营健全度
Snowflake ML 与大多数编排框架兼容,包括 Airflow、Dagster 和 Prefect。如果您已经有工作流程/DAG 设置,我们建议您只将现有工作流程与 Snowflake ML 功能集成,并将计算或数据密集型步骤分流到 ML 作业或 UDFs。如果您没有现有的 DAG 设置,则可以将 Snowflake Task Graphs 用于 Snowflake 原生解决方案。
要使用 DAG 在 Snowflake 上设置编排,请按照以下高级步骤操作:
根据 准备代码 准备本地管道代码
创建一个新
dag.py文件(或任何其他名称)来保存您的 DAG 定义根据本指南,将您的管道实现为 DAG 形式
运行
dag.py脚本,将任务图部署到您的 Snowflake 账户
小技巧
运行任务图脚本不一定会执行该图表;基本的任务图脚本只是定义和部署任务图。任务图必须单独触发才能执行,可以手动触发,也可以按计划执行。
将开发和生产分开¶
我们建议对 DAG 脚本进行参数化,以支持隔离开发 (DEV) 和生产 (PROD) 环境。您可以使用 Snowflake 连接管理、应用程序特定的配置或两者的任意组合来实现此目的。所需的隔离级别取决于您的治理要求,但通常我们建议为 DEV 和 PROD 使用单独的数据库,其中 PROD 数据库受 RBAC 策略保护,这些策略会限制管理员和专用服务账户。
CI/CD¶
您可以使用 CI/CD 管道(如 Azure Pipelines 和 GitHub Actions)自动验证和部署管道。通常,我们建议在部署到 PROD 之前,在 DEV 或 STAGING 环境中进行测试。最佳做法是使用合并门来配置源代码控制存储库,在合并到生产分支之前,在 DEV 中验证代码更改。对生产分支所做的更改可以持续部署到 PROD(即针对每一次更改),也可以定期部署(每天/每周)。最佳实践是在 DEV 或 STAGING 环境中对生产分支的状态进行最终验证,然后再将更改部署到 PROD 中。使用平台功能(GitHub Actions 的部署与环境)来定义和配置与各个部署环境的连接。配置您的 CI/CD 管道以将您的更改推送到部署环境中,包括:
(可选)以 Python 包的形式构建库和模块,并将它们推送到专有包源中
(可选)将文件上传到 Snowflake 暂存区
当您在管道中使用
snowflake.ml.jobs.submit_from_stage()时,这是最常需要的或者,您可以使用 Snowflake 的 GitHub 集成直接以 Snowflake 暂存区的形式跟踪 GitHub 存储库
运行
dag.py以在配置的环境中部署任务图(可选)触发和监控新部署的任务图的执行以验证有效性
其他资源¶
E2E Task Graph Quickstart (https://quickstarts.snowflake.com/guide/e2e-task-graph/)