常见特征和查询模式

Note

The Snowflake Feature Store API is available in the Snowpark ML Python package (snowflake-ml-python) v1.5.0 and later.

The FeatureView class accepts a Snowpark DataFrame object containing the feature transformation logic. You can therefore describe your features in any way supported by the Snowpark DataFrame API or by Snowflake SQL. You can pass the DataFrame to the FeatureView constructor directly.

The Snowpark Python API provides analytics functions for easily defining many common feature types, such as windowed aggregations. This topic contains some examples of these.

The open source snowflake-ml-python (https://github.com/snowflakedb/snowflake-ml-python/tree/main/snowflake/ml/feature_store/examples) on Github also contains some sample feature view and entity definitions using public datasets.

每行特征

In per-row features, functions are applied to each row of tabular data. For example, the following code fills null in foo with zero, then computes a ZIP code from lat and long. There is one output row per input row.

Python:

def get_zipcode(df: snowpark.DataFrame) -> snowpark.DataFrame:
    df = df.fillna({"foo": 0})
    df = df.with_column(
        "zipcode",
        F.compute_zipcode(df["lat"], df["long"])
    )
    return df

Snowflake SQL:

SELECT
    COALESCE(foo, 0) AS foo,
    compute_zipcode(lat, long) AS zipcode
FROM <source_table_name>;

每组特征

每组特征对一个组内某列中的值进行汇总。例如,可以按城市分组日降雨量总和,用于天气预报。对于每个组,输出 DataFrame 中都包含一行。

Python:

def sum_rainfall(df: snowpark.DataFrame) -> snowpark.DataFrame:
    df = df.group_by(
        ["location", to_date(timestamp)]
    ).agg(
        sum("rain").alias("sum_rain"),
        avg("humidity").alias("avg_humidity")
    )
    return df

Snowflake SQL:

SELECT
    location,
    TO_DATE(timestamp) AS date,
    SUM(rain) AS sum_rain,
    AVG(humidity) AS avg_humidity
FROM <source_table_name>
GROUP BY location, date;

基于行的窗口特征

基于行的窗口特征会对固定行窗口的值进行汇总;例如,对最后三笔交易金额求和。对于每个窗口框架,输出 DataFrame 中都包含一行。

Python:

def sum_past_3_transactions(df: snowpark.DataFrame) -> snowpark.DataFrame:
    window = Window.partition_by("id").order_by("ts").rows_between(2, Window.CURRENT_ROW)

    return df.select(
        sum("amount").over(window).alias("sum_past_3_transactions")
    )

Snowflake SQL:

SELECT
    id,
    SUM(amount) OVER (PARTITION BY id ORDER BY ts ROWS BETWEEN 2 PRECEDING and 0 FOLLOWING)
        AS sum_past_3_transactions
FROM <source_table_name>;

移动聚合特征

移动聚合特征会计算指定窗口大小内的移动统计信息,如总和及平均值。该函数会根据定义的窗口大小、顺序和分组,动态计算 DataFrame 不同子集的这些聚合。对于每个窗口框架,输出 DataFrame 中都包含一行。

new_df =  df.analytics.moving_agg(
    aggs={"SALESAMOUNT": ["SUM", "AVG"]},
    window_sizes=[2, 3],
    order_by=["ORDERDATE"],
    group_by=["PRODUCTKEY"]
)

累计聚合特征

累计聚合会计算整个数据分区中的持续总计、最小值、最大值和其他累积统计信息,并按照指定的条件对其进行排序和分组。不同于移动聚合,这些聚合会从分区的起点开始延伸,或者延伸到分区的末尾,具体取决于为其指定的方向,目的是提供不重置的累计总数。对于每个输入行,输出 DataFrame 中都包含一行。

 new_df = df.analytics.cumulative_agg(
    aggs={"SALESAMOUNT": ["SUM", "MIN", "MAX"]},
    order_by=["ORDERDATE"],
    group_by=["PRODUCTKEY"],
    is_forward=True
)

滞后特征

滞后特征按指定行数偏移,引入了包含各分区内先前行值的新列。此功能对于将当前值与数据集的先前值进行比较至关重要,有助于检测随时间推移的趋势或变化。对于每个输入行,输出 DataFrame 中都包含一行。

new_df = df.analytics.compute_lag(
    cols=["SALESAMOUNT"],
    lags=[1, 2],
    order_by=["ORDERDATE"],
    group_by=["PRODUCTKEY"]
)

前导特征

前导特征与滞后特征恰好相反,它将数据上移,创建包含后续行值的新列。此特征对于根据数据集中已有的未来数据点作出预测或假设至关重要。对于每个输入行,输出 DataFrame 中都包含一行。

new_df = df.analytics.compute_lead(
    cols=["SALESAMOUNT"],
    leads=[1, 2],
    order_by=["ORDERDATE"],
    group_by=["PRODUCTKEY"]
)

时序特征

时序特征根据时间窗口以及时间轴上的固定位置计算特征值。例如,在过去一周内的拼车出行次数,或者过去三天内的总销售额。对于每个时间窗口,输出 DataFrame 中都包含一行。

Snowflake 特征平台的最新版本包括一个实验性的时序汇总 API。使用此 API,可以使用如下代码创建时序特征:

Python:

def custom_column_naming(input_col, agg, window):
    return f"{agg}_{input_col}_{window.replace('-', 'past_')}"

result_df = weather_df.analytics.time_series_agg(
    aggs={"rain": ["SUM"]},
    windows=["-3D", "-5D"],
    sliding_interval="1D",
    group_by=["location"],
    time_col="ts",
    col_formatter=custom_column_naming
)

You can also construct time-series features with RANGE BETWEEN syntax in SQL. for more details, see Snowflake Window functions.

Snowflake SQL:

select
    TS,
    LOCATION,
    sum(RAIN) over (
        partition by LOCATION
        order by TS
        range between interval '3 days' preceding and current row
    ) SUM_RAIN_3D,
    sum(RAIN) over (
        partition by LOCATION
        order by TS
        range between interval '5 days' preceding and current row
    ) SUM_RAIN_5D
from <source_table_name>

在特征管道中使用用户定义的函数

Snowflake 特征平台支持特征管道定义中的用户定义的函数 (UDFs)。但只有确定性函数(在输入相同的前提下,此类函数始终返回相同的结果)才能进行增量式维护。要启用增量式维护,请在注册 UDF 时将其标记为不可变。

# In Python
@F.udf(
    name="MY_UDF",
    immutable=True,
    # ...
)
def my_udf(...):
    # ...

If your function is written in SQL, specify the IMMUTABLE keyword. See this guide.