教程 3:使用 Cortex Search 构建 PDF 聊天机器人

简介

This tutorial describes how to build a chatbot from a dataset of PDF documents using Cortex Search. In Tutorial 2, you learned how to build a chatbot from text data that was already extracted from its source. This tutorial walks through an example of extracting that text from the PDFs using a basic Python UDF, then ingesting the extracted data into a Cortex Search Service.

您将学习以下内容

  • 使用 Python UDF 从暂存区内的一组 PDF 文件中提取文本。
  • 从提取的文本中创建 Cortex Search 服务。
  • Create a Streamlit-in-Snowflake chat app that lets you ask questions about the data extracted from the PDF documents.

先决条件

要完成本教程,需要满足以下先决条件:

  • You have a Snowflake account and user with a role that grants the necessary privileges to create a database, tables, virtual warehouse objects, Cortex Search Services, and Streamlit apps.

Refer to the Snowflake in 20 minutes for instructions to meet these requirements.

第 1 步:设置

获取 PDF 数据

您将使用联邦公开市场委员会 (FOMC) 会议记录的样本数据集。这是一个包含 12 份文件的样本,每份文件有 10 页,记录了 2023 年和 2024 年 FOMC 会议的会议记录。通过此链接直接从浏览器下载文件:

  • FOMC minutes sample

The complete set of FOMC minutes can be found at the US Federal Reserve’s website (https://www.federalreserve.gov/monetarypolicy/fomccalendars.htm).

Note

在非教程设置中,您需要自带数据,这些数据可能已存储于 Snowflake 暂存区。

创建数据库、表和仓库

执行以下语句,创建本教程所需的数据库和虚拟仓库。完成本教程后,您可以删除这些对象。

CREATE DATABASE IF NOT EXISTS cortex_search_tutorial_db;

CREATE OR REPLACE WAREHOUSE cortex_search_tutorial_wh WITH
     WAREHOUSE_SIZE='X-SMALL'
     AUTO_SUSPEND = 120
     AUTO_RESUME = TRUE
     INITIALLY_SUSPENDED=TRUE;

 USE WAREHOUSE cortex_search_tutorial_wh;

Note

  • The CREATE DATABASE statement creates a database. The database automatically includes a schema named PUBLIC.
  • The CREATE WAREHOUSE statement creates an initially suspended warehouse.

第 2 步:将数据加载到 Snowflake

首先创建一个 Snowflake 暂存区,用于存储包含数据的文件。此暂存区将保存会议记录 PDF 文件。

CREATE OR REPLACE STAGE cortex_search_tutorial_db.public.fomc
    DIRECTORY = (ENABLE = TRUE)
    ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE');

Note

配置目录和加密,用于生成文件的 presigned_url。如果不需要生成 presigned_url,可以跳过这些配置。

Now upload the dataset. You can upload the dataset in Snowsight or using SQL. To upload in Snowsight:

  1. Sign in to Snowsight.
  2. In the navigation menu, select Catalog » Database Explorer.
  3. Select your database cortex_search_tutorial_db.
  4. Select your schema public.
  5. Select Stages and select fomc.
  6. On the top right, Select the + Files button.
  7. Drag and drop files into the UI or select Browse to choose a file from the dialog window.
  8. Select Upload to upload your file.

第 3 步:解析 PDF 文件

在此步骤中,我们将从 PDFs 中提取原始文本,然后将其拆分为多个块,以供引入到搜索服务中。

First, we will use the Parsing documents with AI_PARSE_DOCUMENT function to extract the text and layout information from the PDFs into a new table, RAW_TEXT.

CREATE OR REPLACE TABLE cortex_search_tutorial_db.public.raw_text AS
SELECT
    RELATIVE_PATH,
    TO_VARCHAR (
        SNOWFLAKE.CORTEX.PARSE_DOCUMENT (
            '@cortex_search_tutorial_db.public.fomc',
            RELATIVE_PATH,
            {'mode': 'LAYOUT'} ):content
        ) AS EXTRACTED_LAYOUT
FROM
    DIRECTORY('@cortex_search_tutorial_db.public.fomc')
WHERE
    RELATIVE_PATH LIKE '%.pdf';

Then, we will use SPLIT_TEXT_MARKDOWN_HEADER to split the documents up into chunks of maximum size 2000 characters each, using the top two markdown header levels as chunk boundaries. We’ll insert the chunks into a new table DOC_CHUNKS.

CREATE OR REPLACE TABLE cortex_search_tutorial_db.public.doc_chunks AS
SELECT
    relative_path,
    BUILD_SCOPED_FILE_URL(@cortex_search_tutorial_db.public.fomc, relative_path) AS file_url,
    (
        relative_path || ':\n'
        || coalesce('Header 1: ' || c.value['headers']['header_1'] || '\n', '')
        || coalesce('Header 2: ' || c.value['headers']['header_2'] || '\n', '')
        || c.value['chunk']
    ) AS chunk,
    'English' AS language
FROM
    cortex_search_tutorial_db.public.raw_text,
    LATERAL FLATTEN(SNOWFLAKE.CORTEX.SPLIT_TEXT_MARKDOWN_HEADER(
        EXTRACTED_LAYOUT,
        OBJECT_CONSTRUCT('#', 'header_1', '##', 'header_2'),
        2000, -- chunks of 2000 characters
        300 -- 300 character overlap
    )) c;

第 4 步:创建搜索服务

运行以下 SQL 命令,在新表上创建搜索服务:

CREATE OR REPLACE CORTEX SEARCH SERVICE cortex_search_tutorial_db.public.fomc_meeting
    ON chunk
    ATTRIBUTES language
    WAREHOUSE = cortex_search_tutorial_wh
    TARGET_LAG = '1 hour'
    AS (
    SELECT
        chunk,
        relative_path,
        file_url,
        language
    FROM cortex_search_tutorial_db.public.doc_chunks
    );

This command specifies the attributes, which are the columns that you’ll be able to filter search results on, as well as the warehouse and target lag. The search column is designated as chunk, which is generated in the source query as a concatenation of several text columns in the base table. The other columns in the source query can be included in response to a search request.

第 5 步:创建一个 Streamlit 应用程序

You can query the service with Python SDK (using the snowflake Python package). This tutorial demonstrates using the Python SDK in a Streamlit in Snowflake application.

First, ensure your global Snowsight UI role is the same as the role used to create the service in the service creation step.

  1. Sign in to Snowsight.
  2. In the navigation menu, select Projects » Streamlit.
  3. Select + Streamlit App.
  4. Important: Select the cortex_search_tutorial_db database and the public schema for the app location.
  5. In the left pane of the Streamlit in Snowflake editor, select Packages and add snowflake (version >= 0.8.0) and snowflake-ml-python to install the required packages in your application.
  6. 用以下 Streamlit 应用程序替换示例应用程序代码:
    import streamlit as st
    from snowflake.core import Root # requires snowflake>=0.8.0
    from snowflake.cortex import Complete
    from snowflake.snowpark.context import get_active_session
    
    """
    The available models are subject to change. Check the model availability for the REST API:
    https://docs.snowflake.com/en/user-guide/snowflake-cortex/cortex-llm-rest-api#model-availability
    """
    MODELS = [
     "mistral-large2",
     "llama3.1-70b",
     "llama3.1-8b",
    ]
    
    def init_messages():
     """
     Initialize the session state for chat messages. If the session state indicates that the
     conversation should be cleared or if the "messages" key is not in the session state,
     initialize it as an empty list.
     """
     if st.session_state.clear_conversation or "messages" not in st.session_state:
         st.session_state.messages = []
    
    
    def init_service_metadata():
     """
     Initialize the session state for cortex search service metadata. Query the available
     cortex search services from the Snowflake session and store their names and search
     columns in the session state.
     """
     if "service_metadata" not in st.session_state:
         services = session.sql("SHOW CORTEX SEARCH SERVICES;").collect()
         service_metadata = []
         if services:
             for s in services:
                 svc_name = s["name"]
                 svc_search_col = session.sql(
                     f"DESC CORTEX SEARCH SERVICE {svc_name};"
                 ).collect()[0]["search_column"]
                 service_metadata.append(
                     {"name": svc_name, "search_column": svc_search_col}
                 )
    
         st.session_state.service_metadata = service_metadata
    
    
    def init_config_options():
     """
     Initialize the configuration options in the Streamlit sidebar. Allow the user to select
     a cortex search service, clear the conversation, toggle debug mode, and toggle the use of
     chat history. Also provide advanced options to select a model, the number of context chunks,
     and the number of chat messages to use in the chat history.
     """
     st.sidebar.selectbox(
         "Select cortex search service:",
         [s["name"] for s in st.session_state.service_metadata],
         key="selected_cortex_search_service",
     )
    
     st.sidebar.button("Clear conversation", key="clear_conversation")
     st.sidebar.toggle("Debug", key="debug", value=False)
     st.sidebar.toggle("Use chat history", key="use_chat_history", value=True)
    
     with st.sidebar.expander("Advanced options"):
         st.selectbox("Select model:", MODELS, key="model_name")
         st.number_input(
             "Select number of context chunks",
             value=5,
             key="num_retrieved_chunks",
             min_value=1,
             max_value=10,
         )
         st.number_input(
             "Select number of messages to use in chat history",
             value=5,
             key="num_chat_messages",
             min_value=1,
             max_value=10,
         )
    
     st.sidebar.expander("Session State").write(st.session_state)
    
    
    def query_cortex_search_service(query, columns = [], filter={}):
     """
     Query the selected cortex search service with the given query and retrieve context documents.
     Display the retrieved context documents in the sidebar if debug mode is enabled. Return the
     context documents as a string.
    
     Args:
         query (str): The query to search the cortex search service with.
    
     Returns:
         str: The concatenated string of context documents.
     """
     db, schema = session.get_current_database(), session.get_current_schema()
    
     cortex_search_service = (
         root.databases[db]
         .schemas[schema]
         .cortex_search_services[st.session_state.selected_cortex_search_service]
     )
    
     context_documents = cortex_search_service.search(
         query, columns=columns, filter=filter, limit=st.session_state.num_retrieved_chunks
     )
     results = context_documents.results
    
     service_metadata = st.session_state.service_metadata
     search_col = [s["search_column"] for s in service_metadata
                     if s["name"] == st.session_state.selected_cortex_search_service][0].lower()
    
     context_str = ""
     for i, r in enumerate(results):
         context_str += f"Context document {i+1}: {r[search_col]} \n" + "\n"
    
     if st.session_state.debug:
         st.sidebar.text_area("Context documents", context_str, height=500)
    
     return context_str, results
    
    
    def get_chat_history():
     """
     Retrieve the chat history from the session state limited to the number of messages specified
     by the user in the sidebar options.
    
     Returns:
         list: The list of chat messages from the session state.
     """
     start_index = max(
         0, len(st.session_state.messages) - st.session_state.num_chat_messages
     )
     return st.session_state.messages[start_index : len(st.session_state.messages) - 1]
    
    
    def complete(model, prompt):
     """
     Generate a completion for the given prompt using the specified model.
    
     Args:
         model (str): The name of the model to use for completion.
         prompt (str): The prompt to generate a completion for.
    
     Returns:
         str: The generated completion.
     """
     return Complete(model, prompt).replace("$", "\$")
    
    
    def make_chat_history_summary(chat_history, question):
     """
     Generate a summary of the chat history combined with the current question to extend the query
     context. Use the language model to generate this summary.
    
     Args:
         chat_history (str): The chat history to include in the summary.
         question (str): The current user question to extend with the chat history.
    
     Returns:
         str: The generated summary of the chat history and question.
     """
     prompt = f"""
         [INST]
         Based on the chat history below and the question, generate a query that extend the question
         with the chat history provided. The query should be in natural language.
         Answer with only the query. Do not add any explanation.
    
         <chat_history>
         {chat_history}
         </chat_history>
         <question>
         {question}
         </question>
         [/INST]
     """
    
     summary = complete(st.session_state.model_name, prompt)
    
     if st.session_state.debug:
         st.sidebar.text_area(
             "Chat history summary", summary.replace("$", "\$"), height=150
         )
    
     return summary
    
    
    def create_prompt(user_question):
     """
     Create a prompt for the language model by combining the user question with context retrieved
     from the cortex search service and chat history (if enabled). Format the prompt according to
     the expected input format of the model.
    
     Args:
         user_question (str): The user's question to generate a prompt for.
    
     Returns:
         str: The generated prompt for the language model.
     """
     if st.session_state.use_chat_history:
         chat_history = get_chat_history()
         if chat_history != []:
             question_summary = make_chat_history_summary(chat_history, user_question)
             prompt_context, results = query_cortex_search_service(
                 question_summary,
                 columns=["chunk", "file_url", "relative_path"],
                 filter={"@and": [{"@eq": {"language": "English"}}]},
             )
         else:
             prompt_context, results = query_cortex_search_service(
                 user_question,
                 columns=["chunk", "file_url", "relative_path"],
                 filter={"@and": [{"@eq": {"language": "English"}}]},
             )
     else:
         prompt_context, results = query_cortex_search_service(
             user_question,
             columns=["chunk", "file_url", "relative_path"],
             filter={"@and": [{"@eq": {"language": "English"}}]},
         )
         chat_history = ""
    
     prompt = f"""
             [INST]
             You are a helpful AI chat assistant with RAG capabilities. When a user asks you a question,
             you will also be given context provided between <context> and </context> tags. Use that context
             with the user's chat history provided in the between <chat_history> and </chat_history> tags
             to provide a summary that addresses the user's question. Ensure the answer is coherent, concise,
             and directly relevant to the user's question.
    
             If the user asks a generic question which cannot be answered with the given context or chat_history,
             just say "I don't know the answer to that question.
    
             Don't saying things like "according to the provided context".
    
             <chat_history>
             {chat_history}
             </chat_history>
             <context>
             {prompt_context}
             </context>
             <question>
             {user_question}
             </question>
             [/INST]
             Answer:
             """
     return prompt, results
    
    
    def main():
     st.title(f":speech_balloon: Chatbot with Snowflake Cortex")
    
     init_service_metadata()
     init_config_options()
     init_messages()
    
     icons = {"assistant": "❄️", "user": "👤"}
    
     # Display chat messages from history on app rerun
     for message in st.session_state.messages:
         with st.chat_message(message["role"], avatar=icons[message["role"]]):
             st.markdown(message["content"])
    
     disable_chat = (
         "service_metadata" not in st.session_state
         or len(st.session_state.service_metadata) == 0
     )
     if question := st.chat_input("Ask a question...", disabled=disable_chat):
         # Add user message to chat history
         st.session_state.messages.append({"role": "user", "content": question})
         # Display user message in chat message container
         with st.chat_message("user", avatar=icons["user"]):
             st.markdown(question.replace("$", "\$"))
    
         # Display assistant response in chat message container
         with st.chat_message("assistant", avatar=icons["assistant"]):
             message_placeholder = st.empty()
             question = question.replace("'", "")
             prompt, results = create_prompt(question)
             with st.spinner("Thinking..."):
                 generated_response = complete(
                     st.session_state.model_name, prompt
                 )
                 # build references table for citation
                 markdown_table = "###### References \n\n| PDF Title | URL |\n|-------%-----%\n"
                 for ref in results:
                     markdown_table += f"| {ref['relative_path']} | [Link]({ref['file_url']}) |\n"
                 message_placeholder.markdown(generated_response + "\n\n" + markdown_table)
    
         st.session_state.messages.append(
             {"role": "assistant", "content": generated_response}
         )
    
    
    if __name__ == "__main__":
     session = get_active_session()
     root = Root(session)
     main()

第 6 步:试用应用程序

In the right pane of the Streamlit in Snowflake editor window, you’ll see a preview of your Streamlit app. It should look similar to the following screenshot:

Chat with PDF files in Streamlit UI

在文本框中输入查询,试用新应用程序。您可尝试的一些示例查询如下:

  • Example session 1: multi-turn question-answering
    • How was gpd growth in q4 23?
    • How was unemployment in the same quarter?
  • Example session 2: summarizing multiple documents
    • How has the fed's view of the market change over the course of 2024?
  • Example session 3: abstaining when the documents don’t contain the right answer
    • What was janet yellen's opinion about 2024 q1?

第 7 步:清理

清理(可选)

Execute the following DROP <object> commands to return your system to its state before you began the tutorial:

DROP DATABASE IF EXISTS cortex_search_tutorial_db;
DROP WAREHOUSE IF EXISTS cortex_search_tutorial_wh;

删除数据库会自动移除所有子数据库对象,例如表。

后续步骤

恭喜!您已成功在 Snowflake 中通过一组 PDF 文件构建了一个搜索应用程序。

其他资源

您可以利用以下资源继续学习: