从 VS Code、Jupyter Notebook 或终端运行 Spark 工作负载¶
你可以通过 Jupyter 笔记本、VS Code 或任何基于 Python 的界面,以交互方式运行 Spark 工作负载,而无需管理 Spark 集群。工作负载会在 Snowflake 基础设施上运行。
例如,您可以执行以下任务:
确认您符合先决条件。
在 Snowflake 上设置环境以连接 Snowpark Connect for Spark。
安装 Snowpark Connect for Spark。
从您的客户端运行 PySpark 代码,以在 Snowflake 上运行。
先决条件¶
确认您的 Python 和 Java 安装基于相同的计算机架构。例如,如果 Python 基于 arm64,则 Java 也必须基于 arm64(比如,并非基于 x86_64)。
设置环境¶
您可以确保代码可以连接到 Snowflake 上的 Snowpark Connect for Spark,通过这种方式来设置开发环境。要连接到 Snowflake 客户端代码,将使用包含连接详细信息的 .toml 文件。
如已安装 Snowflake CLI,则可以使用它来定义连接。否则,可以手动将连接参数写入 config.toml 文件。
使用 Snowflake CLI 添加连接¶
您可以使用 Snowflake CLI,添加 Snowpark Connect for Spark 可用于连接到 Snowflake 的连接属性。您的更改将保存到 config.toml 文件。
使用 snow connection add 命令,运行以下命令来添加连接。
snow connection add
按照提示以定义连接。
请务必将
spark-connect指定为连接名称。此命令会向您的
config.toml文件添加连接,如以下示例所示:[connections.spark-connect] host = "example.snowflakecomputing.cn" port = 443 account = "example" user = "test_example" password = "password" protocol = "https" warehouse = "example_wh" database = "example_db" schema = "public"
运行以下命令,以确认连接正常。
使用 Snowflake CLI 添加连接后,可以通过这种方式测试连接。
snow connection list snow connection test --connection spark-connect
通过手动写入连接文件来添加连接¶
您可以手动编写或更新 connections.toml 文件,让代码能够连接到 Snowflake Snowpark Connect for Spark。
运行以下命令,确保您的
connections.toml文件仅允许所有者(用户)具备读写权限。chmod 0600 "~/.snowflake/connections.toml"
编辑
connections.toml文件,使其包含具有以下示例中所示连接属性的[spark-connect]连接。请务必使用您自己的连接细节取代其中的值。
[spark-connect] host="my_snowflake_account.snowflakecomputing.cn" account="my_snowflake_account" user="my_user" password="&&&&&&&&" warehouse="my_wh" database="my_db" schema="public"
安装 Snowpark Connect for Spark¶
您可以将 Snowpark Connect for Spark 作为 Python 包进行安装。
创建 Python 虚拟环境。
通过运行
python3 --version确认您的 Python 版本为 3.10 或更高,但低于 3.13。python3 -m venv .venv source .venv/bin/activate
安装 Snowpark Connect for Spark 包。
pip install --upgrade --force-reinstall 'snowpark-connect[jdk]'
添加 Python 代码以启动 Snowpark Connect for Spark 服务器并创建 Snowpark Connect for Spark 会话。
from snowflake import snowpark_connect spark=snowpark_connect.init_spark_session()
从客户端运行 Python 代码¶
建立经过身份验证的连接之后,即可照常编写代码。
您可以使用 PySpark 客户端库运行连接到 Snowpark Connect for Spark 的 PySpark 代码。
# Row is imported in the previous code snippet
df = spark.createDataFrame([
Row(a=1, b=2.),
Row(a=2, b=3.),
Row(a=4, b=5.),])
print(df.count())
从客户端运行 Scala 代码¶
您可使用 Spark Connect 客户端库运行连接至 Snowpark Connect for Spark 的 Scala 应用程序。
本指南将引导您设置 Snowpark Connect,并将 Scala 应用程序连接到 Snowpark Connect for Spark 服务器。
第 1 步:设置 Snowpark Connect for Spark 环境¶
使用以下主题中描述的步骤设置环境:
第 2 步:创建 Snowpark Connect for Spark 服务器脚本并启动服务器¶
创建 Python 脚本以启动 Snowpark Connect for Spark 服务器。
# launch-snowpark-connect.py from snowflake import snowpark_connect def main(): snowpark_connect.start_session(is_daemon=False, remote_url="sc://localhost:15002") print("SAS started on port 15002") if __name__ == "__main__": main()
启动 Snowpark Connect for Spark 服务器。
# Make sure you're in the correct Python environment pyenv activate your-snowpark-connect-env # Run the server script python launch-snowpark-connect.py
第 3 步:设置 Scala 应用程序¶
将 Spark Connect 客户端依赖项添加到 build.sbt 文件中。
libraryDependencies += "org.apache.spark" %% "spark-connect-client-jvm" % "3.5.6" // Add JVM options for Java 9+ module system compatibility javaOptions ++= Seq( "--add-opens=java.base/java.nio=ALL-UNNAMED" )
执行 Scala 代码以连接到 Snowpark Connect for Spark 服务器。
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connect.client.REPLClassDirMonitor object SnowparkConnectExample { def main(args: Array[String]): Unit = { // Create Spark session with Snowpark Connect val spark = SparkSession.builder().remote("sc://localhost:15002").getOrCreate() // Register ClassFinder for UDF support (if needed) // val classFinder = new REPLClassDirMonitor("target/scala-2.12/classes") // spark.registerClassFinder(classFinder) try { // Simple DataFrame operations import spark.implicits._ val data = Seq( (1, "Alice", 25), (2, "Bob", 30), (3, "Charlie", 35) ) val df = spark.createDataFrame(data).toDF("id", "name", "age") println("Original DataFrame:") df.show() println("Filtered DataFrame (age > 28):") df.filter($"age" > 28).show() println("Aggregated result:") df.groupBy().avg("age").show() } finally { spark.stop() } } }
编译并运行应用程序。
# Compile your Scala application sbt compile # Run the application sbt "runMain SnowparkConnectExample"
Snowpark Connect for Spark 上的 Scala UDF 支持¶
使用用户定义的函数或自定义代码时,请执行以下操作之一:
注册类查找器以监控和上传类文件。
import org.apache.spark.sql.connect.client.REPLClassDirMonitor val classFinder = new REPLClassDirMonitor("/absolute/path/to/target/scala-2.12/classes") spark.registerClassFinder(classFinder)
上传 JAR 依赖项(如果需要)。如果未使用类查找器,您可以包含工作负载 JAR 本身。
spark.addArtifact("/absolute/path/to/dependency.jar")
使用暂存的 JAR。
spark.conf.set("snowpark.connect.udf.java.imports", "[@mystage/dependency.jar, @db.schema.stage/other_dependency.jar]")
使用 Scala 2.13¶
默认情况下,Snowpark Connect for Spark 使用 Scala 2.12。使用 Scala 2.13 构建的工作负载必须使用“snowpark.connect.scala.version”配置选项指定 Scala 版本。
// Directly in the session builder
val spark = SparkSession.builder()
.remote("sc://localhost:15002")
.config("snowpark.connect.scala.version", "2.13")
.getOrCreate()
// Or via session configuration
spark.conf.set("snowpark.connect.scala.version", "2.13")
Snowpark Connect for Spark 安装故障排除¶
借助以下检查列表,您可以对 Snowpark Connect for Spark 的安装和使用进行故障排除。
确保 Java 和 Python 基于相同的架构。
使用最新的 Snowpark Connect for Spark 包文件,如 安装 Snowpark Connect for Spark 中所述。
确认带有 PySpark 代码的 python 命令在本地执行(即无 Snowflake 连接)中可以正常发挥作用。
例如,执行如下命令:
python your_pyspark_file.py
开源客户端¶
您可以使用标准的现成开源软件 (OSS) Spark 客户端包(例如 PySpark 以及适用于 Java 或 Scala 的 Spark 客户端),从您偏好的本地环境中调用,包括 Jupyter Notebook 和 VS Code。通过这种方式,您可以避免安装特定于 Snowflake 的包。
如果您希望在本地编写 Spark 代码,并让该代码使用 Snowflake 的计算资源和企业级治理功能,您可能会觉得这很有用。在此场景中,您通过编程访问令牌 (PATs) 进行身份验证和授权。
以下部分介绍安装、配置和身份验证。您还会找到一个简单的 PySpark 示例,用于验证您的连接。
第 1 步:安装所需的包¶
安装
pyspark。您无需安装任何 Snowflake 包。pip install "pyspark[connect]>=3.5.0,<4"
第 2 步:设置和身份验证¶
生成编程访问令牌 (PAT)。
有关详细信息,请参阅以下主题:
以下示例为用户
sysadmin添加一个名为TEST_PAT的 PAT,并将过期时间设置为 30 天。ALTER USER add PAT TEST_PAT ROLE_RESTRICTION = sysadmin DAYS_TO_EXPIRY = 30;
查找 Snowflake Spark Connect 主机 URL。
在 Snowflake 中运行以下 SQL 以查找您账户的主机名:
SELECT t.VALUE:type::VARCHAR as type, t.VALUE:host::VARCHAR as host, t.VALUE:port as port FROM TABLE(FLATTEN(input => PARSE_JSON(SYSTEM$ALLOWLIST()))) AS t where type = 'SNOWPARK_CONNECT';
第 3 步:连接到 Spark Connect 服务器¶
要连接到 Spark Connect 服务器,请使用如下代码:
from pyspark.sql import SparkSession import urllib.parse # Replace with your actual PAT. pat = urllib.parse.quote("<pat>", safe="") # Replace with your Snowpark Connect host from the above SQL query. snowpark_connect_host = "" # Define database/schema/warehouse for executing your Spark session in Snowflake (recommended); otherwise, it will be resolved from your default_namespace and default_warehouse db_name = urllib.parse.quote("TESTDB", safe="") schema_name = urllib.parse.quote("TESTSCHEMA", safe="") warehouse_name = urllib.parse.quote("TESTWH", safe="") spark = SparkSession.builder.remote(f"sc://{snowpark_connect_host}/;token={pat};token_type=PAT;database={db_name};schema={schema_name};warehouse={warehouse_name}").getOrCreate() # Spark session is ready to use. You can write regular Spark DataFrame code, as in the following example: from pyspark.sql import Row df = spark.createDataFrame([ Row(a=1, b=2.), Row(a=2, b=3.), Row(a=4, b=5.),]) print(df.count())