在 Python 中发出跟踪事件

您可以使用 Snowflake telemetry 包通过用 Python 编写的函数或过程处理程序发出跟踪事件。` Anaconda Snowflake 通道 <https://repo.anaconda.com/pkgs/snowflake (https://repo.anaconda.com/pkgs/snowflake)>`_ 提供包。

您可以通过对事件表执行 SELECT 命令,来访问存储的跟踪事件数据。有关更多信息,请参阅 访问跟踪数据

备注

有关添加跟踪事件时要牢记的准则,请参阅 添加跟踪事件的一般准则

有关在 Snowflake 中设置日志记录和检索消息的一般信息,请参阅 来自函数和过程的日志记录消息

通过代码记录之前,您必须:

  • 设置事件表来收集通过处理程序代码记录的消息。

    有关更多信息,请参阅 设置事件表

  • 请确保已设置跟踪级别,以便将所需数据存储在事件表中。

    有关更多信息,请参阅 设置跟踪级别

备注

有关添加跟踪事件时要牢记的准则,请参阅 添加跟踪事件的一般准则

添加对遥测包的支持

要使用遥测包,必须使 Snowflake 附带的开源 ` Snowflake 遥测包 <https://github.com/snowflakedb/snowflake-telemetry-python (https://github.com/snowflakedb/snowflake-telemetry-python)>`_ 可供处理程序代码使用。` Anaconda Snowflake 通道 <https://repo.anaconda.com/pkgs/snowflake (https://repo.anaconda.com/pkgs/snowflake)>`_ 提供包。

  • 对于过程或函数。 在 CREATE PROCEDURE 或 CREATE FUNCTION 语句中的 PACKAGES 子句中包含 snowflake-telemetry-python 包。PACKAGES 子句会向代码提供包含的 Snowflake 遥测包。

  • 对于 Streamlit 应用程序。 您可使用 Snowsight 或 environment.yml. 文件将 snowflake-telemetry-python 包添加到您的应用程序中。

    以下示例中的代码使用 PACKAGES 子句引用遥测包以及 Snowpark Library(这是用 Python 编写的存储过程所必需的。有关更多信息,请参阅 使用 Python 编写存储过程)。

    CREATE OR REPLACE FUNCTION my_function(...)
      RETURNS ...
      LANGUAGE PYTHON
      ...
      PACKAGES = ('snowflake-telemetry-python')
      ...
    
    Copy
  • 使用您的代码导入 telemetry 包。

    from snowflake import telemetry
    
    Copy

添加跟踪事件

您可以通过调用 telemetry.add_event 方法添加跟踪事件,传递事件的名称。您还可以选择将属性(键值对)与事件相关联。

add_event 方法以以下形式提供:

telemetry.add_event(<name>, <attributes>)
Copy

其中

  • name 是一个 Python 字符串,用于指定跟踪事件的名称。

  • attributes 是一个 `OpenTelemetry Attributes 对象 `_ ,用于指定此跟踪事件的属性。此实参是可选的。如果没有要为此跟踪事件指定的任何属性,请省略该实参。

以下示例中的处理程序代码添加了两个事件: FunctionEmptyEventFunctionEventWithAttributes。通过 FunctionEventWithAttributes,该代码还添加了两个属性:key1key2

telemetry.add_event("FunctionEmptyEvent")
telemetry.add_event("FunctionEventWithAttributes", {"key1": "value1", "key2": "value2"})
Copy

添加这些事件会在事件表中添加两行,每行在列中 RECORD 具有不同的值:

{
  "name": "FunctionEmptyEvent"
}
Copy
{
  "name": "FunctionEventWithAttributes"
}
Copy

FunctionEventWithAttributes 事件行包含行的 RECORD_ATTRIBUTES 列中的以下属性:

{
  "key1": "value1",
  "key2": "value2"
}
Copy

添加 span 属性

您可以通过调用 telemetry.set_span_attribute 方法来设置与 span 关联的属性(键值对)。

有关 Span 的详细信息,请参阅 Snowflake 如何表示跟踪事件

set_span_attribute 方法以以下形式提供:

telemetry.set_span_attribute(<key>, <value>)
Copy

其中:

  • key 是一个 Python 字符串,用于指定属性的键。

  • value 是指定属性值的 `OpenTelemetry AttributeValue 对象 `_ 。

以下示例中的代码创建四个属性并设置其值:

// Setting span attributes.
telemetry.set_span_attribute("example.boolean", true);
telemetry.set_span_attribute("example.long", 2);
telemetry.set_span_attribute("example.double", 2.5);
telemetry.set_span_attribute("example.string", "testAttribute");
Copy

设置这些属性会导致事件表的 RECORD_ATTRIBUTES 列中出现以下内容:

{
  "example.boolean": true,
  "example.long": 2,
  "example.double": 2.5,
  "example.string": "testAttribute"
}
Copy

Python 示例

以下各节提供了从 Python 代码中为跟踪事件添加支持的示例。

存储过程示例

CREATE OR REPLACE PROCEDURE do_tracing()
RETURNS VARIANT
LANGUAGE PYTHON
PACKAGES=('snowflake-snowpark-python', 'snowflake-telemetry-python')
RUNTIME_VERSION=3.8
HANDLER='run'
AS $$
from snowflake import telemetry
def run(session):
  telemetry.set_span_attribute("example.proc.do_tracing", "begin")
  telemetry.add_event("event_with_attributes", {"example.key1": "value1", "example.key2": "value2"})
  return "SUCCESS"
$$;
Copy

Streamlit 示例

import streamlit as st
from snowflake import telemetry

st.title("Streamlit trace event example")

hifives_val = st.slider("Number of high-fives", min_value=0, max_value=90, value=60)

if st.button("Submit"):
    telemetry.add_event("new_submission", {"high_fives": hifives_val})
Copy

UDF 示例

CREATE OR REPLACE FUNCTION times_two(x number)
RETURNS NUMBER
LANGUAGE PYTHON
PACKAGES=('snowflake-telemetry-python')
RUNTIME_VERSION=3.8
HANDLER = 'times_two'
AS $$
from snowflake import telemetry
def times_two(x):
  telemetry.set_span_attribute("example.func.times_two", "begin")
  telemetry.add_event("event_without_attributes")
  telemetry.add_event("event_with_attributes", {"example.key1": "value1", "example.key2": "value2"})

  response = 2 * x

  telemetry.set_span_attribute("example.func.times_two.response", response)

  return response
$$;
Copy

当您通过处理输入行的 Python 函数调用跟踪事件 API 时,将为 UDF 处理的 每一行 调用该 API。

例如,以下语句对 50 行调用上一示例中定义的 Python 函数,从而产生 100 个跟踪事件(每行两个):

select count(times_two(seq8())) from table(generator(rowcount => 50));
Copy

UDTF 示例

CREATE OR REPLACE FUNCTION digits_of_number(input number)
RETURNS TABLE(result number)
LANGUAGE PYTHON
PACKAGES=('snowflake-telemetry-python')
RUNTIME_VERSION=3.8
HANDLER = 'TableFunctionHandler'
AS $$
from snowflake import telemetry

class TableFunctionHandler:

  def __init__(self):
    telemetry.add_event("test_udtf_init")

  def process(self, input):
    telemetry.add_event("test_udtf_process", {"input": str(input)})
    response = input

    while input > 0:
      response = input % 10
      input /= 10
      yield (response,)

  def end_partition(self):
    telemetry.add_event("test_udtf_end_partition")
$$;
Copy

在 UDTF 处理程序类的 process() 方法中调用跟踪事件 API 时,将为处理的 每一行 调用 API。

例如,以下语句对 50 行调用上一示例中定义的 process() 方法,从而产生 100 个由 process() 方法添加的跟踪事件(每行两个):

select * from table(generator(rowcount => 50)), table(digits_of_number(seq8())) order by 1;
Copy
语言: 中文