Snowpark Submit 示例¶
本主题包括使用 Snowpark Submit 提交生产就绪 Spark 应用程序的示例。
编写并提交简单的 Spark 应用程序¶
以下示例演示如何编写和提交没有依赖项的简单 Spark 应用程序。
在本地 IDE 中,创建一个包含以下内容,名为
app.py的新 Python 文件:from pyspark.sql import SparkSession from pyspark.sql.functions import col, lit, upper, concat # Create Spark session spark = SparkSession.builder.appName("SimpleSession").getOrCreate() # Create a DataFrame from inline data data = [ (1, "alice", "engineering", 95000), (2, "bob", "marketing", 72000), (3, "carol", "engineering", 105000), (4, "david", "sales", 68000), (5, "eva", "engineering", 88000), ] df = spark.createDataFrame(data, ["id", "name", "department", "salary"]) # Add a new column df_with_bonus = df.withColumn("bonus", col("salary") * 0.1) df_with_bonus.show() # Filter and transform engineers = df.filter(col("department") == "engineering") \ .withColumn("name_upper", upper(col("name"))) \ .withColumn("greeting", concat(lit("Hello, "), col("name"))) engineers.show() # Aggregate df.groupBy("department").avg("salary").show() # Stop the Spark session spark.stop()
要提交应用程序,请使用以下命令:
snowpark-submit \ --snowflake-workload-name MY_JOB \ --snowflake-connection-name MY_CONNECTION \ /path/to/app.py
您可以使用用于等待作业完成的
--wait-for-completion选项、用于检查作业状态的--workload-status选项,以及用于显示作业日志的--display-logs选项。有关选项的完整列表,请参阅 Snowpark Submit 参考。
从 Snowflake 暂存区部署应用程序¶
如果应用程序有依赖项(例如,需要读取的文件),则可以从 Snowflake 暂存区部署它们。以下示例演示如何从 Snowflake 暂存区部署应用程序及其依赖项。
要从终端将文件上传到暂存区,您可以使用 SnowflakeCLI。请注意,SnowSQL 是旧版 CLI;如果您已经在使用它,也可以使用它来将文件上传到暂存区。如果您尚未安装 Snowflake CLI,可以按照 安装 Snowflake CLI 中的说明进行安装。
在本地 IDE 中,创建一个包含以下内容,名为
sample_employees.csv的新 CSV 文件:employee_id,name,department,salary,years_employed 1,Alice Johnson,Engineering,95000,5 2,Bob Smith,Marketing,72000,3 3,Carol Williams,Engineering,105000,8 4,David Brown,Sales,68000,2 5,Eva Martinez,Engineering,88000,4 6,Frank Wilson,Marketing,75000,6 7,Grace Lee,Sales,92000,7 8,Henry Taylor,Engineering,110000,10 9,Ivy Chen,Marketing,65000,1 10,Jack Davis,Sales,78000,4 11,Karen White,Engineering,98000,6 12,Leo Harris,Marketing,71000,3 13,Maria Garcia,Sales,85000,5 14,Nathan Clark,Engineering,102000,9 15,Olivia Moore,Marketing,69000,2
使用以下命令将依赖项文件上传到暂存区,其中
my_stage是账户中暂存区的名称。(如果您没有创建暂存区,可以使用 [snow stage create](/developer-guide/snowflake-cli/command-reference/stage-commands/create)。)snow stage copy sample_employees.csv @<database>.<schema>.<stage>/sample_employees.csv -c MY_CONNECTION
要验证文件是否上传成功,可以使用以下命令列出暂存区中的文件:
snow sql -c MY_CONNECTION -q "ls @<database>.<schema>.<stage>"
您应该会在列表中看到文件
sample_employees.csv。在本地 IDE 中,创建一个包含以下内容,名为
app.py的新 Python 文件:from pyspark.sql import SparkSession # Create Spark session spark = SparkSession.builder.appName("SimpleStageExample").getOrCreate() # Load data from stage (adjust stage name to match yours) df = spark.read.csv("/app/<YOUR_STAGE>/sample_employees.csv", header=True, inferSchema=True) df.show() # Filter: Engineering department only engineers = df.filter(df["department"] == "Engineering") engineers.show() # Filter: Salary > 80000 and years_employed > 3 senior_high_earners = df.filter((df["salary"] > 80000) & (df["years_employed"] > 3)) senior_high_earners.show() # Aggregate: Average salary by department df.groupBy("department").avg("salary").show() # Select specific columns result = senior_high_earners.select("name", "department", "salary") result.show() # Stop the Spark session spark.stop()
要提交使用您上传到暂存区的文件的应用程序,请使用以下命令:
snowpark-submit \ --snowflake-connection-name MY_CONNECTION \ --snowflake-workload-name MY_JOB \ --snowflake-stage @<database>.<schema>.<stage> \ /path/to/app.py
请注意,运行应用程序需要计算池,并且必须在
connections.toml文件中指定,或通过命令行使用--compute-pool选项指定。有关更多信息,请参阅 Snowpark Submit 参考。
利用等待和日志进行监控¶
以下示例说明如何提交作业、等待作业完成,然后检索日志。
使用以下命令提交作业,并等待其完成:
snowpark-submit \ --snowflake-workload-name MY_JOB \ --wait-for-completion \ --snowflake-connection-name MY_CONNECTION \ /path/to/app.py
如果作业失败,请使用以下命令检查详细日志:
snowpark-submit --snowflake-workload-name MY_JOB \ --workload-status \ --display-logs \ --snowflake-connection-name MY_CONNECTION
在 Apache Airflow DAG 中使用 Snowpark Submit¶
您可以通过 Snowpark Connect for Spark 向 Snowflake 提交 Spark 作业。您可以在集群模式下使用 snowpark-submit,利用计算池来运行作业。
以这种方式使用 Apache Airflow 时,请确保运行 Apache Airflow 的 Docker 服务或 Snowpark Container Services 容器能够正确访问 Snowflake 和 Snowflake 暂存区所需的文件。
以下示例中的代码执行以下任务:
在
/tmp/myenv处创建 Python 虚拟环境。在
create_venv任务中,代码使用pip,从而使用.whl文件安装snowpark-submit包。生成一个包含 Snowflake 连接凭据和 OAuth 令牌的安全
connections.toml文件。在
create_connections_toml任务中,代码会创建/app/.snowflake目录、创建.toml文件,然后更改文件权限,以仅允许所有者(用户)具备读写权限。使用 snowpark-submit 命令运行 Spark 作业。
在
run_snowpark_script作业中,代码执行以下操作:激活虚拟环境。
使用 snowpark-submit 命令运行 Spark 作业。
使用集群模式部署到 Snowflake。
使用 Snowpark Connect for Spark 远程 URI sc://localhost:15002。
指定 Spark 应用程序类
org.example.SnowparkConnectApp。从 @snowflake_stage 暂存区提取脚本。
使用
--wait-for-completion阻止部署,直到作业完成。
import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
default_args = {
'start_date': airflow.utils.dates.days_ago(1),
'retries': 0,
}
with DAG(
'run_sparkconnect_python_script',
default_args=default_args,
schedule_interval=None,
catchup=False,
) as dag:
create_venv = BashOperator(
task_id='create_venv',
bash_command="""
python3 -m venv /tmp/myenv &&
source /tmp/myenv/bin/activate &&
export PIP_USER=false &&
pip install --upgrade pip &&
pip install --no-cache-dir grpcio-tools>=1.48.1 &&
pip install /app/snowpark_submit-<version>.whl
"""
)
create_connections_toml = BashOperator(
task_id='create_connections_toml',
bash_command="""
mkdir -p /app/.snowflake
echo "${SNOWFLAKE_USER}"
cat <<EOF > /app/.snowflake/connections.toml
[snowpark-submit]
host = "${SNOWFLAKE_HOST}"
port = "${SNOWFLAKE_PORT}"
protocol = "https"
account = "${SNOWFLAKE_ACCOUNT}"
authenticator = "oauth"
token = "$(cat /snowflake/session/token)"
warehouse = "airflow_wh"
database = "${SNOWFLAKE_DATABASE}"
schema = "${SNOWFLAKE_SCHEMA}"
client_session_keep_alive = true
EOF
chmod 600 /app/.snowflake/connections.toml
"""
)
run_script = BashOperator(
task_id='run_snowpark_script',
bash_command="""
set -e
echo "Using SNOWFLAKE_HOME: $SNOWFLAKE_HOME"
echo "Running Python script with Snowpark..."
source /tmp/myenv/bin/activate &&
snowpark-submit --deploy-mode cluster --class org.example.SnowparkConnectApp --compute-pool="snowparksubmit" --snowflake-workload-name="spcstest" --snowflake-stage="@AIRFLOW_APP_FILES" --wait-for-completion "@AIRFLOW_APP_FILES/transformation.py" --snowflake-connection-name snowpark-submit
""",
env={
'SNOWFLAKE_HOME': '/app/.snowflake'
}
)
create_venv >> create_connections_toml >> run_script
您可以使用 Apache Airflow 用户界面的图表视图或树视图监控 DAG。检查任务日志中是否有以下项目:
环境设置
Snowpark Connect for Spark 的状态
snowpark-submit 作业输出
您还可以通过存储在 Snowflake 暂存区中的日志或者是事件表,监控在 Snowflake 中运行的作业。