教程:使用 Cortex Analyst 回答有关时间序列收入数据的问题

简介

Cortex Analyst 通过生成和执行 SQL 查询,将关于数据的自然语言问题转化为结果。本教程介绍如何设置 Cortex Analyst 以响应有关时间序列收入数据集的问题。

您将学习以下内容

  • 为数据集建立语义模型。

  • 创建一个查询 Cortex Analyst 的 Streamlit 应用程序。

先决条件

要完成本教程,需要满足以下先决条件:

  • 您拥有一个 Snowflake 账户和用户,该用户具有这样的角色:可授予创建数据库、架构、表、暂存区和虚拟仓库对象所需的权限。

  • 您在本地系统上设置了 Streamlit (https://pypi.org/project/streamlit/)。

请参阅 20 分钟学会使用 Snowflake 以获得满足这些要求的说明。

第 1 步:设置

获取样本数据

您将使用`从 Google 云端硬盘 <https://drive.google.com/drive/folders/1SON4rmnmB7pROZOoHCX2BVV5LvGSDizx?usp=sharing (link removed)>`_ 下载的一个示例数据集和一个相应的语义模型。将以下数据文件下载到您的系统:

  • daily_revenue_combined.csv

  • daily_revenue_by_region_combined.csv

  • daily_revenue_by_product_combined.csv

同时下载语义模型:

  • revenue_timeseries.yaml

您可能想花一点时间查看包含数据的语义模型的 YAML 文件。语义模型通过提供额外信息,补充每个表的 SQL 架构,帮助 Cortex Analyst 理解有关数据的问题。有关更多信息,请参阅 Cortex Analyst 语义模型规范

备注

在非教程设置中,您将自带数据(可能已经存在于 Snowflake 表中),并开发自己的语义模型。

创建 Snowflake 对象

使用 |sf-web-interface|(Snowflake UI),来创建本教程所需的 Snowflake 对象。完成本教程后,您可以删除这些对象。

备注

使用可以创建数据库、架构、仓库、暂存区和表的角色。

要创建对象,请执行以下操作:

  1. 在 Snowsight 用户界面的左侧导航栏中,选择 Worksheets,然后选择 + 按钮。将出现一个新的 SQL 工作表。

  2. 将下面的 SQL 代码粘贴到工作表中,然后从工作表右上角的下拉列表中选择 Run All

CREATE OR REPLACE DATABASE cortex_analyst_demo;

CREATE OR REPLACE SCHEMA revenue_timeseries;

CREATE OR REPLACE WAREHOUSE cortex_analyst_wh
    WAREHOUSE_SIZE = 'large'
    WAREHOUSE_TYPE = 'standard'
    AUTO_SUSPEND = 60
    AUTO_RESUME = TRUE
    INITIALLY_SUSPENDED = TRUE
COMMENT = 'warehouse for cortex analyst demo';

USE WAREHOUSE cortex_analyst_wh;

CREATE STAGE raw_data DIRECTORY = (ENABLE = TRUE);

CREATE OR REPLACE TABLE CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE (
    DATE DATE,
    REVENUE FLOAT,
    COGS FLOAT,
    FORECASTED_REVENUE FLOAT
);

CREATE OR REPLACE TABLE CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE_BY_PRODUCT (
    DATE DATE,
    PRODUCT_LINE VARCHAR(16777216),
    REVENUE FLOAT,
    COGS FLOAT,
    FORECASTED_REVENUE FLOAT
);

CREATE OR REPLACE TABLE CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE_BY_REGION (
    DATE DATE,
    SALES_REGION VARCHAR(16777216),
    REVENUE FLOAT,
    COGS FLOAT,
    FORECASTED_REVENUE FLOAT
);
Copy

上面的 SQL 将创建以下对象:

  • 一个名为 cortex_analyst_demo 的数据库

  • 该数据库中名为 revenue_timeseries 的架构

  • 该架构中的三个表:DAILY_REVENUE、DAILY_REVENUE_BY_PRODUCT 和 DAILY_REVENUE_BY_REGION

  • 一个名为 raw_data 的暂存区,保存我们将加载到这些表中的原始数据

  • 一个名为 cortex_analyst_wh 的虚拟仓库

请注意,虚拟仓库最初处于暂停状态,并将在需要时自动启动。

第 2 步:将数据加载到 Snowflake

要将数据从 CSV 文件导入 Snowflake,需要先将其上传到暂存区,然后再将数据从暂存区加载到表中。同时,您将上传语义模型 YAML 文件,以便在后续步骤中使用。

您将上传的文件是:

  • daily_revenue_combined.csv

  • daily_revenue_by_region_combined.csv

  • daily_revenue_by_product_combined.csv

  • revenue_timeseries.yaml

要将文件上传到 Snowsight,请执行以下操作:

  1. 在 Snowsight UI 中,选择左侧导航栏中的 Data icon,然后选择 Add Data。在 Add Data 页面中,选择 Load files into a stage

  2. 将您在上一步中下载的四个文件拖入 Snowsight 窗口。

  3. 选择数据库 cortex_analyst_demo 和暂存区 raw_data,然后选择 Upload 按钮以上传文件。

现在您已经上传了文件,请在 Snowsight 工作表中执行下面的 SQL 命令,从 CSV 文件中加载数据。

COPY INTO CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE
FROM @raw_data
FILES = ('daily_revenue_combined.csv')
FILE_FORMAT = (
    TYPE=CSV,
    SKIP_HEADER=1,
    FIELD_DELIMITER=',',
    TRIM_SPACE=FALSE,
    FIELD_OPTIONALLY_ENCLOSED_BY=NONE,
    REPLACE_INVALID_CHARACTERS=TRUE,
    DATE_FORMAT=AUTO,
    TIME_FORMAT=AUTO,
    TIMESTAMP_FORMAT=AUTO
    EMPTY_FIELD_AS_NULL = FALSE
    error_on_column_count_mismatch=false
)
ON_ERROR=CONTINUE
FORCE = TRUE ;

COPY INTO CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE_BY_PRODUCT
FROM @raw_data
FILES = ('daily_revenue_by_product_combined.csv')
FILE_FORMAT = (
    TYPE=CSV,
    SKIP_HEADER=1,
    FIELD_DELIMITER=',',
    TRIM_SPACE=FALSE,
    FIELD_OPTIONALLY_ENCLOSED_BY=NONE,
    REPLACE_INVALID_CHARACTERS=TRUE,
    DATE_FORMAT=AUTO,
    TIME_FORMAT=AUTO,
    TIMESTAMP_FORMAT=AUTO
    EMPTY_FIELD_AS_NULL = FALSE
    error_on_column_count_mismatch=false
)
ON_ERROR=CONTINUE
FORCE = TRUE ;

COPY INTO CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE_BY_REGION
FROM @raw_data
FILES = ('daily_revenue_by_region_combined.csv')
FILE_FORMAT = (
    TYPE=CSV,
    SKIP_HEADER=1,
    FIELD_DELIMITER=',',
    TRIM_SPACE=FALSE,
    FIELD_OPTIONALLY_ENCLOSED_BY=NONE,
    REPLACE_INVALID_CHARACTERS=TRUE,
    DATE_FORMAT=AUTO,
    TIME_FORMAT=AUTO,
    TIMESTAMP_FORMAT=AUTO
    EMPTY_FIELD_AS_NULL = FALSE
    error_on_column_count_mismatch=false
)
ON_ERROR=CONTINUE
FORCE = TRUE ;
Copy

第 3 步:创建一个通过 Cortex Analyst 与数据对话的 Streamlit 应用程序

要创建一个使用 Cortex Analyst 的 Streamlit 应用程序,请执行以下操作:

  1. 在本地创建一个名为 analyst_demo.py 的 Python 文件。

  2. 将下面的代码复制到文件中。

  3. 用您的账户详细信息替换占位符值。

  4. 使用 streamlit run analyst_demo.py 运行 Streamlit 应用程序。

from typing import Any, Dict, List, Optional

import pandas as pd
import requests
import snowflake.connector
import streamlit as st

DATABASE = "CORTEX_ANALYST_DEMO"
SCHEMA = "REVENUE_TIMESERIES"
STAGE = "RAW_DATA"
FILE = "revenue_timeseries.yaml"
WAREHOUSE = "cortex_analyst_wh"

# replace values below with your Snowflake connection information
HOST = "<host>"
ACCOUNT = "<account>"
USER = "<user>"
PASSWORD = "<password>"
ROLE = "<role>"

if 'CONN' not in st.session_state or st.session_state.CONN is None:
    st.session_state.CONN = snowflake.connector.connect(
        user=USER,
        password=PASSWORD,
        account=ACCOUNT,
        host=HOST,
        port=443,
        warehouse=WAREHOUSE,
        role=ROLE,
    )

def send_message(prompt: str) -> Dict[str, Any]:
    """Calls the REST API and returns the response."""
    request_body = {
        "messages": [{"role": "user", "content": [{"type": "text", "text": prompt}]}],
        "semantic_model_file": f"@{DATABASE}.{SCHEMA}.{STAGE}/{FILE}",
    }
    resp = requests.post(
        url=f"https://{HOST}/api/v2/cortex/analyst/message",
        json=request_body,
        headers={
            "Authorization": f'Snowflake Token="{st.session_state.CONN.rest.token}"',
            "Content-Type": "application/json",
        },
    )
    request_id = resp.headers.get("X-Snowflake-Request-Id")
    if resp.status_code < 400:
        return {**resp.json(), "request_id": request_id}  # type: ignore[arg-type]
    else:
        raise Exception(
            f"Failed request (id: {request_id}) with status {resp.status_code}: {resp.text}"
        )

def process_message(prompt: str) -> None:
    """Processes a message and adds the response to the chat."""
    st.session_state.messages.append(
        {"role": "user", "content": [{"type": "text", "text": prompt}]}
    )
    with st.chat_message("user"):
        st.markdown(prompt)
    with st.chat_message("assistant"):
        with st.spinner("Generating response..."):
            response = send_message(prompt=prompt)
            request_id = response["request_id"]
            content = response["message"]["content"]
            display_content(content=content, request_id=request_id)  # type: ignore[arg-type]
    st.session_state.messages.append(
        {"role": "assistant", "content": content, "request_id": request_id}
    )

def display_content(
    content: List[Dict[str, str]],
    request_id: Optional[str] = None,
    message_index: Optional[int] = None,
) -> None:
    """Displays a content item for a message."""
    message_index = message_index or len(st.session_state.messages)
    if request_id:
        with st.expander("Request ID", expanded=False):
            st.markdown(request_id)
    for item in content:
        if item["type"] == "text":
            st.markdown(item["text"])
        elif item["type"] == "suggestions":
            with st.expander("Suggestions", expanded=True):
                for suggestion_index, suggestion in enumerate(item["suggestions"]):
                    if st.button(suggestion, key=f"{message_index}_{suggestion_index}"):
                        st.session_state.active_suggestion = suggestion
        elif item["type"] == "sql":
            with st.expander("SQL Query", expanded=False):
                st.code(item["statement"], language="sql")
            with st.expander("Results", expanded=True):
                with st.spinner("Running SQL..."):
                    df = pd.read_sql(item["statement"], st.session_state.CONN)
                    if len(df.index) > 1:
                        data_tab, line_tab, bar_tab = st.tabs(
                            ["Data", "Line Chart", "Bar Chart"]
                        )
                        data_tab.dataframe(df)
                        if len(df.columns) > 1:
                            df = df.set_index(df.columns[0])
                        with line_tab:
                            st.line_chart(df)
                        with bar_tab:
                            st.bar_chart(df)
                    else:
                        st.dataframe(df)

st.title("Cortex Analyst")
st.markdown(f"Semantic Model: `{FILE}`")

if "messages" not in st.session_state:
    st.session_state.messages = []
    st.session_state.suggestions = []
    st.session_state.active_suggestion = None

for message_index, message in enumerate(st.session_state.messages):
    with st.chat_message(message["role"]):
        display_content(
            content=message["content"],
            request_id=message.get("request_id"),
            message_index=message_index,
        )

if user_input := st.chat_input("What is your question?"):
    process_message(prompt=user_input)

if st.session_state.active_suggestion:
    process_message(prompt=st.session_state.active_suggestion)
    st.session_state.active_suggestion = None
Copy

运行这个应用程序时,它会提示您输入一个问题。从“我能问什么问题?”开始,尝试其中的一些建议。

第 7 步:清理

清理(可选)

执行以下 DROP * <object>* 命令,将系统恢复到教程开始前的状态:

DROP DATABASE IF EXISTS cortex_analyst_demo;
DROP WAREHOUSE IF EXISTS cortex_analyst_wh;
Copy

删除数据库会自动移除所有子数据库对象,例如表。

后续步骤

恭喜!您已成功构建了一个在 Snowflake 中“与数据对话”的简单 Cortex Analyst 应用程序。

其他资源

使用以下资源继续学习:

语言: 中文