从 Snowflake 笔记本运行 Spark 工作负载¶
您可以通过 Snowflake 笔记本以交互方式运行 Spark 工作负载,而无需管理 Spark 集群。工作负载会在 Snowflake 基础设施上运行。
要使用 Snowflake 笔记本作为客户端,来开发在 Snowflake 上运行的 Spark 工作负载,请执行以下操作:
启动 Snowflake 笔记本。
在笔记本中启动 Spark 会话。
编写 PySpark 代码来加载、转换和分析数据,例如筛选高价值客户订单或汇总收入。
使用在仓库上运行的 Snowflake 笔记本¶
有关 Snowflake 笔记本的更多信息,请参阅 创建笔记本。
完成以下步骤,创建 Snowflake 笔记本:
Sign in to Snowsight.
At the top of the navigation menu, select
(Create) » Notebook » New Notebook.在 Create notebook 对话框中,输入新笔记本的名称、数据库和架构。
有关更多信息,请参阅 创建笔记本。
对于 Runtime,选择 Run on warehouse。
对于 Runtime version,选择 Snowflake Warehouse Runtime 2.0。
选择 2.0 版本时,请确保您拥有所需的依赖项支持,包括 Python 3.10。有关更多信息,请参阅 笔记本运行时。
对于 Query warehouse 和 Notebook warehouse,选择用于运行查询代码以及内核和 Python 代码的仓库,如 创建笔记本 中所述。
选择 Create。
在您创建的笔记本中,确保 Packages 下列出了以下包,以支持笔记本中的代码:
Python 3.10 或更高版本
snowpark-connect,最新版本
如果您需要添加这些包,请按以下步骤操作:
在 Anaconda Packages 下的搜索框中键入包名称。
选中所需包名称。
选择 Save。
要连接到 Snowpark Connect for Spark 服务器并测试连接,请将以下代码复制并粘贴到您创建的笔记本的 Python 单元格中:
from snowflake import snowpark_connect spark = snowpark_connect.server.init_spark_session() df = spark.sql("show schemas").limit(10) df.show()
Use a Snowflake Notebook that runs in a workspace¶
For more information about Snowflake Notebooks in Workspaces, see 工作区中的 Snowflake 笔记本.
创建 PyPI 外部访问集成。
您必须使用 ACCOUNTADMIN 角色并拥有一个您可以访问的数据库。
在工作区中从某个 SQL 文件运行以下命令。
USE DATABASE mydb; USE ROLE accountadmin; CREATE OR REPLACE NETWORK RULE pypi_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('pypi.org', 'pypi.python.org', 'pythonhosted.org', 'files.pythonhosted.org'); CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION pypi_access_integration ALLOWED_NETWORK_RULES = (pypi_network_rule) ENABLED = true;
在笔记本中启用 PyPI 集成。
在笔记本中,对于 Service name,选择一项服务。
对于 External access integrations,选择您创建的 PyPI 集成。
For Python version, select Python 3.11.
选择 Create。
在笔记本中从 PyPI 安装
snowpark_connect包,使用如下代码:pip install snowpark-connect[jdk]
重新启动内核。
从 Connect 按钮中选择 Restart kernel。
使用如下代码启动
snowpark_connect服务器:import snowflake.snowpark_connect spark = snowflake.snowpark_connect.server.init_spark_session()
运行您的 Spark 代码,如以下示例所示:
from pyspark.sql.connect.functions import * from pyspark.sql.connect.types import * from pyspark.sql import Row # Sample nested data data = [(1, ("Alice", 30))] schema = "id INT, info STRUCT<name:STRING, age:INT>" df = spark.createDataFrame(data, schema=schema) df.show() spark.sql("show databases").show()
