从 VS Code、Jupyter Notebook 或终端运行 Spark 工作负载

你可以通过 Jupyter 笔记本、VS Code 或任何基于 Python 的界面,以交互方式运行 Spark 工作负载,而无需管理 Spark 集群。工作负载会在 Snowflake 基础设施上运行。

例如,您可以执行以下任务:

  1. 确认您符合先决条件。

  2. 在 Snowflake 上设置环境以连接 Snowpark Connect for Spark。

  3. 安装 Snowpark Connect for Spark。

  4. 从您的客户端运行 PySpark 代码,以在 Snowflake 上运行。

先决条件

确认您的 Python 和 Java 安装基于相同的计算机架构。例如,如果 Python 基于 arm64,则 Java 也必须基于 arm64(比如,并非基于 x86_64)。

设置环境

您可以确保代码可以连接到 Snowflake 上的 Snowpark Connect for Spark,通过这种方式来设置开发环境。要连接到 Snowflake 客户端代码,将使用包含连接详细信息的 .toml 文件。

如已安装 Snowflake CLI,则可以使用它来定义连接。否则,可以手动将连接参数写入 config.toml 文件。

使用 Snowflake CLI 添加连接

您可以使用 Snowflake CLI,添加 Snowpark Connect for Spark 可用于连接到 Snowflake 的连接属性。您的更改将保存到 config.toml 文件。

  1. 使用 snow connection add 命令,运行以下命令来添加连接。

    snow connection add
    
    Copy
  2. 按照提示以定义连接。

    请务必将 spark-connect 指定为连接名称。

    此命令会向您的 config.toml 文件添加连接,如以下示例所示:

    [connections.spark-connect]
    host = "example.snowflakecomputing.cn"
    port = 443
    account = "example"
    user = "test_example"
    password = "password"
    protocol = "https"
    warehouse = "example_wh"
    database = "example_db"
    schema = "public"
    
    Copy
  3. 运行以下命令,以确认连接正常。

    使用 Snowflake CLI 添加连接后,可以通过这种方式测试连接。

    snow connection list
    snow connection test --connection spark-connect
    
    Copy

通过手动写入连接文件来添加连接

您可以手动编写或更新 connections.toml 文件,让代码能够连接到 Snowflake Snowpark Connect for Spark。

  1. 运行以下命令,确保您的 connections.toml 文件仅允许所有者(用户)具备读写权限。

    chmod 0600 "~/.snowflake/connections.toml"
    
    Copy
  2. 编辑 connections.toml 文件,使其包含具有以下示例中所示连接属性的 [spark-connect] 连接。

    请务必使用您自己的连接细节取代其中的值。

    [spark-connect]
    host="my_snowflake_account.snowflakecomputing.cn"
    account="my_snowflake_account"
    user="my_user"
    password="&&&&&&&&"
    warehouse="my_wh"
    database="my_db"
    schema="public"
    
    Copy

安装 Snowpark Connect for Spark

您可以将 Snowpark Connect for Spark 作为 Python 包进行安装。

  1. 创建 Python 虚拟环境。

    通过运行 python3 --version 确认您的 Python 版本为 3.10 或更高,但低于 3.13。

    python3 -m venv .venv
    source .venv/bin/activate
    
    Copy
  2. 安装 Snowpark Connect for Spark 包。

    pip install --upgrade --force-reinstall 'snowpark-connect[jdk]'
    
    Copy
  3. 添加 Python 代码以启动 Snowpark Connect for Spark 服务器并创建 Snowpark Connect for Spark 会话。

    from snowflake import snowflake.snowpark_connect
    
    # Import snowpark_connect *before* importing pyspark libraries
    from pyspark.sql.types import Row
    
    spark = snowflake.snowpark_connect.server.init_spark_session()
    
    Copy

Run Python code from your client

Once you have an authenticated connection in place, you can write code as you normally would.

您可以使用 PySpark 客户端库运行连接到 Snowpark Connect for Spark 的 PySpark 代码。

from pyspark.sql import Row

  df = spark.createDataFrame([
      Row(a=1, b=2.),
      Row(a=2, b=3.),
      Row(a=4, b=5.),])

  print(df.count())
Copy

Run Scala code from your client

You can run Scala applications that connect to Snowpark Connect for Spark by using the Spark Connect client library.

This guide walks you through setting up Snowpark Connect and connecting your Scala applications to the Snowpark Connect for Spark server.

Step 1: Set up your Snowpark Connect for Spark environment

使用以下主题中描述的步骤设置环境:

  1. 创建 Python 虚拟环境并安装 Snowpark Connect

  2. Set up a connection.

Step 2: Create a Snowpark Connect for Spark server script and launch the server

  1. Create a Python script to launch the Snowpark Connect for Spark server.

    # launch-snowpark-connect.py
    
    from snowflake import snowpark_connect
    
    def main():
        snowpark_connect.start_session(is_daemon=False, remote_url="sc://localhost:15002")
        print("SAS started on port 15002")
    
    if __name__ == "__main__":
        main()
    
    Copy
  2. Launch the Snowpark Connect for Spark server.

    # Make sure you're in the correct Python environment
    pyenv activate your-snowpark-connect-env
    
    # Run the server script
    python launch-snowpark-connect.py
    
    Copy

Step 3: Set up your Scala application

  1. Add the Spark Connect client dependency to your build.sbt file.

    libraryDependencies += "org.apache.spark" %% "spark-connect-client-jvm" % "3.5.3"
    
    // Add JVM options for Java 9+ module system compatibility
    javaOptions ++= Seq(
      "--add-opens=java.base/java.nio=ALL-UNNAMED"
    )
    
    Copy
  2. Execute Scala code to connect to the Snowpark Connect for Spark server.

    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.connect.client.REPLClassDirMonitor
    
    object SnowparkConnectExample {
      def main(args: Array[String]): Unit = {
        // Create Spark session with Snowpark Connect
        val spark = SparkSession.builder().remote("sc://localhost:15002").getOrCreate()
    
        // Register ClassFinder for UDF support (if needed)
        // val classFinder = new REPLClassDirMonitor("target/scala-2.12/classes")
        // spark.registerClassFinder(classFinder)
    
        try {
          // Simple DataFrame operations
          import spark.implicits._
    
          val data = Seq(
            (1, "Alice", 25),
            (2, "Bob", 30),
            (3, "Charlie", 35)
          )
    
          val df = spark.createDataFrame(data).toDF("id", "name", "age")
    
          println("Original DataFrame:")
          df.show()
    
          println("Filtered DataFrame (age > 28):")
          df.filter($"age" > 28).show()
    
          println("Aggregated result:")
          df.groupBy().avg("age").show()
    
        } finally {
          spark.stop()
        }
      }
    }
    
    Copy
  3. 编译并运行应用程序。

    # Compile your Scala application
    sbt compile
    
    # Run the application
    sbt "runMain SnowparkConnectExample"
    
    Copy

Scala UDF support on Snowpark Connect for Spark

When using user-defined functions or custom code, do one of the following:

  • Register a class finder to monitor and upload class files.

    import org.apache.spark.sql.connect.client.REPLClassDirMonitor
    
    val classFinder = new REPLClassDirMonitor("/absolute/path/to/target/scala-2.12/classes")
    spark.registerClassFinder(classFinder)
    
    Copy
  • 上传 JAR 依赖项(如果需要)。

    spark.addArtifact("/absolute/path/to/dependency.jar")
    
    Copy

Snowpark Connect for Spark 安装故障排除

With the following list of checks, you can troubleshoot Snowpark Connect for Spark installation and use.

  • 确保 Java 和 Python 基于相同的架构

  • Use the most recent Snowpark Connect for Spark package file, as described in 安装 Snowpark Connect for Spark.

  • 确认带有 PySpark 代码的 python 命令在本地执行(即无 Snowflake 连接)中可以正常发挥作用。

    例如,执行如下命令:

    python your_pyspark_file.py
    
    Copy

开源客户端

您可以使用标准的现成开源软件 (OSS) Spark 客户端包(例如 PySpark 以及适用于 Java 或 Scala 的 Spark 客户端),从您偏好的本地环境中调用,包括 Jupyter Notebook 和 VS Code。通过这种方式,您可以避免安装特定于 Snowflake 的包。

如果您希望在本地编写 Spark 代码,并让该代码使用 Snowflake 的计算资源和企业级治理功能,您可能会觉得这很有用。在此场景中,您通过编程访问令牌 (PATs) 进行身份验证和授权。

以下部分介绍安装、配置和身份验证。您还会找到一个简单的 PySpark 示例,用于验证您的连接。

第 1 步:安装所需的包

  • 安装 pyspark。您无需安装任何 Snowflake 包。

    pip install "pyspark[connect]>=3.5.0,<4"
    
    Copy

Step 2: Setup and Authentication

  1. 生成编程访问令牌 (PAT)。

    有关详细信息,请参阅以下主题:

    以下示例为用户 sysadmin 添加一个名为 TEST_PAT 的 PAT,并将过期时间设置为 30 天。

    ALTER USER add PAT TEST_PAT ROLE_RESTRICTION = sysadmin DAYS_TO_EXPIRY = 30;
    
    Copy
  2. 查找 Snowflake Spark Connect 主机 URL。

    在 Snowflake 中运行以下 SQL 以查找您账户的主机名:

    SELECT t.VALUE:type::VARCHAR as type,
           t.VALUE:host::VARCHAR as host,
           t.VALUE:port as port
      FROM TABLE(FLATTEN(input => PARSE_JSON(SYSTEM$ALLOWLIST()))) AS t where type = 'SNOWPARK_CONNECT';
    
    Copy

Step 3: Connect to Spark Connect server

  • 要连接到 Spark Connect 服务器,请使用如下代码:

    from pyspark.sql import SparkSession
    import urllib.parse
    
    # Replace with your actual PAT.
    pat = urllib.parse.quote("<pat>", safe="")
    
    # Replace with your Snowpark Connect host from the above SQL query.
    snowpark_connect_host = ""
    
    # Define database/schema/warehouse for executing your Spark session in Snowflake (recommended); otherwise, it will be resolved from your default_namespace and default_warehouse
    
    db_name = urllib.parse.quote("TESTDB", safe="")
    schema_name = urllib.parse.quote("TESTSCHEMA", safe="")
    warehouse_name = urllib.parse.quote("TESTWH", safe="")
    
    spark = SparkSession.builder.remote(f"sc://{snowpark_connect_host}/;token={pat};token_type=PAT;database={db_name};schema={schema_name};warehouse={warehouse_name}").getOrCreate()
    
    # Spark session is ready to use. You can write regular Spark DataFrame code, as in the following example:
    
    from pyspark.sql import Row
    
    df = spark.createDataFrame([
        Row(a=1, b=2.),
        Row(a=2, b=3.),
        Row(a=4, b=5.),])
    print(df.count())
    
    Copy
语言: 中文