Streamlit in Snowflake 日志记录和跟踪

Streamlit in Snowflake 支持仓库和容器运行时的日志记录。仓库运行时使用 Snowflake 遥测框架将日志消息和跟踪事件捕获到事件表中。容器运行时捕获您的应用程序发送到标准输出和标准错误的日志,将其存储在账户的事件表中,并在 Snowsight 中提供实时控制台日志和历史日志视图。

两个运行时都将日志存储在账户级事件表中。账户管理员必须先设置和配置此事件表,然后才能捕获日志。有关说明,请参阅 事件表概述

  • 要查找为您的账户配置的事件表,请运行:

    SHOW PARAMETERS LIKE 'event_table' IN ACCOUNT;
    

下表按运行时比较了日志记录和跟踪支持:

功能

仓库运行时

Container runtime

事件表日志记录

支持

支持

跟踪

支持

不支持

Snowsight 中的实时控制台日志

不支持

支持

Snowsight 中的历史日志

不支持

支持

容器运行时日志记录

容器运行时 Streamlit 应用程序在 Snowpark Container Services 容器内运行。Snowflake 会自动捕获您的应用程序发送到标准输出和标准错误的任何内容,并将其存储在账户的事件表中。您可以在 Snowsight 中查看这些日志或使用 SQL 对其进行查询。

Python 的日志记录模块

使用 Python 的内置 logging 模块从您的应用程序发出日志消息。以下示例配置了一个记录器,该记录器将 INFO 级别及更高级别的消息写入标准输出:

import logging
import sys

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(name)s: %(message)s",
    stream=sys.stdout,
)

LOGGER = logging.getLogger("my_app")

按照严重性从轻到重的顺序,Python 具有以下日志记录级别:

  • DEBUG

  • INFO

  • WARNING

  • ERROR

将级别设置为 INFO 会捕获 INFO、WARNING 和 ERROR 消息,但不会捕获 DEBUG 消息。

备注

默认情况下,Python 的 logging 模块写入标准错误 (sys.stderr)。Snowflake 会同时捕获标准输出和标准错误,因此无论您使用哪种流,系统都会捕获日志。将流设置为 sys.stdout 是可选的,但建议这样做,因为按照惯例,标准错误是为错误输出保留的。

配置记录器后,您可以使用它在整个应用程序代码中记录消息。通常在单独的模块中定义记录器,然后将其导入您的应用程序代码:

source_directory/
├── my_logger.py
├── pyproject.toml
└── streamlit_app.py
import streamlit as st
from my_logger import LOGGER

LOGGER.info("Home page loaded")
st.title("My App")

if st.button("Run analysis"):
    LOGGER.info("Analysis button clicked")
    try:
        result = run_analysis()
        LOGGER.info("Analysis completed successfully")
    except Exception as e:
        LOGGER.error("Analysis failed: %s", e)
        st.error("Analysis failed: %s", e)

Snowsight 中的实时日志

当您在 Snowsight 中编辑容器运行时应用程序时,编辑器下方会出现一个日志窗格。此窗格会在您的应用程序发出日志消息时实时流式传输这些消息。首次连接时,系统会显示最近日志的简短历史记录。

每个日志条目会显示以下信息:

描述

Source

APP 用于来自您的 Streamlit 过程和用户配置记录器的日志,或 MANAGER 用于来自管理容器的系统过程的日志。

Level

日志消息的严重性级别(DEBUG、INFO、WARNING、ERROR)。

Message

日志消息内容。

可用的实时日志操作

在日志窗格的右上角,您可以搜索和筛选日志,以帮助您找到所需的信息。这包括文本搜索、按来源筛选以及按严重性级别筛选。您可以在三点菜单中,下载当前日志、导航到历史日志或清除实时日志记录窗格。当您清除窗格时,当前日志将从当前视图中删除,但不会从事件表中删除。立即重新加载页面将恢复最近的日志。

了解日志来源

来自容器运行时应用程序的日志有以下两个来源之一:

  • MANAGER:选择使用 时默认使用的角色和仓库。容器内部准备并运行您的应用程序的系统过程。管理器日志包括有关从暂存区下载应用程序文件、安装 Python 依赖项以及启动 Streamlit 服务器过程的消息。如果在应用程序运行时更新应用程序的依赖项文件,管理器过程将重新安装依赖项并生成额外的管理器日志。

  • APP:选择使用 时默认使用的角色和仓库。来自运行中的 Streamlit 服务器过程的日志。这包括来自您用户配置的 Python 记录器、Streamlit 内置记录器的消息,以及您的应用程序写入标准输出或标准错误的任何其他输出。

来源之间的边界是 streamlit run 命令。容器在启动 Streamlit 过程之前执行的所有操作都会产生 MANAGER 日志。Streamlit 过程启动后,该过程的输出将产生 APP 日志。

查看 Snowsight 中的历史日志

以下步骤仅适用于容器运行时应用程序。仓库运行时应用程序没有日志窗格。

  1. 登录 Snowsight

  2. 在导航菜单中选择 Projects » Streamlit,然后选择应用程序。

  3. 在页面的右上角,选择 Edit

  4. 在日志窗格的右上角,选择三点菜单 (Other actions) » Historical logs

这将打开为您应用程序运行的后台服务的 Snowpark Container Services 监控页面。日志表显示以下列:

描述

时间戳

日志消息的时间戳。

实例 ID

容器实例的标识符。对于 Streamlit 应用程序,这始终是 0

容器

容器实例的标识符。

日志是发送到标准输出 (stdout) 还是标准错误 (stderr)。

包含 "level""message""source""timestamp" 字段的 JSON 格式日志消息。

有关监控页面的更多信息,请参阅 Snowpark Container Services:监控服务

使用 SQL 查询日志

您可以直接查询事件表以分析容器运行时应用程序的日志。以下查询从特定的 Streamlit 应用程序检索日志:

SELECT
    TIMESTAMP,
    RECORD['severity_text']::VARCHAR AS level,
    VALUE::VARCHAR AS message,
    RESOURCE_ATTRIBUTES['snow.database.name']::VARCHAR AS database_name,
    RESOURCE_ATTRIBUTES['snow.schema.name']::VARCHAR AS schema_name,
    RESOURCE_ATTRIBUTES['snow.executable.name']::VARCHAR AS app_name,
    RECORD_ATTRIBUTES['log.iostream']::VARCHAR AS stream
FROM <event_table>
WHERE RESOURCE_ATTRIBUTES['snow.database.name'] = '<database_name>'
  AND RESOURCE_ATTRIBUTES['snow.schema.name'] = '<schema_name>'
  AND RESOURCE_ATTRIBUTES['snow.executable.name'] = '<app_name>'
  AND RECORD_TYPE = 'LOG'
  AND TIMESTAMP > DATEADD(hour, -1, CURRENT_TIMESTAMP())
ORDER BY TIMESTAMP DESC
LIMIT 100;

<event_table> 替换为 SHOW PARAMETERS 命令返回的事件表名称,并将 <database_name><schema_name><app_name> 替换为您的 Streamlit 应用程序的值。

小技巧

在您的事件表查询中包含 TIMESTAMP 筛选器以提高性能。事件表可能包含来自各种 Snowflake 组件的海量数据。

有关事件表列的更多信息,请参阅 事件表列

仓库运行时日志记录

对于使用仓库运行时的 Streamlit 应用程序,您可以在 Streamlit 应用程序代码运行时捕获日志消息并跟踪事件,然后使用 SQL 分析结果(例如分析错误)。有关更多信息,请参阅 日志记录、跟踪和指标

仓库运行时需要在包含您应用程序的数据库上设置日志和跟踪级别:

-- Set the log level for the database containing your app
ALTER DATABASE <database_name> SET LOG_LEVEL = INFO;

-- Set the trace level for the database containing your app
ALTER DATABASE <database_name> SET TRACE_LEVEL = ON_EVENT;

示例:从仓库运行时应用程序记录日志

import logging
import streamlit as st

logger = logging.getLogger("simple_logger")

# Write directly to the app
st.title("Simple Logging Example")

# Get the current credentials
session = st.connection('snowflake').session()

def get_log_messages_query() -> str:
    return """
            SELECT
                TIMESTAMP,
                RECORD:"severity_text"::VARCHAR AS SEVERITY,
                RESOURCE_ATTRIBUTES:"db.user"::VARCHAR AS USER,
                VALUE::VARCHAR AS VALUE
            FROM
                SAMPLE_EVENTS
            WHERE
                SCOPE:"name" = 'simple_logger'
            ORDER BY
                TIMESTAMP DESC;
            """

button = st.button("Log a message")

if button:
    try:
        logger.info("Logging an info message through Streamlit App.")
        st.success('Logged a message')
    except Exception as e:
        logger.error("Logging an error message through Streamlit App: %s",e)
        st.error('Logged an error')

sql = get_log_messages_query()

df = session.sql(sql).to_pandas()

with st.expander("**Show All Messages**"):
     st.dataframe(df, use_container_width=True)

跟踪(仅限仓库运行时)

仅仓库运行时支持跟踪。您可以从 Streamlit 应用程序发出跟踪事件,然后查询事件表以对其进行分析。

备注

以下示例需要安装 snowflake-telemetry-python 包。有关更多信息,请参阅 添加对遥测包的支持

import streamlit as st
import time
import random
from snowflake import telemetry

def sleep_function() -> int:
    random_time = random.randint(1, 10)
    time.sleep(random_time)
    return random_time

def get_trace_messages_query() -> str:
    return """
            SELECT
                TIMESTAMP,
                RESOURCE_ATTRIBUTES :"db.user" :: VARCHAR AS USER,
                RECORD_TYPE,
                RECORD_ATTRIBUTES
            FROM
                SAMPLE_EVENTS
            WHERE
                RECORD :"name" :: VARCHAR = 'tracing_some_data'
                OR RECORD_ATTRIBUTES :"logging_demo.tracing" :: VARCHAR = 'begin_span'
            ORDER BY
                TIMESTAMP DESC;
            """

def trace_message() -> None:
    execution_time = sleep_function()
    telemetry.set_span_attribute("logging_demo.tracing", "begin_span")
    telemetry.add_event(
        "tracing_some_data",
        {"function_name": "sleep_function", "execution_time": execution_time},
    )

# Write directly to the app
st.title("Simple Tracing Example")

# Get the current credentials
session = st.connection('snowflake').session()

button = st.button("Add trace event")

if button:
    with st.spinner("Executing function..."):
        trace_message()
        st.toast("Successfully log a trace message!", icon="✅")

sql = get_trace_messages_query()

df = session.sql(sql).to_pandas()

with st.expander("**Show All Trace Messages**"):
     st.dataframe(df, use_container_width=True)