Snowpark Migration Accelerator: 嵌入式 SQL 代码

备注

Currently, SMA only supports the *pyspark.sql* (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.sql.html) function.

SMA 可以转换嵌入在 Python 或 Scala 文件中的 SQL 代码。它处理以下文件扩展名中的嵌入式 SQL 代码:

  • Python 源代码文件(带有 .py 扩展名)

  • Scala 源代码文件(带有 .scala 扩展名)

  • Jupyter Notebook 文件(带有 .ipynb 扩展名)

  • Databricks 源文件(带有 .python 或 .scala 扩展名)

  • Databricks Notebook 存档文件(带有 .dbc 扩展名)

嵌入式 SQL 代码转换示例

支持的案例

  • Using the *spark.sql* (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.sql.html) function in Python to execute SQL queries:

# Original in Spark
spark.sql("""MERGE INTO people_target pt
USING people_source ps
ON (pt.person_id1 = ps.person_id2)
WHEN NOT MATCHED BY SOURCE THEN DELETE""")
# SMA transformation
spark.sql("""MERGE INTO people_target pt
USING (
   SELECT
      pt.person_id1
   FROM
      people_target pt
      LEFT JOIN
         people_source ps
         ON pt.person_id1 = ps.person_id2
   WHERE
      ps.person_id2 IS NULL
) s_src
ON pt.person_id1 = s_src.person_id1
WHEN MATCHED THEN
   DELETE;""")

不支持的案例

When SMA encounters code that it cannot convert, it generates an Error, Warning, and Issue (EWI) message in the output code. For more details about these messages, see EWI.

目前不支持以下场景:

  • 使用了 SQL 代码时,可以通过以下方式合并字符串变量:

query = "SELECT COUNT(COUNTRIES) FROM SALES"
dfSales = spark.sql(query)
query = "SELECT COUNT(COUNTRIES) FROM SALES"
#EWI: SPRKPY1077 => SQL embedded code cannot be processed.
dfSales = spark.sql(query)
  • 使用了简单的串联组合字符串来构建 SQL 代码:

base = "SELECT "
criteria_1 = " COUNT(*) "
criteria_2 = " * "
fromClause = " FROM COUNTRIES"

df1 = spark.sql(bas + criteria_1 + fromClause)
df2 = spark.sql(bas + criteria_2 + fromClause)
base = "SELECT "
criteria_1 = " COUNT(*) "
criteria_2 = " * "
fromClause = " FROM COUNTRIES"
#EWI: SPRKPY1077 => SQL embedded code cannot be processed.

df1 = spark.sql(bas + criteria_1 + fromClause)
#EWI: SPRKPY1077 => SQL embedded code cannot be processed.
df2 = spark.sql(bas + criteria_2 + fromClause)
  • 使用了字符串插值动态生成 SQL 语句:

# Old Style interpolation
UStbl = "SALES_US"
salesUS = spark.sql("SELECT * FROM %s" % (UStbl))

# Using format function
COLtbl = "COL_SALES WHERE YEAR(saleDate) > 2023"
salesCol = spark.sql("SELECT * FROM {}".format(COLtbl))

# New Style
UKTbl = " UK_SALES_JUN_18"
salesUk = spark.sql(f"SELECT * FROM {UKTbl}")
# Old Style interpolation
UStbl = "SALES_US"
#EWI: SPRKPY1077 => SQL embedded code cannot be processed.
salesUS = spark.sql("SELECT * FROM %s" % (UStbl))

# Using format function
COLtbl = "COL_SALES WHERE YEAR(saleDate) > 2023"
#EWI: SPRKPY1077 => SQL embedded code cannot be processed.
salesCol = spark.sql("SELECT * FROM {}".format(COLtbl))

# New Style
UKTbl = " UK_SALES_JUN_18"
#EWI: SPRKPY1077 => SQL embedded code cannot be processed.
salesUk = spark.sql(f"SELECT * FROM {UKTbl}")
  • 使用了动态生成 SQL 查询的函数:

def ByMonth(month):
    query = f"SELECT * LOGS WHERE MONTH(access_date) = {month}"
    return spark.sql(query)
def ByMonth(month):
query = f"SELECT * LOGS WHERE MONTH(access_date) = {month}"
    #EWI: SPRKPY1077 => SQL embedded code cannot be processed.
    return spark.sql(query)

不支持的案例和 EWI 消息

  • 分析 Scala 代码时,错误代码 SPRKSCL1173 表示不支持的嵌入式 SQL 语句。

/*Scala*/
 class SparkSqlExample {
    def main(spark: SparkSession) : Unit = {
    /*EWI: SPRKSCL1173 => SQL embedded code cannot be processed.*/
    spark.sql("CREATE VIEW IF EXISTS My View AS Select * From my Table WHERE date < current_date() ")
    }
  • 当 Python 代码包含不支持的嵌入式 SQL 语句时,将显示错误代码 SPRKPY1077。

# Python Output
#EWI: SPRKPY1077 => SQL embedded code cannot be processed.
b = spark.sql("CREATE VIEW IF EXISTS My View AS Select * From my Table WHERE date < current_date() ")