Snowpark Submit 示例

本主题包括使用 Snowpark Submit 提交生产就绪 Spark 应用程序的示例。

从 Snowflake 暂存区部署应用程序

以下示例展示了如何从 Snowflake 暂存区部署应用程序及其依赖项。

  1. 使用以下命令,将您的应用程序文件上传到某个暂存区:

    PUT file:///<abs_path>/app.py @my_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE
    PUT file:///<abs_path>/dependencies.zip @my_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE
    
    Copy
  2. 要使用您上传到暂存区的文件提交作业,请使用以下命令:

    snowpark-submit \
      --py-files @my_stage/dependencies.zip \
      --snowflake-stage @my_stage \
      --snowflake-workload-name MY_JOB \
      --snowflake-connection-name MY_CONNECTION\
      --compute-pool MY_COMPUTE_POOL \
      @my_stage/app.py
    
    Copy

利用等待和日志进行监控

以下示例说明如何提交作业、等待作业完成,然后检索日志。

  1. 使用以下命令提交作业,并等待其完成:

    snowpark-submit \
      --snowflake-workload-name MY_JOB \
      --wait-for-completion \
      --snowflake-connection-name MY_CONNECTION \
      --compute-pool MY_COMPUTE_POOL \
      app.py
    
    Copy
  2. 如果作业失败,请使用以下命令检查详细日志:

    snowpark-submit
      --snowflake-workload-name MY_JOB
      --workload-status
      --display-logs
      --snowflake-connection-name MY_CONNECTION
    
    Copy

在 Apache Airflow DAG 中使用 Snowpark Submit

您可以通过 Snowpark Connect for Spark 向 Snowflake 提交 Spark 作业。您可以在集群模式下使用 snowpark-submit,利用计算池来运行作业。

以这种方式使用 Apache Airflow 时,请确保运行 Apache Airflow 的 Docker 服务或 Snowpark Container Services 容器能够正确访问 Snowflake 和 Snowflake 暂存区所需的文件。

以下示例中的代码执行以下任务:

  • /tmp/myenv 处创建 Python 虚拟环境。

    create_venv 任务中,代码使用 pip,从而使用 .whl 文件安装 snowpark-submit 包。

  • 生成一个包含 Snowflake 连接凭据和 OAuth 令牌的安全 connections.toml 文件。

    create_connections_toml 任务中,代码会创建 /app/.snowflake 目录、创建 .toml 文件,然后更改文件权限,以仅允许所有者(用户)具备读写权限。

  • 使用 snowpark-submit 命令运行 Spark 作业。

    run_snowpark_script 作业中,代码执行以下操作:

    • 激活虚拟环境。

    • 使用 snowpark-submit 命令运行 Spark 作业。

    • 使用集群模式部署到 Snowflake。

    • 使用 Snowpark Connect for Spark 远程 URI sc://localhost:15002。

    • 指定 Spark 应用程序类 org.example.SnowparkConnectApp

    • 从 @snowflake_stage 暂存区提取脚本。

    • 使用 --wait-for-completion 阻止部署,直到作业完成。

import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
from airflow.operators.trigger_dagrun import TriggerDagRunOperator


default_args = {
  'start_date': airflow.utils.dates.days_ago(1),
  'retries': 0,
}


with DAG(
  'run_sparkconnect_python_script',
  default_args=default_args,
  schedule_interval=None,
  catchup=False,
) as dag:

  create_venv = BashOperator(
      task_id='create_venv',
      bash_command="""
      python3 -m venv /tmp/myenv &&
      source /tmp/myenv/bin/activate &&
      export PIP_USER=false &&
      pip install --upgrade pip &&
      pip install --no-cache-dir grpcio-tools>=1.48.1 &&
      pip install /app/snowpark_submit-<version>.whl
      """
  )

  create_connections_toml = BashOperator(
      task_id='create_connections_toml',
      bash_command="""
      mkdir -p /app/.snowflake
      echo "${SNOWFLAKE_USER}"
      cat <<EOF > /app/.snowflake/connections.toml

[snowpark-submit]
host = "${SNOWFLAKE_HOST}"
port = "${SNOWFLAKE_PORT}"
protocol = "https"
account = "${SNOWFLAKE_ACCOUNT}"
authenticator = "oauth"
token = "$(cat /snowflake/session/token)"
warehouse = "airflow_wh"
database = "${SNOWFLAKE_DATABASE}"
schema = "${SNOWFLAKE_SCHEMA}"
client_session_keep_alive = true
EOF
  chmod 600 /app/.snowflake/connections.toml
  """
  )

  run_script = BashOperator(
      task_id='run_snowpark_script',
      bash_command="""
      set -e
      echo "Using SNOWFLAKE_HOME: $SNOWFLAKE_HOME"

      echo "Running Python script with Snowpark..."
      source /tmp/myenv/bin/activate &&
      snowpark-submit --deploy-mode cluster --class org.example.SnowparkConnectApp --compute-pool="snowparksubmit" --snowflake-workload-name="spcstest" --snowflake-stage="@AIRFLOW_APP_FILES" --wait-for-completion "@AIRFLOW_APP_FILES/transformation.py" --snowflake-connection-name snowpark-submit
      """,
      env={
          'SNOWFLAKE_HOME': '/app/.snowflake'
      }
  )

create_venv >> create_connections_toml >> run_script
Copy

您可以使用 Apache Airflow 用户界面的图表视图或树视图监控 DAG。检查任务日志中是否有以下项目:

  • 环境设置

  • Snowpark Connect for Spark 的状态

  • snowpark-submit 作业输出

您还可以通过存储在 Snowflake 暂存区中的日志或者是事件表,监控在 Snowflake 中运行的作业。

语言: 中文