在 Python 中发出跟踪事件¶
您可以使用 Snowflake telemetry
包通过用 Python 编写的函数或过程处理程序发出跟踪事件。` Anaconda Snowflake 通道 <https://repo.anaconda.com/pkgs/snowflake (https://repo.anaconda.com/pkgs/snowflake)>`_ 提供包。
您可以通过对事件表执行 SELECT 命令,来访问存储的跟踪事件数据。有关更多信息,请参阅 访问跟踪数据。
备注
有关添加跟踪事件时要牢记的准则,请参阅 添加跟踪事件的一般准则。
有关在 Snowflake 中设置日志记录和检索消息的一般信息,请参阅 来自函数和过程的日志记录消息。
通过代码记录之前,您必须:
备注
有关添加跟踪事件时要牢记的准则,请参阅 添加跟踪事件的一般准则。
添加对遥测包的支持¶
要使用遥测包,必须使 Snowflake 附带的开源 ` Snowflake 遥测包 <https://github.com/snowflakedb/snowflake-telemetry-python (https://github.com/snowflakedb/snowflake-telemetry-python)>`_ 可供处理程序代码使用。` Anaconda Snowflake 通道 <https://repo.anaconda.com/pkgs/snowflake (https://repo.anaconda.com/pkgs/snowflake)>`_ 提供包。
对于过程或函数。 在 CREATE PROCEDURE 或 CREATE FUNCTION 语句中的 PACKAGES 子句中包含
snowflake-telemetry-python
包。PACKAGES 子句会向代码提供包含的 Snowflake 遥测包。对于 Streamlit 应用程序。 您可使用 Snowsight 或
environment.yml.
文件将snowflake-telemetry-python
包添加到您的应用程序中。以下示例中的代码使用 PACKAGES 子句引用遥测包以及 Snowpark Library(这是用 Python 编写的存储过程所必需的。有关更多信息,请参阅 使用 Python 编写存储过程)。
CREATE OR REPLACE FUNCTION my_function(...) RETURNS ... LANGUAGE PYTHON ... PACKAGES = ('snowflake-telemetry-python') ...
使用您的代码导入
telemetry
包。from snowflake import telemetry
添加跟踪事件¶
您可以通过调用 telemetry.add_event
方法添加跟踪事件,传递事件的名称。您还可以选择将属性(键值对)与事件相关联。
add_event
方法以以下形式提供:
telemetry.add_event(<name>, <attributes>)
其中
以下示例中的处理程序代码添加了两个事件: FunctionEmptyEvent
和 FunctionEventWithAttributes
。通过 FunctionEventWithAttributes
,该代码还添加了两个属性:key1
和 key2
。
telemetry.add_event("FunctionEmptyEvent")
telemetry.add_event("FunctionEventWithAttributes", {"key1": "value1", "key2": "value2"})
添加这些事件会在事件表中添加两行,每行在列中 RECORD 具有不同的值:
{
"name": "FunctionEmptyEvent"
}
{
"name": "FunctionEventWithAttributes"
}
FunctionEventWithAttributes
事件行包含行的 RECORD_ATTRIBUTES 列中的以下属性:
{
"key1": "value1",
"key2": "value2"
}
添加 span 属性¶
您可以通过调用 telemetry.set_span_attribute
方法来设置与 span 关联的属性(键值对)。
有关 Span 的详细信息,请参阅 Snowflake 如何表示跟踪事件。
set_span_attribute
方法以以下形式提供:
telemetry.set_span_attribute(<key>, <value>)
其中:
以下示例中的代码创建四个属性并设置其值:
// Setting span attributes.
telemetry.set_span_attribute("example.boolean", true);
telemetry.set_span_attribute("example.long", 2);
telemetry.set_span_attribute("example.double", 2.5);
telemetry.set_span_attribute("example.string", "testAttribute");
设置这些属性会导致事件表的 RECORD_ATTRIBUTES 列中出现以下内容:
{
"example.boolean": true,
"example.long": 2,
"example.double": 2.5,
"example.string": "testAttribute"
}
Python 示例¶
以下各节提供了从 Python 代码中为跟踪事件添加支持的示例。
存储过程示例¶
CREATE OR REPLACE PROCEDURE do_tracing()
RETURNS VARIANT
LANGUAGE PYTHON
PACKAGES=('snowflake-snowpark-python', 'snowflake-telemetry-python')
RUNTIME_VERSION=3.8
HANDLER='run'
AS $$
from snowflake import telemetry
def run(session):
telemetry.set_span_attribute("example.proc.do_tracing", "begin")
telemetry.add_event("event_with_attributes", {"example.key1": "value1", "example.key2": "value2"})
return "SUCCESS"
$$;
Streamlit 示例¶
import streamlit as st
from snowflake import telemetry
st.title("Streamlit trace event example")
hifives_val = st.slider("Number of high-fives", min_value=0, max_value=90, value=60)
if st.button("Submit"):
telemetry.add_event("new_submission", {"high_fives": hifives_val})
UDF 示例¶
CREATE OR REPLACE FUNCTION times_two(x number)
RETURNS NUMBER
LANGUAGE PYTHON
PACKAGES=('snowflake-telemetry-python')
RUNTIME_VERSION=3.8
HANDLER = 'times_two'
AS $$
from snowflake import telemetry
def times_two(x):
telemetry.set_span_attribute("example.func.times_two", "begin")
telemetry.add_event("event_without_attributes")
telemetry.add_event("event_with_attributes", {"example.key1": "value1", "example.key2": "value2"})
response = 2 * x
telemetry.set_span_attribute("example.func.times_two.response", response)
return response
$$;
当您通过处理输入行的 Python 函数调用跟踪事件 API 时,将为 UDF 处理的 每一行 调用该 API。
例如,以下语句对 50 行调用上一示例中定义的 Python 函数,从而产生 100 个跟踪事件(每行两个):
select count(times_two(seq8())) from table(generator(rowcount => 50));
UDTF 示例¶
CREATE OR REPLACE FUNCTION digits_of_number(input number)
RETURNS TABLE(result number)
LANGUAGE PYTHON
PACKAGES=('snowflake-telemetry-python')
RUNTIME_VERSION=3.8
HANDLER = 'TableFunctionHandler'
AS $$
from snowflake import telemetry
class TableFunctionHandler:
def __init__(self):
telemetry.add_event("test_udtf_init")
def process(self, input):
telemetry.add_event("test_udtf_process", {"input": str(input)})
response = input
while input > 0:
response = input % 10
input /= 10
yield (response,)
def end_partition(self):
telemetry.add_event("test_udtf_end_partition")
$$;
在 UDTF 处理程序类的 process()
方法中调用跟踪事件 API 时,将为处理的 每一行 调用 API。
例如,以下语句对 50 行调用上一示例中定义的 process()
方法,从而产生 100 个由 process()
方法添加的跟踪事件(每行两个):
select * from table(generator(rowcount => 50)), table(digits_of_number(seq8())) order by 1;