Snowpark ML FileSystem 和 FileSet

Snowpark ML 库包括 FileSystem,这是一个类似于文件系统的抽象概念,用于内部、服务器端加密的 Snowflake 暂存区。具体来说,它是一种 ` fsspec AbstractFileSystem <https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem (https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem)>`__ 实现。该库还包括 FileSet,这是一个相关类,可让您将机器学习数据从 Snowflake 表移动到暂存区,并从那里将数据提供给 PyTorch 或者 TensorFlow(请参阅 Snowpark ML Framework Connectors)。

小技巧

有关这些 APIs 的完整详细信息,请参阅 Snowpark ML API 参考

备注

本主题假定已安装 Snowpark ML 模块。如果尚未安装,请参阅 安装 Snowpark ML

创建和使用文件系统

创建 Snowpark ML 文件系统需要 Snowflake Connector for Python Connection 对象或 Snowpark Python Session。有关说明,请参阅 连接到 Snowflake

在建立连接或会话后,您可以创建一个 Snowpark ML SFFileSystem 实例,通过该实例您可以访问内部暂存区中的数据。

如果您有 Snowflake Connector for Python 连接,请将其作为 sf_connection 实参传递:

import fsspec
from snowflake.ml.fileset import sfcfs

sf_fs1 = sfcfs.SFFileSystem(sf_connection=sf_connection)
Copy

如果您有 Snowpark Python 会话,请将其作为 snowpark_session 实参传递:

import fsspec
from snowflake.ml.fileset import sfcfs

sf_fs2 = sfcfs.SFFileSystem(snowpark_session=sp_session)
Copy

SFFileSystem 继承了 fsspec.FileSystem 的许多功能,例如文件的本地缓存。您可以通过 fsspec.filesystem 工厂函数实例化 Snowflake 文件系统,并传递 target_protocol="sfc" 以使用 Snowflake FileSystem 实现来启用此功能和其他功能:

local_cache_path = "/tmp/sf_files/"
cached_fs = fsspec.filesystem("cached", target_protocol="sfc",
                    target_options={"sf_connection": sf_connection,
                                    "cache_types": "bytes",
                                    "block_size": 32 * 2**20},
                    cache_storage=local_cache_path)
Copy

Snowflake 文件系统支持为 fsspec FileSystem 定义的大多数只读方法,包括 findinfoisdirisfileexists

指定文件

要指定暂存区中的文件,请使用 @database.schema.stage/file_path 形式的路径。

列表文件

文件系统的 ls 方法用于获取暂存区中的文件列表:

print(*cached_fs.ls("@ML_DATASETS.public.my_models/sales_predict/"), end='\n')
Copy

打开和读取文件

您可以使用文件系统的 open 方法在暂存区中打开文件。然后,您可以使用与读取普通 Python 文件相同的方法读取这些文件。文件对象也是一个上下文管理器,可以与 Python 的 with 语句一起使用,因此当不再需要时它会自动关闭。

path = '@ML_DATASETS.public.my_models/test/data_7_7_3.snappy.parquet'

with sf_fs1.open(path, mode='rb') as f:
    print(f.read(16))
Copy

您还可以将 SFFileSystem 实例与接受 fsspec 文件系统的其他组件一起使用。此处,前面代码块中提到的 Parquet 数据文件会传递给 PyArrow 的 read_table 方法:

import pyarrow.parquet as pq

table = pq.read_table(path, filesystem=sf_fs1)
table.take([1, 3])
Copy

接受文件(或类似文件的对象)的 Python 组件可以传递从 Snowflake 文件系统打开的文件对象。例如,如果暂存区中有一个 gzip 压缩文件,则可以通过将其作为 fileobj 参数传递给 gzip.GzipFile 来将其与 Python 的 gzip 模块一起使用:

path = "sfc://@ML_DATASETS.public.my_models/dataset.csv.gz"

with cached_fs.open(path, mode='rb', sf_connection=sf_connection) as f:
    g = gzip.GzipFile(fileobj=f)
    for i in range(3):
        print(g.readline())
Copy

创建并使用 FileSet

Snowflake FileSet 表示内部服务器端加密暂存区中以文件形式呈现的 SQL 查询结果的不可变快照。可以通过 FileSystem 访问这些文件,将数据提供给 PyTorch 和 TensorFlow 等工具,以便您可以在现有数据治理模型内大规模训练模型。要创建 FileSet,请使用 FileSet.make 方法。

您需要 Snowflake Python 连接或 Snowpark 会话来创建 FileSet。 有关说明,请参阅 连接到 Snowflake。您还必须提供现有内部服务器端加密暂存区的路径,或此类暂存区下的子目录,FileSet 将存储于此。

要从 Snowpark DataFrame 创建 FileSet,请 构造 DataFrame,并将其作为 snowpark_dataframe 传递给 FileSet.make;切勿调用 DataFrame 的 collect 方法:

# Snowpark Python equivalent of "SELECT * FROM MYDATA LIMIT 5000000"
df = snowpark_session.table('mydata').limit(5000000)
fileset_df = fileset.FileSet.make(
    target_stage_loc="@ML_DATASETS.public.my_models/",
    name="from_dataframe",
    snowpark_dataframe=df,
    shuffle=True,
)
Copy

要使用 Snowflake Connector for Python 连接创建 FileSet,请将该连接作为 sf_connection 传递给 Fileset.make,并将 SQL 查询作为 query 进行传递。

fileset_sf = fileset.FileSet.make(
    target_stage_loc="@ML_DATASETS.public.my_models/",
    name="from_connector",
    sf_connection=sf_connection,
    query="SELECT * FROM MYDATA LIMIT 5000000",
    shuffle=True,           # see later section about shuffling
)
Copy

备注

有关使用 shuffle 参数混洗数据的信息,请参阅 在 FileSets 中混洗数据

使用 files 方法获取 FileSet 中的文件列表:

print(*fileset_df.files())
Copy

有关将 FileSet 中的数据提供给 PyTorch 或 TensorFlow 的信息,请参阅 Snowpark ML Framework Connectors

语言: 中文