Snowpark Submit 示例

本主题包括使用 Snowpark Submit 提交生产就绪 Spark 应用程序的示例。

编写并提交简单的 Spark 应用程序

以下示例演示如何编写和提交没有依赖项的简单 Spark 应用程序。

  1. 在本地 IDE 中,创建一个包含以下内容,名为 app.py 的新 Python 文件:

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, lit, upper, concat
    
    # Create Spark session
    spark = SparkSession.builder.appName("SimpleSession").getOrCreate()
    
    # Create a DataFrame from inline data
    data = [
        (1, "alice", "engineering", 95000),
        (2, "bob", "marketing", 72000),
        (3, "carol", "engineering", 105000),
        (4, "david", "sales", 68000),
        (5, "eva", "engineering", 88000),
    ]
    df = spark.createDataFrame(data, ["id", "name", "department", "salary"])
    
    # Add a new column
    df_with_bonus = df.withColumn("bonus", col("salary") * 0.1)
    df_with_bonus.show()
    
    # Filter and transform
    engineers = df.filter(col("department") == "engineering") \
        .withColumn("name_upper", upper(col("name"))) \
        .withColumn("greeting", concat(lit("Hello, "), col("name")))
    engineers.show()
    
    # Aggregate
    df.groupBy("department").avg("salary").show()
    
    # Stop the Spark session
    spark.stop()
    
    Copy
  2. 要提交应用程序,请使用以下命令:

    snowpark-submit \
      --snowflake-workload-name MY_JOB \
      --snowflake-connection-name MY_CONNECTION \
      /path/to/app.py
    
    Copy

    您可以使用用于等待作业完成的 --wait-for-completion 选项、用于检查作业状态的 --workload-status 选项,以及用于显示作业日志的 --display-logs 选项。有关选项的完整列表,请参阅 Snowpark Submit 参考

从 Snowflake 暂存区部署应用程序

如果应用程序有依赖项(例如,需要读取的文件),则可以从 Snowflake 暂存区部署它们。以下示例演示如何从 Snowflake 暂存区部署应用程序及其依赖项。

  1. 要从终端将文件上传到暂存区,您可以使用 SnowflakeCLI。请注意,SnowSQL 是旧版 CLI;如果您已经在使用它,也可以使用它来将文件上传到暂存区。如果您尚未安装 Snowflake CLI,可以按照 安装 Snowflake CLI 中的说明进行安装。

  2. 在本地 IDE 中,创建一个包含以下内容,名为 sample_employees.csv 的新 CSV 文件:

    employee_id,name,department,salary,years_employed
    1,Alice Johnson,Engineering,95000,5
    2,Bob Smith,Marketing,72000,3
    3,Carol Williams,Engineering,105000,8
    4,David Brown,Sales,68000,2
    5,Eva Martinez,Engineering,88000,4
    6,Frank Wilson,Marketing,75000,6
    7,Grace Lee,Sales,92000,7
    8,Henry Taylor,Engineering,110000,10
    9,Ivy Chen,Marketing,65000,1
    10,Jack Davis,Sales,78000,4
    11,Karen White,Engineering,98000,6
    12,Leo Harris,Marketing,71000,3
    13,Maria Garcia,Sales,85000,5
    14,Nathan Clark,Engineering,102000,9
    15,Olivia Moore,Marketing,69000,2
    
    Copy

    使用以下命令将依赖项文件上传到暂存区,其中 my_stage 是账户中暂存区的名称。(如果您没有创建暂存区,可以使用 [snow stage create](/developer-guide/snowflake-cli/command-reference/stage-commands/create)。)

    snow stage copy sample_employees.csv @<database>.<schema>.<stage>/sample_employees.csv -c MY_CONNECTION
    
    Copy

    要验证文件是否上传成功,可以使用以下命令列出暂存区中的文件:

    snow sql -c MY_CONNECTION -q "ls @<database>.<schema>.<stage>"
    
    Copy

    您应该会在列表中看到文件 sample_employees.csv

  3. 在本地 IDE 中,创建一个包含以下内容,名为 app.py 的新 Python 文件:

    from pyspark.sql import SparkSession
    
    # Create Spark session
    spark = SparkSession.builder.appName("SimpleStageExample").getOrCreate()
    
    # Load data from stage (adjust stage name to match yours)
    df = spark.read.csv("/app/<YOUR_STAGE>/sample_employees.csv", header=True, inferSchema=True)
    df.show()
    
    # Filter: Engineering department only
    engineers = df.filter(df["department"] == "Engineering")
    engineers.show()
    
    # Filter: Salary > 80000 and years_employed > 3
    senior_high_earners = df.filter((df["salary"] > 80000) & (df["years_employed"] > 3))
    senior_high_earners.show()
    
    # Aggregate: Average salary by department
    df.groupBy("department").avg("salary").show()
    
    # Select specific columns
    result = senior_high_earners.select("name", "department", "salary")
    result.show()
    
    # Stop the Spark session
    spark.stop()
    
    Copy

    要提交使用您上传到暂存区的文件的应用程序,请使用以下命令:

    snowpark-submit \
      --snowflake-connection-name MY_CONNECTION \
      --snowflake-workload-name MY_JOB \
      --snowflake-stage @<database>.<schema>.<stage> \
      /path/to/app.py
    
    Copy

    请注意,运行应用程序需要计算池,并且必须在 connections.toml 文件中指定,或通过命令行使用 --compute-pool 选项指定。有关更多信息,请参阅 Snowpark Submit 参考

利用等待和日志进行监控

以下示例说明如何提交作业、等待作业完成,然后检索日志。

  1. 使用以下命令提交作业,并等待其完成:

    snowpark-submit \
      --snowflake-workload-name MY_JOB \
      --wait-for-completion \
      --snowflake-connection-name MY_CONNECTION \
      /path/to/app.py
    
    Copy
  2. 如果作业失败,请使用以下命令检查详细日志:

    snowpark-submit
      --snowflake-workload-name MY_JOB \
      --workload-status \
      --display-logs \
      --snowflake-connection-name MY_CONNECTION
    
    Copy

在 Apache Airflow DAG 中使用 Snowpark Submit

您可以通过 Snowpark Connect for Spark 向 Snowflake 提交 Spark 作业。您可以在集群模式下使用 snowpark-submit,利用计算池来运行作业。

以这种方式使用 Apache Airflow 时,请确保运行 Apache Airflow 的 Docker 服务或 Snowpark Container Services 容器能够正确访问 Snowflake 和 Snowflake 暂存区所需的文件。

以下示例中的代码执行以下任务:

  • /tmp/myenv 处创建 Python 虚拟环境。

    create_venv 任务中,代码使用 pip,从而使用 .whl 文件安装 snowpark-submit 包。

  • 生成一个包含 Snowflake 连接凭据和 OAuth 令牌的安全 connections.toml 文件。

    create_connections_toml 任务中,代码会创建 /app/.snowflake 目录、创建 .toml 文件,然后更改文件权限,以仅允许所有者(用户)具备读写权限。

  • 使用 snowpark-submit 命令运行 Spark 作业。

    run_snowpark_script 作业中,代码执行以下操作:

    • 激活虚拟环境。

    • 使用 snowpark-submit 命令运行 Spark 作业。

    • 使用集群模式部署到 Snowflake。

    • 使用 Snowpark Connect for Spark 远程 URI sc://localhost:15002。

    • 指定 Spark 应用程序类 org.example.SnowparkConnectApp

    • 从 @snowflake_stage 暂存区提取脚本。

    • 使用 --wait-for-completion 阻止部署,直到作业完成。

import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
from airflow.operators.trigger_dagrun import TriggerDagRunOperator


default_args = {
  'start_date': airflow.utils.dates.days_ago(1),
  'retries': 0,
}


with DAG(
  'run_sparkconnect_python_script',
  default_args=default_args,
  schedule_interval=None,
  catchup=False,
) as dag:

  create_venv = BashOperator(
      task_id='create_venv',
      bash_command="""
      python3 -m venv /tmp/myenv &&
      source /tmp/myenv/bin/activate &&
      export PIP_USER=false &&
      pip install --upgrade pip &&
      pip install --no-cache-dir grpcio-tools>=1.48.1 &&
      pip install /app/snowpark_submit-<version>.whl
      """
  )

  create_connections_toml = BashOperator(
      task_id='create_connections_toml',
      bash_command="""
      mkdir -p /app/.snowflake
      echo "${SNOWFLAKE_USER}"
      cat <<EOF > /app/.snowflake/connections.toml

[snowpark-submit]
host = "${SNOWFLAKE_HOST}"
port = "${SNOWFLAKE_PORT}"
protocol = "https"
account = "${SNOWFLAKE_ACCOUNT}"
authenticator = "oauth"
token = "$(cat /snowflake/session/token)"
warehouse = "airflow_wh"
database = "${SNOWFLAKE_DATABASE}"
schema = "${SNOWFLAKE_SCHEMA}"
client_session_keep_alive = true
EOF
  chmod 600 /app/.snowflake/connections.toml
  """
  )

  run_script = BashOperator(
      task_id='run_snowpark_script',
      bash_command="""
      set -e
      echo "Using SNOWFLAKE_HOME: $SNOWFLAKE_HOME"

      echo "Running Python script with Snowpark..."
      source /tmp/myenv/bin/activate &&
      snowpark-submit --deploy-mode cluster --class org.example.SnowparkConnectApp --compute-pool="snowparksubmit" --snowflake-workload-name="spcstest" --snowflake-stage="@AIRFLOW_APP_FILES" --wait-for-completion "@AIRFLOW_APP_FILES/transformation.py" --snowflake-connection-name snowpark-submit
      """,
      env={
          'SNOWFLAKE_HOME': '/app/.snowflake'
      }
  )

create_venv >> create_connections_toml >> run_script
Copy

您可以使用 Apache Airflow 用户界面的图表视图或树视图监控 DAG。检查任务日志中是否有以下项目:

  • 环境设置

  • Snowpark Connect for Spark 的状态

  • snowpark-submit 作业输出

您还可以通过存储在 Snowflake 暂存区中的日志或者是事件表,监控在 Snowflake 中运行的作业。