教程:使用 Cortex Analyst 回答有关时间序列收入数据的问题¶
简介¶
Cortex Analyst 通过生成和执行 SQL 查询,将关于数据的自然语言问题转化为结果。本教程介绍如何设置 Cortex Analyst 以响应有关时间序列收入数据集的问题。
您将学习以下内容¶
为数据集建立语义模型。
创建一个查询 Cortex Analyst 的 Streamlit 应用程序。
先决条件¶
要完成本教程,需要满足以下先决条件:
您拥有一个 Snowflake 账户和用户,该用户具有这样的角色:可授予创建数据库、架构、表、暂存区和虚拟仓库对象所需的权限。
您在本地系统上设置了 Streamlit (https://pypi.org/project/streamlit/)。
请参阅 20 分钟学会使用 Snowflake 以获得满足这些要求的说明。
第 1 步:设置¶
获取样本数据¶
您将使用`从 Google 云端硬盘 <https://drive.google.com/drive/folders/1SON4rmnmB7pROZOoHCX2BVV5LvGSDizx?usp=sharing (link removed)>`_ 下载的一个示例数据集和一个相应的语义模型。将以下数据文件下载到您的系统:
daily_revenue_combined.csv
daily_revenue_by_region_combined.csv
daily_revenue_by_product_combined.csv
同时下载语义模型:
revenue_timeseries.yaml
您可能想花一点时间查看包含数据的语义模型的 YAML 文件。语义模型通过提供额外信息,补充每个表的 SQL 架构,帮助 Cortex Analyst 理解有关数据的问题。有关更多信息,请参阅 Cortex Analyst 语义模型规范。
备注
在非教程设置中,您将自带数据(可能已经存在于 Snowflake 表中),并开发自己的语义模型。
创建 Snowflake 对象¶
使用 |sf-web-interface|(Snowflake UI),来创建本教程所需的 Snowflake 对象。完成本教程后,您可以删除这些对象。
备注
使用可以创建数据库、架构、仓库、暂存区和表的角色。
要创建对象,请执行以下操作:
在 Snowsight 用户界面的左侧导航栏中,选择 Worksheets,然后选择 + 按钮。将出现一个新的 SQL 工作表。
将下面的 SQL 代码粘贴到工作表中,然后从工作表右上角的下拉列表中选择 Run All。
CREATE OR REPLACE DATABASE cortex_analyst_demo;
CREATE OR REPLACE SCHEMA revenue_timeseries;
CREATE OR REPLACE WAREHOUSE cortex_analyst_wh
WAREHOUSE_SIZE = 'large'
WAREHOUSE_TYPE = 'standard'
AUTO_SUSPEND = 60
AUTO_RESUME = TRUE
INITIALLY_SUSPENDED = TRUE
COMMENT = 'warehouse for cortex analyst demo';
USE WAREHOUSE cortex_analyst_wh;
CREATE STAGE raw_data DIRECTORY = (ENABLE = TRUE);
CREATE OR REPLACE TABLE CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE (
DATE DATE,
REVENUE FLOAT,
COGS FLOAT,
FORECASTED_REVENUE FLOAT
);
CREATE OR REPLACE TABLE CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE_BY_PRODUCT (
DATE DATE,
PRODUCT_LINE VARCHAR(16777216),
REVENUE FLOAT,
COGS FLOAT,
FORECASTED_REVENUE FLOAT
);
CREATE OR REPLACE TABLE CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE_BY_REGION (
DATE DATE,
SALES_REGION VARCHAR(16777216),
REVENUE FLOAT,
COGS FLOAT,
FORECASTED_REVENUE FLOAT
);
上面的 SQL 将创建以下对象:
一个名为
cortex_analyst_demo
的数据库该数据库中名为
revenue_timeseries
的架构该架构中的三个表:DAILY_REVENUE、DAILY_REVENUE_BY_PRODUCT 和 DAILY_REVENUE_BY_REGION
一个名为
raw_data
的暂存区,保存我们将加载到这些表中的原始数据一个名为
cortex_analyst_wh
的虚拟仓库
请注意,虚拟仓库最初处于暂停状态,并将在需要时自动启动。
第 2 步:将数据加载到 Snowflake¶
要将数据从 CSV 文件导入 Snowflake,需要先将其上传到暂存区,然后再将数据从暂存区加载到表中。同时,您将上传语义模型 YAML 文件,以便在后续步骤中使用。
您将上传的文件是:
daily_revenue_combined.csv
daily_revenue_by_region_combined.csv
daily_revenue_by_product_combined.csv
revenue_timeseries.yaml
要将文件上传到 Snowsight,请执行以下操作:
在 Snowsight UI 中,选择左侧导航栏中的 Data icon,然后选择 Add Data。在 Add Data 页面中,选择 Load files into a stage。
将您在上一步中下载的四个文件拖入 Snowsight 窗口。
选择数据库
cortex_analyst_demo
和暂存区raw_data
,然后选择 Upload 按钮以上传文件。
现在您已经上传了文件,请在 Snowsight 工作表中执行下面的 SQL 命令,从 CSV 文件中加载数据。
COPY INTO CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE
FROM @raw_data
FILES = ('daily_revenue_combined.csv')
FILE_FORMAT = (
TYPE=CSV,
SKIP_HEADER=1,
FIELD_DELIMITER=',',
TRIM_SPACE=FALSE,
FIELD_OPTIONALLY_ENCLOSED_BY=NONE,
REPLACE_INVALID_CHARACTERS=TRUE,
DATE_FORMAT=AUTO,
TIME_FORMAT=AUTO,
TIMESTAMP_FORMAT=AUTO
EMPTY_FIELD_AS_NULL = FALSE
error_on_column_count_mismatch=false
)
ON_ERROR=CONTINUE
FORCE = TRUE ;
COPY INTO CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE_BY_PRODUCT
FROM @raw_data
FILES = ('daily_revenue_by_product_combined.csv')
FILE_FORMAT = (
TYPE=CSV,
SKIP_HEADER=1,
FIELD_DELIMITER=',',
TRIM_SPACE=FALSE,
FIELD_OPTIONALLY_ENCLOSED_BY=NONE,
REPLACE_INVALID_CHARACTERS=TRUE,
DATE_FORMAT=AUTO,
TIME_FORMAT=AUTO,
TIMESTAMP_FORMAT=AUTO
EMPTY_FIELD_AS_NULL = FALSE
error_on_column_count_mismatch=false
)
ON_ERROR=CONTINUE
FORCE = TRUE ;
COPY INTO CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE_BY_REGION
FROM @raw_data
FILES = ('daily_revenue_by_region_combined.csv')
FILE_FORMAT = (
TYPE=CSV,
SKIP_HEADER=1,
FIELD_DELIMITER=',',
TRIM_SPACE=FALSE,
FIELD_OPTIONALLY_ENCLOSED_BY=NONE,
REPLACE_INVALID_CHARACTERS=TRUE,
DATE_FORMAT=AUTO,
TIME_FORMAT=AUTO,
TIMESTAMP_FORMAT=AUTO
EMPTY_FIELD_AS_NULL = FALSE
error_on_column_count_mismatch=false
)
ON_ERROR=CONTINUE
FORCE = TRUE ;
第 3 步:创建一个通过 Cortex Analyst 与数据对话的 Streamlit 应用程序¶
要创建一个使用 Cortex Analyst 的 Streamlit 应用程序,请执行以下操作:
在本地创建一个名为
analyst_demo.py
的 Python 文件。将下面的代码复制到文件中。
用您的账户详细信息替换占位符值。
使用
streamlit run analyst_demo.py
运行 Streamlit 应用程序。
from typing import Any, Dict, List, Optional
import pandas as pd
import requests
import snowflake.connector
import streamlit as st
DATABASE = "CORTEX_ANALYST_DEMO"
SCHEMA = "REVENUE_TIMESERIES"
STAGE = "RAW_DATA"
FILE = "revenue_timeseries.yaml"
WAREHOUSE = "cortex_analyst_wh"
# replace values below with your Snowflake connection information
HOST = "<host>"
ACCOUNT = "<account>"
USER = "<user>"
PASSWORD = "<password>"
ROLE = "<role>"
if 'CONN' not in st.session_state or st.session_state.CONN is None:
st.session_state.CONN = snowflake.connector.connect(
user=USER,
password=PASSWORD,
account=ACCOUNT,
host=HOST,
port=443,
warehouse=WAREHOUSE,
role=ROLE,
)
def send_message(prompt: str) -> Dict[str, Any]:
"""Calls the REST API and returns the response."""
request_body = {
"messages": [{"role": "user", "content": [{"type": "text", "text": prompt}]}],
"semantic_model_file": f"@{DATABASE}.{SCHEMA}.{STAGE}/{FILE}",
}
resp = requests.post(
url=f"https://{HOST}/api/v2/cortex/analyst/message",
json=request_body,
headers={
"Authorization": f'Snowflake Token="{st.session_state.CONN.rest.token}"',
"Content-Type": "application/json",
},
)
request_id = resp.headers.get("X-Snowflake-Request-Id")
if resp.status_code < 400:
return {**resp.json(), "request_id": request_id} # type: ignore[arg-type]
else:
raise Exception(
f"Failed request (id: {request_id}) with status {resp.status_code}: {resp.text}"
)
def process_message(prompt: str) -> None:
"""Processes a message and adds the response to the chat."""
st.session_state.messages.append(
{"role": "user", "content": [{"type": "text", "text": prompt}]}
)
with st.chat_message("user"):
st.markdown(prompt)
with st.chat_message("assistant"):
with st.spinner("Generating response..."):
response = send_message(prompt=prompt)
request_id = response["request_id"]
content = response["message"]["content"]
display_content(content=content, request_id=request_id) # type: ignore[arg-type]
st.session_state.messages.append(
{"role": "assistant", "content": content, "request_id": request_id}
)
def display_content(
content: List[Dict[str, str]],
request_id: Optional[str] = None,
message_index: Optional[int] = None,
) -> None:
"""Displays a content item for a message."""
message_index = message_index or len(st.session_state.messages)
if request_id:
with st.expander("Request ID", expanded=False):
st.markdown(request_id)
for item in content:
if item["type"] == "text":
st.markdown(item["text"])
elif item["type"] == "suggestions":
with st.expander("Suggestions", expanded=True):
for suggestion_index, suggestion in enumerate(item["suggestions"]):
if st.button(suggestion, key=f"{message_index}_{suggestion_index}"):
st.session_state.active_suggestion = suggestion
elif item["type"] == "sql":
with st.expander("SQL Query", expanded=False):
st.code(item["statement"], language="sql")
with st.expander("Results", expanded=True):
with st.spinner("Running SQL..."):
df = pd.read_sql(item["statement"], st.session_state.CONN)
if len(df.index) > 1:
data_tab, line_tab, bar_tab = st.tabs(
["Data", "Line Chart", "Bar Chart"]
)
data_tab.dataframe(df)
if len(df.columns) > 1:
df = df.set_index(df.columns[0])
with line_tab:
st.line_chart(df)
with bar_tab:
st.bar_chart(df)
else:
st.dataframe(df)
st.title("Cortex Analyst")
st.markdown(f"Semantic Model: `{FILE}`")
if "messages" not in st.session_state:
st.session_state.messages = []
st.session_state.suggestions = []
st.session_state.active_suggestion = None
for message_index, message in enumerate(st.session_state.messages):
with st.chat_message(message["role"]):
display_content(
content=message["content"],
request_id=message.get("request_id"),
message_index=message_index,
)
if user_input := st.chat_input("What is your question?"):
process_message(prompt=user_input)
if st.session_state.active_suggestion:
process_message(prompt=st.session_state.active_suggestion)
st.session_state.active_suggestion = None
运行这个应用程序时,它会提示您输入一个问题。从“我能问什么问题?”开始,尝试其中的一些建议。
第 7 步:清理¶
清理(可选)¶
执行以下 DROP * <object>* 命令,将系统恢复到教程开始前的状态:
DROP DATABASE IF EXISTS cortex_analyst_demo;
DROP WAREHOUSE IF EXISTS cortex_analyst_wh;
删除数据库会自动移除所有子数据库对象,例如表。
后续步骤¶
恭喜!您已成功构建了一个在 Snowflake 中“与数据对话”的简单 Cortex Analyst 应用程序。
其他资源¶
使用以下资源继续学习: