Databricks 中的检查点

Snowpark Checkpoints 写入有关收集结果的文件并读取这些相同的文件以验证 DataFrames。其中一些文件是使用 PySpark 生成的;另一些则使用 Python 包(例如 osglob)生成。这种类型的文件处理行为可能会导致 Databricks 环境中的不一致(在该环境中,文件系统与传统环境不同)。因此,必须调整包以确保正确的文件读取和写入。

下一节演示了如何配置 Snowpark Checkpoints 以使其在 Databricks 环境中无缝运行,从而实现高效的 DataFrame 验证。

先决条件

在 Databricks 中使用 Snowpark Checkpoints 之前,请确保环境满足以下要求:

  • `PySpark:`版本 3.5.0 或更高版本。

  • `Python:`版本 3.9、3.10 和 3.11

满足这些要求的 Databricks Runtime 版本是:

  • Databricks Runtime 14.3 LTS

  • Databricks Runtime 15.4 LTS

输入/输出 (I/O) 策略

为确保 Snowpark Checkpoints 在各种环境中正常运行,可以使用接口 EnvStrategy 及其实现类进行文件读写操作。这使得 I/O 操作具有适应性和自定义性。

  • 使用 Snowpark Checkpoints,您可以通过创建实现 EnvStrategy 接口的类来实现自己的自定义输入/输出方法。然后,您可以根据特定的执行环境和预期定制操作。

  • 在内部,该包使用默认类 (IODefaultStrategy) 来实现 EnvStrategy 接口并提供 I/O 操作的基本实现。您可以将此默认策略替换为适合环境特定需求的自定义实现。

重要

每个 Snowpark Checkpoints 包(snowpark-checkpoints-collectorssnowpark-checkpoints-validatorssnowpark-checkpoints-hypothesis)都包含自己的文件处理类副本。因此,对文件配置的任何更改都必须分别应用于每个包。请务必从您正在使用的包中导入配置。

I/O 函数

这些文件读写方法可以自定义:

  • mkdir:创建文件夹。

  • folder_exists:检查文件夹是否存在。

  • file_exists:检查文件是否存在。

  • write:将内容写入到文件。

  • read:从文件中读取内容。

  • read_bytes:从文件中读取二进制内容。

  • ls:列出目录的内容。

  • getcwd:获取当前工作目录。

  • remove_dir:移除目录及其内容。此功能仅在 snowpark-checkpoints-collectors 模块中使用。

  • telemetry_path_files:获取遥测文件的路径。

Databricks 策略

Databricks 策略是用于了解如何使用 DBFS 文件路径的配置。它使用 normalize_dbfs_path 函数来确保所有路径都以 /dbfs/ 开头。

如何使用该策略

要使用 Databricks 策略,必须在代码中对其进行明确配置。方法如下:

  1. 导入所需的类:

    from typing import Optional, BinaryIO
    from pathlib import Path
    from snowflake.snowpark_checkpoints_collector.io_utils import EnvStrategy, IODefaultStrategy
    from snowflake.snowpark_checkpoints_collector.io_utils.io_file_manager import get_io_file_manager
    
    Copy
  2. 定义 Databricks 策略:

    class IODatabricksStrategy(EnvStrategy):
    
      def __init__(self):
          self.default_strategy = IODefaultStrategy()
    
      def mkdir(self, path: str, exist_ok: bool = False) -> None:
          path = normalize_dbfs_path(path)
          self.default_strategy.mkdir(path, exist_ok=exist_ok)
    
      def folder_exists(self, path: str) -> bool:
          path = normalize_dbfs_path(path)
          return self.default_strategy.folder_exists(path)
    
      def file_exists(self, path: str) -> bool:
          path = normalize_dbfs_path(path)
          return self.default_strategy.file_exists(path)
    
      def write(self, file_path: str, file_content: str, overwrite: bool = True) -> None:
          file_path = normalize_dbfs_path(file_path)
          self.default_strategy.write(file_path, file_content, overwrite=overwrite)
    
      def read(
          self, file_path: str, mode: str = "r", encoding: Optional[str] = None
      ) -> str:
          file_path = normalize_dbfs_path(file_path)
          return self.default_strategy.read(file_path, mode=mode, encoding=encoding)
    
      def read_bytes(self, file_path: str) -> bytes:
          file_path = normalize_dbfs_path(file_path)
          return self.default_strategy.read_bytes(file_path)
    
      def ls(self, path: str, recursive: bool = False) -> list[str]:
          file_path = normalize_dbfs_path(path)
          list_of_files = self.default_strategy.ls(file_path, recursive=recursive)
          return [content.replace("/dbfs","") for content in list_of_files]
    
      def getcwd(self) -> str:
          try:
              parent_folder = "/snowpark_checkpoints"
              self.mkdir(parent_folder, exist_ok=True)
              return parent_folder
          except Exception:
              return ""
    
      def remove_dir(self, path:str) -> None:
          path = normalize_dbfs_path(path)
          self.default_strategy.remove_dir(path)
    
      def telemetry_path_files(self, path:str) -> Path:
          path = normalize_dbfs_path(path)
          return self.default_strategy.telemetry_path_files(path)
    
    def normalize_dbfs_path(path: str) -> str:
        if isinstance(path, Path):
            path = str(path)
        if not path.startswith("/"):
            path = "/" + path
        if not path.startswith("/dbfs/"):
            path = f'/dbfs{path}'
        return path
    
    Copy
  3. 配置 Databricks 策略:

    get_io_file_manager().set_strategy(IODatabricksStrategy())
    
    Copy

在 Databricks 脚本或笔记本开始时执行此代码会将 Snowpark Checkpoints 配置为使用定义的 I/O 策略在 DBFS 中正确处理文件。

可选自定义

对于更专业的输入/输出操作,可以设计和实现自定义策略。此方法提供了对 I/O 行为的完全控制和灵活性。此方法允许开发者根据其特定要求和约束来精确地定制策略,从而有可能优化性能、资源利用率或其他相关因素。

重要

使用自定义策略时,您有责任确保 I/O 操作正常运行。

语言: 中文