分析 Python 过程处理程序代码

您可以使用内置的代码分析器了解执行处理程序代码所消耗的时间或内存。分析器可以生成信息,用以描述执行每行过程处理程序代码所消耗的时间或内存。

使用分析器可以生成报告,每次专注于以下其中一个方面:

  • 每行代码的执行时间,报告中显示每行代码被执行的次数、执行所花费的时间等。

  • 每行代码的内存使用量,报告中显示每行代码消耗的内存量。

分析器会将生成的报告保存到您指定的 Snowflake 内部用户暂存区。您可以使用 GET_PYTHON_PROFILER_OUTPUT (SNOWFLAKE.CORE) 系统函数 读取分析器的输出结果

备注

分析会对 Python 代码的执行引入额外的性能开销,并可能影响查询的性能。该功能主要用于开发、测试和故障排查,不建议在持续的生产工作负载中启用。

所需权限

设置会话级别的参数不会触发权限检查,但当使用 ACTIVE_PYTHON_PROFILER 会话参数(LINE 或 MEMORY)执行存储过程时,Snowflake 会检查以下权限:

  • 您必须对分析输出暂存区具有读写权限。

  • 如果被分析的存储过程是 调用者权限存储过程,则您必须使用对该存储过程具有 USAGE 权限的角色。

  • 如果被分析的存储过程是 所有者权限存储过程,则您必须使用对该存储过程具有 OWNERSHIP 权限的角色。

限制

  • 仅支持存储过程。暂不支持 UDFs。

  • 不支持递归分析。仅会对指定模块的顶层函数进行分析。不会分析函数内部定义的函数。

  • 不支持对通过 snowflake.snowpark API 在客户端创建的存储过程进行分析(例如,通过 Session.sproc.register 创建的存储过程)。

  • 不会分析通过 joblib 并行运行的 Python 函数。

  • 无法分析系统定义的存储过程。对它们进行分析不会生成任何输出。

用途

设置好要使用的分析器后,您只需简单地调用存储过程,即可生成分析器输出。在存储过程执行完成后,分析器的输出会写入到您指定的暂存区内的文件。您可以通过系统函数获取分析器输出。

按照以下步骤设置并使用分析器:

  1. 指定应将分析输出写入到的 Snowflake 暂存区

    将参数 PYTHON_PROFILER_TARGET_STAGE 设置为暂存区的完全限定名称。

  2. 启用分析器,并指定分析重点。

    设置 ACTIVE_PYTHON_PROFILER 会话参数。

  3. 调用存储过程

    启用分析器后,调用您的存储过程。

  4. 查看分析输出

    执行结束后,分析输出会以文件形式上传到输出暂存区,文件命名格式为 <query_id>_<sproc_name>.lprof<query_id>_<sproc_name>.mprof

指定应将分析输出写入到的 Snowflake 暂存区

在运行分析器之前,您必须指定一个暂存区,用于保存其报告。要指定暂存区,请将参数 PYTHON_PROFILER_TARGET_STAGE 设置为该暂存区的完全限定名称。

  • 使用临时暂存区,仅在会话期间存储输出。

  • 使用永久暂存区,在会话范围之外保留分析器输出。

以下示例代码会创建一个临时暂存区 profiler_output,用于接收分析器输出。

USE DATABASE my_database;
USE SCHEMA my_schema;

CREATE TEMPORARY STAGE profiler_output;
ALTER SESSION SET PYTHON_PROFILER_TARGET_STAGE = "my_database.my_schema.profiler_output";
Copy

启用分析器并指定分析重点

ACTIVE_PYTHON_PROFILER 会话参数设置为指定您希望生成的分析报告类型的值。

  • 若希望分析专注于代码行的执行活动,请将参数设置为 LINE 值(不区分大小写),如下所示:

    ALTER SESSION SET ACTIVE_PYTHON_PROFILER = 'LINE';
    
    Copy
  • 若希望分析专注于内存使用活动,请将参数设置为 MEMORY 值(不区分大小写),如下所示:

    ALTER SESSION SET ACTIVE_PYTHON_PROFILER = 'MEMORY';
    
    Copy

调用存储过程

启用分析器后,调用您的存储过程。

CALL YOUR_STORED_PROCEDURE();
Copy

默认情况下,分析器会分析用户模块中定义的分析方法。您还可以注册其他模块进行分析。有关更多信息,请参阅 分析其他模块

查看分析输出

执行结束后,分析输出会以文件形式上传到输出暂存区,文件命名格式为 <query_id>_<sproc_name>.lprof<query_id>_<sproc_name>.mprof

可以通过 SNOWFLAKE 数据库 中的系统函数 GET_PYTHON_PROFILER_OUTPUT 访问输出。

系统函数的签名格式如下:

SELECT SNOWFLAKE.CORE.GET_PYTHON_PROFILER_OUTPUT(<query_id>);
Copy

<query_id> 替换为启用了分析的存储过程查询的查询 ID。

您还可以直接访问输出暂存区中的输出文件。有关更多信息,请参阅 查看暂存文件

备注

系统函数会从 PYTHON_PROFILER_TARGET_STAGE 参数指定的暂存区中查找分析输出文件。

子存储过程的分析输出不会附加到父存储过程的输出中。要查看子存储过程的输出,请针对子存储过程的查询 ID 显式调用系统函数。

添加用于分析的其他模块

您可以添加默认情况下未包含的模块进行分析。要添加其他模块进行分析,请将 PYTHON_PROFILER_MODULES 参数设置为要包含的模块名称。

默认情况下,系统将分析您的模块中定义的方法。这些方法包括以下内容:

  • 处理程序方法

  • 模块中定义的方法

  • 从包或其他模块导入的方法。

在以下示例中,handlerhelpersome_method 在默认情况下都会接受分析。

CREATE OR REPLACE PROCEDURE my_sproc()
RETURNS VARIANT
LANGUAGE PYTHON
RUNTIME_VERSION = 3.10
PACKAGES = ('snowflake-snowpark-python', 'other_package')
HANDLER='handler'
AS $$
from other_package import some_method

def helper():
...

def handler(session):
...
$$;
Copy

使用 PYTHON_PROFILER_MODULES 参数添加模块

您可以使用 PYTHON_PROFILER_MODULES 参数来添加默认情况下未包含的模块进行性能分析。以这种方式添加模块时,该模块中使用的所有函数都将包含在分析器输出中。默认情况下,PYTHON_PROFILER_MODULES 参数值为空字符串 (''),此时分析仅会分析内联的处理程序代码(如果有)。

要包含模块进行分析,请以逗号分隔列表的形式指定模块名称作为参数值,如下所示。

ALTER SESSION SET PYTHON_PROFILER_MODULES = 'module_a, my_module';
Copy

分析暂存的处理程序代码

要分析暂存(而非内联)的处理程序代码(包括辅助函数),您必须使用 PYTHON_PROFILER_MODULES 参数显式指定暂存的处理代码进行分析。

默认情况下,分析器不会分析 暂存(而非内联) 的处理程序代码(即使用 IMPORTS 子句指定处理程序模块时)。

例如,默认情况下,此过程不会生成详细的分析输出。

CREATE OR REPLACE PROCEDURE test_udf_1()
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES=('snowflake-snowpark-python')
HANDLER = 'test_python_import_main.my_udf'
IMPORTS = ('@stage1/test_python_import_main.py', '@stage2/test_python_import_module.py');
Copy

要包含暂存的代码进行分析,请以逗号分隔列表的形式将暂存模块名称指定为 PYTHON_PROFILER_MODULES 参数的值,如下所示。

ALTER SESSION SET PYTHON_PROFILER_MODULES = 'test_python_import_main, test_python_import_module';
Copy

示例

以下示例中的代码展示了如何使用分析器生成并检索代码行使用情况的报告。

CREATE OR REPLACE PROCEDURE last_n_query_duration(last_n number, total number)
RETURNS string
LANGUAGE PYTHON
RUNTIME_VERSION=3.8
PACKAGES=('snowflake-snowpark-python')
HANDLER='main'
AS
$$
import snowflake.snowpark.functions as funcs

def main(session, last_n, total):
  # create sample dataset to emulate id + elapsed time
  session.sql('''
  CREATE OR REPLACE TABLE sample_query_history (query_id INT, elapsed_time FLOAT)
  ''').collect()
  session.sql('''
  INSERT INTO sample_query_history
  SELECT
  seq8() AS query_id,
  uniform(0::float, 100::float, random()) as elapsed_time
  FROM table(generator(rowCount => {0}));'''.format(total)).collect()

  # get the mean of the last n query elapsed time
  df = session.table('sample_query_history').select(
    funcs.col('query_id'),
    funcs.col('elapsed_time')).limit(last_n)

  pandas_df = df.to_pandas()
  mean_time = pandas_df.loc[:, 'ELAPSED_TIME'].mean()
  del pandas_df
  return mean_time
$$;

CREATE TEMPORARY STAGE profiler_output;
ALTER SESSION SET PYTHON_PROFILER_TARGET_STAGE = "my_database.my_schema.profiler_output";
ALTER SESSION SET ACTIVE_PYTHON_PROFILER = 'LINE';

-- Sample 1 million from 10 million records
CALL last_n_query_duration(1000000, 10000000);

SELECT SNOWFLAKE.CORE.GET_PYTHON_PROFILER_OUTPUT(last_query_id());
Copy

代码行分析器输出将如下所示:

Handler Name: main
Python Runtime Version: 3.8
Modules Profiled: ['main_module']
Timer Unit: 0.001 s

Total Time: 8.96127 s
File: _udf_code.py
Function: main at line 4

Line #      Hits        Time  Per Hit   % Time  Line Contents
==============================================================
    4                                           def main(session, last_n, total):
    5                                               # create sample dataset to emulate id + elapsed time
    6         1        122.3    122.3      1.4      session.sql('''
    7                                                   CREATE OR REPLACE TABLE sample_query_history (query_id INT, elapsed_time FLOAT)''').collect()
    8         2       7248.4   3624.2     80.9      session.sql('''
    9                                               INSERT INTO sample_query_history
    10                                               SELECT
    11                                               seq8() AS query_id,
    12                                               uniform(0::float, 100::float, random()) as elapsed_time
    13         1          0.0      0.0      0.0      FROM table(generator(rowCount => {0}));'''.format(total)).collect()
    14
    15                                               # get the mean of the last n query elapsed time
    16         3         58.6     19.5      0.7      df = session.table('sample_query_history').select(
    17         1          0.0      0.0      0.0          funcs.col('query_id'),
    18         2          0.0      0.0      0.0          funcs.col('elapsed_time')).limit(last_n)
    19
    20         1       1528.4   1528.4     17.1      pandas_df = df.to_pandas()
    21         1          3.2      3.2      0.0      mean_time = pandas_df.loc[:, 'ELAPSED_TIME'].mean()
    22         1          0.3      0.3      0.0      del pandas_df
    23         1          0.0      0.0      0.0      return mean_time

内存分析器输出将如下所示:

ALTER SESSION SET ACTIVE_PYTHON_PROFILER = 'MEMORY';

Handler Name: main
Python Runtime Version: 3.8
Modules Profiled: ['main_module']
File: _udf_code.py
Function: main at line 4

Line #   Mem usage    Increment  Occurrences  Line Contents
=============================================================
    4    245.3 MiB    245.3 MiB           1   def main(session, last_n, total):
    5                                             # create sample dataset to emulate id + elapsed time
    6    245.8 MiB      0.5 MiB           1       session.sql('''
    7                                                 CREATE OR REPLACE TABLE sample_query_history (query_id INT, elapsed_time FLOAT)''').collect()
    8    245.8 MiB      0.0 MiB           2       session.sql('''
    9                                             INSERT INTO sample_query_history
    10                                             SELECT
    11                                             seq8() AS query_id,
    12                                             uniform(0::float, 100::float, random()) as elapsed_time
    13    245.8 MiB      0.0 MiB           1       FROM table(generator(rowCount => {0}));'''.format(total)).collect()
    14
    15                                             # get the mean of the last n query elapsed time
    16    245.8 MiB      0.0 MiB           3       df = session.table('sample_query_history').select(
    17    245.8 MiB      0.0 MiB           1           funcs.col('query_id'),
    18    245.8 MiB      0.0 MiB           2           funcs.col('elapsed_time')).limit(last_n)
    19
    20    327.9 MiB     82.1 MiB           1       pandas_df = df.to_pandas()
    21    328.9 MiB      1.0 MiB           1       mean_time = pandas_df.loc[:, 'ELAPSED_TIME'].mean()
    22    320.9 MiB     -8.0 MiB           1       del pandas_df
    23    320.9 MiB      0.0 MiB           1       return mean_time
语言: 中文