Snowpark ML FileSystem 和 FileSet¶
Snowpark ML 库包括 FileSystem,这是一个类似于文件系统的抽象概念,用于内部、服务器端加密的 Snowflake 暂存区。具体来说,它是一种 ` fsspec AbstractFileSystem <https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem (https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem)>`__ 实现。该库还包括 FileSet,这是一个相关类,可让您将机器学习数据从 Snowflake 表移动到暂存区,并从那里将数据提供给 PyTorch 或者 TensorFlow(请参阅 Snowpark ML Framework Connectors)。
小技巧
有关这些 APIs 的完整详细信息,请参阅 Snowpark ML API 参考。
备注
本主题假定已安装 Snowpark ML 模块。如果尚未安装,请参阅 安装 Snowpark ML。
创建和使用文件系统¶
创建 Snowpark ML 文件系统需要 Snowflake Connector for Python Connection
对象或 Snowpark Python Session
。有关说明,请参阅 连接到 Snowflake。
在建立连接或会话后,您可以创建一个 Snowpark ML SFFileSystem
实例,通过该实例您可以访问内部暂存区中的数据。
如果您有 Snowflake Connector for Python 连接,请将其作为 sf_connection
实参传递:
import fsspec
from snowflake.ml.fileset import sfcfs
sf_fs1 = sfcfs.SFFileSystem(sf_connection=sf_connection)
如果您有 Snowpark Python 会话,请将其作为 snowpark_session
实参传递:
import fsspec
from snowflake.ml.fileset import sfcfs
sf_fs2 = sfcfs.SFFileSystem(snowpark_session=sp_session)
SFFileSystem
继承了 fsspec.FileSystem
的许多功能,例如文件的本地缓存。您可以通过 fsspec.filesystem
工厂函数实例化 Snowflake 文件系统,并传递 target_protocol="sfc"
以使用 Snowflake FileSystem 实现来启用此功能和其他功能:
local_cache_path = "/tmp/sf_files/"
cached_fs = fsspec.filesystem("cached", target_protocol="sfc",
target_options={"sf_connection": sf_connection,
"cache_types": "bytes",
"block_size": 32 * 2**20},
cache_storage=local_cache_path)
Snowflake 文件系统支持为 fsspec FileSystem
定义的大多数只读方法,包括 find
、info
、isdir
、isfile
和 exists
。
指定文件¶
要指定暂存区中的文件,请使用 @database.schema.stage/file_path
形式的路径。
列表文件¶
文件系统的 ls
方法用于获取暂存区中的文件列表:
print(*cached_fs.ls("@ML_DATASETS.public.my_models/sales_predict/"), end='\n')
打开和读取文件¶
您可以使用文件系统的 open
方法在暂存区中打开文件。然后,您可以使用与读取普通 Python 文件相同的方法读取这些文件。文件对象也是一个上下文管理器,可以与 Python 的 with
语句一起使用,因此当不再需要时它会自动关闭。
path = '@ML_DATASETS.public.my_models/test/data_7_7_3.snappy.parquet'
with sf_fs1.open(path, mode='rb') as f:
print(f.read(16))
您还可以将 SFFileSystem
实例与接受 fsspec 文件系统的其他组件一起使用。此处,前面代码块中提到的 Parquet 数据文件会传递给 PyArrow 的 read_table
方法:
import pyarrow.parquet as pq
table = pq.read_table(path, filesystem=sf_fs1)
table.take([1, 3])
接受文件(或类似文件的对象)的 Python 组件可以传递从 Snowflake 文件系统打开的文件对象。例如,如果暂存区中有一个 gzip 压缩文件,则可以通过将其作为 fileobj
参数传递给 gzip.GzipFile
来将其与 Python 的 gzip
模块一起使用:
path = "sfc://@ML_DATASETS.public.my_models/dataset.csv.gz"
with cached_fs.open(path, mode='rb', sf_connection=sf_connection) as f:
g = gzip.GzipFile(fileobj=f)
for i in range(3):
print(g.readline())
创建并使用 FileSet¶
Snowflake FileSet 表示内部服务器端加密暂存区中以文件形式呈现的 SQL 查询结果的不可变快照。可以通过 FileSystem 访问这些文件,将数据提供给 PyTorch 和 TensorFlow 等工具,以便您可以在现有数据治理模型内大规模训练模型。要创建 FileSet,请使用 FileSet.make
方法。
您需要 Snowflake Python 连接或 Snowpark 会话来创建 FileSet。 有关说明,请参阅 连接到 Snowflake。您还必须提供现有内部服务器端加密暂存区的路径,或此类暂存区下的子目录,FileSet 将存储于此。
要从 Snowpark DataFrame 创建 FileSet,请 构造 DataFrame,并将其作为 snowpark_dataframe
传递给 FileSet.make
;切勿调用 DataFrame 的 collect
方法:
# Snowpark Python equivalent of "SELECT * FROM MYDATA LIMIT 5000000"
df = snowpark_session.table('mydata').limit(5000000)
fileset_df = fileset.FileSet.make(
target_stage_loc="@ML_DATASETS.public.my_models/",
name="from_dataframe",
snowpark_dataframe=df,
shuffle=True,
)
要使用 Snowflake Connector for Python 连接创建 FileSet,请将该连接作为 sf_connection
传递给 Fileset.make
,并将 SQL 查询作为 query
进行传递。
fileset_sf = fileset.FileSet.make(
target_stage_loc="@ML_DATASETS.public.my_models/",
name="from_connector",
sf_connection=sf_connection,
query="SELECT * FROM MYDATA LIMIT 5000000",
shuffle=True, # see later section about shuffling
)
备注
有关使用 shuffle
参数混洗数据的信息,请参阅 在 FileSets 中混洗数据。
使用 files
方法获取 FileSet 中的文件列表:
print(*fileset_df.files())
有关将 FileSet 中的数据提供给 PyTorch 或 TensorFlow 的信息,请参阅 Snowpark ML Framework Connectors。