批量推理作业¶
备注
预览版功能 – 公开
支持自 snowflake-ml-python 版本 1.26.0 之后的公开预览版。
使用 Snowflake 批量推理对静态或定期更新的数据集进行高效、大规模的模型推理。批量推理 API 使用 Snowpark Container Services (SPCS),提供针对海量吞吐量和成本效益进行优化的分布式计算层。
何时使用批量推理¶
对工作负载使用 run_batch 方法可以执行以下操作:
处理图像、音频或视频文件,或将多模态模型与非结构化数据结合使用
对数百万或数十亿行执行推理。
将推理作为管道中的离散异步暂存区运行。
将推理集成为 Airflow DAG 或 Snowflake 任务中的一个步骤。
限制¶
对于多模态用例,仅在服务器端支持加密
不支持分区模型
开始使用¶
连接到 Model Registry¶
连接到 Snowflake Model Registry 并检索模型引用如下:
from snowflake.ml.registry import Registry
registry = Registry(session=session, database_name=DATABASE, schema_name=REGISTRY_SCHEMA)
mv = registry.get_model('my_model').version('my_version') # returns ModelVersion
执行批处理作业¶
此 API 使用 Snowpark Container Services (SPCS) 作业启动推理工作负载。运行推理后,计算会自动结束,以防止产生额外费用。在较高级别,此 API 如下所示:
from snowflake.ml.model.batch import OutputSpec
# how to run a batch job
job = mv.run_batch(
compute_pool = "my_compute_pool",
X = session.table("my_table"),
output_spec = OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
)
job.wait() # Optional: Blocking until the job finishes
作业管理¶
您可以使用以下方法获取作业列表、取消作业、获取作业句柄或删除作业:
from snowflake.ml.jobs import list_jobs, delete_job, get_job
# view logs to troubleshoot
job.get_logs()
# cancel a job
job.cancel()
# list to see all jobs
list_jobs().show()
# get the handle of a job
job = get_job("my_db.my_schema.job_name")
# delete a job that you no longer wish to run
delete_job(job)
备注
批量推理作业不支持 ML 作业 APIs 中的 result 函数。
指定推理数据¶
您可以使用结构化数据或非结构化数据进行批量推理。要在工作流程中使用结构化数据,可以向 run_batch 方法提供 SQL 查询或数据帧。
对于非结构化数据,您可以从 Snowflake 暂存区引用文件。要引用文件,请创建一个包含文件路径的数据帧。
向 run_batch 方法提供数据帧。run_batch 向模型提供文件内容。
结构化输入¶
以下示例说明了输入可能性范围:
# providing input from a query
X = session.sql("SELECT id, feature_1, feature_2 FROM feature_table WHERE feature_1 > 100"),
# reading from parquet files
X = session.read.option("pattern",".*file.*\\.parquet")
.parquet("@DB.SCHEMA.STAGE/some/path")
.select(col("id1").alias("id"), col("feature_1"), col("feature_2"))).filter(col("feature_1") > 100)
非结构化输入(多模态)¶
对于非结构化数据,run_batch 方法可以从输入数据帧中提供的完全限定暂存区路径读取文件。以下示例展示如何指定非结构化输入数据:
# Process a list of files
# The file paths have to be in the form of a full stage path as below
data = [
["@DB.SCHEMA.STAGE/dataset/files/file1"],
["@DB.SCHEMA.STAGE/dataset/files/file2"],
["@DB.SCHEMA.STAGE/dataset/files/file3"],
]
column_names = ["image"]
X = session.create_dataframe(data, schema=column_names)
要自动将暂存区中的所有文件列为数据帧,请使用如下代码:
from snowflake.ml.utils.stage_file import list_stage_files
# get all files under a path
X = list_stage_files(session, "@db.schema.my_stage/path")
# get all files under a path ending with ".jpg"
X = list_stage_files(session, "@db.schema.my_stage/path", pattern=".*\\.jpg")
# get all files under a path ending with ".jpg" and return the datafram with a column_name "IMAGES"
X = list_stage_files(session, "@db.schema.my_stage/path", pattern=".*\\.jpg", column_name="IMAGES")
表示数据类型¶
Run_batch 会自动将文件转换为模型兼容的格式。
模型可以接受以下格式之一的数据:
RAW_BYTES
BASE64
例如,如果您将图像以 PNG 格式存储在暂存区中,并且模型接受 RAW_BYTES,则可以使用 input_spec 实参来指定 Snowflake 如何转换数据。
以下示例代码将暂存区中的文件转换为 RAW_BYTES:
mv.run_batch(
X,
input_spec=InputSpec(
# we need to provide column_handling in the InputSpec to perform the necessary conversion
# FULL_STAGE_PATH: fully qualified path (@db.schema.stage/path) to a file
# RAW_BYTES: download and convert the file from the stage path to bytes
column_handling={
"path": {"input_format": InputFormat.FULL_STAGE_PATH, "convert_to": FileEncoding.RAW_BYTES}
}
),
...
)
column_handling 实参告诉框架 X 的路径列包含完整的暂存区路径,并使用该文件中的原始字节调用模型。
输出 (output_spec)¶
指定用于存储文件输出的暂存区目录,如下所示:
mv.run_batch(
...
output_spec = OutputSpec(stage_location="@db.schema.stage/path/"),
)
Snowflake 目前支持输出文本并将其存储为 parquet 文件的模型。您可以将 parquet 文件转换为 Snowpark 数据框,如下所示:
session.read.option("pattern", ".*\\.parquet").parquet("@db.schema.stage/output_path/")
传递参数¶
如果模型的签名包含使用 ParamSpec 定义的参数,您可以在推理时通过 InputSpec 中的 params 实参传递参数值。字典中未包含的任何参数都使用签名中的默认值。
from snowflake.ml.model.batch import InputSpec, OutputSpec
mv.run_batch(
X=input_df,
compute_pool="my_compute_pool",
input_spec=InputSpec(
params={"temperature": 0.9, "max_tokens": 512}
),
output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
)
作业规范¶
要为批量推理工作负载配置作业级设置(例如处理器数量、资源分配和执行参数),请将 JobSpec 实例作为 run_batch 方法的 job_spec 实参传递。示例如下:
from snowflake.ml.model.batch import JobSpec, OutputSpec
job_spec = JobSpec(
job_name="my_inference_job",
cpu_requests="2",
memory_requests="8GiB",
max_batch_rows=2048,
replicas=2,
)
job = mv.run_batch(
X=input_df,
compute_pool="my_compute_pool",
output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
job_spec=job_spec,
)
最佳实践¶
使用哨兵文件¶
作业可能会因各种原因而中途失败。因此,输出目录最终可能只有部分数据。为了标记作业完成,run_batch 会在输出目录中写入一个完成文件 _SUCCESS。
为避免输出不完整或不正确,请执行以下操作:
仅在找到哨兵文件后读取输出数据。
首先提供一个空目录。
使用 mode = SaveMode.ERROR 来运行 run_batch。
示例¶
使用自定义模型¶
from transformers import pipeline
from snowflake.ml.model import custom_model
from snowflake.ml.model import target_platform
from snowflake.ml.model.batch import InputSpec, OutputSpec, FileEncoding, InputFormat
from snowflake.ml.model.model_signature import core
# first we must define the schema, we'll expect audio file input as base64 string
signature = core.ModelSignature(
inputs=[
core.FeatureSpec(name="audio", dtype=core.DataType.STRING),
],
outputs=[
core.FeatureGroupSpec(
name="outputs",
specs=[
core.FeatureSpec(name="text", dtype=core.DataType.STRING),
core.FeatureGroupSpec(
name="chunks",
specs=[
core.FeatureSpec(
name="timestamp", dtype=core.DataType.DOUBLE, shape=(2,)
),
core.FeatureSpec(name="text", dtype=core.DataType.STRING),
],
shape=(-1,),
),
],
),
],
)
# defining the custom model, we decode the input from base64 to bytes and
# use whisper to perform the transcription
class CustomTranscriber(custom_model.CustomModel):
def __init__(self, context: custom_model.ModelContext) -> None:
super().__init__(context)
self.model = self.context.model_ref("my_model")
@custom_model.inference_api
def predict(self, df: pd.DataFrame) -> pd.DataFrame:
import base64
audio_b64_list = df["audio"].tolist()
audio_bytes_list = [base64.b64decode(audio_b64) for audio_b64 in audio_b64_list]
temp_res = [self.model(audio_bytes) for audio_bytes in audio_bytes_list]
return pd.DataFrame({"outputs": temp_res})
# creating an instance of our transcriber for logging
transcriber = CustomTranscriber(
custom_model.ModelContext(
models={
"my_model": pipeline(
task="automatic-speech-recognition", model="openai/whisper-small"
)
}
)
)
# log the model
mv = reg.log_model(
transcriber,
model_name="custom_transcriber",
version_name="v1",
signatures={"predict": signature},
)
# input dataframe
data = [
["@DB.SCHEMA.STAGE/dataset/audio/audio1.mp3"],
["@DB.SCHEMA.STAGE/dataset/audio/audio2.mp3"],
["@DB.SCHEMA.STAGE/dataset/audio/audio3.mp3"],
]
column_names = ["audio"] # This column was defined in the signature above
input_df = session.create_dataframe(data, schema=column_names)
job = mv.run_batch(
X=input_df,
compute_pool="my_compute_pool",
output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
input_spec=InputSpec(
# we need to provide column_handling in the InputSpec to perform the necessary conversion
# FULL_STAGE_PATH: fully qualified path (db.schema.stage/path) to a file
# BASE_64: download and convert the file from the stage path to base64 string
column_handling={
"audio": {"input_format": InputFormat.FULL_STAGE_PATH, "convert_to": FileEncoding.BASE64}
}
)
)
使用 Hugging Face 模型¶
from transformers import pipeline
from snowflake.ml.model import target_platform
from snowflake.ml.model.batch import InputSpec, OutputSpec, FileEncoding, InputFormat
# supported Hugging Face tasks will have their signatures auto-inferred
classifier = pipeline(task="image-classification", model="google/vit-base-patch16-224")
# log the model
mv = reg.log_model(
classifier,
model_name="image_classifier",
version_name="v1",
target_platforms=target_platform.SNOWPARK_CONTAINER_SERVICES_ONLY,
pip_requirements=[
"pillow" # dependency for image classification
],
)
# input dataframe
data = [
["@DB.SCHEMA.STAGE/dataset/image/image1.mp3"],
["@DB.SCHEMA.STAGE/dataset/image/image2.mp3"],
["@DB.SCHEMA.STAGE/dataset/image/image3.mp3"],
]
# this column was defined in the auto-inferred signature
# you can view the signature by calling 'mv.show_functions()'
column_names = ["images"]
input_df = session.create_dataframe(data, schema=column_names)
mv.run_batch(
X=input_df,
compute_pool="my_compute_pool",
output_spec=OutputSpec(stage_location=f"@my_db.my_schema.my_stage/path/"),
input_spec=InputSpec(
# we need to provide column_handling in the InputSpec to perform the necessary conversion
# FULL_STAGE_PATH: fully qualified path (db.schema.stage/path) to a file
# RAW_BYTES: download and convert the file to bytes (matching the predefined signature)
column_handling={
"IMAGES": {"input_format": InputFormat.FULL_STAGE_PATH, "convert_to": FileEncoding.RAW_BYTES}
}
)
)
结合使用 Hugging Face 模型和 vLLM¶
任务:文本生成¶
import json
from snowflake.ml.model import target_platform
from snowflake.ml.model.batch import InputSpec, OutputSpec, FileEncoding, InputFormat
# it's a large model so we remotely log it
model = huggingface.TransformersPipeline(model="Qwen/Qwen2.5-0.5B-Instruct", task="text-generation")
mv = reg.log_model(
model,
model_name="qwenw_5",
version_name="v1",
options={"cuda_version": "12.4"},
target_platforms=target_platform.SNOWPARK_CONTAINER_SERVICES_ONLY,
)
# constructing OpenAi chat/completions API compatible messages
messages = [[
{"role": "system", "content": [{"type": "text", "text": "You are an expert on cats and kitchens."}]},
{
"role": "user",
"content": [
{"type": "text", "text": "How many breeds of cats are there?"},
]
}
]]
schema = ["messages"]
data = [(json.dumps(m)) for m in messages]
input_df = session.create_dataframe(data, schema=schema)
mv.run_batch(
X=input_df,
compute_pool="my_compute_pool",
output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
inference_engine_options={
# set vLLM as the inference backend
"engine": InferenceEngine.VLLM,
},
)
任务:图像文本转文本¶
import json
from snowflake.ml.model import target_platform
from snowflake.ml.model.batch import InputSpec, OutputSpec
# it's a large model so we remotely log it
model = huggingface.TransformersPipeline(model="Qwen/Qwen2-VL-2B-Instruct", task="image-text-to-text")
mv = reg.log_model(
model,
model_name="qwen2_vl_2b",
version_name="v1",
options={"cuda_version": "12.4"},
targets=target_platform.SNOWPARK_CONTAINER_SERVICES_ONLY,
)
# constructing OpenAi chat/completions API compatible messages
messages = [[
{"role": "system", "content": [{"type": "text", "text": "You are an expert on cats and kitchens."}]},
{
"role": "user",
"content": [
{"type": "text", "text": "What breed of cat is this?"},
{
"type": "image_url",
"image_url": {
# run_batch will downlaod and convert the file to the format that vLLM can handle
"url": f"@db.schema.stage/path/cat.jpeg",
}
}
# you can also pass video and audio like below
# {
# "type": "video_url",
# "video_url": {
# "url": "@db.schema.stage/path/video.avi",
# }
# }
# {
# "type": "input_audio",
# "input_audio": {
# "data": "@db.schema.stage/path/audio.mp3",
# "format": "mp3",
# }
# }
]
}
]]
schema = ["messages"]
data = [(json.dumps(m)) for m in messages]
input_df = session.create_dataframe(data, schema=schema)
mv.run_batch(
X=input_df,
compute_pool="my_compute_pool",
output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
inference_engine_options={
# set vLLM as the inference backend
"engine": InferenceEngine.VLLM,
},
)
示例笔记本¶
有关端到端的可运行示例,请参阅 GitHub 上的 批量推理示例笔记本 (https://github.com/Snowflake-Labs/sf-samples/tree/main/samples/ml/model_serving/batch_inference)。