在生产环境中使用 Snowflake 在线特征平台

Snowflake ML 特征平台有助于在整个特征工程过程中管理特征。

对于需要低延迟推理的在线应用程序,请使用在线特征平台来提供特征。

以下章节将介绍在 Python 应用程序中检索功能的生产过程。这些章节包含执行以下操作的代码示例:

  1. 将鸢尾花数据集加载到 Snowflake 中

  2. 定义与 Snowflake 的连接

  3. 创建特征平台和特征视图

  4. 检索特征和特征值

  5. 从模型生成预测

代码示例是用 Python 编写的。要为用其他语言编写的应用程序完成此工作流程,请使用特定于该语言的 Snowflake 驱动程序。有关更多信息,请参阅 驱动程序

先决条件

To run online ML feature retrieval in Snowflake, you need the following:

  • 已加载到 Snowflake 中的数据

  • Snowflake 特征平台

  • 特征视图

  • 为每个特征视图启用在线特征服务

您可以使用自己的 Snowflake 特征平台中的特征,但如果您还没有特征平台,则可以使用以下代码将鸢尾花数据集加载到 Snowflake 中。

from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
import pandas as pd

from snowflake.snowpark.context import get_active_session

sf_session = get_active_session()

### Download the Iris dataset.

iris = load_iris()
X = pd.DataFrame(iris.data, columns=iris.feature_names)
# rename the columns to fit the Snowflake feature naming requirements
X.rename(columns={
    'sepal length (cm)': 'SEPAL_LENGTH_CM',
    'sepal width (cm)': 'SEPAL_WIDTH_CM',
    'petal length (cm)': 'PETAL_LENGTH_CM',
    'petal width (cm)': 'PETAL_WIDTH_CM'
}, inplace=True)
y = iris.target

### Load the data into Snowflake.
X = X.reset_index().rename(columns={"index": "ID"})
sepal_df = sf_session.write_pandas(
    X[['ID', 'SEPAL_LENGTH_CM', 'SEPAL_WIDTH_CM']],
    table_name="SEPAL_DATA",
    auto_create_table=True,
    overwrite=True
)
petal_df = sf_session.write_pandas(
    X[['ID', 'PETAL_LENGTH_CM', 'PETAL_WIDTH_CM']],
    table_name="PETAL_DATA",
    auto_create_table=True,
    overwrite=True
)
Copy

在环境中拥有数据后,您可以创建特征平台。以下代码为鸢尾花数据集中的不同样本创建特征平台及 id_entity 实体。

### Install Snowflake ML
%pip install snowflake-ml-python==1.18.0

from snowflake.ml.feature_store import (
    FeatureStore,
    FeatureView,
    Entity,
    CreationMode,
)
from snowflake.ml.feature_store.feature_view import OnlineConfig

### Create Snowflake feature store

feature_store = FeatureStore(
    session=sf_session,
    database=sf_session.get_current_database(),
    name="MY_FEATURE_STORE",
    default_warehouse=sf_session.get_current_warehouse(),
    creation_mode=CreationMode.OR_REPLACE
)
sf_session.use_schema("MY_FEATURE_STORE")

id_entity = Entity(
    name='SAMPLE_ID',
    join_keys=["ID"],
    desc='sample id'
)
feature_store.register_entity(id_entity)
Copy

备注

Snowflake ML 特征平台具有实体的概念。实体是在特征视图之间组织特征的密钥。有关实体的更多信息,请参阅 使用实体

创建特征平台后,定义特征视图。以下代码定义了鸢尾花数据集中的萼片和花瓣特征视图。

### Create feature views with Online Serving.
sepal_fv = FeatureView(
    name='SEPAL_FEATURES',
    entities=[id_entity],
    feature_df=sepal_df,
    desc='Sepal features',
    refresh_freq='10 minutes',
    online_config=OnlineConfig(enable=True)
)
petal_fv = FeatureView(
    name='PETAL_FEATURES',
    entities=[id_entity],
    feature_df=petal_df,
    desc='Petal features',
    refresh_freq='10 minutes',
    online_config=OnlineConfig(enable=True)
)
sepal_fv = feature_store.register_feature_view(
    sepal_fv, version="v1", overwrite=True)
petal_fv = feature_store.register_feature_view(
    petal_fv, version="v1", overwrite=True)
Copy

检索特征值

注册特征视图并为每个特征视图启用在线特征服务后,您可以将每个特征视图的特征值提供给应用程序。

要检索特征值,请执行以下操作:

  1. 设置与 Snowflake 的连接

  2. 创建在应用程序启动时初始化的会话和 Snowflake 特征平台对象

  3. 从特征视图中检索特征

  4. 创建预测端点并从该端点获取预测

重要

您必须将 snowflake-ml-python>=1.18.0 安装到应用程序的环境中,才能使用特征平台 API。

要从应用程序连接到 Snowflake,必须设置 编程访问令牌 (PAT)密钥对身份验证 作为身份验证方法。

配置客户端

初始化应用程序时,必须连接到 Snowflake ML 特征平台 API 并创建所需的特征平台 Python 对象。

使用以下部分配置客户端与 Snowflake ML 特征平台 API 的连接。

配置编程访问令牌 (PAT)

在以下代码中指定以下连接参数,以从应用程序连接到 Snowflake:

  • schema – Snowflake 特征平台的名称

  • database – 包含架构或特征平台的数据库

  • role – 从特征平台读取数据所需的角色。有关更多信息,请参阅 提供创建和提供在线特征的访问权限

  • password – 您的 PAT。

import os

### Define connection parameters using PAT authentication.
snowflake_connection_parameters = {
    "account": "<account_identifier>",
    "user": "<user>",
    "password": pat,
    "role": "<FS_CONSUMER_ROLE>",
    "host": "<host>",
    "warehouse": "<warehouse>",
    "database": "<database>",
    "schema": "MY_FEATURE_STORE",
}
Copy

创建会话和特征平台对象

定义连接参数后,创建应用程序用于连接到 Snowflake 的会话和特征平台对象。

以下代码:

  1. 创建 Snowflake 会话,即应用程序用于与 Snowflake 通信的客户端。

  2. 配置线程池执行器以启用特征检索并行性。

  3. 列出我们从特征平台中检索的特征。

  4. 初始化特征平台阅读器客户端。此对象包装 Snowflake 会话。这是应用程序与特征平台交互的主要方式。

  5. 初始化您定义的特征视图。您可以用自己的特征替换这些特征。

import os
from concurrent.futures import ThreadPoolExecutor

from snowflake.snowpark.session import Session
from snowflake.ml.feature_store import FeatureStore, CreationMode

# 1.Start a Snowflake session
sf_session = Session.builder.configs(snowflake_connection_parameters).create()

# 2. Create a thread pool executor for feature store requests
MAX_WORKERS=os.cpu_count() * 2
executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)

# 3. List individual features we are going to retrieve for inference. In this
#    example, we are listing Iris features described above in the
#    "Prerequisites" section.
PETAL_FEATURE_LIST = ["PETAL_WIDTH_CM", "PETAL_LENGTH_CM"]
SEPAL_FEATURE_LIST = ["SEPAL_WIDTH_CM", "SEPAL_LENGTH_CM"]

# 4. Initialize feature store consumer client
feature_store = FeatureStore(
    session=sf_session,
    database=sf_session.get_current_database(),
    name="MY_FEATURE_STORE",
    default_warehouse="<warehouse>",
    creation_mode=CreationMode.FAIL_IF_NOT_EXIST
)

# 5. Initialize the feature views
sepal_fv = feature_store.get_feature_view("SEPAL_FEATURES", version="v1")
petal_fv = feature_store.get_feature_view("PETAL_FEATURES", version="v1")
Copy

检索服务路径上的在线特征

定义应用程序的初始化方式后,您可以创建预测端点。

您可以通过不同的方式来定义应用程序处理请求的方式。以下 Python 代码:

  • 定义应用程序中的预测端点

  • 从 JSON 请求中获取密钥

  • 使用密钥从特征视图中检索特征值

  • 将这些特征值传递给模型

  • 从模型中获取预测

  • 返回响应中的预测

from snowflake.ml.feature_store.feature_view import StoreType
import json
import flask

def _retrieve_features(
    feature_view: FeatureView,
    keys: List[int],
    feature_names: List[str]):
    """Retrieve features from the given feature view"""

    return feature_store.read_feature_view(
        feature_view,
        keys=[keys],
        feature_names=feature_names,
        store_type=StoreType.ONLINE  # Query the ONLINE store
    ).collect()

@app.route("/prediction-endpoint", methods=["POST"])
def prediction():
    if flask.request.content_type == 'application/json':
        input_data = flask.request.data.decode("utf-8")
        input_data = json.loads(data)
    else:
        return flask.Response(
            response="This predictor only supports JSON data",
            status=415,
            mimetype="text/plain"
        )

    # Expect that input data is a single key
    keys = [int(input_data["key"])]

    # Retrieve features from two feature views in parallel.
    sepal_features = executor.submit(
        _retrieve_features, sepal_fv, keys, SEPAL_FEATURE_LIST)
    petal_features = executor.submit(
        _retrieve_features, petal_fv, keys, PETAL_FEATURE_LIST)

    sepal_features = sepal_features.result()
    petal_features = petal_features.result()

    predictions = []
    if len(sepal_features) != 0 and len(petal_features) != 0:
        # Compose the feature vector, excluding the join keys.
        feature_vector = (
            list(sepal_features[0][1:])
            + list(petal_features[0][1:])
        )

        # Using a hypothetical run_inference function.
        predictions = run_inference(feature_vector)

    result = json.dumps({"results": list(predictions)})
    return flask.Response(response=result, status=200,
        mimetype="application/json")
Copy

前面的代码调用了一个假设的 run_inference 函数。无论模型是远程托管还是在应用程序内存中,您自己的推理函数都可以从模型中获取预测。

前面代码中的预测端点接受一个密钥并返回该密钥的预测。您的数据可能有多个密钥来描述单个样本的特征。前面的代码只是一个示例,您可以根据自己的用例进行调整。

语言: 中文