常见特征和查询模式

备注

Snowflake 特征平台 API 在 Snowpark ML Python 包 (snowflake-ml-python) v1.5.0 及更高版本中提供。

FeatureView 类接受包含特征转换逻辑的 Snowpark DataFrame 对象。因此,您可以使用 Snowpark DataFrame API 或 Snowflake SQL 支持的任何方式描述特征。您可以直接将 DataFrame 传递给 FeatureView 构造函数。

Snowpark Python API 提供了 分析函数,可轻松定义许多常见特征类型,如窗口聚合。本主题包含一些此类示例。

GitHub 上的开源 snowflake-ml-python (https://github.com/snowflakedb/snowflake-ml-python/tree/main/snowflake/ml/feature_store/examples) 还包含一些使用公共数据集的特征视图和实体定义的示例。

每行特征

在每行特征中,函数会应用于表格数据的每一行。例如,以下代码使用 0 为 foo 填充缺失值,然后根据 latlong 计算 ZIP 代码。每个输入行都有一个输出行。

Python:

def get_zipcode(df: snowpark.DataFrame) -> snowpark.DataFrame:
    df = df.fillna({"foo": 0})
    df = df.with_column(
        "zipcode",
        F.compute_zipcode(df["lat"], df["long"])
    )
    return df
Copy

Snowflake SQL:

SELECT
    COALESCE(foo, 0) AS foo,
    compute_zipcode(lat, long) AS zipcode
FROM <source_table_name>;
Copy

每组特征

每组特征对一个组内某列中的值进行汇总。例如,可以按城市分组日降雨量总和,用于天气预报。对于每个组,输出 DataFrame 中都包含一行。

Python:

def sum_rainfall(df: snowpark.DataFrame) -> snowpark.DataFrame:
    df = df.group_by(
        ["location", to_date(timestamp)]
    ).agg(
        sum("rain").alias("sum_rain"),
        avg("humidity").alias("avg_humidity")
    )
    return df
Copy

Snowflake SQL:

SELECT
    location,
    TO_DATE(timestamp) AS date,
    SUM(rain) AS sum_rain,
    AVG(humidity) AS avg_humidity
FROM <source_table_name>
GROUP BY location, date;
Copy

基于行的窗口特征

基于行的窗口特征会对固定行窗口的值进行汇总;例如,对最后三笔交易金额求和。对于每个窗口框架,输出 DataFrame 中都包含一行。

Python:

def sum_past_3_transactions(df: snowpark.DataFrame) -> snowpark.DataFrame:
    window = Window.partition_by("id").order_by("ts").rows_between(2, Window.CURRENT_ROW)

    return df.select(
        sum("amount").over(window).alias("sum_past_3_transactions")
    )
Copy

Snowflake SQL:

SELECT
    id,
    SUM(amount) OVER (PARTITION BY id ORDER BY ts ROWS BETWEEN 2 PRECEDING and 0 FOLLOWING)
        AS sum_past_3_transactions
FROM <source_table_name>;
Copy

移动聚合特征

移动聚合特征会计算指定窗口大小内的移动统计信息,如总和及平均值。该函数会根据定义的窗口大小、顺序和分组,动态计算 DataFrame 不同子集的这些聚合。对于每个窗口框架,输出 DataFrame 中都包含一行。

new_df =  df.analytics.moving_agg(
    aggs={"SALESAMOUNT": ["SUM", "AVG"]},
    window_sizes=[2, 3],
    order_by=["ORDERDATE"],
    group_by=["PRODUCTKEY"]
)
Copy

累计聚合特征

累计聚合会计算整个数据分区中的持续总计、最小值、最大值和其他累积统计信息,并按照指定的条件对其进行排序和分组。不同于移动聚合,这些聚合会从分区的起点开始延伸,或者延伸到分区的末尾,具体取决于为其指定的方向,目的是提供不重置的累计总数。对于每个输入行,输出 DataFrame 中都包含一行。

 new_df = df.analytics.cumulative_agg(
    aggs={"SALESAMOUNT": ["SUM", "MIN", "MAX"]},
    order_by=["ORDERDATE"],
    group_by=["PRODUCTKEY"],
    is_forward=True
)
Copy

滞后特征

滞后特征按指定行数偏移,引入了包含各分区内先前行值的新列。此功能对于将当前值与数据集的先前值进行比较至关重要,有助于检测随时间推移的趋势或变化。对于每个输入行,输出 DataFrame 中都包含一行。

new_df = df.analytics.compute_lag(
    cols=["SALESAMOUNT"],
    lags=[1, 2],
    order_by=["ORDERDATE"],
    group_by=["PRODUCTKEY"]
)
Copy

前导特征

前导特征与滞后特征恰好相反,它将数据上移,创建包含后续行值的新列。此特征对于根据数据集中已有的未来数据点作出预测或假设至关重要。对于每个输入行,输出 DataFrame 中都包含一行。

new_df = df.analytics.compute_lead(
    cols=["SALESAMOUNT"],
    leads=[1, 2],
    order_by=["ORDERDATE"],
    group_by=["PRODUCTKEY"]
)
Copy

时序特征

时序特征根据时间窗口以及时间轴上的固定位置计算特征值。例如,在过去一周内的拼车出行次数,或者过去三天内的总销售额。对于每个时间窗口,输出 DataFrame 中都包含一行。

Snowflake 特征平台的最新版本包括一个实验性的时序汇总 API。使用此 API,可以使用如下代码创建时序特征:

Python:

def custom_column_naming(input_col, agg, window):
    return f"{agg}_{input_col}_{window.replace('-', 'past_')}"

result_df = weather_df.analytics.time_series_agg(
    aggs={"rain": ["SUM"]},
    windows=["-3D", "-5D"],
    sliding_interval="1D",
    group_by=["location"],
    time_col="ts",
    col_formatter=custom_column_naming
)
Copy

您还可以在 SQL 中使用 RANGE BETWEEN 语法构造时序特征。有关详细信息,请参阅 Snowflake 窗口函数

Snowflake SQL:

select
    TS,
    LOCATION,
    sum(RAIN) over (
        partition by LOCATION
        order by TS
        range between interval '3 days' preceding and current row
    ) SUM_RAIN_3D,
    sum(RAIN) over (
        partition by LOCATION
        order by TS
        range between interval '5 days' preceding and current row
    ) SUM_RAIN_5D
from <source_table_name>
Copy

在特征管道中使用用户定义的函数

Snowflake 特征平台支持特征管道定义中的用户定义的函数 (UDFs)。但只有确定性函数(在输入相同的前提下,此类函数始终返回相同的结果)才能进行增量式维护。要启用增量式维护,请在注册 UDF 时将其标记为不可变。

# In Python
@F.udf(
    name="MY_UDF",
    immutable=True,
    # ...
)
def my_udf(...):
    # ...
Copy

如果您的函数是使用 SQL 编写的,请指定 IMMUTABLE 关键字。请参阅 此指南

语言: 中文