Snowflake ML 作业¶
使用 Snowflake ML 作业在Snowflake ML 容器运行时内运行机器学习 (ML) 工作流程。您可以在任何开发环境中运行它们。您无需在 Snowflake 工作表或笔记本中运行代码。使用作业,利用 Snowflake 的基础架构在开发工作流程中运行资源密集型任务。有关在本地设置 Snowflake ML 的信息,请参阅 在本地使用 Snowflake ML。
重要
Snowflake ML 作业在 snowflake-ml-python 1.9.2 及更高版本中可用。
Snowflake ML 作业使您能够执行以下操作:
在 Snowflake 计算池上运行 ML 工作负载,包括 GPU 和高内存 CPU 实例。
使用您的首选开发环境,例如 VS Code 或 Jupyter 笔记本。
在运行时环境中安装和使用自定义 Python 包。
使用 Snowflake 的分布式 APIs 来优化数据加载、训练和超参数调整。
与 Apache Airflow 等编排工具集成。
通过 Snowflake APIs 监控和管理作业。
您可以使用这些功能来执行以下操作:
对需要 GPU 加速或大量计算资源的大型数据集执行资源密集型训练。
通过将 ML 代码从开发转移到生产环境并通过管道进行编程执行,生产 ML 工作流程。
保留现有开发环境,同时利用 Snowflake 的计算资源。
只需最少的代码更改即可提升和转移 OSS ML 工作流程。
直接使用大型 Snowflake 数据集以减少数据移动并避免昂贵的数据传输。
先决条件¶
在您的 Python 3.10 环境中安装 Snowflake ML Python 包。
pip install snowflake-ml-python>=1.9.2
默认计算池大小使用 CPU_X64_S 实例系列。最小节点数为 1,最大节点数为 25。您可以使用以下 SQL 命令创建自定义计算池:
CREATE COMPUTE POOL IF NOT EXISTS MY_COMPUTE_POOL MIN_NODES = <MIN_NODES> MAX_NODES = <MAX_NODES> INSTANCE_FAMILY = <INSTANCE_FAMILY>;
Snowflake ML 作业需要 Snowpark 会话。使用以下代码进行创建:
from snowflake.snowpark import Session from snowflake.ml.jobs import list_jobs ls = list_jobs() # This will fail! You must create a session first. # Requires valid ~/.snowflake/config.toml file session = Session.builder.getOrCreate() ls = list_jobs(session=session) ls = list_jobs() # Infers created session from context
有关创建会话的更多信息,请参阅 创建会话。
运行 Snowflake ML 作业¶
您可以通过以下其中一种方式运行 Snowflake ML 作业:
在代码中使用函数装饰器。
使用 Python API 提交整个文件或目录。
将 Python 函数作为 Snowflake ML 作业运行¶
使用 Function Dispatch 在 Snowflake 的计算资源上使用 @remote 装饰器远程运行单个 Python 函数。
使用 @remote,您可以:
序列化函数及其依赖项。
将其上传到指定 Snowflake 暂存区。
在特定容器运行时中执行。
以下示例 Python 代码使用 @remote 装饰器,将一个函数调用作为 Snowflake ML 作业提交:
from snowflake.ml.jobs import remote
@remote("MY_COMPUTE_POOL", stage_name="payload_stage", session=session)
def train_model(data_table: str):
# Provide your ML code here, including imports and function calls
...
job = train_model("my_training_data")
备注
提交作业需要已存在一个 Snowpark Session;请参阅 先决条件 了解详细信息。
调用 @remote 装饰函数会返回一个 Snowflake MLJob 对象,该对象可用于管理和监控作业执行。有关更多信息,请参阅 管理 ML 作业。
将 Python 文件作为 Snowflake ML 作业运行¶
在 Snowflake 计算资源上运行 Python 文件或项目目录。这在以下情况下很有用:
您的 ML 项目很复杂,有多个模块和依赖项。
您希望保持本地开发与生产代码之间的分离。
您需要运行使用命令行实参的脚本。
您正在处理不是专门为在 Snowflake 计算上执行而设计的现有 ML 项目。
Snowflake 作业 API 为提交基于文件的有效负载提供了三种主要方法:
submit_file():用于运行单个 Python 文件submit_directory():用于运行跨多个文件和资源的 Python 项目submit_from_stage():用于运行保存在 Snowflake 暂存区上的 Python 项目
两种方法都支持:
命令行实参传递
环境变量配置
自定义依赖项规范
通过 Snowflake 暂存区进行项目资产管理
File Dispatch 对于生产现有 ML 工作流程以及保持开发和执行环境之间的明确分离特别有用。
以下 Python 代码将一个文件作为 Snowflake ML 作业提交:
from snowflake.ml.jobs import submit_file
# Run a single file
job1 = submit_file(
"train.py",
"MY_COMPUTE_POOL",
stage_name="payload_stage",
args=["--data-table", "my_training_data"],
session=session,
)
以下 Python 代码将一个目录作为 Snowflake ML 作业提交:
from snowflake.ml.jobs import submit_directory
# Run from a directory
job2 = submit_directory(
"./ml_project/",
"MY_COMPUTE_POOL",
entrypoint="train.py",
stage_name="payload_stage",
session=session,
)
以下 Python 代码将来自 Snowflake 暂存区的目录作为 Snowflake ML 作业提交:
from snowflake.ml.jobs import submit_from_stage
# Run from a directory
job3 = submit_from_stage(
"@source_stage/ml_project/"
"MY_COMPUTE_POOL",
entrypoint="@source_stage/ml_project/train.py",
stage_name="payload_stage",
session=session,
)
# Entrypoint may also be a relative path
job4 = submit_from_stage(
"@source_stage/ml_project/",
"MY_COMPUTE_POOL",
entrypoint="train.py", # Resolves to @source_stage/ml_project/train.py
stage_name="payload_stage",
session=session,
)
提交文件或目录会返回一个 Snowflake MLJob 对象,该对象可用于管理和监控任务执行。有关更多信息,请参阅 管理 ML 作业。
在指定的容器运行时上运行 Snowflake ML 作业¶
@remote 装饰器以及 submit_directory()、submit_from_stage() 和 submit_file() 都支持 runtime_environment 关键字。当您在装饰器或函数调用中未提供此关键字时,Snowflake 会自动使用计算池上可用的最新版本 Snowflake 容器运行时。
要为您的 ML 作业指定容器运行时,请使用 runtime_environment 关键字,并为其设置一个字符串值,该值表示要使用的容器运行时版本。有关可用版本的完整列表以及这些环境默认包含的内容,请参阅 容器运行时版本发布说明。
以下示例展示了如何通过 @remote 装饰器将函数固定运行在 Snowflake 容器运行时 2.3 版本上:
from snowflake.ml.jobs import remote
@remote("MY_COMPUTE_POOL", stage_name="payload_stage", session=session, runtime_environment="2.3")
def train_model(data_table: str):
# Provide your ML code here, including imports and function calls
...
在提交中支持额外的有效负载¶
提交文件、目录或从某个暂存区提交时,支持在作业执行期间使用额外的有效负载。可以明确指定导入路径;否则,系统将从额外有效负载的位置推断此路径。
重要
您只能从暂存区加载单个 Python 文件。
# Run from a file
job1 = submit_file(
"train.py",
"MY_COMPUTE_POOL",
stage_name="payload_stage",
session=session,
imports=[
("src/utils/", "utils"), # the import path is utils
],
)
# Run from a directory
job2 = submit_directory(
"./ml_project/",
"MY_COMPUTE_POOL",
entrypoint="train.py",
stage_name="payload_stage",
session=session,
imports=[
("src/utils/"), # the import path is utils
],
)
# Run from a stage
job3 = submit_from_stage(
"@source_stage/ml_project/",
"MY_COMPUTE_POOL",
entrypoint="@source_stage/ml_project/train.py",
stage_name="payload_stage",
session=session,
imports=[
("@source_stage/src/utils/sub_utils/", "utils.sub_utils"),
],
)
在 ML 作业中访问 Snowpark 会话¶
在 Snowflake 上运行 ML 作业时,Snowpark 会话自动在执行上下文中可用。您可以使用以下方法,从 ML 有效负载中访问会话对象:
from snowflake.ml.jobs import remote
from snowflake.snowpark import Session
@remote("MY_COMPUTE_POOL", stage_name="payload_stage")
def my_function():
# This approach works for all payload types, including file and directory payloads
session = Session.builder.getOrCreate()
print(session.sql("SELECT CURRENT_VERSION()").collect())
@remote("MY_COMPUTE_POOL", stage_name="payload_stage")
def my_function_with_injected_session(session: Session):
# This approach works only for function dispatch payloads
# The session is injected automatically by the Snowflake ML Job API
print(session.sql("SELECT CURRENT_VERSION()").collect())
Snowpark 会话可用于在 ML 作业内访问 Snowflake 表、暂存区和其他数据库对象。
从 ML 作业返回结果¶
Snowflake ML 作业支持将执行结果返回到客户端环境。这使您能够检索作业有效负载产生的计算值、经过训练的模型或任何其他工件。
对于函数调度,只需从装饰的函数中返回一个值即可。返回值将被序列化,并通过 result() 方法提供。
from snowflake.ml.jobs import remote
@remote("MY_COMPUTE_POOL", stage_name="payload_stage")
def train_model(data_table: str):
# Your ML code here
model = XGBClassifier()
model.fit(data_table)
return model
job1 = train_model("my_training_data")
对于基于文件的作业,使用特殊 __return__ 变量来指定返回值。
# Example: /path/to/repo/my_script.py
def main():
# Your ML code here
model = XGBClassifier()
model.fit(data_table)
return model
if __name__ == "__main__":
__return__ = main()
from snowflake.ml.jobs import submit_file
job2 = submit_file(
"/path/to/repo/my_script.py",
"MY_COMPUTE_POOL",
stage_name="payload_stage",
session=session,
)
您可以使用 MLJob.result() API 检索作业执行结果。API 会阻塞调用线程,直至任务达到终止状态,然后返回有效负载的返回值,如果执行失败,则引发异常。如果有效负载未定义返回值,则在成功时,结果是 None。
# These will block until the respective job is done and return the trained model
model1 = job1.result()
model2 = job2.result()
ML 作业定义¶
一个 ML 作业定义封装了 ML 作业的可复用组件,包括有效负载位置、计算池及相关配置。这使您能够从同一有效负载提交多个作业,并使用不同的实参,而无需重新上传有效负载。
备注
ML 作业定义适用于 snowflake-ml-python 1.26 及更高版本。
要创建 ML 作业定义,请使用 MLJobDefinition 类。
from snowflake.ml.jobs import remote
from snowflake.ml.jobs import MLJobDefinition
compute_pool = "MY_COMPUTE_POOL"
@remote(compute_pool, stage_name="payload_stage")
def hello_world(name: str = "world"):
from datetime import datetime
print(f"{datetime.now()} Hello {name}!")
# this is a definition handle
definition = hello_world
job1 = definition()
job2 = definition(name="ML Job Definition")
# /path/to/repo/my_script.py
def main(*args):
print("Hello world", *args)
if __name__ == '__main__':
import sys
main(*sys.argv[1:])
job_definition = MLJobDefinition.register(
"/path/to/repo/my_script.py",
# If you register a source directory, provide the entrypoint file:
# entrypoint="/path/to/repo/my_script.py",
compute_pool="MY_COMPUTE_POOL",
stage_name="payload_stage",
)
# Arguments follow the same format used in file dispatch
job3 = job_definition()
job4 = job_definition(arg1="ML Job")
register() 函数接受 runtime_environment 作为可选关键字实参,用于选择在所选计算池上运行的容器镜像。默认情况下,作业定义使用 Snowflake 容器运行时的最新可用版本。
要为您的 ML 作业指定容器运行时,请使用 runtime_environment 关键字,并为其设置一个字符串值,该值表示要使用的容器运行时版本。有关可用版本的完整列表以及这些环境默认包含的内容,请参阅 容器运行时版本发布说明。
管理 ML 作业¶
当您提交 Snowflake ML 作业时,API 会创建一个 MLJob 实例。您可以用它来执行以下操作:
通过状态更新跟踪作业进度
使用详细的执行日志调试问题
检索执行结果(如有)
您可以使用 get_job() API 方法,通过其 ID 来检索一个 MLJob 对象。以下 Python 代码显示了如何检索 MLJob 对象:
from snowflake.ml.jobs import MLJob, get_job, list_jobs, delete_job
# Get a list of the 10 most recent jobs as a Pandas DataFrame
jobs_df = list_jobs(limit=10)
print(jobs_df) # Display list in table format
# Retrieve an existing job based on ID
job = get_job("<job_id>") # job is an MLJob instance
# Retrieve status and logs for the retrieved job
print(job.status) # PENDING, RUNNING, FAILED, DONE
print(job.get_logs())
# Clean up the job
delete_job(job)
管理依赖项¶
Snowflake ML 作业 API 在 Snowflake Container Runtime 环境中运行有效负载。该环境具有用于机器学习和数据科学的最常用 Python 包。大多数用例应该“开箱即用”,无需额外配置。如果您需要自定义依赖项,可以使用 pip_requirements 来安装它们。
要安装自定义依赖项,必须使用外部访问集成启用外部网络访问。您可以使用以下 SQL 示例命令来提供访问权限:
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION PYPI_EAI
ALLOWED_NETWORK_RULES = (snowflake.external_access.pypi_rule)
ENABLED = true;
有关外部访问集成的更多信息,请参阅 创建和使用外部访问集成。
提供外部网络访问权限后,您可以使用 pip_requirements 和 external_access_integrations 参数来配置自定义依赖项。您可以使用容器运行时环境中不可用的包,也可以使用包的特定版本。
以下 Python 代码显示了如何为 remote 装饰器指定自定义依赖项:
@remote(
"MY_COMPUTE_POOL",
stage_name="payload_stage",
pip_requirements=["custom-package"],
external_access_integrations=["PYPI_EAI"],
session=session,
)
def my_function():
# Your code here
以下 Python 代码显示如何为 submit_file() 方法指定自定义依赖项:
from snowflake.ml.jobs import submit_file
# Can include version specifier to specify version(s)
job = submit_file(
"/path/to/repo/my_script.py",
compute_pool,
stage_name="payload_stage",
pip_requirements=["custom-package==1.0.*"],
external_access_integrations=["pypi_eai"],
session=session,
)
专用包源¶
Snowflake ML 作业还支持从 JFrog Artifactory 和 Sonatype Nexus 存储库等专用源加载包。这些源通常用于分发内部和专有包、保持对依赖版本的控制以及确保安全性/合规性。
要从专用源安装软件包,必须执行以下操作:
创建网络规则以允许访问专用源 URL。
对于使用基本身份验证的源,您只需创建网络规则即可。
CREATE OR REPLACE NETWORK RULE private_feed_nr MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('<your-repo>.jfrog.io');
要使用专用连接(即专用链接)配置对源的访问权限,请按照 使用专用连接的网络出口 中的步骤操作。
使用网络规则创建外部访问集成。向将要提交作业的角色授予 EAI 使用权限。
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION private_feed_eai ALLOWED_NETWORK_RULES = (PRIVATE_FEED_NR) ENABLED = true; GRANT USAGE ON INTEGRATION private_feed_eai TO ROLE <role_name>;
提交作业时指定专有源 URL、外部访问集成和包
# Option 1: Specify private feed URL in pip_requirements job = submit_file( "/path/to/script.py", compute_pool="MY_COMPUTE_POOL", stage_name="payload_stage", pip_requirements=[ "--index-url=https://your.private.feed.url", "internal-package==1.2.3" ], external_access_integrations=["PRIVATE_FEED_EAI"] )
# Option 2: Specify private feed URL by environment variable job = submit_directory( "/path/to/code/", compute_pool="MY_COMPUTE_POOL", entrypoint="script.py", stage_name="payload_stage", pip_requirements=["internal-package==1.2.3"], external_access_integrations=["PRIVATE_FEED_EAI"], env_vars={'PIP_INDEX_URL': 'https://your.private.feed.url'}, )
如果您的专有源 URL 包含身份验证令牌等敏感信息,请通过创建 Snowflake 密钥管理 URL。使用 CREATE SECRET 创建一个密钥。在作业提交期间使用 spec_overrides 实参配置密钥。
# Create secret for private feed URL with embedded auth token
feed_url = "<your-repo>.jfrog.io/artifactory/api/pypi/test-pypi/simple"
user = "<auth_user>"
token = "<auth_token>"
session.sql(f"""
CREATE SECRET IF NOT EXISTS PRIVATE_FEED_URL_SECRET
TYPE = GENERIC_STRING
SECRET_STRING = 'https://{auth_user}:{auth_token}@{feed_url}'
""").collect()
# Prepare service spec override for mounting secret into job execution
spec_overrides = {
"spec": {
"containers": [
{
"name": "main", # Primary container name is always "main"
"secrets": [
{
"snowflakeSecret": "PRIVATE_FEED_URL_SECRET",
"envVarName": "PIP_INDEX_URL",
"secretKeyRef": "secret_string"
},
],
}
]
}
}
# Load private feed URL from secret (e.g. if URL includes auth token)
job = submit_file(
"/path/to/script.py",
compute_pool="MY_COMPUTE_POOL",
stage_name="payload_stage",
pip_requirements=[
"internal-package==1.2.3"
],
external_access_integrations=["PRIVATE_FEED_EAI"],
spec_overrides=spec_overrides,
)
有关 container.secrets 的更多信息,请参阅 containers.secrets 字段。
示例¶
有关如何使用 Snowflake ML 作业的示例,请参阅 ML 作业代码示例 (https://github.com/Snowflake-Labs/sf-samples/tree/main/samples/ml/ml_jobs)。
成本注意事项¶
Snowflake ML 作业在 Snowpark Container Service 上运行,并根据使用量计费。有关计算成本的信息,请参阅 Snowpark Container Services 成本。
作业有效负载将上传到使用 stage_name 实参指定的暂存区。为避免额外收费,必须将其清理干净。有关更多信息,请参阅 了解存储成本 和 探索存储成本 了解有关暂存区存储相关成本的更多信息。