Databricks 中的检查点¶
Snowpark Checkpoints 写入有关收集结果的文件并读取这些相同的文件以验证 DataFrames。其中一些文件是使用 PySpark 生成的;另一些则使用 Python 包(例如 os 或 glob)生成。这种类型的文件处理行为可能会导致 Databricks 环境中的不一致(在该环境中,文件系统与传统环境不同)。因此,必须调整包以确保正确的文件读取和写入。
下一节演示了如何配置 Snowpark Checkpoints 以使其在 Databricks 环境中无缝运行,从而实现高效的 DataFrame 验证。
先决条件¶
在 Databricks 中使用 Snowpark Checkpoints 之前,请确保环境满足以下要求:
满足这些要求的 Databricks Runtime 版本是:
- Databricks Runtime 14.3 LTS
- Databricks Runtime 15.4 LTS
输入/输出 (I/O) 策略¶
为确保 Snowpark Checkpoints 在各种环境中正常运行,可以使用接口 EnvStrategy 及其实现类进行文件读写操作。这使得 I/O 操作具有适应性和自定义性。
- 使用 Snowpark Checkpoints,您可以通过创建实现 - EnvStrategy接口的类来实现自己的自定义输入/输出方法。然后,您可以根据特定的执行环境和预期定制操作。
- 在内部,该包使用默认类 ( - IODefaultStrategy) 来实现- EnvStrategy接口并提供 I/O 操作的基本实现。您可以将此默认策略替换为适合环境特定需求的自定义实现。
重要
每个 Snowpark Checkpoints 包(snowpark-checkpoints-collectors、snowpark-checkpoints-validators、snowpark-checkpoints-hypothesis)都包含自己的文件处理类副本。因此,对文件配置的任何更改都必须分别应用于每个包。请务必从您正在使用的包中导入配置。
I/O 函数¶
这些文件读写方法可以自定义:
- mkdir:创建文件夹。
- folder_exists:检查文件夹是否存在。
- file_exists:检查文件是否存在。
- write:将内容写入到文件。
- read:从文件中读取内容。
- read_bytes:从文件中读取二进制内容。
- ls:列出目录的内容。
- getcwd:获取当前工作目录。
- remove_dir:移除目录及其内容。此功能仅在- snowpark-checkpoints-collectors模块中使用。
- telemetry_path_files:获取遥测文件的路径。
Databricks 策略¶
Databricks 策略是用于了解如何使用 DBFS 文件路径的配置。它使用 normalize_dbfs_path 函数来确保所有路径都以 /dbfs/ 开头。
如何使用该策略¶
要使用 Databricks 策略,必须在代码中对其进行明确配置。方法如下:
- 导入所需的类: - from typing import Optional, BinaryIO from pathlib import Path from snowflake.snowpark_checkpoints_collector.io_utils import EnvStrategy, IODefaultStrategy from snowflake.snowpark_checkpoints_collector.io_utils.io_file_manager import get_io_file_manager 
- 定义 Databricks 策略: - class IODatabricksStrategy(EnvStrategy): def __init__(self): self.default_strategy = IODefaultStrategy() def mkdir(self, path: str, exist_ok: bool = False) -> None: path = normalize_dbfs_path(path) self.default_strategy.mkdir(path, exist_ok=exist_ok) def folder_exists(self, path: str) -> bool: path = normalize_dbfs_path(path) return self.default_strategy.folder_exists(path) def file_exists(self, path: str) -> bool: path = normalize_dbfs_path(path) return self.default_strategy.file_exists(path) def write(self, file_path: str, file_content: str, overwrite: bool = True) -> None: file_path = normalize_dbfs_path(file_path) self.default_strategy.write(file_path, file_content, overwrite=overwrite) def read( self, file_path: str, mode: str = "r", encoding: Optional[str] = None ) -> str: file_path = normalize_dbfs_path(file_path) return self.default_strategy.read(file_path, mode=mode, encoding=encoding) def read_bytes(self, file_path: str) -> bytes: file_path = normalize_dbfs_path(file_path) return self.default_strategy.read_bytes(file_path) def ls(self, path: str, recursive: bool = False) -> list[str]: file_path = normalize_dbfs_path(path) list_of_files = self.default_strategy.ls(file_path, recursive=recursive) return [content.replace("/dbfs","") for content in list_of_files] def getcwd(self) -> str: try: parent_folder = "/snowpark_checkpoints" self.mkdir(parent_folder, exist_ok=True) return parent_folder except Exception: return "" def remove_dir(self, path:str) -> None: path = normalize_dbfs_path(path) self.default_strategy.remove_dir(path) def telemetry_path_files(self, path:str) -> Path: path = normalize_dbfs_path(path) return self.default_strategy.telemetry_path_files(path) def normalize_dbfs_path(path: str) -> str: if isinstance(path, Path): path = str(path) if not path.startswith("/"): path = "/" + path if not path.startswith("/dbfs/"): path = f'/dbfs{path}' return path 
- 配置 Databricks 策略: - get_io_file_manager().set_strategy(IODatabricksStrategy()) 
在 Databricks 脚本或笔记本开始时执行此代码会将 Snowpark Checkpoints 配置为使用定义的 I/O 策略在 DBFS 中正确处理文件。
可选自定义¶
对于更专业的输入/输出操作,可以设计和实现自定义策略。此方法提供了对 I/O 行为的完全控制和灵活性。此方法允许开发者根据其特定要求和约束来精确地定制策略,从而有可能优化性能、资源利用率或其他相关因素。
重要
使用自定义策略时,您有责任确保 I/O 操作正常运行。