常见特征和查询模式
Note
The Snowflake Feature Store API is available in the Snowpark ML Python package (snowflake-ml-python) v1.5.0 and later.
The FeatureView class accepts a Snowpark DataFrame object containing the feature transformation logic. You can
therefore describe your features in any way supported by the Snowpark DataFrame API or by Snowflake SQL. You can pass
the DataFrame to the FeatureView constructor directly.
The Snowpark Python API provides
analytics functions
for easily defining many common feature types, such as windowed aggregations. This topic contains some examples of these.
The open source snowflake-ml-python (https://github.com/snowflakedb/snowflake-ml-python/tree/main/snowflake/ml/feature_store/examples )
on Github also contains some sample feature view and entity definitions using public datasets.
每行特征
In per-row features, functions are applied to each row of tabular data. For example, the following code fills null in
foo with zero, then computes a ZIP code from lat and long. There is one output row per input
row.
Python:
Copy code Expand code block def get_zipcode (df: snowpark.DataFrame ) -> snowpark.DataFrame:
df = df.fillna({"foo" : 0 })
df = df.with_column(
"zipcode" ,
F.compute_zipcode(df["lat" ], df["long" ])
)
return df
Snowflake SQL:
Copy code Expand code block SELECT
COALESCE ( foo, 0 ) AS foo,
compute_zipcode( lat, long) AS zipcode
FROM < source_table_name> ;
每组特征
每组特征对一个组内某列中的值进行汇总。例如,可以按城市分组日降雨量总和,用于天气预报。对于每个组,输出 DataFrame 中都包含一行。
Python:
Copy code Expand code block def sum_rainfall (df: snowpark.DataFrame ) -> snowpark.DataFrame:
df = df.group_by(
["location" , to_date(timestamp)]
).agg(
sum ("rain" ).alias("sum_rain" ),
avg("humidity" ).alias("avg_humidity" )
)
return df
Snowflake SQL:
Copy code Expand code block SELECT
location ,
TO_DATE ( timestamp ) AS date ,
SUM ( rain) AS sum_rain,
AVG ( humidity) AS avg_humidity
FROM < source_table_name>
GROUP BY location , date ;
基于行的窗口特征
基于行的窗口特征会对固定行窗口的值进行汇总;例如,对最后三笔交易金额求和。对于每个窗口框架,输出 DataFrame 中都包含一行。
Python:
Copy code Expand code block def sum_past_3_transactions (df: snowpark.DataFrame ) -> snowpark.DataFrame:
window = Window.partition_by("id" ).order_by("ts" ).rows_between(2 , Window.CURRENT_ROW)
return df.select(
sum ("amount" ).over(window).alias("sum_past_3_transactions" )
)
Snowflake SQL:
Copy code Expand code block SELECT
id,
SUM ( amount) OVER ( PARTITION BY id ORDER BY ts ROWS BETWEEN 2 PRECEDING and 0 FOLLOWING )
AS sum_past_3_transactions
FROM < source_table_name> ;
移动聚合特征
移动聚合特征会计算指定窗口大小内的移动统计信息,如总和及平均值。该函数会根据定义的窗口大小、顺序和分组,动态计算 DataFrame 不同子集的这些聚合。对于每个窗口框架,输出 DataFrame 中都包含一行。
Copy code Expand code block new_df = df.analytics.moving_agg(
aggs={"SALESAMOUNT" : ["SUM" , "AVG" ]},
window_sizes=[2 , 3 ],
order_by=["ORDERDATE" ],
group_by=["PRODUCTKEY" ]
)
累计聚合特征
累计聚合会计算整个数据分区中的持续总计、最小值、最大值和其他累积统计信息,并按照指定的条件对其进行排序和分组。不同于移动聚合,这些聚合会从分区的起点开始延伸,或者延伸到分区的末尾,具体取决于为其指定的方向,目的是提供不重置的累计总数。对于每个输入行,输出 DataFrame 中都包含一行。
Copy code Expand code block new_df = df.analytics.cumulative_agg(
aggs={"SALESAMOUNT" : ["SUM" , "MIN" , "MAX" ]},
order_by=["ORDERDATE" ],
group_by=["PRODUCTKEY" ],
is_forward=True
)
滞后特征
滞后特征按指定行数偏移,引入了包含各分区内先前行值的新列。此功能对于将当前值与数据集的先前值进行比较至关重要,有助于检测随时间推移的趋势或变化。对于每个输入行,输出 DataFrame 中都包含一行。
Copy code Expand code block new_df = df.analytics.compute_lag(
cols=["SALESAMOUNT" ],
lags=[1 , 2 ],
order_by=["ORDERDATE" ],
group_by=["PRODUCTKEY" ]
)
前导特征
前导特征与滞后特征恰好相反,它将数据上移,创建包含后续行值的新列。此特征对于根据数据集中已有的未来数据点作出预测或假设至关重要。对于每个输入行,输出 DataFrame 中都包含一行。
Copy code Expand code block new_df = df.analytics.compute_lead(
cols=["SALESAMOUNT" ],
leads=[1 , 2 ],
order_by=["ORDERDATE" ],
group_by=["PRODUCTKEY" ]
)
时序特征
时序特征根据时间窗口以及时间轴上的固定位置计算特征值。例如,在过去一周内的拼车出行次数,或者过去三天内的总销售额。对于每个时间窗口,输出 DataFrame 中都包含一行。
Snowflake 特征平台的最新版本包括一个实验性的时序汇总 API。使用此 API,可以使用如下代码创建时序特征:
Python:
Copy code Expand code block def custom_column_naming (input_col, agg, window ):
return f"{agg} _{input_col} _{window.replace('-' , 'past_' )} "
result_df = weather_df.analytics.time_series_agg(
aggs={"rain" : ["SUM" ]},
windows=["-3D" , "-5D" ],
sliding_interval="1D" ,
group_by=["location" ],
time_col="ts" ,
col_formatter=custom_column_naming
)
You can also construct time-series features with RANGE BETWEEN syntax in SQL. for more details, see
Snowflake Window functions .
Snowflake SQL:
Copy code Expand code block select
TS ,
LOCATION ,
sum ( RAIN) over (
partition by LOCATION
order by TS
range between interval '3 days' preceding and current row
) SUM_RAIN_3D,
sum ( RAIN) over (
partition by LOCATION
order by TS
range between interval '5 days' preceding and current row
) SUM_RAIN_5D
from < source_table_name>
在特征管道中使用用户定义的函数
Snowflake 特征平台支持特征管道定义中的用户定义的函数 (UDFs)。但只有确定性函数(在输入相同的前提下,此类函数始终返回相同的结果)才能进行增量式维护。要启用增量式维护,请在注册 UDF 时将其标记为不可变。
Copy code Expand code block
@F.udf(
name="MY_UDF" ,
immutable=True ,
)
def my_udf (... ):
If your function is written in SQL, specify the IMMUTABLE keyword. See this guide .