教程 3:使用 Cortex Search 构建 PDF 聊天机器人¶
简介¶
本教程介绍如何使用 Cortex Search 从 PDF 文档数据集中构建聊天机器人。在 教程 2 中,您学习了如何从提取自其源数据的文本数据中构建聊天机器人。本教程演示了如何使用基本 Python UDF 从 PDFs 中提取文本,然后将提取的数据引入 Cortex Search 服务的一个示例。
您将学习以下内容¶
使用 Python UDF 从暂存区内的一组 PDF 文件中提取文本。
从提取的文本中创建 Cortex Search 服务。
创建一个 Streamlit-in-Snowflake 聊天应用程序,让您可就提取自 PDF 文档的数据发起提问。
先决条件¶
要完成本教程,需要满足以下先决条件:
您拥有一个 Snowflake 账户和用户,该用户具有这样的角色:可授予创建数据库、表、虚拟仓库对象、Cortex Search 服务和 Streamlit 应用程序所需的权限。
请参阅 20 分钟学会使用 Snowflake 以获得满足这些要求的说明。
第 1 步:设置¶
获取 PDF 数据¶
您将使用联邦公开市场委员会 (FOMC) 会议记录的样本数据集。这是一个包含 12 份文件的样本,每份文件有 10 页,记录了 2023 年和 2024 年 FOMC 会议的会议记录。通过此链接直接从浏览器下载文件:
FOMC 会议记录样本 (link removed)
FOMC 会议记录全文可在 `US Federal Reserve(美联储)网站<https://www.federalreserve.gov/monetarypolicy/fomccalendars.htm>`_ 上查阅。
备注
在非教程设置中,您需要自带数据,这些数据可能已存储于 Snowflake 暂存区。
创建数据库、表和仓库¶
执行以下语句,创建本教程所需的数据库和虚拟仓库。完成本教程后,您可以删除这些对象。
CREATE DATABASE IF NOT EXISTS cortex_search_tutorial_db;
CREATE OR REPLACE WAREHOUSE cortex_search_tutorial_wh WITH
WAREHOUSE_SIZE='X-SMALL'
AUTO_SUSPEND = 120
AUTO_RESUME = TRUE
INITIALLY_SUSPENDED=TRUE;
USE WAREHOUSE cortex_search_tutorial_wh;
备注
CREATE DATABASE
语句创建一个数据库。数据库自动包含一个名为 PUBLIC 的架构。CREATE WAREHOUSE
语句创建一个最初暂停的仓库。
第 2 步:将数据加载到 Snowflake¶
首先创建一个 Snowflake 暂存区,用于存储包含数据的文件。此暂存区将保存会议记录 PDF 文件。
CREATE OR REPLACE STAGE cortex_search_tutorial_db.public.fomc
DIRECTORY = (ENABLE = TRUE)
ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE');
备注
配置目录和加密,用于生成文件的 presigned_url。如果不需要生成 presigned_url,可以跳过这些配置。
立即上传数据集。您可在 Snowsight 中或使用 SQL 上传数据集。要在 Snowsight 中上传,请执行以下操作:
登录 Snowsight。
在左侧导航菜单中选择 Data。
选择数据库
cortex_search_tutorial_db
。选择架构
public
。选择 Stages,并选择
fomc
。在右上角,选择 + Files 按钮。
将文件拖放进 UI,或选择 Browse,以从对话框窗口中选择一个文件。
选择 Upload 上传文件。
第 3 步:解析 PDF 文件¶
创建一个预处理函数,来执行以下操作:
解析 PDF 文件并提取文本。
将文本分成小块,以便编制索引。
CREATE OR REPLACE FUNCTION cortex_search_tutorial_db.public.pdf_text_chunker(file_url STRING)
RETURNS TABLE (chunk VARCHAR)
LANGUAGE PYTHON
RUNTIME_VERSION = '3.9'
HANDLER = 'pdf_text_chunker'
PACKAGES = ('snowflake-snowpark-python', 'PyPDF2', 'langchain')
AS
$$
from snowflake.snowpark.types import StringType, StructField, StructType
from langchain.text_splitter import RecursiveCharacterTextSplitter
from snowflake.snowpark.files import SnowflakeFile
import PyPDF2, io
import logging
import pandas as pd
class pdf_text_chunker:
def read_pdf(self, file_url: str) -> str:
logger = logging.getLogger("udf_logger")
logger.info(f"Opening file {file_url}")
with SnowflakeFile.open(file_url, 'rb') as f:
buffer = io.BytesIO(f.readall())
reader = PyPDF2.PdfReader(buffer)
text = ""
for page in reader.pages:
try:
text += page.extract_text().replace('\n', ' ').replace('\0', ' ')
except:
text = "Unable to Extract"
logger.warn(f"Unable to extract from file {file_url}, page {page}")
return text
def process(self, file_url: str):
text = self.read_pdf(file_url)
text_splitter = RecursiveCharacterTextSplitter(
chunk_size = 2000, # Adjust this as needed
chunk_overlap = 300, # Overlap to keep chunks contextual
length_function = len
)
chunks = text_splitter.split_text(text)
df = pd.DataFrame(chunks, columns=['chunk'])
yield from df.itertuples(index=False, name=None)
$$;
然后创建一个表来保存 PDF 文件中的解析数据。
CREATE OR REPLACE TABLE cortex_search_tutorial_db.public.docs_chunks_table AS
SELECT
relative_path,
build_scoped_file_url(@cortex_search_tutorial_db.public.fomc, relative_path) AS file_url,
-- preserve file title information by concatenating relative_path with the chunk
CONCAT(relative_path, ': ', func.chunk) AS chunk,
'English' AS language
FROM
directory(@cortex_search_tutorial_db.public.fomc),
TABLE(cortex_search_tutorial_db.public.pdf_text_chunker(build_scoped_file_url(@cortex_search_tutorial_db.public.fomc, relative_path))) AS func;
第 4 步:创建搜索服务¶
运行以下 SQL 命令,在新表上创建搜索服务:
CREATE OR REPLACE CORTEX SEARCH SERVICE cortex_search_tutorial_db.public.fomc_meeting
ON chunk
ATTRIBUTES language
WAREHOUSE = cortex_search_tutorial_wh
TARGET_LAG = '1 hour'
AS (
SELECT
chunk,
relative_path,
file_url,
language
FROM cortex_search_tutorial_db.public.docs_chunks_table
);
该命令指定了 attributes
,即您可以用来筛选搜索结果的列,以及仓库和目标滞后。搜索列指定为 chunk
,在源查询中作为基表中若干文本列的连接生成。源查询中的其他列可以包含在搜索请求中。
第 5 步:创建一个 Streamlit 应用程序¶
您可使用 Python SDK(使用 snowflake
Python 包)来查询服务。本教程演示了如何在 Streamlit in Snowflake 应用程序中使用 Python SDK。
首先,确保全局 Snowsight UI 角色与服务创建步骤中用于创建服务的角色相同。
登录 Snowsight。
在左侧导航菜单中选择 Projects » Streamlit。
选择 + Streamlit App。
重要:选择应用程序位置的
cortex_search_tutorial_db
数据库和public
架构。在 Streamlit in Snowflake 编辑器的左侧窗格中,选择:ui:
Packages
,然后添加snowflake`(版本 >= 0.8.0)和 :code:`snowflake-ml-python
,将所需软件包安装到应用程序中。用以下 Streamlit 应用程序替换示例应用程序代码:
import streamlit as st from snowflake.core import Root # requires snowflake>=0.8.0 from snowflake.cortex import Complete from snowflake.snowpark.context import get_active_session """" The available models are subject to change. Check the model availability for the REST API: https://docs.snowflake.com/en/user-guide/snowflake-cortex/cortex-llm-rest-api#model-availability """" MODELS = [ "mistral-large2", "llama3.1-70b", "llama3.1-8b", ] def init_messages(): """ Initialize the session state for chat messages. If the session state indicates that the conversation should be cleared or if the "messages" key is not in the session state, initialize it as an empty list. """ if st.session_state.clear_conversation or "messages" not in st.session_state: st.session_state.messages = [] def init_service_metadata(): """ Initialize the session state for cortex search service metadata. Query the available cortex search services from the Snowflake session and store their names and search columns in the session state. """ if "service_metadata" not in st.session_state: services = session.sql("SHOW CORTEX SEARCH SERVICES;").collect() service_metadata = [] if services: for s in services: svc_name = s["name"] svc_search_col = session.sql( f"DESC CORTEX SEARCH SERVICE {svc_name};" ).collect()[0]["search_column"] service_metadata.append( {"name": svc_name, "search_column": svc_search_col} ) st.session_state.service_metadata = service_metadata def init_config_options(): """ Initialize the configuration options in the Streamlit sidebar. Allow the user to select a cortex search service, clear the conversation, toggle debug mode, and toggle the use of chat history. Also provide advanced options to select a model, the number of context chunks, and the number of chat messages to use in the chat history. """ st.sidebar.selectbox( "Select cortex search service:", [s["name"] for s in st.session_state.service_metadata], key="selected_cortex_search_service", ) st.sidebar.button("Clear conversation", key="clear_conversation") st.sidebar.toggle("Debug", key="debug", value=False) st.sidebar.toggle("Use chat history", key="use_chat_history", value=True) with st.sidebar.expander("Advanced options"): st.selectbox("Select model:", MODELS, key="model_name") st.number_input( "Select number of context chunks", value=5, key="num_retrieved_chunks", min_value=1, max_value=10, ) st.number_input( "Select number of messages to use in chat history", value=5, key="num_chat_messages", min_value=1, max_value=10, ) st.sidebar.expander("Session State").write(st.session_state) def query_cortex_search_service(query, columns = [], filter={}): """ Query the selected cortex search service with the given query and retrieve context documents. Display the retrieved context documents in the sidebar if debug mode is enabled. Return the context documents as a string. Args: query (str): The query to search the cortex search service with. Returns: str: The concatenated string of context documents. """ db, schema = session.get_current_database(), session.get_current_schema() cortex_search_service = ( root.databases[db] .schemas[schema] .cortex_search_services[st.session_state.selected_cortex_search_service] ) context_documents = cortex_search_service.search( query, columns=columns, filter=filter, limit=st.session_state.num_retrieved_chunks ) results = context_documents.results service_metadata = st.session_state.service_metadata search_col = [s["search_column"] for s in service_metadata if s["name"] == st.session_state.selected_cortex_search_service][0].lower() context_str = "" for i, r in enumerate(results): context_str += f"Context document {i+1}: {r[search_col]} \n" + "\n" if st.session_state.debug: st.sidebar.text_area("Context documents", context_str, height=500) return context_str, results def get_chat_history(): """ Retrieve the chat history from the session state limited to the number of messages specified by the user in the sidebar options. Returns: list: The list of chat messages from the session state. """ start_index = max( 0, len(st.session_state.messages) - st.session_state.num_chat_messages ) return st.session_state.messages[start_index : len(st.session_state.messages) - 1] def complete(model, prompt): """ Generate a completion for the given prompt using the specified model. Args: model (str): The name of the model to use for completion. prompt (str): The prompt to generate a completion for. Returns: str: The generated completion. """ return Complete(model, prompt).replace("$", "\$") def make_chat_history_summary(chat_history, question): """ Generate a summary of the chat history combined with the current question to extend the query context. Use the language model to generate this summary. Args: chat_history (str): The chat history to include in the summary. question (str): The current user question to extend with the chat history. Returns: str: The generated summary of the chat history and question. """ prompt = f""" [INST] Based on the chat history below and the question, generate a query that extend the question with the chat history provided. The query should be in natural language. Answer with only the query. Do not add any explanation. <chat_history> {chat_history} </chat_history> <question> {question} </question> [/INST] """ summary = complete(st.session_state.model_name, prompt) if st.session_state.debug: st.sidebar.text_area( "Chat history summary", summary.replace("$", "\$"), height=150 ) return summary def create_prompt(user_question): """ Create a prompt for the language model by combining the user question with context retrieved from the cortex search service and chat history (if enabled). Format the prompt according to the expected input format of the model. Args: user_question (str): The user's question to generate a prompt for. Returns: str: The generated prompt for the language model. """ if st.session_state.use_chat_history: chat_history = get_chat_history() if chat_history != []: question_summary = make_chat_history_summary(chat_history, user_question) prompt_context, results = query_cortex_search_service( question_summary, columns=["chunk", "file_url", "relative_path"], filter={"@and": [{"@eq": {"language": "English"}}]}, ) else: prompt_context, results = query_cortex_search_service( user_question, columns=["chunk", "file_url", "relative_path"], filter={"@and": [{"@eq": {"language": "English"}}]}, ) else: prompt_context, results = query_cortex_search_service( user_question, columns=["chunk", "file_url", "relative_path"], filter={"@and": [{"@eq": {"language": "English"}}]}, ) chat_history = "" prompt = f""" [INST] You are a helpful AI chat assistant with RAG capabilities. When a user asks you a question, you will also be given context provided between <context> and </context> tags. Use that context with the user's chat history provided in the between <chat_history> and </chat_history> tags to provide a summary that addresses the user's question. Ensure the answer is coherent, concise, and directly relevant to the user's question. If the user asks a generic question which cannot be answered with the given context or chat_history, just say "I don't know the answer to that question. Don't saying things like "according to the provided context". <chat_history> {chat_history} </chat_history> <context> {prompt_context} </context> <question> {user_question} </question> [/INST] Answer: """ return prompt, results def main(): st.title(f":speech_balloon: Chatbot with Snowflake Cortex") init_service_metadata() init_config_options() init_messages() icons = {"assistant": "❄️", "user": "👤"} # Display chat messages from history on app rerun for message in st.session_state.messages: with st.chat_message(message["role"], avatar=icons[message["role"]]): st.markdown(message["content"]) disable_chat = ( "service_metadata" not in st.session_state or len(st.session_state.service_metadata) == 0 ) if question := st.chat_input("Ask a question...", disabled=disable_chat): # Add user message to chat history st.session_state.messages.append({"role": "user", "content": question}) # Display user message in chat message container with st.chat_message("user", avatar=icons["user"]): st.markdown(question.replace("$", "\$")) # Display assistant response in chat message container with st.chat_message("assistant", avatar=icons["assistant"]): message_placeholder = st.empty() question = question.replace("'", "") prompt, results = create_prompt(question) with st.spinner("Thinking..."): generated_response = complete( st.session_state.model_name, prompt ) # build references table for citation markdown_table = "###### References \n\n| PDF Title | URL |\n|-------|-----|\n" for ref in results: markdown_table += f"| {ref['relative_path']} | [Link]({ref['file_url']}) |\n" message_placeholder.markdown(generated_response + "\n\n" + markdown_table) st.session_state.messages.append( {"role": "assistant", "content": generated_response} ) if __name__ == "__main__": session = get_active_session() root = Root(session) main()
第 6 步:试用应用程序¶
在 Streamlit in Snowflake 编辑器窗口的右侧窗格中,您会看到 Streamlit 应用程序的预览。此预览应与以下屏幕截图相似:

在文本框中输入查询,试用新应用程序。您可尝试的一些示例查询如下:
- 示例会话 1:多轮问答
How was gpd growth in q4 23?
How was unemployment in the same quarter?
- 示例会话 2:汇总多份文件
How has the fed's view of the market change over the course of 2024?
- 示例会话 3:文件没有正确答案时弃权
What was janet yellen's opinion about 2024 q1?
第 7 步:清理¶
清理(可选)¶
执行以下 DROP * <object>* 命令,将系统恢复到教程开始前的状态:
DROP DATABASE IF EXISTS cortex_search_tutorial_db;
DROP WAREHOUSE IF EXISTS cortex_search_tutorial_wh;
删除数据库会自动移除所有子数据库对象,例如表。
后续步骤¶
恭喜!您已成功在 Snowflake 中通过一组 PDF 文件构建了一个搜索应用程序。
其他资源¶
您可以利用以下资源继续学习: