批量推理作业

备注

预览版功能 – 公开

支持自 snowflake-ml-python 版本 1.26.0 之后的公开预览版。

使用 Snowflake 批量推理对静态或定期更新的数据集进行高效、大规模的模型推理。批量推理 API 使用 Snowpark Container Services (SPCS),提供针对海量吞吐量和成本效益进行优化的分布式计算层。

何时使用批量推理

对工作负载使用 run_batch 方法可以执行以下操作:

  • 处理图像、音频或视频文件,或将多模态模型与非结构化数据结合使用

  • 对数百万或数十亿行执行推理。

  • 将推理作为管道中的离散异步暂存区运行。

  • 将推理集成为 Airflow DAG 或 Snowflake 任务中的一个步骤。

限制

  • 对于多模态用例,仅在服务器端支持加密

  • 不支持分区模型

开始使用

连接到 Model Registry

连接到 Snowflake Model Registry 并检索模型引用如下:

from snowflake.ml.registry import Registry

registry = Registry(session=session, database_name=DATABASE, schema_name=REGISTRY_SCHEMA)
mv = registry.get_model('my_model').version('my_version')  # returns ModelVersion
Copy

执行批处理作业

此 API 使用 Snowpark Container Services (SPCS) 作业启动推理工作负载。运行推理后,计算会自动结束,以防止产生额外费用。在较高级别,此 API 如下所示:

from snowflake.ml.model.batch import OutputSpec

# how to run a batch job
job = mv.run_batch(
    compute_pool = "my_compute_pool",
    X = session.table("my_table"),
    output_spec = OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
)

job.wait() # Optional: Blocking until the job finishes
Copy

作业管理

您可以使用以下方法获取作业列表、取消作业、获取作业句柄或删除作业:

from snowflake.ml.jobs import list_jobs, delete_job, get_job

# view logs to troubleshoot
job.get_logs()

# cancel a job
job.cancel()

# list to see all jobs
list_jobs().show()

# get the handle of a job
job = get_job("my_db.my_schema.job_name")

# delete a job that you no longer wish to run
delete_job(job)
Copy

备注

批量推理作业不支持 ML 作业 APIs 中的 result 函数。

指定推理数据

您可以使用结构化数据或非结构化数据进行批量推理。要在工作流程中使用结构化数据,可以向 run_batch 方法提供 SQL 查询或数据帧。

对于非结构化数据,您可以从 Snowflake 暂存区引用文件。要引用文件,请创建一个包含文件路径的数据帧。

向 run_batch 方法提供数据帧。run_batch 向模型提供文件内容。

结构化输入

以下示例说明了输入可能性范围:

# providing input from a query
X = session.sql("SELECT id, feature_1, feature_2 FROM feature_table WHERE feature_1 > 100"),

# reading from parquet files
X = session.read.option("pattern",".*file.*\\.parquet")
    .parquet("@DB.SCHEMA.STAGE/some/path")
    .select(col("id1").alias("id"), col("feature_1"), col("feature_2"))).filter(col("feature_1") > 100)
Copy

非结构化输入(多模态)

对于非结构化数据,run_batch 方法可以从输入数据帧中提供的完全限定暂存区路径读取文件。以下示例展示如何指定非结构化输入数据:

# Process a list of files
# The file paths have to be in the form of a full stage path as below
data = [
    ["@DB.SCHEMA.STAGE/dataset/files/file1"],
    ["@DB.SCHEMA.STAGE/dataset/files/file2"],
    ["@DB.SCHEMA.STAGE/dataset/files/file3"],
]
column_names = ["image"]
X = session.create_dataframe(data, schema=column_names)
Copy

要自动将暂存区中的所有文件列为数据帧,请使用如下代码:

from snowflake.ml.utils.stage_file import list_stage_files

# get all files under a path
X = list_stage_files(session, "@db.schema.my_stage/path")

# get all files under a path ending with ".jpg"
X = list_stage_files(session, "@db.schema.my_stage/path", pattern=".*\\.jpg")

# get all files under a path ending with ".jpg" and return the datafram with a column_name "IMAGES"
X = list_stage_files(session, "@db.schema.my_stage/path", pattern=".*\\.jpg", column_name="IMAGES")
Copy

表示数据类型

Run_batch 会自动将文件转换为模型兼容的格式。

模型可以接受以下格式之一的数据:

  • RAW_BYTES

  • BASE64

例如,如果您将图像以 PNG 格式存储在暂存区中,并且模型接受 RAW_BYTES,则可以使用 input_spec 实参来指定 Snowflake 如何转换数据。

以下示例代码将暂存区中的文件转换为 RAW_BYTES:

mv.run_batch(
    X,
    input_spec=InputSpec(
 # we need to provide column_handling in the InputSpec to perform the necessary conversion
 # FULL_STAGE_PATH: fully qualified path (@db.schema.stage/path) to a file
 # RAW_BYTES: download and convert the file from the stage path to bytes
        column_handling={
            "path": {"input_format": InputFormat.FULL_STAGE_PATH, "convert_to": FileEncoding.RAW_BYTES}
        }
    ),
    ...
)
Copy

column_handling 实参告诉框架 X 的路径列包含完整的暂存区路径,并使用该文件中的原始字节调用模型。

输出 (output_spec)

指定用于存储文件输出的暂存区目录,如下所示:

mv.run_batch(
    ...
    output_spec = OutputSpec(stage_location="@db.schema.stage/path/"),
)
Copy

Snowflake 目前支持输出文本并将其存储为 parquet 文件的模型。您可以将 parquet 文件转换为 Snowpark 数据框,如下所示:

session.read.option("pattern", ".*\\.parquet").parquet("@db.schema.stage/output_path/")
Copy

传递参数

如果模型的签名包含使用 ParamSpec 定义的参数,您可以在推理时通过 InputSpec 中的 params 实参传递参数值。字典中未包含的任何参数都使用签名中的默认值。

from snowflake.ml.model.batch import InputSpec, OutputSpec

mv.run_batch(
    X=input_df,
    compute_pool="my_compute_pool",
    input_spec=InputSpec(
        params={"temperature": 0.9, "max_tokens": 512}
    ),
    output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
)
Copy

作业规范

要为批量推理工作负载配置作业级设置(例如处理器数量、资源分配和执行参数),请将 JobSpec 实例作为 run_batch 方法的 job_spec 实参传递。示例如下:

from snowflake.ml.model.batch import JobSpec, OutputSpec

job_spec = JobSpec(
    job_name="my_inference_job",
    cpu_requests="2",
    memory_requests="8GiB",
    max_batch_rows=2048,
    replicas=2,
)

job = mv.run_batch(
    X=input_df,
    compute_pool="my_compute_pool",
    output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
    job_spec=job_spec,
)
Copy

最佳实践

使用哨兵文件

作业可能会因各种原因而中途失败。因此,输出目录最终可能只有部分数据。为了标记作业完成,run_batch 会在输出目录中写入一个完成文件 _SUCCESS。

为避免输出不完整或不正确,请执行以下操作:

  • 仅在找到哨兵文件后读取输出数据。

  • 首先提供一个空目录。

  • 使用 mode = SaveMode.ERROR 来运行 run_batch。

示例

使用自定义模型

from transformers import pipeline
from snowflake.ml.model import custom_model
from snowflake.ml.model import target_platform
from snowflake.ml.model.batch import InputSpec, OutputSpec, FileEncoding, InputFormat
from snowflake.ml.model.model_signature import core

# first we must define the schema, we'll expect audio file input as base64 string
signature = core.ModelSignature(
    inputs=[
        core.FeatureSpec(name="audio", dtype=core.DataType.STRING),
    ],
    outputs=[
        core.FeatureGroupSpec(
            name="outputs",
            specs=[
                core.FeatureSpec(name="text", dtype=core.DataType.STRING),
                core.FeatureGroupSpec(
                    name="chunks",
                    specs=[
                        core.FeatureSpec(
                            name="timestamp", dtype=core.DataType.DOUBLE, shape=(2,)
                        ),
                        core.FeatureSpec(name="text", dtype=core.DataType.STRING),
                    ],
                    shape=(-1,),
                ),
            ],
        ),
    ],
)

# defining the custom model, we decode the input from base64 to bytes and
# use whisper to perform the transcription
class CustomTranscriber(custom_model.CustomModel):
    def __init__(self, context: custom_model.ModelContext) -> None:
        super().__init__(context)
        self.model = self.context.model_ref("my_model")

    @custom_model.inference_api
    def predict(self, df: pd.DataFrame) -> pd.DataFrame:
        import base64
        audio_b64_list = df["audio"].tolist()
        audio_bytes_list = [base64.b64decode(audio_b64) for audio_b64 in audio_b64_list]
        temp_res = [self.model(audio_bytes) for audio_bytes in audio_bytes_list]
        return pd.DataFrame({"outputs": temp_res})

# creating an instance of our transcriber for logging
transcriber = CustomTranscriber(
    custom_model.ModelContext(
        models={
            "my_model": pipeline(
                task="automatic-speech-recognition", model="openai/whisper-small"
            )
        }
    )
)

# log the model
mv = reg.log_model(
    transcriber,
    model_name="custom_transcriber",
    version_name="v1",
    signatures={"predict": signature},
)

# input dataframe
data = [
    ["@DB.SCHEMA.STAGE/dataset/audio/audio1.mp3"],
    ["@DB.SCHEMA.STAGE/dataset/audio/audio2.mp3"],
    ["@DB.SCHEMA.STAGE/dataset/audio/audio3.mp3"],
]
column_names = ["audio"] # This column was defined in the signature above
input_df = session.create_dataframe(data, schema=column_names)


job = mv.run_batch(
    X=input_df,
    compute_pool="my_compute_pool",
    output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
    input_spec=InputSpec(
# we need to provide column_handling in the InputSpec to perform the necessary conversion
# FULL_STAGE_PATH: fully qualified path (db.schema.stage/path) to a file
# BASE_64: download and convert the file from the stage path to base64 string
        column_handling={
            "audio": {"input_format": InputFormat.FULL_STAGE_PATH, "convert_to": FileEncoding.BASE64}
        }
    )
)
Copy

使用 Hugging Face 模型

from transformers import pipeline
from snowflake.ml.model import target_platform
from snowflake.ml.model.batch import InputSpec, OutputSpec, FileEncoding, InputFormat

# supported Hugging Face tasks will have their signatures auto-inferred
classifier = pipeline(task="image-classification", model="google/vit-base-patch16-224")

# log the model
mv = reg.log_model(
    classifier,
    model_name="image_classifier",
    version_name="v1",
    target_platforms=target_platform.SNOWPARK_CONTAINER_SERVICES_ONLY,
    pip_requirements=[
        "pillow" # dependency for image classification
    ],
)

# input dataframe
data = [
    ["@DB.SCHEMA.STAGE/dataset/image/image1.mp3"],
    ["@DB.SCHEMA.STAGE/dataset/image/image2.mp3"],
    ["@DB.SCHEMA.STAGE/dataset/image/image3.mp3"],
]
# this column was defined in the auto-inferred signature
# you can view the signature by calling 'mv.show_functions()'
column_names = ["images"]
input_df = session.create_dataframe(data, schema=column_names)

mv.run_batch(
    X=input_df,
    compute_pool="my_compute_pool",
    output_spec=OutputSpec(stage_location=f"@my_db.my_schema.my_stage/path/"),
    input_spec=InputSpec(
# we need to provide column_handling in the InputSpec to perform the necessary conversion
# FULL_STAGE_PATH: fully qualified path (db.schema.stage/path) to a file
# RAW_BYTES: download and convert the file to bytes (matching the predefined signature)
        column_handling={
            "IMAGES": {"input_format": InputFormat.FULL_STAGE_PATH, "convert_to": FileEncoding.RAW_BYTES}
        }
    )
)
Copy

结合使用 Hugging Face 模型和 vLLM

任务:文本生成

import json

from snowflake.ml.model import target_platform
from snowflake.ml.model.batch import InputSpec, OutputSpec, FileEncoding, InputFormat

# it's a large model so we remotely log it
model = huggingface.TransformersPipeline(model="Qwen/Qwen2.5-0.5B-Instruct", task="text-generation")

mv = reg.log_model(
    model,
    model_name="qwenw_5",
    version_name="v1",
    options={"cuda_version": "12.4"},
    target_platforms=target_platform.SNOWPARK_CONTAINER_SERVICES_ONLY,
)


# constructing OpenAi chat/completions API compatible messages
messages = [[
    {"role": "system", "content": [{"type": "text", "text": "You are an expert on cats and kitchens."}]},
    {
        "role": "user",
        "content": [
            {"type": "text", "text": "How many breeds of cats are there?"},
        ]
    }
]]
schema = ["messages"]
data = [(json.dumps(m)) for m in messages]
input_df = session.create_dataframe(data, schema=schema)

mv.run_batch(
    X=input_df,
    compute_pool="my_compute_pool",
    output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
    inference_engine_options={
 # set vLLM as the inference backend
        "engine": InferenceEngine.VLLM,
    },
)
Copy

任务:图像文本转文本

import json

from snowflake.ml.model import target_platform
from snowflake.ml.model.batch import InputSpec, OutputSpec

# it's a large model so we remotely log it
model = huggingface.TransformersPipeline(model="Qwen/Qwen2-VL-2B-Instruct", task="image-text-to-text")

mv = reg.log_model(
    model,
    model_name="qwen2_vl_2b",
    version_name="v1",
    options={"cuda_version": "12.4"},
    targets=target_platform.SNOWPARK_CONTAINER_SERVICES_ONLY,
)


# constructing OpenAi chat/completions API compatible messages
messages = [[
    {"role": "system", "content": [{"type": "text", "text": "You are an expert on cats and kitchens."}]},
    {
        "role": "user",
        "content": [
            {"type": "text", "text": "What breed of cat is this?"},
            {
                "type": "image_url",
                "image_url": {
                    # run_batch will downlaod and convert the file to the format that vLLM can handle
                    "url": f"@db.schema.stage/path/cat.jpeg",
                }
            }
     # you can also pass video and audio like below
            # {
            #     "type": "video_url",
            #     "video_url": {
            #         "url": "@db.schema.stage/path/video.avi",
            #     }
            # }
            # {
            #     "type": "input_audio",
            #     "input_audio": {
            #         "data": "@db.schema.stage/path/audio.mp3",
            #         "format": "mp3",
            #     }
            # }
        ]
    }
]]

schema = ["messages"]
data = [(json.dumps(m)) for m in messages]
input_df = session.create_dataframe(data, schema=schema)

mv.run_batch(
    X=input_df,
    compute_pool="my_compute_pool",
    output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
    inference_engine_options={
 # set vLLM as the inference backend
        "engine": InferenceEngine.VLLM,
    },
)
Copy

示例笔记本

有关端到端的可运行示例,请参阅 GitHub 上的 批量推理示例笔记本 (https://github.com/Snowflake-Labs/sf-samples/tree/main/samples/ml/model_serving/batch_inference)。