Snowpark Migration Accelerator: 嵌入式 SQL 代码

备注

目前,SMA 仅支持 pyspark.sql (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.sql.html) 函数。

SMA 可以转换嵌入在 Python 或 Scala 文件中的 SQL 代码。它处理以下文件扩展名中的嵌入式 SQL 代码:

  • Python 源代码文件(带有 .py 扩展名)

  • Scala 源代码文件(带有 .scala 扩展名)

  • Jupyter Notebook 文件(带有 .ipynb 扩展名)

  • Databricks 源文件(带有 .python 或 .scala 扩展名)

  • Databricks Notebook 存档文件(带有 .dbc 扩展名)

嵌入式 SQL 代码转换示例

支持的案例

  • 使用了 Python 中的 spark.sql (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.sql.html) 函数执行 SQL 查询:

# Original in Spark
spark.sql("""MERGE INTO people_target pt
USING people_source ps
ON (pt.person_id1 = ps.person_id2)
WHEN NOT MATCHED BY SOURCE THEN DELETE""")
Copy
# SMA transformation
spark.sql("""MERGE INTO people_target pt
USING (
   SELECT
      pt.person_id1
   FROM
      people_target pt
      LEFT JOIN
         people_source ps
         ON pt.person_id1 = ps.person_id2
   WHERE
      ps.person_id2 IS NULL
) s_src
ON pt.person_id1 = s_src.person_id1
WHEN MATCHED THEN
   DELETE;""")
Copy

不支持的案例

When SMA encounters code that it cannot convert, it generates an Error, Warning, and Issue (EWI) message in the output code. For more details about these messages, see EWI.

目前不支持以下场景:

  • 使用了 SQL 代码时,可以通过以下方式合并字符串变量:

query = "SELECT COUNT(COUNTRIES) FROM SALES"
dfSales = spark.sql(query)
Copy
query = "SELECT COUNT(COUNTRIES) FROM SALES" 
#EWI: SPRKPY1077 => SQL embedded code cannot be processed. 
dfSales = spark.sql(query)
Copy
  • 使用了简单的串联组合字符串来构建 SQL 代码:

base = "SELECT "
criteria_1 = " COUNT(*) "
criteria_2 = " * "
fromClause = " FROM COUNTRIES"

df1 = spark.sql(bas + criteria_1 + fromClause)
df2 = spark.sql(bas + criteria_2 + fromClause)
Copy
base = "SELECT "
criteria_1 = " COUNT(*) "
criteria_2 = " * "
fromClause = " FROM COUNTRIES"
#EWI: SPRKPY1077 => SQL embedded code cannot be processed.

df1 = spark.sql(bas + criteria_1 + fromClause)
#EWI: SPRKPY1077 => SQL embedded code cannot be processed.
df2 = spark.sql(bas + criteria_2 + fromClause)
Copy
  • 使用了字符串插值动态生成 SQL 语句:

# Old Style interpolation
UStbl = "SALES_US"
salesUS = spark.sql("SELECT * FROM %s" % (UStbl))

# Using format function
COLtbl = "COL_SALES WHERE YEAR(saleDate) > 2023"
salesCol = spark.sql("SELECT * FROM {}".format(COLtbl))

# New Style
UKTbl = " UK_SALES_JUN_18" 
salesUk = spark.sql(f"SELECT * FROM {UKTbl}")
Copy
# Old Style interpolation
UStbl = "SALES_US"
#EWI: SPRKPY1077 => SQL embedded code cannot be processed.
salesUS = spark.sql("SELECT * FROM %s" % (UStbl))

# Using format function
COLtbl = "COL_SALES WHERE YEAR(saleDate) > 2023"
#EWI: SPRKPY1077 => SQL embedded code cannot be processed.
salesCol = spark.sql("SELECT * FROM {}".format(COLtbl))

# New Style
UKTbl = " UK_SALES_JUN_18"
#EWI: SPRKPY1077 => SQL embedded code cannot be processed.
salesUk = spark.sql(f"SELECT * FROM {UKTbl}")
Copy
  • 使用了动态生成 SQL 查询的函数:

def ByMonth(month):
    query = f"SELECT * LOGS WHERE MONTH(access_date) = {month}"
    return spark.sql(query)
Copy
def ByMonth(month):
query = f"SELECT * LOGS WHERE MONTH(access_date) = {month}"
    #EWI: SPRKPY1077 => SQL embedded code cannot be processed.
    return spark.sql(query)
Copy

不支持的案例和 EWI 消息

  • 分析 Scala 代码时,错误代码 SPRKSCL1173 表示不支持的嵌入式 SQL 语句。

/*Scala*/
 class SparkSqlExample {
    def main(spark: SparkSession) : Unit = {
    /*EWI: SPRKSCL1173 => SQL embedded code cannot be processed.*/
    spark.sql("CREATE VIEW IF EXISTS My View AS Select * From my Table WHERE date < current_date() ")
    }
Copy
  • 当 Python 代码包含不支持的嵌入式 SQL 语句时,将显示错误代码 SPRKPY1077。

# Python Output 
#EWI: SPRKPY1077 => SQL embedded code cannot be processed.
b = spark.sql("CREATE VIEW IF EXISTS My View AS Select * From my Table WHERE date < current_date() ")
Copy
语言: 中文