Snowpark ML FileSystem 和 FileSet – 已弃用¶
Snowpark ML 库包括 FileSystem,这是一个类似于文件系统的抽象概念,用于内部、服务器端加密的 Snowflake 暂存区。具体来说,它是一种 fsspec AbstractFileSystem (https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem) 实现。该库还包括 FileSet,这是一个相关类,可让您将机器学习数据从 Snowflake 表移动到暂存区,并从那里将数据提供给 PyTorch 或者 TensorFlow(请参阅 Snowpark ML Data Connector)。
小技巧
大多数用户应使用较新的 Dataset API 在 Snowflake 中创建不可变的、受管理的数据快照,并将其用于端到端机器学习工作流程中。
安装¶
FileSystem 和 FileSet APIs 是 Snowpark ML Python 包 snowflake-ml-python
的一部分。有关安装说明,请参阅 在本地使用 Snowflake ML。
创建和使用文件系统¶
创建 Snowpark ML 文件系统需要 Snowflake Connector for Python Connection
对象或 Snowpark Python Session
。有关说明,请参阅 连接到 Snowflake。
在建立连接或会话后,您可以创建一个 Snowpark ML SFFileSystem
实例,通过该实例您可以访问内部暂存区中的数据。
如果您有 Snowflake Connector for Python 连接,请将其作为 sf_connection
实参传递:
import fsspec
from snowflake.ml.fileset import sfcfs
sf_fs1 = sfcfs.SFFileSystem(sf_connection=sf_connection)
如果您有 Snowpark Python 会话,请将其作为 snowpark_session
实参传递:
import fsspec
from snowflake.ml.fileset import sfcfs
sf_fs2 = sfcfs.SFFileSystem(snowpark_session=sp_session)
SFFileSystem
继承了 fsspec.FileSystem
的许多功能,例如文件的本地缓存。您可以通过 fsspec.filesystem
工厂函数实例化 Snowflake 文件系统,并传递 target_protocol="sfc"
以使用 Snowflake FileSystem 实现来启用此功能和其他功能:
local_cache_path = "/tmp/sf_files/"
cached_fs = fsspec.filesystem("cached", target_protocol="sfc",
target_options={"sf_connection": sf_connection,
"cache_types": "bytes",
"block_size": 32 * 2**20},
cache_storage=local_cache_path)
Snowflake 文件系统支持为 fsspec FileSystem
定义的大多数只读方法,包括 find
、info
、isdir
、isfile
和 exists
。
指定文件¶
要指定暂存区中的文件,请使用 @database.schema.stage/file_path
形式的路径。
列表文件¶
文件系统的 ls
方法用于获取暂存区中的文件列表:
print(*cached_fs.ls("@ML_DATASETS.public.my_models/sales_predict/"), end='\n')
打开和读取文件¶
您可以使用文件系统的 open
方法在暂存区中打开文件。然后,您可以使用与读取普通 Python 文件相同的方法读取这些文件。文件对象也是一个上下文管理器,可以与 Python 的 with
语句一起使用,因此当不再需要时它会自动关闭。
path = '@ML_DATASETS.public.my_models/test/data_7_7_3.snappy.parquet'
with sf_fs1.open(path, mode='rb') as f:
print(f.read(16))
您还可以将 SFFileSystem
实例与接受 fsspec 文件系统的其他组件一起使用。此处,前面代码块中提到的 Parquet 数据文件会传递给 PyArrow 的 read_table
方法:
import pyarrow.parquet as pq
table = pq.read_table(path, filesystem=sf_fs1)
table.take([1, 3])
接受文件(或类似文件的对象)的 Python 组件可以传递从 Snowflake 文件系统打开的文件对象。例如,如果暂存区中有一个 gzip 压缩文件,则可以通过将其作为 fileobj
参数传递给 gzip.GzipFile
来将其与 Python 的 gzip
模块一起使用:
path = "sfc://@ML_DATASETS.public.my_models/dataset.csv.gz"
with cached_fs.open(path, mode='rb', sf_connection=sf_connection) as f:
g = gzip.GzipFile(fileobj=f)
for i in range(3):
print(g.readline())
创建并使用 FileSet¶
Snowflake FileSet 表示内部服务器端加密暂存区中以文件形式呈现的 SQL 查询结果的不可变快照。可以通过 FileSystem 访问这些文件,将数据提供给 PyTorch 和 TensorFlow 等工具,以便您可以在现有数据治理模型内大规模训练模型。要创建 FileSet,请使用 FileSet.make
方法。
您需要 Snowflake Python 连接或 Snowpark 会话来创建 FileSet。 有关说明,请参阅 连接到 Snowflake。您还必须提供现有内部服务器端加密暂存区的路径,或此类暂存区下的子目录,FileSet 将存储于此。
要从 Snowpark DataFrame 创建 FileSet,请 构造 DataFrame,并将其作为 snowpark_dataframe
传递给 FileSet.make
;切勿调用 DataFrame 的 collect
方法:
# Snowpark Python equivalent of "SELECT * FROM MYDATA LIMIT 5000000"
df = snowpark_session.table('mydata').limit(5000000)
fileset_df = fileset.FileSet.make(
target_stage_loc="@ML_DATASETS.public.my_models/",
name="from_dataframe",
snowpark_dataframe=df,
shuffle=True,
)
要使用 Snowflake Connector for Python 连接创建 FileSet,请将该连接作为 sf_connection
传递给 Fileset.make
,并将 SQL 查询作为 query
进行传递。
fileset_sf = fileset.FileSet.make(
target_stage_loc="@ML_DATASETS.public.my_models/",
name="from_connector",
sf_connection=sf_connection,
query="SELECT * FROM MYDATA LIMIT 5000000",
shuffle=True, # see later section about shuffling
)
备注
有关使用 shuffle
参数混洗数据的信息,请参阅 在 FileSets 中混洗数据。
使用 files
方法获取 FileSet 中的文件列表:
print(*fileset_df.files())
有关将 FileSet 中的数据提供给 PyTorch 或 TensorFlow,请分别参阅 将 FileSet 提供给 PyTorch 或 将 FileSet 提供给 TensorFlow。
将 FileSet 提供给 PyTorch¶
从 Snowflake FileSet 中,您可以获得 PyTorch DataPipe,可以将其传递给 PyTorch DataLoader。DataLoader 迭代 FileSet 数据并生成批处理的 PyTorch Tensors。使用 FileSet 的 to_torch_datapipe
方法创建 DataPipe,然后将 DataPipe 传递给 PyTorch 的 DataLoader
:
from torch.utils.data import DataLoader
# See later sections about shuffling and batching
pipe = fileset_df.to_torch_datapipe(
batch_size=4,
shuffle=True,
drop_last_batch=True)
for batch in DataLoader(pipe, batch_size=None, num_workers=0):
print(batch)
break
将 FileSet 提供给 TensorFlow¶
您可以使用 FileSet 的 to_tf_dataset
方法,从 Snowflake FileSet 获取 TensorFlow 数据集:
import tensorflow as tf
# See following sections about shuffling and batching
ds = fileset_df.to_tf_dataset(
batch_size=4,
shuffle=True,
drop_last_batch=True)
for batch in ds:
print(batch)
break
迭代数据集将生成批处理张量。
在 FileSets 中混洗数据¶
对避免过度拟合和其他问题来说,对训练数据进行混洗通常很有价值。有关混洗数据的价值的讨论,请参阅 ` 为什么要为机器学习任务混洗数据?<https://datascience.stackexchange.com/questions/24511/why-should-the-data-be-shuffled-for-machine-learning-tasks (https://datascience.stackexchange.com/questions/24511/why-should-the-data-be-shuffled-for-machine-learning-tasks)>`__
如果您的查询尚未充分混洗数据,则 FileSet 可以出于以下两点对数据进行混洗:
当使用 FileSet 创建
FileSet.make
时。查询中的所有行在写入 FileSet 之前都会进行混洗。这是一次高质量的全局混洗,对于大型数据集来说可能很昂贵。因此,仅在具体化 FileSet 时执行一次。将
shuffle=True
作为关键字实参传递给FileSet.make
。当您从 FileSet 创建 PyTorch DataPipe 或 TensorFlow 数据集时。
此时,FileSet 中的文件顺序是随机的,每个文件中行的顺序也是随机的。这可以被认为是“近似”的全局混洗。它的质量低于真正的全局混洗,但价格要便宜得多。若要在此暂存区进行混洗,请将
shuffle=True
作为关键字实参传递给 FileSet 的to_torch_datapipe
或to_tf_dataset
方法。
为获得最佳结果,请随机混洗两次:创建 FileSet 和将数据提供给 PyTorch 或 TensorFlow 时。
批处理 FileSets 中的数据¶
FileSets 具有批处理功能,其工作方式与 PyTorch 和 TensorFlow 中的批处理功能相同,但效率更高。Snowflake 建议您在 FileSet 的 to_torch_datapipe
和 to_tf_dataset
方法中使用 batch_size
参数,而不是让 PyTorch 或 TensorFlow 进行批处理。使用 PyTorch 时,若要禁用其批处理功能,必须在实例化 DataLoader
时显式传递 batch_size=None
。
如果最后一批不完整,也可以通过将 drop_last_batch=True
传递给 to_torch_datapipe
或 to_tf_dataset
来删除最后一批。