通过 Snowpark Connect for Spark 执行 Snowflake SQL¶
要执行特定于 Snowflake 的 SQL 命令,您可以使用 SnowflakeSession 接口。与 spark.sql 方法类似,查询结果会以 Spark DataFrames 的形式返回,您可以在结果数据上继续应用或串联 Spark DataFrame 转换和操作。
对于大多数 SQL 操作,您可以使用 spark.sql 方法直接执行 SQL 语句,并将结果以 Spark DataFrames 的形式返回。不过,Snowflake 的部分 SQL 语法(包括 QUALIFY、CONNECT BY、LATERAL FLATTEN 以及 Time Travel 查询)与 Spark SQL 不兼容。
以下示例演示如何使用 SnowflakeSession 执行包含 CONNECT BY 子句的 Snowflake SQL 命令。
import snowflake.snowpark_connect
from snowflake.snowpark_connect.snowflake_session import SnowflakeSession
spark = snowflake.snowpark_connect.server.init_spark_session()
snowflake_session = SnowflakeSession(spark)
result_df = snowflake_session.sql("""
SELECT
employee_name,
manager_name,
LEVEL
FROM employees
START WITH employee_name = 'Alice'
CONNECT BY PRIOR manager_name = employee_name
""").show()
result_df.limit(1).show()
您还可以使用 SnowflakeSession 接口执行特定于 Snowflake 的配置指令。这些指令包括设置会话级参数,例如当前使用的数据库、架构或计算仓库。
以下示例演示如何使用 SnowflakeSession 设置会话级参数。
import snowflake.snowpark_connect
from snowflake.snowpark_connect.client import SnowflakeSession
spark = snowflake.snowpark_connect.server.init_spark_session()
snowflake_session = SnowflakeSession(spark)
snowflake_session.use_database("MY_DATABASE")
snowflake_session.use_schema("MY_SCHEMA")
snowflake_session.use_warehouse("MY_WH")
snowflake_session.use_role("PUBLIC")