Snowpark Migration Accelerator: 嵌入式 SQL 代码¶
备注
目前,SMA 仅支持 pyspark.sql (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.sql.html) 函数。
SMA 可以转换嵌入在 Python 或 Scala 文件中的 SQL 代码。它处理以下文件扩展名中的嵌入式 SQL 代码:
Python 源代码文件(带有 .py 扩展名)
Scala 源代码文件(带有 .scala 扩展名)
Jupyter Notebook 文件(带有 .ipynb 扩展名)
Databricks 源文件(带有 .python 或 .scala 扩展名)
Databricks Notebook 存档文件(带有 .dbc 扩展名)
嵌入式 SQL 代码转换示例¶
支持的案例¶
使用了 Python 中的 spark.sql (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.sql.html) 函数执行 SQL 查询:
# Original in Spark
spark.sql("""MERGE INTO people_target pt
USING people_source ps
ON (pt.person_id1 = ps.person_id2)
WHEN NOT MATCHED BY SOURCE THEN DELETE""")
# SMA transformation
spark.sql("""MERGE INTO people_target pt
USING (
SELECT
pt.person_id1
FROM
people_target pt
LEFT JOIN
people_source ps
ON pt.person_id1 = ps.person_id2
WHERE
ps.person_id2 IS NULL
) s_src
ON pt.person_id1 = s_src.person_id1
WHEN MATCHED THEN
DELETE;""")
不支持的案例¶
When SMA encounters code that it cannot convert, it generates an Error, Warning, and Issue (EWI) message in the output code. For more details about these messages, see EWI.
目前不支持以下场景:
使用了 SQL 代码时,可以通过以下方式合并字符串变量:
query = "SELECT COUNT(COUNTRIES) FROM SALES"
dfSales = spark.sql(query)
query = "SELECT COUNT(COUNTRIES) FROM SALES"
#EWI: SPRKPY1077 => SQL embedded code cannot be processed.
dfSales = spark.sql(query)
使用了简单的串联组合字符串来构建 SQL 代码:
base = "SELECT "
criteria_1 = " COUNT(*) "
criteria_2 = " * "
fromClause = " FROM COUNTRIES"
df1 = spark.sql(bas + criteria_1 + fromClause)
df2 = spark.sql(bas + criteria_2 + fromClause)
base = "SELECT "
criteria_1 = " COUNT(*) "
criteria_2 = " * "
fromClause = " FROM COUNTRIES"
#EWI: SPRKPY1077 => SQL embedded code cannot be processed.
df1 = spark.sql(bas + criteria_1 + fromClause)
#EWI: SPRKPY1077 => SQL embedded code cannot be processed.
df2 = spark.sql(bas + criteria_2 + fromClause)
使用了字符串插值动态生成 SQL 语句:
# Old Style interpolation
UStbl = "SALES_US"
salesUS = spark.sql("SELECT * FROM %s" % (UStbl))
# Using format function
COLtbl = "COL_SALES WHERE YEAR(saleDate) > 2023"
salesCol = spark.sql("SELECT * FROM {}".format(COLtbl))
# New Style
UKTbl = " UK_SALES_JUN_18"
salesUk = spark.sql(f"SELECT * FROM {UKTbl}")
# Old Style interpolation
UStbl = "SALES_US"
#EWI: SPRKPY1077 => SQL embedded code cannot be processed.
salesUS = spark.sql("SELECT * FROM %s" % (UStbl))
# Using format function
COLtbl = "COL_SALES WHERE YEAR(saleDate) > 2023"
#EWI: SPRKPY1077 => SQL embedded code cannot be processed.
salesCol = spark.sql("SELECT * FROM {}".format(COLtbl))
# New Style
UKTbl = " UK_SALES_JUN_18"
#EWI: SPRKPY1077 => SQL embedded code cannot be processed.
salesUk = spark.sql(f"SELECT * FROM {UKTbl}")
使用了动态生成 SQL 查询的函数:
def ByMonth(month):
query = f"SELECT * LOGS WHERE MONTH(access_date) = {month}"
return spark.sql(query)
def ByMonth(month):
query = f"SELECT * LOGS WHERE MONTH(access_date) = {month}"
#EWI: SPRKPY1077 => SQL embedded code cannot be processed.
return spark.sql(query)
不支持的案例和 EWI 消息¶
分析 Scala 代码时,错误代码 SPRKSCL1173 表示不支持的嵌入式 SQL 语句。
/*Scala*/
class SparkSqlExample {
def main(spark: SparkSession) : Unit = {
/*EWI: SPRKSCL1173 => SQL embedded code cannot be processed.*/
spark.sql("CREATE VIEW IF EXISTS My View AS Select * From my Table WHERE date < current_date() ")
}
当 Python 代码包含不支持的嵌入式 SQL 语句时,将显示错误代码 SPRKPY1077。
# Python Output
#EWI: SPRKPY1077 => SQL embedded code cannot be processed.
b = spark.sql("CREATE VIEW IF EXISTS My View AS Select * From my Table WHERE date < current_date() ")