Snowflake ML 作业¶
使用 Snowflake ML 作业在Snowflake ML 容器运行时内运行机器学习 (ML) 工作流程。您可以在任何开发环境中运行它们。您无需在 Snowflake 工作表或笔记本中运行代码。使用作业,利用 Snowflake 的基础架构在开发工作流程中运行资源密集型任务。有关在本地设置 Snowflake ML 的信息,请参阅 在本地使用 Snowflake ML。
重要
Snowflake ML 作业在 Snowpark ML Python 包 (snowflake-ml-python
) 版本 1.8.2 及更高版本中以公开预览版的形式提供。
Snowflake ML 作业使您能够执行以下操作:
在 Snowflake 计算池上运行 ML 工作负载,包括 GPU 和高内存 CPU 实例。
使用您的首选开发环境,例如 VS Code 或 Jupyter 笔记本。
在运行时环境中安装和使用自定义 Python 包。
使用 Snowflake 的分布式 APIs 来优化数据加载、训练和超参数调整。
与 Apache Airflow 等编排工具集成。
通过 Snowflake APIs 监控和管理作业。
您可以使用这些功能来执行以下操作:
对需要 GPU 加速或大量计算资源的大型数据集执行资源密集型训练。
通过将 ML 代码从开发转移到生产环境并通过管道进行编程执行,生产 ML 工作流程。
保留现有开发环境,同时利用 Snowflake 的计算资源。
只需最少的代码更改即可提升和转移 OSS ML 工作流程。
直接使用大型 Snowflake 数据集以减少数据移动并避免昂贵的数据传输。
先决条件¶
重要
Snowflake ML 作业目前仅支持 Python 3.10 客户端。如果您需要其他 Python 版本的支持,请联系您的 Snowflake 账户团队。
在您的 Python 3.10 环境中安装 Snowflake ML Python 包。
pip install snowflake-ml-python>=1.8.2
默认计算池大小使用 CPU_X64_S 实例系列。最小节点数为 1,最大节点数为 25。您可以使用以下 SQL 命令创建自定义计算池:
CREATE COMPUTE POOL IF NOT EXISTS MY_COMPUTE_POOL MIN_NODES = <MIN_NODES> MAX_NODES = <MAX_NODES> INSTANCE_FAMILY = <INSTANCE_FAMILY>;
Snowflake ML 作业需要 Snowpark 会话。使用以下代码进行创建:
from snowflake.snowpark import Session from snowflake.ml.jobs import list_jobs ls = list_jobs() # This will fail! You must create a session first. # Requires valid ~/.snowflake/config.toml file session = Session.builder.getOrCreate() ls = list_jobs(session=session) ls = list_jobs() # Infers created session from context
有关创建会话的更多信息,请参阅 创建会话。
运行 Snowflake ML 作业¶
您可以通过以下其中一种方式运行 Snowflake ML 作业:
在代码中使用远程装饰器。
使用 Python API 提交整个文件或目录。
将 Python 函数作为 Snowflake ML 作业运行¶
使用 Function Dispatch 在 Snowflake 的计算资源上使用 @remote
装饰器远程运行单个 Python 函数。
使用 @remote
装饰器,您可以执行以下操作:
序列化函数及其依赖项。
将其上传到指定 Snowflake 暂存区。
在 ML 容器运行时中执行。
以下示例 Python 代码使用 @remote
装饰器提交 Snowflake ML 作业。请注意,Snowpark Session
必填,请参阅 先决条件。
from snowflake.ml.jobs import remote
@remote("MY_COMPUTE_POOL", stage_name="payload_stage", session=session)
def train_model(data_table: str):
# Provide your ML code here, including imports and function calls
...
job = train_model("my_training_data")
调用 @remote
装饰函数会返回一个 Snowflake MLJob
对象,该对象可用于管理和监控作业执行。有关更多信息,请参阅 Snowflake ML 作业管理。
将 Python 文件作为 Snowflake ML 作业运行¶
在 Snowflake 计算资源上运行 Python 文件或项目目录。这在以下情况下很有用:
您的 ML 项目很复杂,有多个模块和依赖项。
您希望保持本地开发与生产代码之间的分离。
您需要运行使用命令行实参的脚本。
您正在处理不是专门为在 Snowflake 计算上执行而设计的现有 ML 项目。
Snowflake 作业 API 提供两种主要方法:
submit_file()
:用于运行单个 Python 文件submit_directory()
:用于运行包含多个文件和资源的整个项目目录
两种方法都支持:
命令行实参传递
环境变量配置
自定义依赖项规范
通过 Snowflake 暂存区进行项目资产管理
File Dispatch 对于生产现有 ML 工作流程以及保持开发和执行环境之间的明确分离特别有用。
以下 Python 代码向 Snowflake ML 作业提交一个文件:
from snowflake.ml.jobs import submit_file
# Run a single file
job1 = submit_file(
"train.py",
"MY_COMPUTE_POOL",
stage_name="payload_stage",
args=["--data-table", "my_training_data"],
session=session,
)
以下 Python 代码向 Snowflake ML 作业提交一个目录:
from snowflake.ml.jobs import submit_directory
# Run from a directory
job2 = submit_directory(
"./ml_project/",
"MY_COMPUTE_POOL",
entrypoint="train.py",
stage_name="payload_stage",
session=session,
)
提交文件或目录会返回一个 Snowflake MLJob
对象,该对象可用于管理和监控任务执行。有关更多信息,请参阅 Snowflake ML 作业管理。
Snowflake ML 作业管理¶
当您提交 Snowflake ML 作业时,API 会创建一个 MLJob
对象。您可以用它来执行以下操作:
通过状态更新跟踪作业进度
使用详细的执行日志调试问题
检索执行结果(如有)
您可以使用 get_job()
API 来按 ID 检索 MLJob
对象。以下 Python 代码显示了如何检索 MLJob
对象:
from snowflake.ml.jobs import MLJob, get_job, list_jobs
# List all jobs
jobs = list_jobs()
# Retrieve an existing job based on ID
job = get_job("<job_id>") # job is an MLJob instance
# Retrieve status and logs for the retrieved job
print(job.status) # PENDING, RUNNING, FAILED, DONE
print(job.get_logs())
调用作业的 result
方法以等待执行完成并检索执行结果。如果执行失败,result
会引发异常。
result = job.result()
管理依赖项¶
Snowflake ML 作业 API 在 ML 的容器运行时 环境中运行有效负载。该环境具有用于机器学习和数据科学的最常用 Python 包。大多数用例应该“开箱即用”,无需额外配置。如果您需要自定义依赖项,可以使用 pip_requirements
来安装它们。
要安装自定义依赖项,必须使用外部访问集成启用外部网络访问。您可以使用以下 SQL 示例命令来提供访问权限:
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION PYPI_EAI
ALLOWED_NETWORK_RULES = (snowflake.external_access.pypi_rule)
ENABLED = true;
有关外部访问集成的更多信息,请参阅 创建和使用外部访问集成。
提供外部网络访问权限后,您可以使用 pip_requirements
和 external_access_integrations
参数来配置自定义依赖项。您可以使用容器运行时环境中不可用的包,也可以使用包的特定版本。
以下 Python 代码显示了如何为 remote
装饰器指定自定义依赖项:
@remote(
"MY_COMPUTE_POOL",
stage_name="payload_stage",
pip_requirements=["custom-package"],
external_access_integrations=["PYPI_EAI"],
session=session,
)
def my_function():
# Your code here
以下 Python 代码显示如何为 submit_file()
方法指定自定义依赖项:
from snowflake.ml.jobs import submit_file
# Can include version specifier to specify version(s)
job = submit_file(
"/path/to/repo/my_script.py",
compute_pool,
stage_name="payload_stage",
pip_requirements=["custom-package==1.0.*"],
external_access_integrations=["pypi_eai"],
session=session,
)
专用包源¶
Snowflake ML 作业还支持从 JFrog Artifactory 和 Sonatype Nexus 存储库等专用源加载包。这些源通常用于分发内部和专有包、保持对依赖版本的控制以及确保安全性/合规性。
要从专用源安装软件包,必须执行以下操作:
创建网络规则以允许访问专用源 URL。
对于使用基本身份验证的源,您只需创建网络规则即可。
CREATE OR REPLACE NETWORK RULE private_feed_nr MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('<your-repo>.jfrog.io');
要使用专用连接(即专用链接)配置对源的访问权限,请按照 使用专用连接的网络出口 中的步骤操作。
使用网络规则创建外部访问集成。向将要提交作业的角色授予 EAI 使用权限。
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION private_feed_eai ALLOWED_NETWORK_RULES = (PRIVATE_FEED_NR) ENABLED = true; GRANT USAGE ON INTEGRATION private_feed_eai TO ROLE <role_name>;
提交作业时指定专有源 URL、外部访问集成和包
# Option 1: Specify private feed URL in pip_requirements job = submit_file( "/path/to/script.py", compute_pool="MY_COMPUTE_POOL", stage_name="payload_stage", pip_requirements=[ "--index-url=https://your.private.feed.url", "internal-package==1.2.3" ], external_access_integrations=["PRIVATE_FEED_EAI"] )
# Option 2: Specify private feed URL by environment variable job = submit_directory( "/path/to/code/", compute_pool="MY_COMPUTE_POOL", entrypoint="script.py", stage_name="payload_stage", pip_requirements=["internal-package==1.2.3"], external_access_integrations=["PRIVATE_FEED_EAI"], env_vars={'PIP_INDEX_URL': 'https://your.private.feed.url'}, )
如果您的专有源 URL 包含身份验证令牌等敏感信息,请通过创建 Snowflake 密钥管理 URL。使用 CREATE SECRET 创建一个密钥。在作业提交期间使用 :spec_overrides
实参配置密钥。
# Create secret for private feed URL with embedded auth token
feed_url = "<your-repo>.jfrog.io/artifactory/api/pypi/test-pypi/simple"
user = "<auth_user>"
token = "<auth_token>"
session.sql(f"""
CREATE SECRET IF NOT EXISTS PRIVATE_FEED_URL_SECRET
TYPE = GENERIC_STRING
SECRET_STRING = 'https://{auth_user}:{auth_token}@{feed_url}'
""").collect()
# Prepare service spec override for mounting secret into job execution
spec_overrides = {
"spec": {
"containers": [
{
"name": "main", # Primary container name is always "main"
"secrets": [
{
"snowflakeSecret": "PRIVATE_FEED_URL_SECRET",
"envVarName": "PIP_INDEX_URL",
"secretKeyRef": "secret_string"
},
],
}
]
}
}
# Load private feed URL from secret (e.g. if URL includes auth token)
job = submit_file(
"/path/to/script.py",
compute_pool="MY_COMPUTE_POOL",
stage_name="payload_stage",
pip_requirements=[
"internal-package==1.2.3"
],
external_access_integrations=["PRIVATE_FEED_EAI"],
spec_overrides=spec_overrides,
)
有关 container.secrets
的更多信息,请参阅 containers.secrets 字段。
成本注意事项¶
Snowflake ML 作业在 Snowpark Container Service 上运行,并根据使用量计费。有关计算成本的信息,请参阅 Snowpark Container Services 成本。
作业有效负载将上传到使用 stage_name
实参指定的暂存区。为避免额外收费,必须将其清理干净。有关更多信息,请参阅 了解存储成本 和 探索存储成本 了解有关暂存区存储相关成本的更多信息。