Snowflake ML 作业

Use Snowflake ML Jobs to run machine learning (ML) workflows inside Snowflake ML container runtimes. You can run them from any development environment. You don’t need to run the code in a Snowflake worksheet or notebook. Use jobs to leverage Snowflake’s infrastructure to run resource-intensive tasks within your development workflow. For information about setting up Snowflake ML locally, see Using Snowflake ML Locally.

Important

Snowflake ML Jobs are available in snowflake-ml-python version 1.26.0 and later.

Snowflake ML 作业使您能够执行以下操作:

  • 在 Snowflake 计算池上运行 ML 工作负载,包括 GPU 和高内存 CPU 实例。
  • 使用您的首选开发环境,例如 VS Code 或 Jupyter 笔记本。
  • 在运行时环境中安装和使用自定义 Python 包。
  • 使用 Snowflake 的分布式 APIs 来优化数据加载、训练和超参数调整。
  • 与 Apache Airflow 等编排工具集成。
  • 通过 Snowflake APIs 监控和管理作业。

您可以使用这些功能来执行以下操作:

  • 对需要 GPU 加速或大量计算资源的大型数据集执行资源密集型训练。
  • 通过将 ML 代码从开发转移到生产环境并通过管道进行编程执行,生产 ML 工作流程。
  • 保留现有开发环境,同时利用 Snowflake 的计算资源。
  • 只需最少的代码更改即可提升和转移 OSS ML 工作流程。
  • 直接使用大型 Snowflake 数据集以减少数据移动并避免昂贵的数据传输。

先决条件

  1. 安装 Snowflake ML Python 包。

    pip install snowflake-ml-python>=1.26.0
  2. 默认计算池大小使用 CPU_X64_S 实例系列。最小节点数为 1,最大节点数为 25。您可以使用以下 SQL 命令创建自定义计算池:

    CREATE COMPUTE POOL IF NOT EXISTS MY_COMPUTE_POOL
      MIN_NODES = <MIN_NODES>
      MAX_NODES = <MAX_NODES>
      INSTANCE_FAMILY = <INSTANCE_FAMILY>;
  3. Snowflake ML 作业需要 Snowpark 会话。使用以下代码进行创建:

    from snowflake.snowpark import Session
    from snowflake.ml.jobs import list_jobs
    
    ls = list_jobs() # This will fail! You must create a session first.
    
    # Requires valid ~/.snowflake/config.toml file
    session = Session.builder.getOrCreate()
    
    ls = list_jobs(session=session)
    ls = list_jobs() # Infers created session from context

    For information about creating a session, see Creating a Session.

运行 Snowflake ML 作业

您可以通过以下其中一种方式运行 Snowflake ML 作业:

  • 在代码中使用函数装饰器。
  • 使用 Python API 提交整个文件或目录。

将 Python 函数作为 Snowflake ML 作业运行

Use Function Dispatch to run individual Python functions remotely on Snowflake’s compute resources with the @remote decorator.

Using @remote, you can:

  • 序列化函数及其依赖项。
  • 将其上传到指定 Snowflake 暂存区。
  • 在特定容器运行时中执行。

The following example Python code uses the @remote decorator to submit a function call as a Snowflake ML Job:

from snowflake.ml.jobs import remote

@remote("MY_COMPUTE_POOL", stage_name="payload_stage", session=session)
def train_model(data_table: str):
  # Provide your ML code here, including imports and function calls
  ...

job = train_model("my_training_data")

Note

Submitting a job requires an existing Snowpark Session; See 先决条件 for details.

Invoking a @remote decorated function returns a Snowflake MLJob object that can be used to manage and monitor the job execution. For more information, see ML Jobs 中的 Ray Dashboard.

将 Python 文件作为 Snowflake ML 作业运行

在 Snowflake 计算资源上运行 Python 文件或项目目录。这在以下情况下很有用:

  • 您的 ML 项目很复杂,有多个模块和依赖项。
  • 您希望保持本地开发与生产代码之间的分离。
  • 您需要运行使用命令行实参的脚本。
  • 您正在处理不是专门为在 Snowflake 计算上执行而设计的现有 ML 项目。

Snowflake 作业 API 为提交基于文件的有效负载提供了三种主要方法:

  • submit_file: For running single Python files
  • submit_directory: For running Python projects spanning multiple files and resources
  • submit_from_stage: For running Python projects saved on a Snowflake stage

两种方法都支持:

  • 命令行实参传递
  • 环境变量配置
  • 自定义依赖项规范
  • 通过 Snowflake 暂存区进行项目资产管理

File Dispatch 对于生产现有 ML 工作流程以及保持开发和执行环境之间的明确分离特别有用。

以下 Python 代码将一个文件作为 Snowflake ML 作业提交:

from snowflake.ml.jobs import submit_file

# Run a single file
job1 = submit_file(
  "train.py",
  "MY_COMPUTE_POOL",
  stage_name="payload_stage",
  args=["--data-table", "my_training_data"],
  session=session,
)

以下 Python 代码将一个目录作为 Snowflake ML 作业提交:

from snowflake.ml.jobs import submit_directory

# Run from a directory
job2 = submit_directory(
  "./ml_project/",
  "MY_COMPUTE_POOL",
  entrypoint="train.py",
  stage_name="payload_stage",
  session=session,
)

以下 Python 代码将来自 Snowflake 暂存区的目录作为 Snowflake ML 作业提交:

from snowflake.ml.jobs import submit_from_stage

# Run from a directory
job3 = submit_from_stage(
  "@source_stage/ml_project/"
  "MY_COMPUTE_POOL",
  entrypoint="@source_stage/ml_project/train.py",
  stage_name="payload_stage",
  session=session,
)

# Entrypoint may also be a relative path
job4 = submit_from_stage(
  "@source_stage/ml_project/",
  "MY_COMPUTE_POOL",
  entrypoint="train.py",  # Resolves to @source_stage/ml_project/train.py
  stage_name="payload_stage",
  session=session,
)

Submitting a file or directory returns a Snowflake MLJob object that can be used to manage and monitor the job execution. For more information, see ML Jobs 中的 Ray Dashboard.

在指定的容器运行时上运行 Snowflake ML 作业

The @remote decorator, as well as the functions submit_directory, submit_from_stage, and submit_file all support the runtime_environment keyword. When you don’t provide this keyword in your decorator or function call, Snowflake automatically uses the latest avialable version of the Snowflake Container Runtime on your compute pool.

To specify a container runtime for your ML Job, use the runtime_environment keyword with a string value of the Container Runtime version to use. See Container Runtime releases for the full list of available versions and what’s contained in these environments by default.

The following example shows how to pin a function with the @remote decorator to Snowflake Container Runtime version 2.3:

from snowflake.ml.jobs import remote

@remote("MY_COMPUTE_POOL", stage_name="payload_stage", session=session, runtime_environment="2.3")
def train_model(data_table: str):
  # Provide your ML code here, including imports and function calls
  ...

在提交中支持额外的有效负载

提交文件、目录或从某个暂存区提交时,支持在作业执行期间使用额外的有效负载。可以明确指定导入路径;否则,系统将从额外有效负载的位置推断此路径。

Important

您只能从暂存区加载单个 Python 文件。

# Run from a file
 job1 = submit_file(
   "train.py",
   "MY_COMPUTE_POOL",
   stage_name="payload_stage",
   session=session,
   imports=[
     ("src/utils/", "utils"), # the import path is utils
   ],
 )

 # Run from a directory
 job2 = submit_directory(
   "./ml_project/",
   "MY_COMPUTE_POOL",
   entrypoint="train.py",
   stage_name="payload_stage",
   session=session,
   imports=[
     ("src/utils/"), # the import path is utils
   ],
 )

 # Run from a stage
 job3 = submit_from_stage(
   "@source_stage/ml_project/",
   "MY_COMPUTE_POOL",
   entrypoint="@source_stage/ml_project/train.py",
   stage_name="payload_stage",
   session=session,
   imports=[
     ("@source_stage/src/utils/sub_utils/", "utils.sub_utils"),
   ],
 )

在 ML 作业中访问 Snowpark 会话

在 Snowflake 上运行 ML 作业时,Snowpark 会话自动在执行上下文中可用。您可以使用以下方法,从 ML 有效负载中访问会话对象:

from snowflake.ml.jobs import remote
from snowflake.snowpark import Session

@remote("MY_COMPUTE_POOL", stage_name="payload_stage")
def my_function():
  # This approach works for all payload types, including file and directory payloads
  session = Session.builder.getOrCreate()
  print(session.sql("SELECT CURRENT_VERSION()").collect())

@remote("MY_COMPUTE_POOL", stage_name="payload_stage")
def my_function_with_injected_session(session: Session):
  # This approach works only for function dispatch payloads
  # The session is injected automatically by the Snowflake ML Job API
  print(session.sql("SELECT CURRENT_VERSION()").collect())

Snowpark 会话可用于在 ML 作业内访问 Snowflake 表、暂存区和其他数据库对象。

从 ML 作业返回结果

Snowflake ML 作业支持将执行结果返回到客户端环境。这使您能够检索作业有效负载产生的计算值、经过训练的模型或任何其他工件。

For function dispatch, simply return a value from your decorated function. The returned value will be serialized and made available through the result() method.

from snowflake.ml.jobs import remote

@remote("MY_COMPUTE_POOL", stage_name="payload_stage")
def train_model(data_table: str):
  # Your ML code here
  model = XGBClassifier()
  model.fit(data_table)
  return model

job1 = train_model("my_training_data")

For file-based jobs, use the special __return__ variable to specify the return value.

# Example: /path/to/repo/my_script.py
def main():
    # Your ML code here
    model = XGBClassifier()
    model.fit(data_table)
    return model

if __name__ == "__main__":
    __return__ = main()
from snowflake.ml.jobs import submit_file

job2 = submit_file(
    "/path/to/repo/my_script.py",
    "MY_COMPUTE_POOL",
    stage_name="payload_stage",
    session=session,
)

You can retrieve the job execution result using the MLJob.result() API. The API blocks the calling thread until the job reaches a terminal state, then returns the payload’s return value or, if execution failed, raises an exception. If the payload does not define a return value, the result will be None on success.

# These will block until the respective job is done and return the trained model
model1 = job1.result()
model2 = job2.result()

ML 作业定义

一个 ML 作业定义封装了 ML 作业的可复用组件,包括有效负载位置、计算池及相关配置。这使您能够从同一有效负载提交多个作业,并使用不同的实参,而无需重新上传有效负载。

Note

ML Job Definitions are available in snowflake-ml-python version 1.26 and later.

要创建 ML 作业定义,请使用 :class:MLJobDefinition 类。API 密切反映了作业创建 APIs。创建作业定义时,也支持创建作业时支持的所有可选参数。

Use Function Dispatch to register individual Python functions with the @remote decorator.

from snowflake.ml.jobs import remote

compute_pool = "MY_COMPUTE_POOL"
@remote(compute_pool, stage_name="payload_stage")
def hello_world(name: str = "world"):
    from datetime import datetime

    print(f"{datetime.now()} Hello {name}!")

# this is a definition handle
definition = hello_world

job1 = hello_world()

Use register() to create job definitions from a local file, a local directory, or a stage directory.

from snowflake.ml.jobs import MLJobDefinition

# create a job definition from a stage directory
job_definition1 = MLJobDefinition.register(
    entrypoint ='@tmp_stage/my_project/xgb.py',
    source = '@tmp_stage/my_project',
    stage_name = "payload_stage",
    compute_pool = compute_pool
)

# create a job definition from local file
job_definition2 = MLJobDefinition.register(
    source ='/path/to/script.py',
    stage_name = "payload_stage",
    compute_pool = compute_pool
)

# create a job definition from the directory
job_definition3 = MLJobDefinition.register(
    entrypoint ='/path/to/directory/script.py',
    source = '/path/to/directory',
    stage_name = "payload_stage",
    compute_pool = compute_pool
)

根据作业定义创建作业,支持传递不同的参数以生成不同的作业。

from snowflake.ml.jobs import remote

# create a job definition using the remote decorator
compute_pool = "MY_COMPUTE_POOL"
@remote(compute_pool, stage_name="payload_stage")
def hello_world(name: str = "world"):
    from datetime import datetime

    print(f"{datetime.now()} Hello {name}!")

definition = hello_world

job1 = definition()

job2 = definition(name="ML Job Definition") # pass in the different parameter

The register function takes runtime_environment as an optional keyword argument to select the container image that runs on your selected compute pool. By default, your job definition uses the latest available version of the Snowflake Container Runtime.

To specify a container runtime for your ML Job, use the runtime_environment keyword with a string value of the Container Runtime version to use. See Container Runtime releases for the full list of available versions and what’s contained in these environments by default.

Support integration with Tasks. Jobs executed from a Task do not run within a stored procedure. Refer to ML Jobs Task Integration samples (https://github.com/Snowflake-Labs/sf-samples/tree/main/samples/ml/ml_jobs/e2e_task_graph) for examples of using Snowflake ML Job Definitions in Tasks.

from snowflake.ml.jobs import remote

compute_pool = "MY_COMPUTE_POOL"
@remote(COMPUTE_POOL, stage_name="payload_stage")
def train_model(input_data: DataSource) -> Optional[str]:
    ...

train_model_task = DAGTask("TRAIN_MODEL", definition=train_model) # train_model is a job definition created by the @remote decorator

ML Jobs 中的 Ray Dashboard

ML Job now supports the ray dashboard for the running jobs in snowflake-ml-python version 1.30 and later.

Note

The Ray Dashboard is not supported on the CPU_X64_XS compute pool instance family. The dashboard is only available while the job is running.

from snowflake.ml.jobs import remote

@remote("MY_COMPUTE_POOL", stage_name="payload_stage", session=session)
def train_model(data_table: str):
  # Provide your ML code here, including imports and function calls
  ...

job = train_model("my_training_data")
ray_dashboard_url = job.get_ray_dashboard_url() # copy and paste this url in browser to log in then to see the ray dashboard

管理 ML 作业

当您提交 Snowflake ML 作业时,API 会创建一个 :class:MLJob 实例。您可以用它来执行以下操作:

  • 通过状态更新跟踪作业进度
  • 使用详细的执行日志调试问题
  • 检索执行结果(如有)

You can use the get_job API to retrieve an :class:MLJob object by its ID. The following Python code shows how to retrieve an :class:MLJob object:

from snowflake.ml.jobs import MLJob, get_job, list_jobs, delete_job

# Get a list of the 10 most recent jobs as a Pandas DataFrame
jobs_df = list_jobs(limit=10)
print(jobs_df)  # Display list in table format

# Retrieve an existing job based on ID
job = get_job("<job_id>")  # job is an MLJob instance

# Retrieve status and logs for the retrieved job
print(job.status)  # PENDING, RUNNING, FAILED, DONE
print(job.get_logs())

# Clean up the job
delete_job(job)

管理依赖项

The Snowflake ML Job API runs payloads inside the Snowflake Container Runtime environment. The environment has the most commonly used Python packages for machine learning and data science. Most use cases should work “out of the box” without additional configuration. If you need custom dependencies, you can use pip_requirements to install them.

要安装自定义依赖项,必须使用外部访问集成启用外部网络访问。您可以使用以下 SQL 示例命令来提供访问权限:

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION PYPI_EAI
  ALLOWED_NETWORK_RULES = (snowflake.external_access.pypi_rule)
  ENABLED = true;

For more information about external access integrations, see Creating and using an external access integration.

After you’ve provided external network access, you can use the pip_requirements and external_access_integrations parameters to configure custom dependencies. You can use packages that aren’t available in the container runtime environment or if you specific versions of the packages.

The following Python code shows how to specify custom dependencies to the remote decorator:

@remote(
  "MY_COMPUTE_POOL",
  stage_name="payload_stage",
  pip_requirements=["custom-package"],
  external_access_integrations=["PYPI_EAI"],
  session=session,
)
def my_function():
  # Your code here

The following Python code shows how to specify custom dependencies for the submit_file() method:

from snowflake.ml.jobs import submit_file

# Can include version specifier to specify version(s)
job = submit_file(
  "/path/to/repo/my_script.py",
  compute_pool,
  stage_name="payload_stage",
  pip_requirements=["custom-package==1.0.*"],
  external_access_integrations=["pypi_eai"],
  session=session,
)

专用包源

Snowflake ML 作业还支持从 JFrog Artifactory 和 Sonatype Nexus 存储库等专用源加载包。这些源通常用于分发内部和专有包、保持对依赖版本的控制以及确保安全性/合规性。

要从专用源安装软件包,必须执行以下操作:

  1. 创建网络规则以允许访问专用源 URL。

    1. 对于使用基本身份验证的源,您只需创建网络规则即可。

      CREATE OR REPLACE NETWORK RULE private_feed_nr
      MODE = EGRESS
      TYPE = HOST_PORT
      VALUE_LIST = ('<your-repo>.jfrog.io');
    2. To configure access to a source using private connectivity (i.e. Private Link), follow the steps in Network egress using private connectivity.

  2. 使用网络规则创建外部访问集成。向将要提交作业的角色授予 EAI 使用权限。

    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION private_feed_eai
    ALLOWED_NETWORK_RULES = (PRIVATE_FEED_NR)
    ENABLED = true;
    
    GRANT USAGE ON INTEGRATION private_feed_eai TO ROLE <role_name>;
  3. 提交作业时指定专有源 URL、外部访问集成和包

    # Option 1: Specify private feed URL in pip_requirements
    job = submit_file(
      "/path/to/script.py",
      compute_pool="MY_COMPUTE_POOL",
      stage_name="payload_stage",
      pip_requirements=[
     "--index-url=https://your.private.feed.url",
     "internal-package==1.2.3"
      ],
      external_access_integrations=["PRIVATE_FEED_EAI"]
    )
    # Option 2: Specify private feed URL by environment variable
    job = submit_directory(
      "/path/to/code/",
      compute_pool="MY_COMPUTE_POOL",
      entrypoint="script.py",
      stage_name="payload_stage",
      pip_requirements=["internal-package==1.2.3"],
      external_access_integrations=["PRIVATE_FEED_EAI"],
      env_vars={'PIP_INDEX_URL': 'https://your.private.feed.url'},
    )

If your private feed URL contains sensitive information like authentication tokens, manage the URL by creating a Snowflake Secret. Use the CREATE SECRET to create a secret. Configure secrets during job submission with the spec_overrides argument.

Note

When using spec_overrides, Snowflake only supports and validates secrets in the secrets field within container definitions. Snowflake does not support or validate other fields, such as args, volumes, and endpoints.

# Create secret for private feed URL with embedded auth token
feed_url = "<your-repo>.jfrog.io/artifactory/api/pypi/test-pypi/simple"
user = "<auth_user>"
token = "<auth_token>"
session.sql(f"""
CREATE SECRET IF NOT EXISTS PRIVATE_FEED_URL_SECRET
 TYPE = GENERIC_STRING
 SECRET_STRING = 'https://{auth_user}:{auth_token}@{feed_url}'
""").collect()

# Prepare service spec override for mounting secret into job execution
spec_overrides = {
 "spec": {
  "containers": [
    {
     "name": "main",  # Primary container name is always "main"
     "secrets": [
      {
        "snowflakeSecret": "PRIVATE_FEED_URL_SECRET",
        "envVarName": "PIP_INDEX_URL",
        "secretKeyRef": "secret_string"
      },
     ],
    }
  ]
 }
}

# Load private feed URL from secret (e.g. if URL includes auth token)
job = submit_file(
  "/path/to/script.py",
  compute_pool="MY_COMPUTE_POOL",
  stage_name="payload_stage",
  pip_requirements=[
    "internal-package==1.2.3"
  ],
  external_access_integrations=["PRIVATE_FEED_EAI"],
  spec_overrides=spec_overrides,
)

For more information about the container.secrets, see `containers.secrets` field.

示例

See ML Jobs Code Samples (https://github.com/Snowflake-Labs/sf-samples/tree/main/samples/ml/ml_jobs) for examples of how to use Snowflake ML Jobs.

成本注意事项

Snowflake ML Jobs run on Snowpark Container Services and are billed based on usage. For information about compute costs, see Snowpark Container Services costs.

Job payloads are uploaded to the stage specified with the stage_name argument. To avoid additional charges, you must clean them up. For more information, see Understanding storage cost and Exploring storage cost to learn more about costs associated with stage storage.