在生产环境中使用 Snowflake 在线特征平台¶
Snowflake ML 特征平台有助于在整个特征工程过程中管理特征。
对于需要低延迟推理的在线应用程序,请使用在线特征平台来提供特征。
以下章节将介绍在 Python 应用程序中检索功能的生产过程。这些章节包含执行以下操作的代码示例:
将鸢尾花数据集加载到 Snowflake 中
定义与 Snowflake 的连接
创建特征平台和特征视图
检索特征和特征值
从模型生成预测
代码示例是用 Python 编写的。要为用其他语言编写的应用程序完成此工作流程,请使用特定于该语言的 Snowflake 驱动程序。有关更多信息,请参阅 驱动程序。
先决条件¶
To run online ML feature retrieval in Snowflake, you need the following:
已加载到 Snowflake 中的数据
Snowflake 特征平台
特征视图
为每个特征视图启用在线特征服务
您可以使用自己的 Snowflake 特征平台中的特征,但如果您还没有特征平台,则可以使用以下代码将鸢尾花数据集加载到 Snowflake 中。
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
import pandas as pd
from snowflake.snowpark.context import get_active_session
sf_session = get_active_session()
### Download the Iris dataset.
iris = load_iris()
X = pd.DataFrame(iris.data, columns=iris.feature_names)
# rename the columns to fit the Snowflake feature naming requirements
X.rename(columns={
'sepal length (cm)': 'SEPAL_LENGTH_CM',
'sepal width (cm)': 'SEPAL_WIDTH_CM',
'petal length (cm)': 'PETAL_LENGTH_CM',
'petal width (cm)': 'PETAL_WIDTH_CM'
}, inplace=True)
y = iris.target
### Load the data into Snowflake.
X = X.reset_index().rename(columns={"index": "ID"})
sepal_df = sf_session.write_pandas(
X[['ID', 'SEPAL_LENGTH_CM', 'SEPAL_WIDTH_CM']],
table_name="SEPAL_DATA",
auto_create_table=True,
overwrite=True
)
petal_df = sf_session.write_pandas(
X[['ID', 'PETAL_LENGTH_CM', 'PETAL_WIDTH_CM']],
table_name="PETAL_DATA",
auto_create_table=True,
overwrite=True
)
在环境中拥有数据后,您可以创建特征平台。以下代码为鸢尾花数据集中的不同样本创建特征平台及 id_entity 实体。
### Install Snowflake ML
%pip install snowflake-ml-python==1.18.0
from snowflake.ml.feature_store import (
FeatureStore,
FeatureView,
Entity,
CreationMode,
)
from snowflake.ml.feature_store.feature_view import OnlineConfig
### Create Snowflake feature store
feature_store = FeatureStore(
session=sf_session,
database=sf_session.get_current_database(),
name="MY_FEATURE_STORE",
default_warehouse=sf_session.get_current_warehouse(),
creation_mode=CreationMode.OR_REPLACE
)
sf_session.use_schema("MY_FEATURE_STORE")
id_entity = Entity(
name='SAMPLE_ID',
join_keys=["ID"],
desc='sample id'
)
feature_store.register_entity(id_entity)
备注
Snowflake ML 特征平台具有实体的概念。实体是在特征视图之间组织特征的密钥。有关实体的更多信息,请参阅 使用实体。
创建特征平台后,定义特征视图。以下代码定义了鸢尾花数据集中的萼片和花瓣特征视图。
### Create feature views with Online Serving.
sepal_fv = FeatureView(
name='SEPAL_FEATURES',
entities=[id_entity],
feature_df=sepal_df,
desc='Sepal features',
refresh_freq='10 minutes',
online_config=OnlineConfig(enable=True)
)
petal_fv = FeatureView(
name='PETAL_FEATURES',
entities=[id_entity],
feature_df=petal_df,
desc='Petal features',
refresh_freq='10 minutes',
online_config=OnlineConfig(enable=True)
)
sepal_fv = feature_store.register_feature_view(
sepal_fv, version="v1", overwrite=True)
petal_fv = feature_store.register_feature_view(
petal_fv, version="v1", overwrite=True)
检索特征值¶
注册特征视图并为每个特征视图启用在线特征服务后,您可以将每个特征视图的特征值提供给应用程序。
要检索特征值,请执行以下操作:
设置与 Snowflake 的连接
创建在应用程序启动时初始化的会话和 Snowflake 特征平台对象
从特征视图中检索特征
创建预测端点并从该端点获取预测
重要
您必须将 snowflake-ml-python>=1.18.0 安装到应用程序的环境中,才能使用特征平台 API。
要从应用程序连接到 Snowflake,必须设置 编程访问令牌 (PAT) 或 密钥对身份验证 作为身份验证方法。
配置客户端¶
初始化应用程序时,必须连接到 Snowflake ML 特征平台 API 并创建所需的特征平台 Python 对象。
使用以下部分配置客户端与 Snowflake ML 特征平台 API 的连接。
配置编程访问令牌 (PAT)¶
在以下代码中指定以下连接参数,以从应用程序连接到 Snowflake:
schema– Snowflake 特征平台的名称database– 包含架构或特征平台的数据库role– 从特征平台读取数据所需的角色。有关更多信息,请参阅 提供创建和提供在线特征的访问权限。password– 您的 PAT。
import os
### Define connection parameters using PAT authentication.
snowflake_connection_parameters = {
"account": "<account_identifier>",
"user": "<user>",
"password": pat,
"role": "<FS_CONSUMER_ROLE>",
"host": "<host>",
"warehouse": "<warehouse>",
"database": "<database>",
"schema": "MY_FEATURE_STORE",
}
在以下代码中指定以下连接参数,以从应用程序连接到 Snowflake:
schema– Snowflake 特征平台的名称database– 包含架构或特征平台的数据库role– 从特征平台读取数据所需的角色。有关更多信息,请参阅 创建和提供在线特征。private_key_file– 私钥文件private_key_file_pwd– 私钥文件的密码
import os
### Define connection parameters for key-pair authentication.
snowflake_connection_parameters = {
"account": "<account_identifier>",
"user": "<user>",
"private_key_file": "<private key file>",
"private_key_file_pwd": "<private key file pwd>",
"role": "<FS_CONSUMER_ROLE>",
"host": "<host>",
"warehouse": "<warehouse>",
"database": "<database>",
"schema": "MY_FEATURE_STORE",
}
创建会话和特征平台对象
定义连接参数后,创建应用程序用于连接到 Snowflake 的会话和特征平台对象。
以下代码:
创建 Snowflake 会话,即应用程序用于与 Snowflake 通信的客户端。
配置线程池执行器以启用特征检索并行性。
列出我们从特征平台中检索的特征。
初始化特征平台阅读器客户端。此对象包装 Snowflake 会话。这是应用程序与特征平台交互的主要方式。
初始化您定义的特征视图。您可以用自己的特征替换这些特征。
import os
from concurrent.futures import ThreadPoolExecutor
from snowflake.snowpark.session import Session
from snowflake.ml.feature_store import FeatureStore, CreationMode
# 1.Start a Snowflake session
sf_session = Session.builder.configs(snowflake_connection_parameters).create()
# 2. Create a thread pool executor for feature store requests
MAX_WORKERS=os.cpu_count() * 2
executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)
# 3. List individual features we are going to retrieve for inference. In this
# example, we are listing Iris features described above in the
# "Prerequisites" section.
PETAL_FEATURE_LIST = ["PETAL_WIDTH_CM", "PETAL_LENGTH_CM"]
SEPAL_FEATURE_LIST = ["SEPAL_WIDTH_CM", "SEPAL_LENGTH_CM"]
# 4. Initialize feature store consumer client
feature_store = FeatureStore(
session=sf_session,
database=sf_session.get_current_database(),
name="MY_FEATURE_STORE",
default_warehouse="<warehouse>",
creation_mode=CreationMode.FAIL_IF_NOT_EXIST
)
# 5. Initialize the feature views
sepal_fv = feature_store.get_feature_view("SEPAL_FEATURES", version="v1")
petal_fv = feature_store.get_feature_view("PETAL_FEATURES", version="v1")
检索服务路径上的在线特征¶
定义应用程序的初始化方式后,您可以创建预测端点。
您可以通过不同的方式来定义应用程序处理请求的方式。以下 Python 代码:
定义应用程序中的预测端点
从 JSON 请求中获取密钥
使用密钥从特征视图中检索特征值
将这些特征值传递给模型
从模型中获取预测
返回响应中的预测
from snowflake.ml.feature_store.feature_view import StoreType
import json
import flask
def _retrieve_features(
feature_view: FeatureView,
keys: List[int],
feature_names: List[str]):
"""Retrieve features from the given feature view"""
return feature_store.read_feature_view(
feature_view,
keys=[keys],
feature_names=feature_names,
store_type=StoreType.ONLINE # Query the ONLINE store
).collect()
@app.route("/prediction-endpoint", methods=["POST"])
def prediction():
if flask.request.content_type == 'application/json':
input_data = flask.request.data.decode("utf-8")
input_data = json.loads(data)
else:
return flask.Response(
response="This predictor only supports JSON data",
status=415,
mimetype="text/plain"
)
# Expect that input data is a single key
keys = [int(input_data["key"])]
# Retrieve features from two feature views in parallel.
sepal_features = executor.submit(
_retrieve_features, sepal_fv, keys, SEPAL_FEATURE_LIST)
petal_features = executor.submit(
_retrieve_features, petal_fv, keys, PETAL_FEATURE_LIST)
sepal_features = sepal_features.result()
petal_features = petal_features.result()
predictions = []
if len(sepal_features) != 0 and len(petal_features) != 0:
# Compose the feature vector, excluding the join keys.
feature_vector = (
list(sepal_features[0][1:])
+ list(petal_features[0][1:])
)
# Using a hypothetical run_inference function.
predictions = run_inference(feature_vector)
result = json.dumps({"results": list(predictions)})
return flask.Response(response=result, status=200,
mimetype="application/json")
前面的代码调用了一个假设的 run_inference 函数。无论模型是远程托管还是在应用程序内存中,您自己的推理函数都可以从模型中获取预测。
前面代码中的预测端点接受一个密钥并返回该密钥的预测。您的数据可能有多个密钥来描述单个样本的特征。前面的代码只是一个示例,您可以根据自己的用例进行调整。