Snowpark ML FileSystem 和 FileSet – 已弃用

Snowpark ML 库包括 FileSystem,这是一个类似于文件系统的抽象概念,用于内部、服务器端加密的 Snowflake 暂存区。具体来说,它是一种 fsspec AbstractFileSystem (https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem) 实现。该库还包括 FileSet,这是一个相关类,可让您将机器学习数据从 Snowflake 表移动到暂存区,并从那里将数据提供给 PyTorch 或者 TensorFlow(请参阅 Snowpark ML Data Connector)。

小技巧

大多数用户应使用较新的 Dataset API 在 Snowflake 中创建不可变的、受管理的数据快照,并将其用于端到端机器学习工作流程中。

安装

FileSystem 和 FileSet APIs 是 Snowpark ML Python 包 snowflake-ml-python 的一部分。有关安装说明,请参阅 在本地使用 Snowflake ML

创建和使用文件系统

创建 Snowpark ML 文件系统需要 Snowflake Connector for Python Connection 对象或 Snowpark Python Session。有关说明,请参阅 连接到 Snowflake

在建立连接或会话后,您可以创建一个 Snowpark ML SFFileSystem 实例,通过该实例您可以访问内部暂存区中的数据。

如果您有 Snowflake Connector for Python 连接,请将其作为 sf_connection 实参传递:

import fsspec
from snowflake.ml.fileset import sfcfs

sf_fs1 = sfcfs.SFFileSystem(sf_connection=sf_connection)
Copy

如果您有 Snowpark Python 会话,请将其作为 snowpark_session 实参传递:

import fsspec
from snowflake.ml.fileset import sfcfs

sf_fs2 = sfcfs.SFFileSystem(snowpark_session=sp_session)
Copy

SFFileSystem 继承了 fsspec.FileSystem 的许多功能,例如文件的本地缓存。您可以通过 fsspec.filesystem 工厂函数实例化 Snowflake 文件系统,并传递 target_protocol="sfc" 以使用 Snowflake FileSystem 实现来启用此功能和其他功能:

local_cache_path = "/tmp/sf_files/"
cached_fs = fsspec.filesystem("cached", target_protocol="sfc",
                    target_options={"sf_connection": sf_connection,
                                    "cache_types": "bytes",
                                    "block_size": 32 * 2**20},
                    cache_storage=local_cache_path)
Copy

Snowflake 文件系统支持为 fsspec FileSystem 定义的大多数只读方法,包括 findinfoisdirisfileexists

指定文件

要指定暂存区中的文件,请使用 @database.schema.stage/file_path 形式的路径。

列表文件

文件系统的 ls 方法用于获取暂存区中的文件列表:

print(*cached_fs.ls("@ML_DATASETS.public.my_models/sales_predict/"), end='\n')
Copy

打开和读取文件

您可以使用文件系统的 open 方法在暂存区中打开文件。然后,您可以使用与读取普通 Python 文件相同的方法读取这些文件。文件对象也是一个上下文管理器,可以与 Python 的 with 语句一起使用,因此当不再需要时它会自动关闭。

path = '@ML_DATASETS.public.my_models/test/data_7_7_3.snappy.parquet'

with sf_fs1.open(path, mode='rb') as f:
    print(f.read(16))
Copy

您还可以将 SFFileSystem 实例与接受 fsspec 文件系统的其他组件一起使用。此处,前面代码块中提到的 Parquet 数据文件会传递给 PyArrow 的 read_table 方法:

import pyarrow.parquet as pq

table = pq.read_table(path, filesystem=sf_fs1)
table.take([1, 3])
Copy

接受文件(或类似文件的对象)的 Python 组件可以传递从 Snowflake 文件系统打开的文件对象。例如,如果暂存区中有一个 gzip 压缩文件,则可以通过将其作为 fileobj 参数传递给 gzip.GzipFile 来将其与 Python 的 gzip 模块一起使用:

path = "sfc://@ML_DATASETS.public.my_models/dataset.csv.gz"

with cached_fs.open(path, mode='rb', sf_connection=sf_connection) as f:
    g = gzip.GzipFile(fileobj=f)
    for i in range(3):
        print(g.readline())
Copy

创建并使用 FileSet

Snowflake FileSet 表示内部服务器端加密暂存区中以文件形式呈现的 SQL 查询结果的不可变快照。可以通过 FileSystem 访问这些文件,将数据提供给 PyTorch 和 TensorFlow 等工具,以便您可以在现有数据治理模型内大规模训练模型。要创建 FileSet,请使用 FileSet.make 方法。

您需要 Snowflake Python 连接或 Snowpark 会话来创建 FileSet。 有关说明,请参阅 连接到 Snowflake。您还必须提供现有内部服务器端加密暂存区的路径,或此类暂存区下的子目录,FileSet 将存储于此。

要从 Snowpark DataFrame 创建 FileSet,请 构造 DataFrame,并将其作为 snowpark_dataframe 传递给 FileSet.make;切勿调用 DataFrame 的 collect 方法:

# Snowpark Python equivalent of "SELECT * FROM MYDATA LIMIT 5000000"
df = snowpark_session.table('mydata').limit(5000000)
fileset_df = fileset.FileSet.make(
    target_stage_loc="@ML_DATASETS.public.my_models/",
    name="from_dataframe",
    snowpark_dataframe=df,
    shuffle=True,
)
Copy

要使用 Snowflake Connector for Python 连接创建 FileSet,请将该连接作为 sf_connection 传递给 Fileset.make,并将 SQL 查询作为 query 进行传递。

fileset_sf = fileset.FileSet.make(
    target_stage_loc="@ML_DATASETS.public.my_models/",
    name="from_connector",
    sf_connection=sf_connection,
    query="SELECT * FROM MYDATA LIMIT 5000000",
    shuffle=True,           # see later section about shuffling
)
Copy

备注

有关使用 shuffle 参数混洗数据的信息,请参阅 在 FileSets 中混洗数据

使用 files 方法获取 FileSet 中的文件列表:

print(*fileset_df.files())
Copy

有关将 FileSet 中的数据提供给 PyTorch 或 TensorFlow,请分别参阅 将 FileSet 提供给 PyTorch将 FileSet 提供给 TensorFlow

将 FileSet 提供给 PyTorch

从 Snowflake FileSet 中,您可以获得 PyTorch DataPipe,可以将其传递给 PyTorch DataLoader。DataLoader 迭代 FileSet 数据并生成批处理的 PyTorch Tensors。使用 FileSet 的 to_torch_datapipe 方法创建 DataPipe,然后将 DataPipe 传递给 PyTorch 的 DataLoader

from torch.utils.data import DataLoader

# See later sections about shuffling and batching
pipe = fileset_df.to_torch_datapipe(
    batch_size=4,
    shuffle=True,
    drop_last_batch=True)

for batch in DataLoader(pipe, batch_size=None, num_workers=0):
    print(batch)
    break
Copy

将 FileSet 提供给 TensorFlow

您可以使用 FileSet 的 to_tf_dataset 方法,从 Snowflake FileSet 获取 TensorFlow 数据集:

import tensorflow as tf

# See following sections about shuffling and batching
ds = fileset_df.to_tf_dataset(
    batch_size=4,
    shuffle=True,
    drop_last_batch=True)

for batch in ds:
    print(batch)
    break
Copy

迭代数据集将生成批处理张量。

在 FileSets 中混洗数据

对避免过度拟合和其他问题来说,对训练数据进行混洗通常很有价值。有关混洗数据的价值的讨论,请参阅 ` 为什么要为机器学习任务混洗数据?<https://datascience.stackexchange.com/questions/24511/why-should-the-data-be-shuffled-for-machine-learning-tasks (https://datascience.stackexchange.com/questions/24511/why-should-the-data-be-shuffled-for-machine-learning-tasks)>`__

如果您的查询尚未充分混洗数据,则 FileSet 可以出于以下两点对数据进行混洗:

  • 当使用 FileSet 创建 FileSet.make 时。

    查询中的所有行在写入 FileSet 之前都会进行混洗。这是一次高质量的全局混洗,对于大型数据集来说可能很昂贵。因此,仅在具体化 FileSet 时执行一次。将 shuffle=True 作为关键字实参传递给 FileSet.make

  • 当您从 FileSet 创建 PyTorch DataPipe 或 TensorFlow 数据集时。

    此时,FileSet 中的文件顺序是随机的,每个文件中行的顺序也是随机的。这可以被认为是“近似”的全局混洗。它的质量低于真正的全局混洗,但价格要便宜得多。若要在此暂存区进行混洗,请将 shuffle=True 作为关键字实参传递给 FileSet 的 to_torch_datapipeto_tf_dataset 方法。

为获得最佳结果,请随机混洗两次:创建 FileSet 和将数据提供给 PyTorch 或 TensorFlow 时。

批处理 FileSets 中的数据

FileSets 具有批处理功能,其工作方式与 PyTorch 和 TensorFlow 中的批处理功能相同,但效率更高。Snowflake 建议您在 FileSet 的 to_torch_datapipeto_tf_dataset 方法中使用 batch_size 参数,而不是让 PyTorch 或 TensorFlow 进行批处理。使用 PyTorch 时,若要禁用其批处理功能,必须在实例化 DataLoader 时显式传递 batch_size=None

如果最后一批不完整,也可以通过将 drop_last_batch=True 传递给 to_torch_datapipeto_tf_dataset 来删除最后一批。

语言: 中文