使用 Snowpark Connect for Spark 访问云服务文件数据¶
使用 Snowpark Connect for Spark,您可以直接与外部云存储系统交互,例如 Amazon S3、Google Cloud Storage 和 Azure Blob。您可以将数据从云存储读取到 Snowflake 中,处理这些数据,然后重新将其写入。
例如,您可能需要使用 Snowpark Connect for Spark 来执行以下任务:
引入原始数据。
在将文件(例如 CSV、JSON 和 Parquet)移入 Snowflake 之前,将其放在 S3、Google Cloud 或 Azure 中。
导出数据以供下游使用。
将处理过的 Snowpark DataFrames 写回云存储,用于 ML 训练、与外部合作伙伴共享或进行基于 Spark 的深入分析。
创建混合管道。
将部分管道保留在 Snowflake 中,同时保持与现有数据湖的兼容性。
遵守法规或降低成本。
由于法规、治理或预算限制,在外部存储特定数据集。
使用本主题中列出的步骤读取和写入存储在这些云服务提供商处的文件。您可以通过 Snowflake 外部暂存区或直接访问来访问文件。
注意事项¶
将 Snowpark Connect for Spark 与云服务结合使用时,请记住以下注意事项:
身份验证 - Snowpark Connect for Spark 不会自动管理云凭证。您必须配置访问密钥 (AWS)、存储账户密钥或 SAS 令牌 (Azure),或者自行维护外部暂存区。凭证过期或丢失将导致读/写失败。
性能 - 云 I/O 取决于网络带宽和对象存储延迟。读取许多小文件会显著影响性能。
格式支持 - 确保支持您正在读取和写入的文件格式。Snowpark Connect for Spark 目前采用常用的格式,包括 TEXT、CSV、JSON 和 Parquet。但是,高级功能(例如 Parquet 分区发现和 JSON 架构演进)可能与 Spark 不同。
权限和策略 - 写入云桶需要适当的 IAM/ACL 策略。如果 Snowflake 角色和云凭证之间的策略不一致,您可能会遇到 AccessDenied 错误。
最佳实践¶
要获得性能良好的最可靠集成,请遵循以下最佳实践:
使用安全的临时凭证并经常轮换凭证。
分区和桶数据。
编写 Parquet 时,对经常筛选的列进行分区以降低扫描成本。使用更少、更大的文件(例如,每个文件 100MB 到 500MB),而不是使用许多小文件。
写入时验证架构。
务必明确定义架构,尤其是对于半结构化格式,例如 JSON 和 CSV。这样可以防止 Snowflake 和外部数据之间出现漂移。
监控成本。
考虑在写入之前合并文件并筛选数据,以降低成本。云提供商的费用是按请求和扫描的字节累积的。
标准化 API 调用。
在使用功能和参数时,请严格遵循记录的指南,避免临时变化。通过这种方式,您可以保持兼容性,防止回归,并确保在不同云提供商处的行为符合预期。
使用 Snowflake 外部暂存区进行访问¶
配置对 Amazon S3 的安全访问,以创建一个指向您的 S3 位置的外部暂存区。
从外部暂存区读取。
# Read CSV spark.read.csv('@<your external stage name>/<file path>') spark.read.option("header", True).csv('@<your external stage name>/<file path>') # read with header in file # Write to CSV df.write.csv('@<your external stage name>/<file path>') df.write.option("header", True).csv('@<your external stage name>/<file path>') # write with header in file # Read Text spark.read.text('@<your external stage name>/<file path>') # Write to Text df.write.text('@<your external stage name>/<file path>') df.write.format("text").mode("overwrite").save('@<your external stage name>/<file path>') # Read Parquet spark.read.parquet('@<your external stage name>/<file path>') # Write to Parquet df.write.parquet('@<your external stage name>/<file path>') # Read JSON spark.read.json('@<your external stage name>/<file path>') # Write to JSON df.write.json('@<your external stage name>/<file path>')
配置对 Azure 的安全访问,以创建指向 Azure 容器的外部暂存区。
从外部暂存区读取。
# Read CSV spark.read.csv('@<your external stage name>/<file path>') spark.read.option("header", True).csv('@<your external stage name>/<file path>') # read with header in file # Write to CSV df.write.csv('@<your external stage name>/<file path>') df.write.option("header", True).csv('@<your external stage name>/<file path>') # write with header in file # Read Text spark.read.text('@<your external stage name>/<file path>') # Write to Text df.write.text('@<your external stage name>/<file path>') df.write.format("text").mode("overwrite").save('@<your external stage name>/<file path>') # Read Parquet spark.read.parquet('@<your external stage name>/<file path>') # Write to Parquet df.write.parquet('@<your external stage name>/<file path>') # Read JSON spark.read.json('@<your external stage name>/<file path>') # Write to JSON df.write.json('@<your external stage name>/<file path>')
配置对 Google Cloud 的安全访问,以创建一个指向您的 Google Cloud Storage 桶的外部暂存区。
从外部暂存区读取。
# Read CSV spark.read.csv('@<your external stage name>/<file path>') spark.read.option("header", True).csv('@<your external stage name>/<file path>') # read with header in file # Write to CSV df.write.csv('@<your external stage name>/<file path>') df.write.option("header", True).csv('@<your external stage name>/<file path>') # write with header in file # Read Text spark.read.text('@<your external stage name>/<file path>') # Write to Text df.write.text('@<your external stage name>/<file path>') df.write.format("text").mode("overwrite").save('@<your external stage name>/<file path>') # Read Parquet spark.read.parquet('@<your external stage name>/<file path>') # Write to Parquet df.write.parquet('@<your external stage name>/<file path>') # Read JSON spark.read.json('@<your external stage name>/<file path>') # Write to JSON df.write.json('@<your external stage name>/<file path>')
使用直接访问进行访问¶
您可以使用此处描述的步骤和代码直接访问在云服务提供商处存储的文件。
使用 AWS 凭证设置 Spark 配置。
# For S3 related access with public/private buckets, please add these config change spark.conf.set("spark.hadoop.fs.s3a.connection.ssl.enabled","false") spark.conf.set("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem") spark.conf.set("spark.jars.packages","org.apache.hadoop:hadoop-aws:3.3.2") # For private S3 access, please also provide credentials spark.conf.set("spark.hadoop.fs.s3a.access.key","<AWS_ACCESS_KEY_ID>") spark.conf.set("spark.hadoop.fs.s3a.secret.key","<AWS_SECRET_ACCESS_KEY>") spark.conf.set("spark.hadoop.fs.s3a.session.token","<AWS_SESSION_TOKEN>")
使用 S3 直接读取和写入。
# Read CSV spark.read.csv('s3a://<bucket name>/<file path>') spark.read.option("header", True).csv('s3a://<bucket name>/<file path>') # read with header in file # Write to CSV df.write.csv('s3a://<bucket name>/<file path>') df.write.option("header", True).csv('s3a://<bucket name>/<file path>') # write with header in file # Read Text spark.read.text('s3a://<bucket name>/<file path>') # Write to Text df.write.text('s3a://<bucket name>/<file path>') df.write.format("text").mode("overwrite").save('s3a://<bucket name>/<file path>') # Read Parquet spark.read.parquet('s3a://<bucket name>/<file path>') # Write to Parquet df.write.parquet('s3a://<bucket name>/<file path>') # Read JSON spark.read.json('s3a://<bucket name>/<file path>') # Write to JSON df.write.json('s3a://<bucket name>/<file path>')
使用 Azure 凭证设置 Spark 配置。
# For private Azure access, please also provide blob SAS token # * Make sure all required permissions are in place before proceeding spark.conf.set("fs.azure.sas.fixed.token.<storage-account>.dfs.core.windows.net","<Shared Access Token>")
使用 Azure 直接读取和写入。
# Read CSV spark.read.csv('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') spark.read.option("header", True).csv('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') # read with header in file # Write to CSV df.write.csv('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') df.write.option("header", True).csv('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') # write with header in file # Read Text spark.read.text('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') # Write to Text df.write.text('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') df.write.format("text").mode("overwrite").save('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') # Read Parquet spark.read.parquet('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') # Write to Parquet df.write.parquet('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') # Read JSON spark.read.json('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>') # Write to JSON df.write.json('wasbs://<container name>@<storage account name>.blob.core.windows.net/<bucket name>/<file path>')