Snowflake ML 作业

使用 Snowflake ML 作业在Snowflake ML 容器运行时内运行机器学习 (ML) 工作流程。您可以在任何开发环境中运行它们。您无需在 Snowflake 工作表或笔记本中运行代码。使用作业,利用 Snowflake 的基础架构在开发工作流程中运行资源密集型任务。有关在本地设置 Snowflake ML 的信息,请参阅 在本地使用 Snowflake ML

重要

Snowflake ML 作业在 Snowpark ML Python 包 (snowflake-ml-python) 版本 1.8.2 及更高版本中以公开预览版的形式提供。

Snowflake ML 作业使您能够执行以下操作:

  • 在 Snowflake 计算池上运行 ML 工作负载,包括 GPU 和高内存 CPU 实例。

  • 使用您的首选开发环境,例如 VS Code 或 Jupyter 笔记本。

  • 在运行时环境中安装和使用自定义 Python 包。

  • 使用 Snowflake 的分布式 APIs 来优化数据加载、训练和超参数调整。

  • 与 Apache Airflow 等编排工具集成。

  • 通过 Snowflake APIs 监控和管理作业。

您可以使用这些功能来执行以下操作:

  • 对需要 GPU 加速或大量计算资源的大型数据集执行资源密集型训练。

  • 通过将 ML 代码从开发转移到生产环境并通过管道进行编程执行,生产 ML 工作流程。

  • 保留现有开发环境,同时利用 Snowflake 的计算资源。

  • 只需最少的代码更改即可提升和转移 OSS ML 工作流程。

  • 直接使用大型 Snowflake 数据集以减少数据移动并避免昂贵的数据传输。

先决条件

重要

Snowflake ML 作业目前仅支持 Python 3.10 客户端。如果您需要其他 Python 版本的支持,请联系您的 Snowflake 账户团队。

  1. 在您的 Python 3.10 环境中安装 Snowflake ML Python 包。

    pip install snowflake-ml-python>=1.8.2
    
    Copy
  2. 默认计算池大小使用 CPU_X64_S 实例系列。最小节点数为 1,最大节点数为 25。您可以使用以下 SQL 命令创建自定义计算池:

    CREATE COMPUTE POOL IF NOT EXISTS MY_COMPUTE_POOL
      MIN_NODES = <MIN_NODES>
      MAX_NODES = <MAX_NODES>
      INSTANCE_FAMILY = <INSTANCE_FAMILY>;
    
    Copy
  3. Snowflake ML 作业需要 Snowpark 会话。使用以下代码进行创建:

    from snowflake.snowpark import Session
    from snowflake.ml.jobs import list_jobs
    
    ls = list_jobs() # This will fail! You must create a session first.
    
    # Requires valid ~/.snowflake/config.toml file
    session = Session.builder.getOrCreate()
    
    ls = list_jobs(session=session)
    ls = list_jobs() # Infers created session from context
    
    Copy

    有关创建会话的更多信息,请参阅 创建会话

运行 Snowflake ML 作业

您可以通过以下其中一种方式运行 Snowflake ML 作业:

  • 在代码中使用远程装饰器。

  • 使用 Python API 提交整个文件或目录。

将 Python 函数作为 Snowflake ML 作业运行

使用 Function Dispatch 在 Snowflake 的计算资源上使用 @remote 装饰器远程运行单个 Python 函数。

使用 @remote 装饰器,您可以执行以下操作:

  • 序列化函数及其依赖项。

  • 将其上传到指定 Snowflake 暂存区。

  • 在 ML 容器运行时中执行。

以下示例 Python 代码使用 @remote 装饰器提交 Snowflake ML 作业。请注意,Snowpark Session 必填,请参阅 先决条件

from snowflake.ml.jobs import remote

@remote("MY_COMPUTE_POOL", stage_name="payload_stage", session=session)
def train_model(data_table: str):
  # Provide your ML code here, including imports and function calls
  ...

job = train_model("my_training_data")
Copy

调用 @remote 装饰函数会返回一个 Snowflake MLJob 对象,该对象可用于管理和监控作业执行。有关更多信息,请参阅 Snowflake ML 作业管理

将 Python 文件作为 Snowflake ML 作业运行

在 Snowflake 计算资源上运行 Python 文件或项目目录。这在以下情况下很有用:

  • 您的 ML 项目很复杂,有多个模块和依赖项。

  • 您希望保持本地开发与生产代码之间的分离。

  • 您需要运行使用命令行实参的脚本。

  • 您正在处理不是专门为在 Snowflake 计算上执行而设计的现有 ML 项目。

Snowflake 作业 API 提供两种主要方法:

  • submit_file():用于运行单个 Python 文件

  • submit_directory():用于运行包含多个文件和资源的整个项目目录

两种方法都支持:

  • 命令行实参传递

  • 环境变量配置

  • 自定义依赖项规范

  • 通过 Snowflake 暂存区进行项目资产管理

File Dispatch 对于生产现有 ML 工作流程以及保持开发和执行环境之间的明确分离特别有用。

以下 Python 代码向 Snowflake ML 作业提交一个文件:

from snowflake.ml.jobs import submit_file

# Run a single file
job1 = submit_file(
  "train.py",
  "MY_COMPUTE_POOL",
  stage_name="payload_stage",
  args=["--data-table", "my_training_data"],
  session=session,
)
Copy

以下 Python 代码向 Snowflake ML 作业提交一个目录:

from snowflake.ml.jobs import submit_directory

# Run from a directory
job2 = submit_directory(
  "./ml_project/",
  "MY_COMPUTE_POOL",
  entrypoint="train.py",
  stage_name="payload_stage",
  session=session,
)
Copy

提交文件或目录会返回一个 Snowflake MLJob 对象,该对象可用于管理和监控任务执行。有关更多信息,请参阅 Snowflake ML 作业管理

Snowflake ML 作业管理

当您提交 Snowflake ML 作业时,API 会创建一个 MLJob 对象。您可以用它来执行以下操作:

  • 通过状态更新跟踪作业进度

  • 使用详细的执行日志调试问题

  • 检索执行结果(如有)

您可以使用 get_job() API 来按 ID 检索 MLJob 对象。以下 Python 代码显示了如何检索 MLJob 对象:

from snowflake.ml.jobs import MLJob, get_job, list_jobs

# List all jobs
jobs = list_jobs()

# Retrieve an existing job based on ID
job = get_job("<job_id>")  # job is an MLJob instance

# Retrieve status and logs for the retrieved job
print(job.status)  # PENDING, RUNNING, FAILED, DONE
print(job.get_logs())
Copy

调用作业的 result 方法以等待执行完成并检索执行结果。如果执行失败,result 会引发异常。

result = job.result()
Copy

管理依赖项

Snowflake ML 作业 API 在 ML 的容器运行时 环境中运行有效负载。该环境具有用于机器学习和数据科学的最常用 Python 包。大多数用例应该“开箱即用”,无需额外配置。如果您需要自定义依赖项,可以使用 pip_requirements 来安装它们。

要安装自定义依赖项,必须使用外部访问集成启用外部网络访问。您可以使用以下 SQL 示例命令来提供访问权限:

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION PYPI_EAI
  ALLOWED_NETWORK_RULES = (snowflake.external_access.pypi_rule)
  ENABLED = true;
Copy

有关外部访问集成的更多信息,请参阅 创建和使用外部访问集成

提供外部网络访问权限后,您可以使用 pip_requirementsexternal_access_integrations 参数来配置自定义依赖项。您可以使用容器运行时环境中不可用的包,也可以使用包的特定版本。

以下 Python 代码显示了如何为 remote 装饰器指定自定义依赖项:

@remote(
  "MY_COMPUTE_POOL",
  stage_name="payload_stage",
  pip_requirements=["custom-package"],
  external_access_integrations=["PYPI_EAI"],
  session=session,
)
def my_function():
  # Your code here
Copy

以下 Python 代码显示如何为 submit_file() 方法指定自定义依赖项:

from snowflake.ml.jobs import submit_file

# Can include version specifier to specify version(s)
job = submit_file(
  "/path/to/repo/my_script.py",
  compute_pool,
  stage_name="payload_stage",
  pip_requirements=["custom-package==1.0.*"],
  external_access_integrations=["pypi_eai"],
  session=session,
)
Copy

专用包源

Snowflake ML 作业还支持从 JFrog Artifactory 和 Sonatype Nexus 存储库等专用源加载包。这些源通常用于分发内部和专有包、保持对依赖版本的控制以及确保安全性/合规性。

要从专用源安装软件包,必须执行以下操作:

  1. 创建网络规则以允许访问专用源 URL。

    1. 对于使用基本身份验证的源,您只需创建网络规则即可。

      CREATE OR REPLACE NETWORK RULE private_feed_nr
      MODE = EGRESS
      TYPE = HOST_PORT
      VALUE_LIST = ('<your-repo>.jfrog.io');
      
      Copy
    2. 要使用专用连接(即专用链接)配置对源的访问权限,请按照 使用专用连接的网络出口 中的步骤操作。

  2. 使用网络规则创建外部访问集成。向将要提交作业的角色授予 EAI 使用权限。

    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION private_feed_eai
    ALLOWED_NETWORK_RULES = (PRIVATE_FEED_NR)
    ENABLED = true;
    
    GRANT USAGE ON INTEGRATION private_feed_eai TO ROLE <role_name>;
    
    Copy
  3. 提交作业时指定专有源 URL、外部访问集成和包

    # Option 1: Specify private feed URL in pip_requirements
    job = submit_file(
      "/path/to/script.py",
      compute_pool="MY_COMPUTE_POOL",
      stage_name="payload_stage",
      pip_requirements=[
        "--index-url=https://your.private.feed.url",
        "internal-package==1.2.3"
      ],
      external_access_integrations=["PRIVATE_FEED_EAI"]
    )
    
    Copy
    # Option 2: Specify private feed URL by environment variable
    job = submit_directory(
      "/path/to/code/",
      compute_pool="MY_COMPUTE_POOL",
      entrypoint="script.py",
      stage_name="payload_stage",
      pip_requirements=["internal-package==1.2.3"],
      external_access_integrations=["PRIVATE_FEED_EAI"],
      env_vars={'PIP_INDEX_URL': 'https://your.private.feed.url'},
    )
    
    Copy

如果您的专有源 URL 包含身份验证令牌等敏感信息,请通过创建 Snowflake 密钥管理 URL。使用 CREATE SECRET 创建一个密钥。在作业提交期间使用 :spec_overrides 实参配置密钥。

# Create secret for private feed URL with embedded auth token
feed_url = "<your-repo>.jfrog.io/artifactory/api/pypi/test-pypi/simple"
user = "<auth_user>"
token = "<auth_token>"
session.sql(f"""
CREATE SECRET IF NOT EXISTS PRIVATE_FEED_URL_SECRET
 TYPE = GENERIC_STRING
 SECRET_STRING = 'https://{auth_user}:{auth_token}@{feed_url}'
""").collect()

# Prepare service spec override for mounting secret into job execution
spec_overrides = {
 "spec": {
  "containers": [
    {
     "name": "main",  # Primary container name is always "main"
     "secrets": [
      {
        "snowflakeSecret": "PRIVATE_FEED_URL_SECRET",
        "envVarName": "PIP_INDEX_URL",
        "secretKeyRef": "secret_string"
      },
     ],
    }
  ]
 }
}

# Load private feed URL from secret (e.g. if URL includes auth token)
job = submit_file(
  "/path/to/script.py",
  compute_pool="MY_COMPUTE_POOL",
  stage_name="payload_stage",
  pip_requirements=[
    "internal-package==1.2.3"
  ],
  external_access_integrations=["PRIVATE_FEED_EAI"],
  spec_overrides=spec_overrides,
)
Copy

有关 container.secrets 的更多信息,请参阅 containers.secrets 字段

成本注意事项

Snowflake ML 作业在 Snowpark Container Service 上运行,并根据使用量计费。有关计算成本的信息,请参阅 Snowpark Container Services 成本

作业有效负载将上传到使用 stage_name 实参指定的暂存区。为避免额外收费,必须将其清理干净。有关更多信息,请参阅 了解存储成本探索存储成本 了解有关暂存区存储相关成本的更多信息。

语言: 中文