Snowflake ML 作业

使用 Snowflake ML 作业在Snowflake ML 容器运行时内运行机器学习 (ML) 工作流程。您可以在任何开发环境中运行它们。您无需在 Snowflake 工作表或笔记本中运行代码。使用作业,利用 Snowflake 的基础架构在开发工作流程中运行资源密集型任务。有关在本地设置 Snowflake ML 的信息,请参阅 在本地使用 Snowflake ML

重要

Snowflake ML Jobs are available in snowflake-ml-python version 1.9.2 and later.

Snowflake ML 作业使您能够执行以下操作:

  • 在 Snowflake 计算池上运行 ML 工作负载,包括 GPU 和高内存 CPU 实例。

  • 使用您的首选开发环境,例如 VS Code 或 Jupyter 笔记本。

  • 在运行时环境中安装和使用自定义 Python 包。

  • 使用 Snowflake 的分布式 APIs 来优化数据加载、训练和超参数调整。

  • 与 Apache Airflow 等编排工具集成。

  • 通过 Snowflake APIs 监控和管理作业。

您可以使用这些功能来执行以下操作:

  • 对需要 GPU 加速或大量计算资源的大型数据集执行资源密集型训练。

  • 通过将 ML 代码从开发转移到生产环境并通过管道进行编程执行,生产 ML 工作流程。

  • 保留现有开发环境,同时利用 Snowflake 的计算资源。

  • 只需最少的代码更改即可提升和转移 OSS ML 工作流程。

  • 直接使用大型 Snowflake 数据集以减少数据移动并避免昂贵的数据传输。

先决条件

重要

Snowflake ML 作业目前仅支持 Python 3.10 客户端。如果您需要其他 Python 版本的支持,请联系您的 Snowflake 账户团队。

  1. 在您的 Python 3.10 环境中安装 Snowflake ML Python 包。

    pip install snowflake-ml-python>=1.9.2
    
    Copy
  2. 默认计算池大小使用 CPU_X64_S 实例系列。最小节点数为 1,最大节点数为 25。您可以使用以下 SQL 命令创建自定义计算池:

    CREATE COMPUTE POOL IF NOT EXISTS MY_COMPUTE_POOL
      MIN_NODES = <MIN_NODES>
      MAX_NODES = <MAX_NODES>
      INSTANCE_FAMILY = <INSTANCE_FAMILY>;
    
    Copy
  3. Snowflake ML 作业需要 Snowpark 会话。使用以下代码进行创建:

    from snowflake.snowpark import Session
    from snowflake.ml.jobs import list_jobs
    
    ls = list_jobs() # This will fail! You must create a session first.
    
    # Requires valid ~/.snowflake/config.toml file
    session = Session.builder.getOrCreate()
    
    ls = list_jobs(session=session)
    ls = list_jobs() # Infers created session from context
    
    Copy

    有关创建会话的更多信息,请参阅 创建会话

运行 Snowflake ML 作业

您可以通过以下其中一种方式运行 Snowflake ML 作业:

  • 在代码中使用函数装饰器。

  • 使用 Python API 提交整个文件或目录。

将 Python 函数作为 Snowflake ML 作业运行

使用 Function Dispatch 在 Snowflake 的计算资源上使用 @remote 装饰器远程运行单个 Python 函数。

使用 @remote 装饰器,您可以执行以下操作:

  • 序列化函数及其依赖项。

  • 将其上传到指定 Snowflake 暂存区。

  • 在 ML 容器运行时中执行。

以下示例 Python 代码使用 @remote 装饰器提交 Snowflake ML 作业。请注意,Snowpark Session 必填,请参阅 先决条件

from snowflake.ml.jobs import remote

@remote("MY_COMPUTE_POOL", stage_name="payload_stage", session=session)
def train_model(data_table: str):
  # Provide your ML code here, including imports and function calls
  ...

job = train_model("my_training_data")
Copy

调用 @remote 装饰函数会返回一个 Snowflake MLJob 对象,该对象可用于管理和监控作业执行。有关更多信息,请参阅 管理 ML 作业

将 Python 文件作为 Snowflake ML 作业运行

在 Snowflake 计算资源上运行 Python 文件或项目目录。这在以下情况下很有用:

  • 您的 ML 项目很复杂,有多个模块和依赖项。

  • 您希望保持本地开发与生产代码之间的分离。

  • 您需要运行使用命令行实参的脚本。

  • 您正在处理不是专门为在 Snowflake 计算上执行而设计的现有 ML 项目。

Snowflake 作业 API 为提交基于文件的有效负载提供了三种主要方法:

  • submit_file():用于运行单个 Python 文件

  • submit_directory():用于运行跨多个文件和资源的 Python 项目

  • submit_from_stage():用于运行保存在 Snowflake 暂存区的 Python 项目

两种方法都支持:

  • 命令行实参传递

  • 环境变量配置

  • 自定义依赖项规范

  • 通过 Snowflake 暂存区进行项目资产管理

File Dispatch 对于生产现有 ML 工作流程以及保持开发和执行环境之间的明确分离特别有用。

以下 Python 代码将一个文件作为 Snowflake ML 作业提交:

from snowflake.ml.jobs import submit_file

# Run a single file
job1 = submit_file(
  "train.py",
  "MY_COMPUTE_POOL",
  stage_name="payload_stage",
  args=["--data-table", "my_training_data"],
  session=session,
)
Copy

以下 Python 代码将一个目录作为 Snowflake ML 作业提交:

from snowflake.ml.jobs import submit_directory

# Run from a directory
job2 = submit_directory(
  "./ml_project/",
  "MY_COMPUTE_POOL",
  entrypoint="train.py",
  stage_name="payload_stage",
  session=session,
)
Copy

以下 Python 代码将来自 Snowflake 暂存区的目录作为 Snowflake ML 作业提交:

from snowflake.ml.jobs import submit_from_stage

# Run from a directory
job3 = submit_from_stage(
  "@source_stage/ml_project/"
  "MY_COMPUTE_POOL",
  entrypoint="@source_stage/ml_project/train.py",
  stage_name="payload_stage",
  session=session,
)

# Entrypoint may also be a relative path
job4 = submit_from_stage(
  "@source_stage/ml_project/",
  "MY_COMPUTE_POOL",
  entrypoint="train.py",  # Resolves to @source_stage/ml_project/train.py
  stage_name="payload_stage",
  session=session,
)
Copy

提交文件或目录会返回一个 Snowflake MLJob 对象,该对象可用于管理和监控任务执行。有关更多信息,请参阅 管理 ML 作业

在提交中支持额外的有效负载

提交文件、目录或从某个暂存区提交时,支持在作业执行期间使用额外的有效负载。可以明确指定导入路径;否则,系统将从额外有效负载的位置推断此路径。

重要

仅可将目录指定为导入源。不支持导入单个文件。

# Run from a file
 job1 = submit_file(
   "train.py",
   "MY_COMPUTE_POOL",
   stage_name="payload_stage",
   session=session,
   additional_payloads=[
     ("src/utils/", "utils"), # the import path is utils
   ],
 )

 # Run from a directory
 job2 = submit_directory(
   "./ml_project/",
   "MY_COMPUTE_POOL",
   entrypoint="train.py",
   stage_name="payload_stage",
   session=session,
   additional_payloads=[
     ("src/utils/"), # the import path is utils
   ],
 )

 # Run from a stage
 job3 = submit_from_stage(
   "@source_stage/ml_project/",
   "MY_COMPUTE_POOL",
   entrypoint="@source_stage/ml_project/train.py",
   stage_name="payload_stage",
   session=session,
   additional_payloads=[
     ("@source_stage/src/utils/sub_utils/", "utils.sub_utils"),
   ],
 )
Copy

在 ML 作业中访问 Snowpark 会话

在 Snowflake 上运行 ML 作业时,Snowpark 会话自动在执行上下文中可用。您可以使用以下方法,从 ML 有效负载中访问会话对象:

from snowflake.ml.jobs import remote
from snowflake.snowpark import Session

@remote("MY_COMPUTE_POOL", stage_name="payload_stage")
def my_function():
  # This approach works for all payload types, including file and directory payloads
  session = Session.builder.getOrCreate()
  print(session.sql("SELECT CURRENT_VERSION()").collect())

@remote("MY_COMPUTE_POOL", stage_name="payload_stage")
def my_function_with_injected_session(session: Session):
  # This approach works only for function dispatch payloads
  # The session is injected automatically by the Snowflake ML Job API
  print(session.sql("SELECT CURRENT_VERSION()").collect())
Copy

Snowpark 会话可用于在 ML 作业内访问 Snowflake 表、暂存区和其他数据库对象。

从 ML 作业返回结果

Snowflake ML 作业支持将执行结果返回到客户端环境。这使您能够检索作业有效负载产生的计算值、经过训练的模型或任何其他工件。

对于函数调度,只需从装饰的函数中返回一个值即可。返回值将被序列化,并通过 result() 方法提供。

from snowflake.ml.jobs import remote

@remote("MY_COMPUTE_POOL", stage_name="payload_stage")
def train_model(data_table: str):
  # Your ML code here
  model = XGBClassifier()
  model.fit(data_table)
  return model

job1 = train_model("my_training_data")
Copy

对于基于文件的作业,使用特殊 __return__ 变量来指定返回值。

# Example: /path/to/repo/my_script.py
def main():
    # Your ML code here
    model = XGBClassifier()
    model.fit(data_table)
    return model

if __name__ == "__main__":
    __return__ = main()
Copy
from snowflake.ml.jobs import submit_file

job2 = submit_file(
    "/path/to/repo/my_script.py",
    "MY_COMPUTE_POOL",
    stage_name="payload_stage",
    session=session,
)
Copy

您可以使用 MLJob.result() API 检索作业执行结果。API 会阻塞调用线程,直至任务达到终止状态,然后返回有效负载的返回值,如果执行失败,则引发异常。如果有效负载未定义返回值,则在成功时,结果是 None

# These will block until the respective job is done and return the trained model
model1 = job1.result()
model2 = job2.result()
Copy

管理 ML 作业

当您提交 Snowflake ML 作业时,API 会创建一个 MLJob 对象。您可以用它来执行以下操作:

  • 通过状态更新跟踪作业进度

  • 使用详细的执行日志调试问题

  • 检索执行结果(如有)

您可以使用 get_job() API 来按 ID 检索 MLJob 对象。以下 Python 代码显示了如何检索 MLJob 对象:

from snowflake.ml.jobs import MLJob, get_job, list_jobs, delete_job

# Get a list of the 10 most recent jobs as a Pandas DataFrame
jobs_df = list_jobs(limit=10)
print(jobs_df)  # Display list in table format

# Retrieve an existing job based on ID
job = get_job("<job_id>")  # job is an MLJob instance

# Retrieve status and logs for the retrieved job
print(job.status)  # PENDING, RUNNING, FAILED, DONE
print(job.get_logs())

# Clean up the job
delete_job(job)
Copy

管理依赖项

Snowflake ML 作业 API 在 ML 的容器运行时 环境中运行有效负载。该环境具有用于机器学习和数据科学的最常用 Python 包。大多数用例应该“开箱即用”,无需额外配置。如果您需要自定义依赖项,可以使用 pip_requirements 来安装它们。

要安装自定义依赖项,必须使用外部访问集成启用外部网络访问。您可以使用以下 SQL 示例命令来提供访问权限:

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION PYPI_EAI
  ALLOWED_NETWORK_RULES = (snowflake.external_access.pypi_rule)
  ENABLED = true;
Copy

有关外部访问集成的更多信息,请参阅 创建和使用外部访问集成

提供外部网络访问权限后,您可以使用 pip_requirementsexternal_access_integrations 参数来配置自定义依赖项。您可以使用容器运行时环境中不可用的包,也可以使用包的特定版本。

以下 Python 代码显示了如何为 remote 装饰器指定自定义依赖项:

@remote(
  "MY_COMPUTE_POOL",
  stage_name="payload_stage",
  pip_requirements=["custom-package"],
  external_access_integrations=["PYPI_EAI"],
  session=session,
)
def my_function():
  # Your code here
Copy

以下 Python 代码显示如何为 submit_file() 方法指定自定义依赖项:

from snowflake.ml.jobs import submit_file

# Can include version specifier to specify version(s)
job = submit_file(
  "/path/to/repo/my_script.py",
  compute_pool,
  stage_name="payload_stage",
  pip_requirements=["custom-package==1.0.*"],
  external_access_integrations=["pypi_eai"],
  session=session,
)
Copy

专用包源

Snowflake ML 作业还支持从 JFrog Artifactory 和 Sonatype Nexus 存储库等专用源加载包。这些源通常用于分发内部和专有包、保持对依赖版本的控制以及确保安全性/合规性。

要从专用源安装软件包,必须执行以下操作:

  1. 创建网络规则以允许访问专用源 URL。

    1. 对于使用基本身份验证的源,您只需创建网络规则即可。

      CREATE OR REPLACE NETWORK RULE private_feed_nr
      MODE = EGRESS
      TYPE = HOST_PORT
      VALUE_LIST = ('<your-repo>.jfrog.io');
      
      Copy
    2. 要使用专用连接(即专用链接)配置对源的访问权限,请按照 使用专用连接的网络出口 中的步骤操作。

  2. 使用网络规则创建外部访问集成。向将要提交作业的角色授予 EAI 使用权限。

    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION private_feed_eai
    ALLOWED_NETWORK_RULES = (PRIVATE_FEED_NR)
    ENABLED = true;
    
    GRANT USAGE ON INTEGRATION private_feed_eai TO ROLE <role_name>;
    
    Copy
  3. 提交作业时指定专有源 URL、外部访问集成和包

    # Option 1: Specify private feed URL in pip_requirements
    job = submit_file(
      "/path/to/script.py",
      compute_pool="MY_COMPUTE_POOL",
      stage_name="payload_stage",
      pip_requirements=[
        "--index-url=https://your.private.feed.url",
        "internal-package==1.2.3"
      ],
      external_access_integrations=["PRIVATE_FEED_EAI"]
    )
    
    Copy
    # Option 2: Specify private feed URL by environment variable
    job = submit_directory(
      "/path/to/code/",
      compute_pool="MY_COMPUTE_POOL",
      entrypoint="script.py",
      stage_name="payload_stage",
      pip_requirements=["internal-package==1.2.3"],
      external_access_integrations=["PRIVATE_FEED_EAI"],
      env_vars={'PIP_INDEX_URL': 'https://your.private.feed.url'},
    )
    
    Copy

如果您的专有源 URL 包含身份验证令牌等敏感信息,请通过创建 Snowflake 密钥管理 URL。使用 CREATE SECRET 创建一个密钥。在作业提交期间使用 spec_overrides 实参配置密钥。

# Create secret for private feed URL with embedded auth token
feed_url = "<your-repo>.jfrog.io/artifactory/api/pypi/test-pypi/simple"
user = "<auth_user>"
token = "<auth_token>"
session.sql(f"""
CREATE SECRET IF NOT EXISTS PRIVATE_FEED_URL_SECRET
 TYPE = GENERIC_STRING
 SECRET_STRING = 'https://{auth_user}:{auth_token}@{feed_url}'
""").collect()

# Prepare service spec override for mounting secret into job execution
spec_overrides = {
 "spec": {
  "containers": [
    {
     "name": "main",  # Primary container name is always "main"
     "secrets": [
      {
        "snowflakeSecret": "PRIVATE_FEED_URL_SECRET",
        "envVarName": "PIP_INDEX_URL",
        "secretKeyRef": "secret_string"
      },
     ],
    }
  ]
 }
}

# Load private feed URL from secret (e.g. if URL includes auth token)
job = submit_file(
  "/path/to/script.py",
  compute_pool="MY_COMPUTE_POOL",
  stage_name="payload_stage",
  pip_requirements=[
    "internal-package==1.2.3"
  ],
  external_access_integrations=["PRIVATE_FEED_EAI"],
  spec_overrides=spec_overrides,
)
Copy

有关 container.secrets 的更多信息,请参阅 containers.secrets 字段

示例

有关如何使用 Snowflake ML 作业的示例,请参阅 ML 作业代码示例 (https://github.com/Snowflake-Labs/sf-samples/tree/main/samples/ml/ml_jobs)。

成本注意事项

Snowflake ML 作业在 Snowpark Container Service 上运行,并根据使用量计费。有关计算成本的信息,请参阅 Snowpark Container Services 成本

作业有效负载将上传到使用 stage_name 实参指定的暂存区。为避免额外收费,必须将其清理干净。有关更多信息,请参阅 了解存储成本探索存储成本 了解有关暂存区存储相关成本的更多信息。

语言: 中文