从第三方客户端运行 Spark 工作负载

你可以通过 Jupyter 笔记本、VS Code 或任何基于 Python 的界面,以交互方式运行 Spark 工作负载,而无需管理 Spark 集群。工作负载会在 Snowflake 基础设施上运行。

例如,您可以执行以下任务:

  1. 确认您符合先决条件。

  2. 在 Snowflake 上设置环境以连接 Snowpark Connect for Spark。

  3. 安装 Snowpark Connect for Spark。

  4. 从您的客户端运行 PySpark 代码,以在 Snowflake 上运行。

先决条件

确认您的 Python 和 Java 安装基于相同的计算机架构。例如,如果 Python 基于 arm64,则 Java 也必须基于 arm64(比如,并非基于 x86_64)。

设置环境

您可以确保代码可以连接到 Snowflake 上的 Snowpark Connect for Spark,通过这种方式来设置开发环境。要连接到 Snowflake 客户端代码,将使用包含连接详细信息的 .toml 文件。

如已安装 Snowflake CLI,则可以使用它来定义连接。否则,可以手动将连接参数写入 config.toml 文件。

使用 Snowflake CLI 添加连接

您可以使用 Snowflake CLI,添加 Snowpark Connect for Spark 可用于连接到 Snowflake 的连接属性。您的更改将保存到 config.toml 文件。

  1. 使用 snow connection add 命令,运行以下命令来添加连接。

    snow connection add
    
    Copy
  2. 按照提示以定义连接。

    请务必将 spark-connect 指定为连接名称。

    此命令会向您的 config.toml 文件添加连接,如以下示例所示:

    [connections.spark-connect]
    host = "example.snowflakecomputing.cn"
    port = 443
    account = "example"
    user = "test_example"
    password = "password"
    protocol = "https"
    warehouse = "example_wh"
    database = "example_db"
    schema = "public"
    
    Copy
  3. 运行以下命令,以确认连接正常。

    使用 Snowflake CLI 添加连接后,可以通过这种方式测试连接。

    snow connection list
    snow connection test --connection spark-connect
    
    Copy

通过手动写入连接文件来添加连接

您可以手动编写或更新 connections.toml 文件,让代码能够连接到 Snowflake Snowpark Connect for Spark。

  1. 运行以下命令,确保您的 connections.toml 文件仅允许所有者(用户)具备读写权限。

    chmod 0600 "~/.snowflake/connections.toml"
    
    Copy
  2. 编辑 connections.toml 文件,使其包含具有以下示例中所示连接属性的 [spark-connect] 连接。

    请务必使用您自己的连接细节取代其中的值。

    [spark-connect]
    host="my_snowflake_account.snowflakecomputing.cn"
    account="my_snowflake_account"
    user="my_user"
    password="&&&&&&&&"
    warehouse="my_wh"
    database="my_db"
    schema="public"
    
    Copy

安装 Snowpark Connect for Spark

您可以将 Snowpark Connect for Spark 作为 Python 包进行安装。

  1. 创建 Python 虚拟环境。

    例如,您可以使用 Conda,如以下示例所示。

    conda create -n xxxx pip python=3.12
    conda activate xxxx
    
    Copy
  2. 安装 Snowpark Connect for Spark 包。

    pip install --upgrade --force-reinstall snowpark-connect
    
    Copy
  3. 添加 Python 代码以启动 Snowpark Connect for Spark 服务器并创建 Snowpark Connect for Spark 会话。

    import os
    import snowflake.snowpark
    from snowflake import snowpark_connect
    # Import snowpark_connect before importing pyspark libraries
    from pyspark.sql.types import Row
    
    os.environ["SPARK_CONNECT_MODE_ENABLED"] = "1"
    snowpark_connect.start_session()  # Start the local Snowpark Connect for Spark session
    spark = snowpark_connect.get_session()
    
    Copy

Run Python code from your client

Once you have an authenticated connection in place, you can write code as you normally would.

您可以使用 PySpark 客户端库运行连接到 Snowpark Connect for Spark 的 PySpark 代码。

from pyspark.sql import Row

  df = spark.createDataFrame([
      Row(a=1, b=2.),
      Row(a=2, b=3.),
      Row(a=4, b=5.),])

  print(df.count())
Copy

Run Scala code from your client

You can run Scala applications that connect to Snowpark Connect for Spark by using the Spark Connect client library.

This guide walks you through setting up Snowpark Connect and connecting your Scala applications to the Snowpark Connect for Spark server.

Step 1: Set up your Snowpark Connect for Spark environment

使用以下主题中描述的步骤设置环境:

  1. 创建 Python 虚拟环境并安装 Snowpark Connect

  2. Set up a connection.

Step 2: Create a Snowpark Connect for Spark server script and launch the server

  1. Create a Python script to launch the Snowpark Connect for Spark server.

    # launch-snowpark-connect.py
    
    from snowflake import snowpark_connect
    
    def main():
        snowpark_connect.start_session(is_daemon=False, remote_url="sc://localhost:15002")
        print("SAS started on port 15002")
    
    if __name__ == "__main__":
        main()
    
    Copy
  2. Launch the Snowpark Connect for Spark server.

    # Make sure you're in the correct Python environment
    pyenv activate your-snowpark-connect-env
    
    # Run the server script
    python launch-snowpark-connect.py
    
    Copy

Step 3: Set up your Scala application

  1. Add the Spark Connect client dependency to your build.sbt file.

    libraryDependencies += "org.apache.spark" %% "spark-connect-client-jvm" % "3.5.3"
    
    // Add JVM options for Java 9+ module system compatibility
    javaOptions ++= Seq(
      "--add-opens=java.base/java.nio=ALL-UNNAMED"
    )
    
    Copy
  2. Execute Scala code to connect to the Snowpark Connect for Spark server.

    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.connect.client.REPLClassDirMonitor
    
    object SnowparkConnectExample {
      def main(args: Array[String]): Unit = {
        // Create Spark session with Snowpark Connect
        val spark = SparkSession.builder().remote("sc://localhost:15002").getOrCreate()
    
        // Register ClassFinder for UDF support (if needed)
        // val classFinder = new REPLClassDirMonitor("target/scala-2.12/classes")
        // spark.registerClassFinder(classFinder)
    
        try {
          // Simple DataFrame operations
          import spark.implicits._
    
          val data = Seq(
            (1, "Alice", 25),
            (2, "Bob", 30),
            (3, "Charlie", 35)
          )
    
          val df = spark.createDataFrame(data).toDF("id", "name", "age")
    
          println("Original DataFrame:")
          df.show()
    
          println("Filtered DataFrame (age > 28):")
          df.filter($"age" > 28).show()
    
          println("Aggregated result:")
          df.groupBy().avg("age").show()
    
        } finally {
          spark.stop()
        }
      }
    }
    
    Copy
  3. 编译并运行应用程序。

    # Compile your Scala application
    sbt compile
    
    # Run the application
    sbt "runMain SnowparkConnectExample"
    
    Copy

Scala UDF support on Snowpark Connect for Spark

When using user-defined functions or custom code, do one of the following:

  • Register a class finder to monitor and upload class files.

    import org.apache.spark.sql.connect.client.REPLClassDirMonitor
    
    val classFinder = new REPLClassDirMonitor("/absolute/path/to/target/scala-2.12/classes")
    spark.registerClassFinder(classFinder)
    
    Copy
  • 上传 JAR 依赖项(如果需要)。

    spark.addArtifact("/absolute/path/to/dependency.jar")
    
    Copy

Snowpark Connect for Spark 安装故障排除

With the following list of checks, you can troubleshoot Snowpark Connect for Spark installation and use.

  • 确保 Java 和 Python 基于相同的架构

  • Use the most recent Snowpark Connect for Spark package file, as described in 安装 Snowpark Connect for Spark.

  • 确认带有 PySpark 代码的 python 命令在本地执行(即无 Snowflake 连接)中可以正常发挥作用。

    例如,执行如下命令:

    python your_pyspark_file.py
    
    Copy
语言: 中文