Snowpark Migration Accelerator:Python 的问题代码

SPRKPY1000

消息:源项目 spark-core 版本为 xxx.xx:xx.x.x,Snowpark 支持的 Snowpark 核心版本为 2.12:3.1.2,因此现有映射之间可能存在功能差异。

类别:警告。

描述

当源代码的 Pyspark 版本不受支持时,就会出现此问题。这意味着现有映射之间可能存在功能差异。

其他建议

  • SMA 扫描的 pyspark 兼容 Snowpark 的版本范围为 2.12 至 3.1.2。如果您使用的版本不在此范围内,该工具可能产生不一致的分析结果。您可以修改正在扫描的源代码的版本。

  • 要获得更多支持,您可以发送电子邮件至 sma-support@snowflake.com 联系我们,或者在 SMA (https://snowflakecomputing.atlassian.net/o/-MB4z_O8Sl--Tfl3XVml/s/6on4bNAZUZGzMpdEum8X/~/changes/371/user-guide/project-overview/configuration-and-settings#report-an-issue) 中发布问题。

SPRKPY1001

消息: 此代码段存在解析错误

类别: 解析错误。

描述

当 Snowpark Migration Accelerator (SMA) 无法正确读取或理解文件中的代码(它无法正确“解析”文件)时,就会报告解析错误。当文件出现一个或多个解析错误时,就会出现此问题代码。

场景

输入: 当代码的语法无效时,会出现 EWI 消息,例如:

def foo():
    x = %%%%%%1###1
Copy

输出: SMA 发现解析错误并注释解析错误,添加了相应的 EWI 消息:

def foo():
    x
## EWI: SPRKPY1101 => Unrecognized or invalid CODE STATEMENT @(2, 7). Last valid token was 'x' @(2, 5), failed token '=' @(2, 7)
##      = %%%%%%1###1
Copy

其他建议

  • 检查该文件是否包含有效的 Python 代码。(您可以使用 issues.csv 文件查找带有此 EWI 代码的所有文件,以确定哪些文件因解析错误而未得到该工具处理。)许多解析错误的发生愿因是只有一部分代码输入到工具中,因此最好确保代码将在源代码中运行。如果有效,请使用 SMA 中的“Report an Issue”选项报告您遇到的解析错误。在提交此问题时,请在描述中包括导致解析错误的代码行。

  • 如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。

SPRKPY1002

消息:不支持 < element >,不支持 Spark 元素。

类别: 转换错误。

描述

当该工具检测到使用了 Snowpark 不支持的元素,就会出现此问题,并且它没有自己的关联错误代码。这是 SMA 对不受支持的元素使用的通用错误代码。

其他建议

  • 尽管不支持消息中的选项或元素,但这并不意味着找不到解决方案。这仅意味着该工具本身无法找到解决方案。

  • 如果您遇到了 pyspark.ml 库中不受支持的元素,请考虑备选方法。还有其他指南可以解决与机器学习相关的问题,例如 Snowflake 的这篇指南。

  • 检查源代码的语法是否正确。(您可以使用 issues.csv 文件来确定转换错误发生的位置。)如果语法正确,请使用 SMA 中的“报告问题”选项报告您在特定元素上遇到的转换错误。在提交此问题时,请在描述中包括导致错误的代码行。

  • 如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。

SPRKPY1003

消息: 加载符号表时出错。

类别: 转换错误。

描述

当处理符号表中的符号期间出错时,就会出现此问题。符号表是 SMA 基础架构的一部分,允许进行更复杂的转换。此错误可能是源代码中的意外语句造成的。

其他建议

  • 这不太可能是源代码本身存在错误,而更可能是该工具处理源代码的方式出现了错误。最好的解决方案是在 SMA (https://snowflakecomputing.atlassian.net/o/-MB4z_O8Sl--Tfl3XVml/s/6on4bNAZUZGzMpdEum8X/~/changes/371/user-guide/project-overview/configuration-and-settings#report-an-issue) 中发布问题。

  • 要获得更多支持,您可以发送电子邮件至 sma-support@snowflake.com 联系我们,或者在 SMA (https://snowflakecomputing.atlassian.net/o/-MB4z_O8Sl--Tfl3XVml/s/6on4bNAZUZGzMpdEum8X/~/changes/371/user-guide/project-overview/configuration-and-settings#report-an-issue) 中发布问题。

SPRKPY1004

消息: 无法加载符号表。

类别: 解析错误。

描述

当工具执行过程中出现意外错误时,就会出现此问题。由于无法加载符号表,该工具无法启动评估或转换过程。

其他建议

  • 这不太可能是源代码本身存在错误,而更可能是该工具处理源代码的方式出现了错误。最好的解决方案是联系 SMA 支持团队。您可以发送电子邮件至 sma-support@snowflake.com 联系我们。

  • 要获得更多支持,您可以发送电子邮件至 sma-support@snowflake.com 联系我们,或者在 SMA (https://snowflakecomputing.atlassian.net/o/-MB4z_O8Sl--Tfl3XVml/s/6on4bNAZUZGzMpdEum8X/~/changes/371/user-guide/project-overview/configuration-and-settings#report-an-issue) 中发布问题。

SPRKPY1005

警告

Spark Conversion Core 版本 4.8.0 起,此问题代码已 弃用

消息: pyspark.conf.SparkConf 并非必需

类别: 警告。

描述

当该工具检测到使用了并非必需的 pyspark.conf.SparkConf (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkConf.html) 时,就会出现此问题。

场景

输入

SparkConf 可以在不带参数调用,也可以带 loadDefaults 调用。

from pyspark import SparkConf

my_conf = SparkConf(loadDefaults=True)
Copy

输出

对于带或不带参数两种情况,SMA 都会创建一个 Snowpark Session.builder 对象:

#EWI: SPRKPY1005 => pyspark.conf.SparkConf is not required
#from pyspark import SparkConf
pass

#EWI: SPRKPY1005 => pyspark.conf.SparkConf is not required
my_conf = Session.builder.configs({"user" : "my_user", "password" : "my_password", "account" : "my_account", "role" : "my_role", "warehouse" : "my_warehouse", "database" : "my_database", "schema" : "my_schema"}).create()
Copy

其他建议

  • 此参数并非必要参数,移除它会插入警告注释。用户不应执行任何其他操作。

  • 如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。

SPRKPY1006

警告

Spark Conversion Core 版本 4.8.0 起,此问题代码已 弃用

消息: pyspark.context.SparkContext 并非必需

类别: 警告。

描述

当该工具检测到使用了 pyspark.context.SparkContext (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.html) 时,就会出现此问题;在 Snowflake 环境中使用它并非必需。

场景

输入

在此示例中,有两个上下文用于创建与 Spark 集群的连接

from pyspark import SparkContext

sql_context1 = SparkContext(my_sc1)
sql_context2 = SparkContext(sparkContext=my_sc2)
Copy

输出

由于 Snowflake 上没有集群,因此不需要上下文,因此请注意,包含 Spark 属性的变量 my_sc1 和 my_sc2 可能并非必需,或者需要对其进行调整以修复代码。

from snowflake.snowpark import Session
#EWI: SPRKPY1006 => pyspark.sql.context.SparkContext is not required
sql_context1 = my_sc1
#EWI: SPRKPY1006 => pyspark.sql.context.SparkContext is not required

sql_context2 = my_sc2
Copy

其他建议

  • 此参数并非必要参数,移除它会插入警告注释。用户不应执行任何操作。

  • 如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。

SPRKPY1007

警告

Spark Conversion Core 版本 4.8.0 起,此问题代码已 弃用

消息: pyspark.sql.context.SQLContext 并非必需

类别: 警告。

描述

当该工具检测到使用了非必需的 pyspark.sql.context.SQLContext (https://downloads.apache.org/spark/docs/1.6.1/api/python/pyspark.sql.html) 时,就会出现此问题。

场景

输入

下面的示例展示了不同 SparkContext 过载。

from pyspark import SQLContext

my_sc1 = SQLContext(myMaster, myAppName, mySparkHome, myPyFiles, myEnvironment, myBatctSize, mySerializer, my_conf1)
my_sc2 = SQLContext(conf=my_conf2)
my_sc3 = SQLContext()
Copy

输出

输出代码为 pyspark.SQLContext 的相应行添加了注释,并将这些场景替换为对配置的引用。请注意,包含 Spark 属性的变量 my_sc1 和 my_sc2 可能并非必需,或者需要对其进行调整以修复代码。

#EWI: SPRKPY1007 => pyspark.sql.context.SQLContext is not required
#from pyspark import SQLContext
pass

#EWI: SPRKPY1007 => pyspark.sql.context.SQLContext is not required
sql_context1 = my_sc1
#EWI: SPRKPY1007 => pyspark.sql.context.SQLContext is not required
sql_context2 = my_sc2
Copy

其他建议

  • 此参数为非必要参数,工具已将其移除并在源代码中插入警告注释。用户不应执行任何操作。

  • 如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。

SPRKPY1008

消息:pyspark.sql.context.HiveContext 并非必需

类别:警告。

描述

当该工具检测到使用了非必需的 pyspark.sql.context.HiveContext (https://downloads.apache.org/spark/docs/1.6.1/api/python/pyspark.sql.html#pyspark.sql.HiveContext) 时,就会出现此问题。

场景

输入

在此示例中,将演示如何创建与 Hive 数据仓库的连接。

from pyspark.sql import HiveContext
hive_context = HiveContext(sc)
df = hive_context.table("myTable")
df.show()
Copy

输出

在 Snowflake 中没有 Hive 数据仓库,因此无需配置 Hive 上下文。仍可在 Snowflake 中使用 parquet 文件,具体操作方法请参阅此 教程

#EWI: SPRKPY1008 => pyspark.sql.context.HiveContext is not required
hive_context = sc
df = hive_context.table("myTable")
df.show()
Copy

sc 变量指的是 Snowpark 会话对象

推荐修复方法

对于示例中的输出代码,应该添加与以下代码类似的 Snowpark 会话对象

## Here manually we can add the Snowpark Session object via a json config file called connection.json
import json
from snowflake.snowpark import Session
jsonFile = open("connection.json")
connection_parameter = json.load(jsonFile)
jsonFile.close()
sc = Session.builder.configs(connection_parameter).getOrCreate()

hive_context = sc
df = hive_context.table("myTable")
df.show()
Copy

其他建议

SPRKPY1009

消息: pyspark.sql.dataframe.DataFrame.approxQuantile 有替代方案

类别: 警告。

描述

当该工具检测到使用了 pyspark.sql.dataframe.DataFrame.approxQuantile (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.approxQuantile.html) 时,就会出现此问题;此问题有替代方案。

场景

输入

需要重点了解的是,Pyspark 使用两种不同的 approxQuantile 函数,在这里使用的是 DataFrame approxQuantile (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.approxQuantile.html) 版本

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
data = [['Sun', 10],
        ['Mon', 64],
        ['Thr', 12],
        ['Wen', 15],
        ['Thu', 68],
        ['Fri', 14],
        ['Sat', 13]]

columns = ['Day', 'Ammount']
df = spark.createDataFrame(data, columns)
df.approxQuantile('Ammount', [0.25, 0.5, 0.75], 0)
Copy

输出

SMA 会在使用 approxQuantile 的代码行上方返回 EWI SPRKPY1009,以便您可以用来识别需要修改的位置。

from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Sun', 10],
        ['Mon', 64],
        ['Thr', 12],
        ['Wen', 15],
        ['Thu', 68],
        ['Fri', 14],
        ['Sat', 13]]

columns = ['Day', 'Ammount']
df = spark.createDataFrame(data, columns)
#EWI: SPRKPY1009 => pyspark.sql.dataframe.DataFrame.approxQuantile has a workaround, see documentation for more info
df.approxQuantile('Ammount', [0.25, 0.5, 0.75], 0)
Copy

推荐修复方法

使用 Snowpark approxQuantile 方法。部分参数不匹配,因此需要进行一些手动调整。对于输出代码的示例,建议的修复方法可能是:

from snowflake.snowpark import Session
...
df = spark.createDataFrame(data, columns)

df.stat.approx_quantile('Ammount', [0.25, 0.5, 0.75])
Copy

pyspark.sql.dataframe.DataFrame.approxQuantile 的 relativeError 参数在 SnowPark 中不存在。

其他建议

SPRKPY1010

消息:pyspark.sql.dataframe.DataFrame.checkpoint 有替代方案

类别:警告。

描述

当该工具检测到使用了 pyspark.sql.dataframe.DataFrame.checkpoint (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.checkpoint.html) 时,就会出现此问题;此问题有替代方案。

场景

输入

在 PySpark 中,检查点用于截断 DataFrame 的逻辑计划,这是为了避免逻辑计划的增长。

import tempfile
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
data = [['Q1', 300000],
        ['Q2', 60000],
        ['Q3', 500002],
        ['Q4', 130000]]

columns = ['Quarter', 'Score']
df = spark.createDataFrame(data, columns)
with tempfile.TemporaryDirectory() as d:
    spark.sparkContext.setCheckpointDir("/tmp/bb")
    df.checkpoint(False)
Copy

输出

SMA 会在使用 approxQuantile 的代码行上方返回 EWI SPRKPY1010,以便您可以用来识别需要修改的位置。请注意,这也将 setCheckpointDir (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.setCheckpointDir.html) 标记为不受支持,但修复不需要使用已设置检查点的目录。

import tempfile
from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 300000],
        ['Q2', 60000],
        ['Q3', 500002],
        ['Q4', 130000]]

columns = ['Quarter', 'Score']
df = spark.createDataFrame(data, columns)
with tempfile.TemporaryDirectory() as d:
    #EWI: SPRKPY1002 => pyspark.context.SparkContext.setCheckpointDir is not supported
    spark.setCheckpointDir("/tmp/bb")
    #EWI: SPRKPY1010 => pyspark.sql.dataframe.DataFrame.checkpoint has a workaround, see documentation for more info
    df.checkpoint(False)
Copy

推荐修复方法

无需显式设置检查点,这是因为 Snowpark 使用基于 SQL 的操作,这些操作经过 Snowflake 查询优化引擎优化,从而避免了不必要的计算或逻辑计划失控增长的问题。

但是,在某些情况下,可能需要在 DataFrame 上持久保存计算结果。在这种情况下,您可以通过将 DataFrame 写入 Snowflake 表或 Snowflake 临时表 来保存物化结果。

  • 通过使用永久表,即使在会话结束之后,也可以随时访问计算结果。

from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 300000],
        ['Q2', 60000],
        ['Q3', 500002],
        ['Q4', 130000]]

columns = ['Quarter', 'Score']
df = spark.createDataFrame(data, columns)
df.write.save_as_table("my_table", table_type="temporary") # Save the dataframe into Snowflake table "my_table".
df2 = Session.table("my_table") # Now I can access the stored result quering the table "my_table"
Copy
  • 对于另一种修复方法,使用临时表的优点是在会话结束后删除该表:

from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 300000],
        ['Q2', 60000],
        ['Q3', 500002],
        ['Q4', 130000]]

columns = ['Quarter', 'Score']
df = spark.createDataFrame(data, columns)
df.write.save_as_table("my_temp_table", table_type="temporary") # Save the dataframe into Snowflake table "my_temp_table".
df2 = Session.table("my_temp_table") # Now I can access the stored result quering the table "my_temp_table"
Copy

其他建议

SPRKPY1011

消息:pyspark.sql.dataframe.DataFrameStatFunctions.approxQuantile 有替代方案

类别:警告。

描述

当该工具检测到使用了 pyspark.sql.dataframe.DataFrameStatFunctions.approxQuantile (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameStatFunctions.approxQuantile.html#pyspark.sql.DataFrameStatFunctions.approxQuantile) 时,就会出现此问题;此问题有替代方案。

场景

输入

需要重点了解的是,Pyspark 使用两种不同的 approxQuantile 函数,在这里使用的是 DataFrameStatFunctions approxQuantile (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameStatFunctions.approxQuantile.html#pyspark.sql.DataFrameStatFunctions.approxQuantile) 版本。

import tempfile
from pyspark.sql import SparkSession, DataFrameStatFunctions
spark = SparkSession.builder.getOrCreate()
data = [['Q1', 300000],
        ['Q2', 60000],
        ['Q3', 500002],
        ['Q4', 130000]]

columns = ['Quarter', 'Gain']
df = spark.createDataFrame(data, columns)
aprox_quantille = DataFrameStatFunctions(df).approxQuantile('Gain', [0.25, 0.5, 0.75], 0)
print(aprox_quantille)
Copy

输出

SMA 会在使用 approxQuantile 的代码行上方返回 EWI SPRKPY1011,以便您可以用来识别需要修改的位置。

import tempfile
from snowflake.snowpark import Session, DataFrameStatFunctions
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 300000],
        ['Q2', 60000],
        ['Q3', 500002],
        ['Q4', 130000]]

columns = ['Quarter', 'Gain']
df = spark.createDataFrame(data, columns)
#EWI: SPRKPY1011 => pyspark.sql.dataframe.DataFrameStatFunctions.approxQuantile has a workaround, see documentation for more info
aprox_quantille = DataFrameStatFunctions(df).approxQuantile('Gain', [0.25, 0.5, 0.75], 0)
Copy

推荐修复方法

可以使用 Snowpark approxQuantile 方法。部分参数不匹配,因此需要进行一些手动调整。对于输出代码的示例,建议的修复方法可能是:

from snowflake.snowpark import Session # remove DataFrameStatFunctions because is not required
...
df = spark.createDataFrame(data, columns)

aprox_quantille = df.stat.approx_quantile('Ammount', [0.25, 0.5, 0.75])
Copy

pyspark.sql.dataframe.DataFrame.approxQuantile 的 relativeError 参数在 SnowPark 中不存在。

其他建议

SPRKPY1012

警告

此问题代码已 弃用

消息:pyspark.sql.dataframe.DataFrameStatFunctions.writeTo 有替代方案

类别:警告。

描述

当该工具检测到使用了 pyspark.sql.dataframe.DataFrameStatFunctions.writeTo (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.writeTo.html) 时,就会出现此问题;此问题有替代方案。

场景

输入

在此示例中,将 DataFrame df 写入 Spark 表“table”。

writer = df.writeTo("table")
Copy

输出

SMA 会在使用 DataFrameStatFunctions.writeTo 的代码行上方返回 EWI SPRKPY1012,以便您可以用来识别需要修改的位置。

#EWI: SPRKPY1012 => pyspark.sql.dataframe.DataFrameStatFunctions.writeTo has a workaround, see documentation for more info
writer = df.writeTo("table")
Copy

建议的修复方法

改用 df.write.SaveAsTable()。

import df.write as wt
writer = df.write.save_as_table(table)
Copy

其他建议

SPRKPY1013

消息:pyspark.sql.functions.acosh 有替代方案

类别:警告。

描述

当该工具检测到使用了 pyspark.sql.functions.acosh (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.acosh.html) 时,就会出现此问题;此问题有替代方案。

场景

输入

在此示例中,pyspark 使用 pyspark.sql.functions.acosh (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.acosh.html) 计算 DataFrame 的 acosh

from pyspark.sql import SparkSession
from pyspark.sql.functions import acosh
spark = SparkSession.builder.getOrCreate()
data = [['V1', 30],
        ['V2', 60],
        ['V3', 50],
        ['V4', 13]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
df_with_acosh = df.withColumn("acosh_value", acosh(df["value"]))
Copy

输出

SMA 会在使用 acosh 的代码行上方返回 EWI SPRKPY1013,以便您可以用来识别需要修改的位置。

from snowflake.snowpark import Session

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['V1', 30],
        ['V2', 60],
        ['V3', 50],
        ['V4', 13]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
#EWI: SPRKPY1013 => pyspark.sql.functions.acosh has a workaround, see documentation for more info
df_with_acosh = df.withColumn("acosh_value", acosh(df["value"]))
Copy

推荐修复方法

没有直接的“acosh”实现,但可以改用“call_function”,并使用“acosh”作为第一个参数,使用 colName 作为第二个参数。

import snowflake.snowpark as snowpark
from snowflake.snowpark import Session
from snowflake.snowpark.functions import call_function, col

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['V1', 30],
        ['V2', 60],
        ['V3', 50],
        ['V4', 13]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
df_with_acosh = df.select(call_function('ACOSH', col('value')))
Copy

其他建议

SPRKPY1014

消息:pyspark.sql.functions.asinh 有替代方案

类别:警告。

描述

当该工具检测到使用了 pyspark.sql.functions.asinh (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.asinh.html) 时,就会出现此问题;此问题有替代方案。

场景

输入

在此示例中,pyspark 使用 pyspark.sql.functions.asinh (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.asinh.html) 计算 DataFrame 的 asinh。

from pyspark.sql import SparkSession
from pyspark.sql.functions import asinh
spark = SparkSession.builder.getOrCreate()
data = [['V1', 3.0],
        ['V2', 60.0],
        ['V3', 14.0],
        ['V4', 3.1]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
df_result = df.withColumn("asinh_value", asinh(df["value"]))
Copy

输出

SMA 会在使用 asinh 的代码行上方返回 EWI SPRKPY1014,以便您可以用来识别需要修改的位置。

from snowflake.snowpark import Session

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['V1', 3.0],
        ['V2', 60.0],
        ['V3', 14.0],
        ['V4', 3.1]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
#EWI: SPRKPY1014 => pyspark.sql.functions.asinh has a workaround, see documentation for more info
df_result = df.withColumn("asinh_value", asinh(df["value"]))
Copy

建议的修复方法

没有直接的“asinh”实现,但可以改用“call_function”,并使用“asinh”作为第一个参数,使用 colName 作为第二个参数。

import snowflake.snowpark as snowpark
from snowflake.snowpark import Session
from snowflake.snowpark.functions import call_function, col

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['V1', 3.0],
        ['V2', 60.0],
        ['V3', 14.0],
        ['V4', 3.1]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
df_result = df.select(call_function('asinh', col('value')))
Copy

其他建议

SPRKPY1015

消息:pyspark.sql.functions.atanh 有替代方案

类别:警告。

描述

当该工具检测到使用了 pyspark.sql.functions.atanh (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.atanh.html) 时,就会出现此问题;此问题有替代方案。

场景

输入

在此示例中,pyspark 使用 pyspark.sql.functions.atanh (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.atanh.html) 计算 DataFrame 的 atanh。

from pyspark.sql import SparkSession
from pyspark.sql.functions import atanh
spark = SparkSession.builder.getOrCreate()
data = [['V1', 0.14],
        ['V2', 0.32],
        ['V3', 0.4],
        ['V4', -0.36]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
df_result = df.withColumn("atanh_value", atanh(df["value"]))
Copy

输出

SMA 会在使用 atanh 的代码行上方返回 EWI SPRKPY1015,以便您可以用来识别需要修改的位置。

from snowflake.snowpark import Session

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['V1', 0.14],
        ['V2', 0.32],
        ['V3', 0.4],
        ['V4', -0.36]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
#EWI: SPRKPY1015 => pyspark.sql.functions.atanh has a workaround, see documentation for more info
df_result = df.withColumn("atanh_value", atanh(df["value"]))
Copy

推荐修复方法

没有直接的“atanh”实现,但可以改用“call_function”,并使用“atanh”作为第一个参数,使用 colName 作为第二个参数。

import snowflake.snowpark as snowpark
from snowflake.snowpark import Session
from snowflake.snowpark.functions import call_function, col

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['V1', 0.14],
        ['V2', 0.32],
        ['V3', 0.4],
        ['V4', -0.36]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
df_result = df.select(call_function('atanh', col('value')))
Copy

其他建议

SPRKPY1016

警告

Spark Conversion Core 版本 0.11.7 起,此问题代码已 弃用

消息:pyspark.sql.functions.collect_set 有替代方案

类别:警告。

描述

当该工具检测到使用了 pyspark.sql.functions.collect_set (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.collect_set.html) 时,就会出现此问题;此问题有替代方案。

场景

输入

使用 collectset 来获取 _colname 的元素(不含重复项):

col = collect_set(colName)
Copy

输出

SMA 会在使用 collect_set 的代码行上方返回 EWI SPRKPY1016,以便您可以用来识别需要修改的位置。

#EWI: SPRKPY1016 => pyspark.sql.functions.collect_set has a workaround, see documentation for more info
col = collect_set(colName)
Copy

推荐修复方法

使用函数 array_agg,然后添加第二个值为 True 的实参。

col = array_agg(col, True)
Copy

其他建议

SPRKPY1017

警告

Spark Conversion Core 版本 4.8.0 起,此问题代码已 弃用

pyspark.sql.functions.date_add 有替代方案

类别:警告。

描述

当该工具检测到使用了 pyspark.sql.functions.date_add (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.date_add.html) 时,就会出现此问题;此问题有替代方案。

场景

输入

在此示例中,使用 date_add 来为 DataFrame df 计算当前日期后 5 天的日期。

col = df.select(date_add(df.colName, 5))
Copy

输出

SMA 会在使用 date_add 的代码行上方返回 EWI SPRKPY1017 警告,以便您可以用来识别需要修改的位置。

#EWI: SPRKPY1017 => pyspark.sql.functions.date_add has a workaround, see documentation for more info
col = df.select(date_add(df.colName, 5))
Copy

推荐修复方法

导入 snowflake.snowpark.functions,其包含 date_add(别名 dateAdd)函数的实现。

from snowflake.snowpark.functions import date_add

col = df.select(date_add(df.dt, 1))
Copy

其他建议

SPRKPY1018

警告

Spark Conversion Core 版本 4.8.0 起,此问题代码已 弃用

消息:pyspark.sql.functions.date_sub 有替代方案

类别:警告。

描述

当该工具检测到使用了 pyspark.sql.functions.date_sub (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.date_sub.html) 时,就会出现此问题;此问题有替代方案。

场景

输入

在此示例中,使用 date_add 来为 DataFrame df 计算当前日期前 5 天的日期。

col = df.select(date_sub(df.colName, 5))
Copy

输出

SMA 会在使用 date_sub 的代码行上方返回 EWI SPRKPY1018 警告,以便您可以用来识别需要修改的位置。

#EWI: SPRKPY1018 => pyspark.sql.functions.date_sub has a workaround, see documentation for more info
col = df.select(date_sub(df.colName, 5))
Copy

推荐修复方法

导入 snowflake.snowpark.functions,其包含 date_sub 函数的实现。

from pyspark.sql.functions import date_sub
df.withColumn("date", date_sub(df.colName, 5))
Copy

其他建议

SPRKPY1019

警告

Spark Conversion Core 版本 4.8.0 起,此问题代码已 弃用

消息:pyspark.sql.functions.datediff 有替代方案

类别:警告。

描述

当该工具检测到使用了 pyspark.sql.functions.datediff (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.datediff.html) 时,就会出现此问题;此问题有替代方案。

场景

输入

在此示例中,我们使用 datediff 来计算与 'today' 和其他日期的天数差。

contacts = (contacts
            #days since last event
            .withColumn('daysSinceLastEvent', datediff(lit(today),'lastEvent'))
            #days since deployment
            .withColumn('daysSinceLastDeployment', datediff(lit(today),'lastDeploymentEnd'))
            #days since online training
            .withColumn('daysSinceLastTraining', datediff(lit(today),'lastTraining'))
            #days since last RC login
            .withColumn('daysSinceLastRollCallLogin', datediff(lit(today),'adx_identity_lastsuccessfullogin'))
            #days since last EMS login
            .withColumn('daysSinceLastEMSLogin', datediff(lit(today),'vms_lastuserlogin'))
           )
Copy

输出

SMA 会在使用 datediff 的代码行上方返回 EWI SPRKPY1019,以便您可以用来识别需要修改的位置。

from pyspark.sql.functions import datediff
#EWI: SPRKPY1019 => pyspark.sql.functions.datediff has a workaround, see documentation for more info
contacts = (contacts
            #days since last event
            .withColumn('daysSinceLastEvent', datediff(lit(today),'lastEvent'))
            #days since deployment
            .withColumn('daysSinceLastDeployment', datediff(lit(today),'lastDeploymentEnd'))
            #days since online training
            .withColumn('daysSinceLastTraining', datediff(lit(today),'lastTraining'))
            #days since last RC login
            .withColumn('daysSinceLastRollCallLogin', datediff(lit(today),'adx_identity_lastsuccessfullogin'))
            #days since last EMS login
            .withColumn('daysSinceLastEMSLogin', datediff(lit(today),'vms_lastuserlogin'))
           )
Copy

SMA 将 pyspark.sql.functions.datediff 转换为 snowflake.snowpark.functions.daydiff,后者同样用于计算两个日期之间的天数差。

推荐修复方法

datediff(part: string ,end: ColumnOrName, start: ColumnOrName)

操作: 导入 snowflake.snowpark.functions,其包含 datediff 函数的实现,该函数需要 日期时间部分 的额外参数,并允许在计算日期之间的天数差时具有更大的灵活性。

from snowflake.snowpark import Session
from snowflake.snowpark.functions import datediff
contacts = (contacts
            #days since last event
            .withColumn('daysSinceLastEvent', datediff('day', lit(today),'lastEvent'))
            #days since deployment
            .withColumn('daysSinceLastDeployment', datediff('day',lit(today),'lastDeploymentEnd'))
            #days since online training
            .withColumn('daysSinceLastTraining', datediff('day', lit(today),'lastTraining'))
            #days since last RC login
            .withColumn('daysSinceLastRollCallLogin', datediff('day', lit(today),'adx_identity_lastsuccessfullogin'))
            #days since last EMS login
            .withColumn('daysSinceLastEMSLogin', datediff('day', lit(today),'vms_lastuserlogin'))
           )
Copy

建议

SPRKPY1020

消息:pyspark.sql.functions.instr 有替代方案

类别:警告。

描述

当该工具检测到使用了 pyspark.sql.functions.instr (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.instr.html) 时,就会出现此问题;此问题有替代方案。

场景

输入

以下是使用 pyspark instr 的基本示例:

from pyspark.sql import SparkSession
from pyspark.sql.functions import instr
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('abcd',)], ['test',])
df.select(instr(df.test, 'cd').alias('result')).collect()
Copy

输出:

SMA 会在使用 instr 的代码行上方返回 EWI SPRKPY1020,以便您可以用来识别需要修改的位置。

from snowflake.snowpark import Session

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
df = spark.createDataFrame([('abcd',)], ['test',])
#EWI: SPRKPY1020 => pyspark.sql.functions.instr has a workaround, see documentation for more info
df.select(instr(df.test, 'cd').alias('result')).collect()
Copy

推荐修复方法

需要手动进行以下更改:使用函数 charindex 并更改前两个参数的顺序。

import snowflake.snowpark as snowpark
from snowflake.snowpark import Session
from snowflake.snowpark.functions import charindex, lit

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
df = spark.createDataFrame([('abcd',)], ['test',])
df.select(charindex(lit('cd'), df.test).as_('result')).show()
Copy

其他建议

SPRKPY1021

警告

此问题代码已 弃用

消息:pyspark.sql.functions.last 有替代方案,请参阅文档了解更多信息

类别:警告

描述

当 SMA 检测到使用了 pyspark.sql.functions.last (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.last.html) 函数时,就会出现此问题;此问题有替代方案。

场景

输入

以下示例使用 pyspark.sql.functions.last 函数生成了此 EWI。在此示例中,last 函数用于获取每个名称的最后一个 value

df = spark.createDataFrame([("Alice", 1), ("Bob", 2), ("Charlie", 3), ("Alice", 4), ("Bob", 5)], ["name", "value"])
df_grouped = df.groupBy("name").agg(last("value").alias("last_value"))
Copy

输出

SMA 在输出代码中添加了 EWI SPRKPY1021,以提示 Snowpark 并非完全支持此函数,但它有替代方案。

df = spark.createDataFrame([("Alice", 1), ("Bob", 2), ("Charlie", 3), ("Alice", 4), ("Bob", 5)], ["name", "value"])
#EWI: SPRKPY1021 => pyspark.sql.functions.last has a workaround, see documentation for more info
df_grouped = df.groupBy("name").agg(last("value").alias("last_value"))
Copy

推荐修复方法

作为一种替代方案,可以使用 Snowflake LAST_VALUE 函数。要从 Snowpark 调用此函数,请使用 snowflake.snowpark.functions.call_builtin 函数,并传递字符串 last_value 作为第一个实参,传递相应的列作为第二个实参。如果您在 last 函数中使用该列的名称,则应在调用 call_builtin 函数时将其转换为列。

df = spark.createDataFrame([("Alice", 1), ("Bob", 2), ("Charlie", 3), ("Alice", 4), ("Bob", 5)], ["name", "value"])
df_grouped = df.groupBy("name").agg(call_builtin("last_value", col("value")).alias("last_value"))
Copy

其他建议


描述:>- CSV、JSON 和 PARQUET 方法中的 mode 参数被转换为 overwrite


SPRKPY1022

消息:pyspark.sql.functions.log10 有替代方案,请参阅文档了解更多信息

类别:警告

描述

当 SMA 检测到使用了 pyspark.sql.functions.log10 (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.log10.html) 函数时,就会出现此问题;此问题有替代方案。

场景

输入

以下示例使用 pyspark.sql.functions.log10 函数生成了此 EWI。在此示例中,log10 函数用于计算 value 列以 10 为底的对数值。

df = spark.createDataFrame([(1,), (10,), (100,), (1000,), (10000,)], ["value"])
df_with_log10 = df.withColumn("log10_value", log10(df["value"]))
Copy

输出

SMA 在输出代码中添加了 EWI SPRKPY1022,以提示 Snowpark 并非完全支持此函数,但它有替代方案。

df = spark.createDataFrame([(1,), (10,), (100,), (1000,), (10000,)], ["value"])
#EWI: SPRKPY1022 => pyspark.sql.functions.log10 has a workaround, see documentation for more info
df_with_log10 = df.withColumn("log10_value", log10(df["value"]))
Copy

推荐修复方法

作为一种替代方案,可以使用snowflake.snowpark.functions.log 函数并传递字面量值 10 作为对数的底。

df = spark.createDataFrame([(1,), (10,), (100,), (1000,), (10000,)], ["value"])
df_with_log10 = df.withColumn("log10_value", log(10, df["value"]))
Copy

其他建议

SPRKPY1023

消息:pyspark.sql.functions.log1p 有替代方案,请参阅文档了解更多信息

类别:警告

描述

当 SMA 检测到使用了 pyspark.sql.functions.log1p (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.log1p.html) 函数时,就会出现此问题;此问题有替代方案。

场景

输入

以下示例使用 pyspark.sql.functions.log1p 函数生成了此 EWI。在此示例中,log1p 函数用于计算 value 列的自然对数。

df = spark.createDataFrame([(0,), (1,), (10,), (100,)], ["value"])
df_with_log1p = df.withColumn("log1p_value", log1p(df["value"]))
Copy

输出

SMA 在输出代码中添加了 EWI SPRKPY1023,以提示 Snowpark 并非完全支持此函数,但它有替代方案。

df = spark.createDataFrame([(0,), (1,), (10,), (100,)], ["value"])
#EWI: SPRKPY1023 => pyspark.sql.functions.log1p has a workaround, see documentation for more info
df_with_log1p = df.withColumn("log1p_value", log1p(df["value"]))
Copy

推荐修复方法

作为一种替代方案,可以使用 call_function 函数,并传递字符串 ln 作为第一个实参,然后在第二个实参添加 1

df = spark.createDataFrame([(0,), (1,), (10,), (100,)], ["value"])
df_with_log1p = df.withColumn("log1p_value", call_function("ln", lit(1) + df["value"]))
Copy

其他建议

SPRKPY1024

消息:pyspark.sql.functions.log2 有替代方案,请参阅文档了解更多信息

类别:警告

描述

当 SMA 检测到使用了 pyspark.sql.functions.log2 (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.log2.html) 函数时,就会出现此问题;此问题有替代方案。

场景

输入

以下示例使用 pyspark.sql.functions.log2 函数生成了此 EWI。在此示例中,log2 函数用于计算 value 列以 2 为底的对数值。

df = spark.createDataFrame([(1,), (2,), (4,), (8,), (16,)], ["value"])
df_with_log2 = df.withColumn("log2_value", log2(df["value"]))
Copy

输出

SMA 在输出代码中添加了 EWI SPRKPY1024,以提示 Snowpark 并非完全支持此函数,但它有替代方案。

df = spark.createDataFrame([(1,), (2,), (4,), (8,), (16,)], ["value"])
#EWI: SPRKPY1024 => pyspark.sql.functions.log2 has a workaround, see documentation for more info
df_with_log2 = df.withColumn("log2_value", log2(df["value"]))
Copy

推荐修复方法

作为一种替代方案,可以使用 snowflake.snowpark.functions.log 函数并传递字面量值 2 作为对数的底。

df = session.createDataFrame([(1,), (2,), (4,), (8,), (16,)], ["value"])
df_with_log2 = df.withColumn("log2_value", log(2, df["value"]))
Copy

其他建议

SPRKPY1025

警告

此问题代码已 弃用

消息:pyspark.sql.functions.ntile 有替代方案,请参阅文档了解更多信息

类别:警告

描述

当 SMA 检测到使用了 pyspark.sql.functions.ntile (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.ntile.html) 函数时,就会出现此问题;此问题有替代方案。

场景

输入

以下示例使用 pyspark.sql.functions.ntile 函数生成了此 EWI。在此示例中,使用 ntile 函数将行分成 3 个桶。

df = spark.createDataFrame([("Alice", 50), ("Bob", 30), ("Charlie", 60), ("David", 90), ("Eve", 70), ("Frank", 40)], ["name", "score"])
windowSpec = Window.orderBy("score")
df_with_ntile = df.withColumn("bucket", ntile(3).over(windowSpec))
Copy

输出

SMA 在输出代码中添加了 EWI SPRKPY1025,以提示 Snowpark 并非完全支持此函数,但它有替代方案。

df = spark.createDataFrame([("Alice", 50), ("Bob", 30), ("Charlie", 60), ("David", 90), ("Eve", 70), ("Frank", 40)], ["name", "score"])
windowSpec = Window.orderBy("score")
#EWI: SPRKPY1025 => pyspark.sql.functions.ntile has a workaround, see documentation for more info
df_with_ntile = df.withColumn("bucket", ntile(3).over(windowSpec))
Copy

推荐修复方法

Snowpark 有一个等效的 ntile 函数,但传递给它的实参应该是列。作为一种替代方案,可以使用 snowflake.snowpark.functions.lit 函数将字面量实参转换为列。

df = spark.createDataFrame([("Alice", 50), ("Bob", 30), ("Charlie", 60), ("David", 90), ("Eve", 70), ("Frank", 40)], ["name", "score"])
windowSpec = Window.orderBy("score")
df_with_ntile = df.withColumn("bucket", ntile(lit(3)).over(windowSpec))
Copy

其他建议

SPRKPY1026

警告

Spark Conversion Core 4.3.2 起,此问题代码已 弃用

消息:pyspark.sql.readwriter.DataFrameReader.csv 有替代方案,请参阅文档了解更多信息

类别:警告

描述

当 SMA 检测到使用了 pyspark.sql.readwriter.DataFrameReader.csv (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.csv.html) 函数时,就会出现此问题;此问题有替代方案。

场景

输入

以下示例使用 pyspark.sql.readwriter.DataFrameReader.csv 函数生成了此 EWI。在此示例中,csv 函数用于读取具有给定架构的多个 .csv 文件,并使用一些额外的选项(例如 encodingheadersep)来微调读取文件的行为。

file_paths = [
  "path/to/your/file1.csv",
  "path/to/your/file2.csv",
  "path/to/your/file3.csv",
]

df = session.read.csv(
  file_paths,
  schema=my_schema,
  encoding="UTF-8",
  header=True,
  sep=","
)
Copy

输出

SMA 在输出代码中添加了 EWI SPRKPY1026,以提示 Snowpark 并非完全支持此函数,但它有替代方案。

file_paths = [
  "path/to/your/file1.csv",
  "path/to/your/file2.csv",
  "path/to/your/file3.csv",
]

#EWI: SPRKPY1026 => pyspark.sql.readwriter.DataFrameReader.csv has a workaround, see documentation for more info
df = session.read.csv(
  file_paths,
  schema=my_schema,
  encoding="UTF-8",
  header=True,
  sep=","
)
Copy

推荐修复方法

本节将说明如何配置 path 参数、schema 参数和一些 options,以确保其在 Snowpark 中的正常运行。

1. path 参数

Snowpark 要求将 path 参数为暂存区位置,因此,作为一种替代方案,可以创建一个临时暂存区,并使用前缀 file:// 将每个 .csv 文件添加到该暂存区。

2. schema 参数

Snowpark 不允许将 schema 定义为 csv 函数的参数。作为一种替代方案,可以使用 snowflake.snowpark.DataFrameReader.schema 函数。

3. options 参数

Snowpark 不允许将 extra options 定义为 csv 函数的参数。作为一种替代方案,对于其中许多参数,可以使用 snowflake.snowpark.DataFrameReader.option 函数将这些参数指定为 DataFrameReader 的选项。

备注

Snowpark 不支持 以下选项:

  • columnNameOfCorruptRecord

  • emptyValue

  • enforceSchema

  • header

  • ignoreLeadingWhiteSpace

  • ignoreTrailingWhiteSpace

  • inferSchema

  • locale

  • maxCharsPerColumn

  • maxColumns

  • mode

  • multiLine

  • nanValue

  • negativeInf

  • nullValue

  • positiveInf

  • quoteAll

  • samplingRatio

  • timestampNTZFormat

  • unescapedQuoteHandling

以下是应用上述建议后,可在 Snowpark 中正常运行的完整代码示例:

stage = f'{session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {stage}')

session.file.put(f"file:///path/to/your/file1.csv", f"@{stage}")
session.file.put(f"file:///path/to/your/file2.csv", f"@{stage}")
session.file.put(f"file:///path/to/your/file3.csv", f"@{stage}")

df = session.read.schema(my_schema).option("encoding", "UTF-8").option("sep", ",").csv(stage)
Copy

其他建议

SPRKPY1027

警告

Spark Conversion Core 4.5.2 起,此问题代码已 弃用

消息:pyspark.sql.readwriter.DataFrameReader.json 有替代方案,请参阅文档了解更多信息

类别:警告

描述

当 SMA 检测到使用了 pyspark.sql.readwriter.DataFrameReader.json (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.json.html) 函数时,就会出现此问题;此问题有替代方案。

场景

输入

以下示例使用 pyspark.sql.readwriter.DataFrameReader.json 函数生成了此 EWI。在此示例中,json 函数用于读取具有给定架构的多个 .json 文件,并使用一些额外的选项(例如 primitiveAsStringdateFormat)来微调读取文件的行为。

file_paths = [
  "path/to/your/file1.json",
  "path/to/your/file2.json",
  "path/to/your/file3.json",
]

df = session.read.json(
  file_paths,
  schema=my_schema,
  primitiveAsString=True,
  dateFormat="2023-06-20"
)
Copy

输出

SMA 在输出代码中添加了 EWI SPRKPY1027,以提示 Snowpark 并非完全支持此函数,但它有替代方案。

file_paths = [
  "path/to/your/file1.json",
  "path/to/your/file2.json",
  "path/to/your/file3.json",
]

#EWI: SPRKPY1027 => pyspark.sql.readwriter.DataFrameReader.json has a workaround, see documentation for more info
df = session.read.json(
  file_paths,
  schema=my_schema,
  primitiveAsString=True,
  dateFormat="2023-06-20"
)
Copy

推荐修复方法

本节将说明如何配置 path 参数、schema 参数和一些 options,以确保其在 Snowpark 中的正常运行。

1. path 参数

Snowpark 要求将 path 参数为暂存区位置,因此,作为一种替代方案,可以创建一个临时暂存区,并使用前缀 file:// 将每个 .json 文件添加到该暂存区。

2. schema 参数

Snowpark 不允许将 schema 定义为 json 函数的参数。作为一种替代方案,可以使用 snowflake.snowpark.DataFrameReader.schema 函数。

3. options 参数

Snowpark 不允许将 extra options 定义为 json 函数的参数。作为一种替代方案,对于其中许多参数,可以使用 snowflake.snowpark.DataFrameReader.option 函数将这些参数指定为 DataFrameReader 的选项。

备注

Snowpark 不支持以下选项:

  • allowBackslashEscapingAnyCharacter

  • allowComments

  • allowNonNumericNumbers

  • allowNumericLeadingZero

  • allowSingleQuotes

  • allowUnquotedControlChars

  • allowUnquotedFieldNames

  • columnNameOfCorruptRecord

  • dropFiledIfAllNull

  • encoding

  • ignoreNullFields

  • lineSep

  • locale

  • mode

  • multiline

  • prefersDecimal

  • primitiveAsString

  • samplingRatio

  • timestampNTZFormat

  • timeZone

以下是应用上述建议后,可在 Snowpark 中正常运行的完整代码示例:

stage = f'{session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {stage}')

session.file.put(f"file:///path/to/your/file1.json", f"@{stage}")
session.file.put(f"file:///path/to/your/file2.json", f"@{stage}")
session.file.put(f"file:///path/to/your/file3.json", f"@{stage}")

df = session.read.schema(my_schema).option("dateFormat", "2023-06-20").json(stage)
Copy

其他建议

SPRKPY1028

消息:pyspark.sql.readwriter.DataFrameReader.orc 有替代方案,请参阅文档了解更多信息

类别:警告

描述

当 SMA 检测到使用了 pyspark.sql.readwriter.DataFrameReader.orc (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.orc.html) 函数时,就会出现此问题;此问题有替代方案。

场景

输入

以下示例使用 pyspark.sql.readwriter.DataFrameReader.orc 函数生成了此 EWI。在此示例中,使用 orc 函数来读取多个 .orc 文件,并使用一些额外的选项(例如 mergeSchemarecursiveFileLookup)来微调读取文件的行为。

file_paths = [
  "path/to/your/file1.orc",
  "path/to/your/file2.orc",
  "path/to/your/file3.orc",
]

df = session.read.orc(
  file_paths,
  mergeSchema="True",
  recursiveFileLookup="True"
)
Copy

输出

SMA 在输出代码中添加了 EWI SPRKPY1028,以提示 Snowpark 并非完全支持此函数,但它有替代方案。

file_paths = [
  "path/to/your/file1.orc",
  "path/to/your/file2.orc",
  "path/to/your/file3.orc",
]

#EWI: SPRKPY1028 => pyspark.sql.readwriter.DataFrameReader.orc has a workaround, see documentation for more info
df = session.read.orc(
  file_paths,
  mergeSchema="True",
  recursiveFileLookup="True"
)
Copy

推荐修复方法

本节将说明如何配置 path 参数和额外的 options,以使其在 Snowpark 中正常工作。

1. path 参数

Snowpark 要求将 path 参数为暂存区位置,因此,作为一种替代方案,可以创建一个临时暂存区,并使用前缀 file:// 将每个 .orc 文件添加到该暂存区。

2. options 参数

Snowpark 不允许将 extra options 定义为 orc 函数的参数。作为一种替代方案,对于其中许多参数,可以使用 snowflake.snowpark.DataFrameReader.option 函数将这些参数指定为 DataFrameReader 的选项。

备注

Snowpark 不支持以下选项:

  • compression

  • mergeSchema

以下是应用上述建议后,可在 Snowpark 中正常运行的完整代码示例:

stage = f'{session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {stage}')

session.file.put(f"file:///path/to/your/file1.orc", f"@{stage}")
session.file.put(f"file:///path/to/your/file2.orc", f"@{stage}")
session.file.put(f"file:///path/to/your/file3.orc", f"@{stage}")

df = session.read.option(recursiveFileLookup, "True").orc(stage)
Copy

其他建议

SPRKPY1029

消息:当该工具检测到使用了 pyspark.sql.readwriter.DataFrameReader.parquet 时,就会出现此问题。支持此功能,但由于 Snowpark 和 Spark API 之间存在一些差异,可能需要手动进行一些更改。

类别:警告

描述

当 SMA 检测到使用了 pyspark.sql.readwriter.DataFrameReader.parquet (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.parquet.html) 函数时,就会出现此问题。Snowpark 支持此函数,但存在一些差异,需要手动进行一些更改。

场景

输入

以下示例使用 pyspark.sql.readwriter.DataFrameReader.parquet 函数生成了此 EWI。

file_paths = [
  "path/to/your/file1.parquet",
  "path/to/your/file2.parquet",
  "path/to/your/file3.parquet",
]

df = session.read.parquet(
  *file_paths,
  mergeSchema="true",
  pathGlobFilter="*file*",
  recursiveFileLookup="true",
  modifiedBefore="2024-12-31T00:00:00",
  modifiedAfter="2023-12-31T00:00:00"
)
Copy

输出

SMA 在输出代码中添加了 EWI SPRKPY1029,以提示 Snowpark 支持此函数,但需要进行一些手动调整。请注意,Snowpark 支持的 options 将转换为 option 函数调用,不支持的 options 将被移除。具体说明详见后续章节。

file_paths = [
  "path/to/your/file1.parquet",
  "path/to/your/file2.parquet",
  "path/to/your/file3.parquet"
]

#EWI: SPRKPY1076 => Some of the included parameters are not supported in the parquet function, the supported ones will be added into a option method.
#EWI: SPRKPY1029 => This issue appears when the tool detects the usage of pyspark.sql.readwriter.DataFrameReader.parquet. This function is supported, but some of the differences between Snowpark and the Spark API might require making some manual changes.
df = session.read.option("PATTERN", "*file*").parquet(
  *file_paths
)
Copy

推荐修复方法

本节将说明如何配置 pathsoptions 参数以使其在 Snowpark 中正常工作。

1. paths 参数

在 Spark 中,此参数可以是本地位置,也可以是云位置。在 Snowpark 中,只接受使用 Snowflake 暂存区 的云位置。因此,可以创建一个临时暂存区,并使用前缀 file:// 将每个文件添加到其中。

2. options 参数

Snowpark 不允许将不同的 options 定义为 parquet 函数的参数。作为一种替代方案,可以使用 optionoptions 函数将这些参数指定为 DataFrameReader 的额外选项。

请注意,Snowpark options 与 PySpark options 并非完全相同,因此可能需要手动进行一些更改。下面更详细说明了如何在 Snowpark 中配置最常见的 PySpark options。

2.1 mergeSchema 选项

Parquet 支持架构演进,允许用户从简单的架构开始,然后根据需要逐步添加更多列。这可能会导致多个具有不同但兼容架构的 parquet 文件。在 Snowflake 中,由于 infer_schema 功能,您不需要这样做,因此可以移除 mergeSchema 选项。

2.2 pathGlobFilter 选项

如果您只想从暂存区加载一部分文件,则可以使用 pattern 选项来指定与要加载的文件相匹配的正则表达式。如当前场景输出所示,SMA 已自动完成该操作。

2.3 recursiveFileLookupstr 选项

Snowpark 不支持此选项。最好的建议是使用类似 pathGlobFilter 选项的正则表达式来实现类似功能。

2.4 modifiedBefore/modifiedAfter 选项

在 Snowflake 中,可以使用 metadata 列获得相同的结果。

备注

Snowpark 不支持以下选项:

  • compression

  • datetimeRebaseMode

  • int96RebaseMode

  • mergeSchema

以下是应如何转换输入代码以让它在 Snowpark 中可以正常工作的完整示例:

from snowflake.snowpark.column import METADATA_FILE_LAST_MODIFIED, METADATA_FILENAME

temp_stage = f'{session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}')

session.file.put(f"file:///path/to/your/file1.parquet", f"@{temp_stage}")
session.file.put(f"file:///path/to/your/file2.parquet", f"@{temp_stage}")
session.file.put(f"file:///path/to/your/file3.parquet", f"@{temp_stage}")

df = session.read \
  .option("PATTERN", ".*file.*") \
  .with_metadata(METADATA_FILENAME, METADATA_FILE_LAST_MODIFIED) \
  .parquet(temp_stage) \
  .where(METADATA_FILE_LAST_MODIFIED < '2024-12-31T00:00:00') \
  .where(METADATA_FILE_LAST_MODIFIED > '2023-12-31T00:00:00')
Copy

其他建议

SPRKPY1030

警告

此问题代码已 弃用

消息:pyspark.sql.session.SparkSession.Builder.appName 有替代方案,请参阅文档了解更多信息

类别:警告

描述

当 SMA 检测到使用了 pyspark.sql.session.SparkSession.Builder.appName (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.builder.appName.html) 函数时,就会出现此问题;此问题有替代方案。

场景

输入

以下示例使用 pyspark.sql.session.SparkSession.Builder.appName 函数生成了此 EWI。在此示例中,appName 函数用于将 MyApp 设置为应用程序的名称。

session = SparkSession.builder.appName("MyApp").getOrCreate()
Copy

输出

SMA 在输出代码中添加了 EWI SPRKPY1030,以提示 Snowpark 并非完全支持此函数,但它有替代方案。

#EWI: SPRKPY1030 => pyspark.sql.session.SparkSession.Builder.appName has a workaround, see documentation for more info
session = Session.builder.appName("MyApp").getOrCreate()
Copy

推荐修复方法

作为一种替代方案,您可以导入 snowpark_extensions (https://pypi.org/project/snowpark-extensions/) 包,它为 appName 函数提供了扩展。

import snowpark_extensions
session = SessionBuilder.appName("MyApp").getOrCreate()
Copy

其他建议

SPRKPY1031

警告

Spark Conversion Core 2.7.0 起,此问题代码已 弃用

消息:pyspark.sql.Column.Column.Contains 有替代方案,请参阅文档了解更多信息

类别:警告

描述

当 SMA 检测到使用了 pyspark.sql.column.Column.contains (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Column.contains.html) 函数时,就会出现此问题;此问题有替代方案。

场景

输入

以下示例使用 pyspark.sql.column.Column.contains 函数生成了此 EWI。在此示例中,contains 函数用于过滤“City”列包含子字符串“New”的行。

df = spark.createDataFrame([("Alice", "New York"), ("Bob", "Los Angeles"), ("Charlie", "Chicago")], ["Name", "City"])
df_filtered = df.filter(col("City").contains("New"))
Copy

输出

SMA 在输出代码中添加了 EWI SPRKPY1031,以提示 Snowpark 并非完全支持此函数,但它有替代方案。

df = spark.createDataFrame([("Alice", "New York"), ("Bob", "Los Angeles"), ("Charlie", "Chicago")], ["Name", "City"])
#EWI: SPRKPY1031 => pyspark.sql.column.Column.contains has a workaround, see documentation for more info
df_filtered = df.filter(col("City").contains("New"))
Copy

推荐修复方法

作为一种替代方案,可以使用 snowflake.snowpark.functions.contains 函数,并将该列作为第一个实参传递,将要搜索的元素作为第二个实参传递。如果要搜索的元素是字面量值,则应使用 lit 函数将其转换为列表达式。

from snowflake.snowpark import functions as f
df = spark.createDataFrame([("Alice", "New York"), ("Bob", "Los Angeles"), ("Charlie", "Chicago")], ["Name", "City"])
df_filtered = df.filter(f.contains(col("City"), f.lit("New")))
Copy

其他建议

SPRKPY1032

消息:spark 元素 未定义

类别:转换错误

描述

当 SMA 无法确定给定元素的相应映射状态时,就会出现此问题。这意味着,SMA 还不知道 Snowpark 是否支持该元素。请注意,这是 SMA 对任何未定义元素使用的通用错误代码。

场景

输入

以下是 SMA 无法确定其相应映射状态的函数的示例。在这种情况下,应假设 not_defined_function() 是有效的 PySpark 函数,代码就可以运行了。

sc.parallelize(["a", "b", "c", "d", "e"], 3).not_defined_function().collect()
Copy

输出

SMA 在输出代码中添加了 EWI SPRKPY1032,以提示此元素未定义。

#EWI: SPRKPY1032 => pyspark.rdd.RDD.not_defined_function is not defined
sc.parallelize(["a", "b", "c", "d", "e"], 3).not_defined_function().collect()
Copy

推荐修复方法

要尝试找出问题,可以执行以下验证:

  • 检查源代码的语法是否正确以及拼写是否正确。

  • 检查是否使用的是 SMA 支持的 PySpark 版本。要了解运行 SMA 时 SMA 支持哪个 PySpark 版本,可以查看 DetailedReport.docx 文件的第一页。

如果这是有效的 PySpark 元素,请使用 SMA 的 报告问题 选项报告您遇到的有关该特定元素的转换错误,并添加您认为可能有帮助的任何其他信息。

请注意,如果某个元素未定义,并不意味着 Snowpark 不支持该元素。您应该查看 Snowpark 文档 以验证是否存在等效元素。

其他建议

SPRKPY1033

警告

此问题代码已 弃用

消息:pyspark.sql.functions.asc 有替代方案,请参阅文档了解更多信息

类别:警告

描述

当 SMA 检测到使用了 pyspark.sql.functions.asc (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.asc.html) 函数时,就会出现此问题;此问题有替代方案。

场景

pyspark.sql.functions.asc 函数会将列对象或列名视为字符串并用作其参数。Snowpark 不支持这两种场景,因此生成了此 EWI。

场景 1

输入

以下示例使用 pyspark.sql.functions.asc 函数,它接收列对象作为参数。

df.orderBy(asc(col))
Copy

输出

SMA 在输出代码中添加了 EWI SPRKPY1033,以提示 Snowpark 并非完全支持带有列对象参数的 asc 函数,但它有替代方案。

#EWI: SPRKPY1033 => pyspark.sql.functions.asc has a workaround, see documentation for more info
df.orderBy(asc(col))
Copy

推荐修复方法

作为一种替代方案,可以从 column 列参数中调用 snowflake.snowpark.Column.asc 函数。

df.orderBy(col.asc())
Copy
场景 2

输入

以下示例使用 pyspark.sql.functions.asc 函数,它接收列名作为参数。

df.orderBy(asc("colName"))
Copy

输出

SMA 在输出代码中添加了 EWI SPRKPY1033,以提示 Snowpark 并非完全支持带有列名参数的 asc 函数,但它有替代方案。

#EWI: SPRKPY1033 => pyspark.sql.functions.asc has a workaround, see documentation for more info
df.orderBy(asc("colName"))
Copy

推荐修复方法

作为一种替代方案,可以使用 snowflake.snowpark.functions.col 函数将字符串参数转换为列对象,然后调用 snowflake.snowpark.Column.asc 函数。

df.orderBy(col("colName").asc())
Copy

其他建议

SPRKPY1034

警告

此问题代码已 弃用

消息:pyspark.sql.functions.desc 有替代方案,请参阅文档了解更多信息

类别:警告

描述

当 SMA 检测到使用了 pyspark.sql.functions.desc (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.desc.html) 函数时,就会出现此问题;此问题有替代方案。

场景

pyspark.sql.functions.desc 函数将列对象或列名作为字符串并用作其参数。Snowpark 不支持这两种场景,因此生成了此 EWI。

场景 1

输入

以下示例使用 pyspark.sql.functions.desc 函数,它接收列对象作为参数。

df.orderBy(desc(col))
Copy

输出

SMA 在输出代码中添加了 EWI SPRKPY1034,以提示 Snowpark 并非完全支持带有列对象参数的 desc 函数,但它有替代方案。

#EWI: SPRKPY1034 => pyspark.sql.functions.desc has a workaround, see documentation for more info
df.orderBy(desc(col))
Copy

推荐修复方法

作为一种替代方案,可以从 column 参数中调用 snowflake.snowpark.Column.desc 函数。

df.orderBy(col.desc())
Copy
场景 2

输入

以下示例使用 pyspark.sql.functions.desc 函数,它接收列名作为参数。

df.orderBy(desc("colName"))
Copy

输出

SMA 在输出代码中添加了 EWI SPRKPY1034,以提示 Snowpark 并非完全支持带有列名参数的 desc 函数,但它有替代方案。

#EWI: SPRKPY1034 => pyspark.sql.functions.desc has a workaround, see documentation for more info
df.orderBy(desc("colName"))
Copy

推荐修复方法

作为一种替代方案,可以使用 snowflake.snowpark.functions.col 函数将字符串参数转换为列对象,然后调用 snowflake.snowpark.Column.desc 函数。

df.orderBy(col("colName").desc())
Copy

其他建议

SPRKPY1035

警告

此问题代码已 弃用

消息:pyspark.sql.functions.reverse 有替代方案,请参阅文档了解更多信息

类别:警告

描述

当 SMA 检测到使用了 pyspark.sql.functions.reverse (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.reverse.html) 函数时,就会出现此问题;此问题有替代方案。

场景

输入

以下示例使用 pyspark.sql.functions.reverse 函数生成了此 EWI。在此示例中,reverse 函数用于反转 word 列的每个字符串。

df = spark.createDataFrame([("hello",), ("world",)], ["word"])
df_reversed = df.withColumn("reversed_word", reverse(df["word"]))
df_reversed = df.withColumn("reversed_word", reverse("word"))
Copy

输出

SMA 在输出代码中添加了 EWI SPRKPY1035,以提示 Snowpark 并非完全支持此函数,但它有替代方案。

df = spark.createDataFrame([("hello",), ("world",)], ["word"])
#EWI: SPRKPY1035 => pyspark.sql.functions.reverse has a workaround, see documentation for more info
df_reversed = df.withColumn("reversed_word", reverse(df["word"]))
#EWI: SPRKPY1035 => pyspark.sql.functions.reverse has a workaround, see documentation for more info
df_reversed = df.withColumn("reversed_word", reverse("word"))
Copy

推荐修复方法

作为一种替代方案,您可以导入 snowpark_extensions (https://pypi.org/project/snowpark-extensions/) 包,它为 reverse 函数提供了扩展。

import snowpark_extensions

df = spark.createDataFrame([("hello",), ("world",)], ["word"])
df_reversed = df.withColumn("reversed_word", reverse(df["word"]))
df_reversed = df.withColumn("reversed_word", reverse("word"))
Copy

其他建议

SPRKPY1036

警告

此问题代码已 弃用

消息:pyspark.sql.column.Column.getField 有替代方案,请参阅文档了解更多信息

类别:警告

描述

当 SMA 检测到使用了 pyspark.sql.column.Column.getField (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Column.getField.html) 函数时,就会出现此问题;此问题有替代方案。

场景

输入

以下示例使用 pyspark.sql.column.Column.getField 函数生成了此 EWI。在此示例中,使用 getField 函数从 info 列中提取 name

df = spark.createDataFrame([(1, {"name": "John", "age": 30}), (2, {"name": "Jane", "age": 25})], ["id", "info"])
df_with_name = df.withColumn("name", col("info").getField("name"))
Copy

输出

SMA 在输出代码中添加了 EWI SPRKPY1036,以提示 Snowpark 并非完全支持此函数,但它有替代方案。

df = spark.createDataFrame([(1, {"name": "John", "age": 30}), (2, {"name": "Jane", "age": 25})], ["id", "info"])
#EWI: SPRKPY1036 => pyspark.sql.column.Column.getField has a workaround, see documentation for more info
df_with_name = df.withColumn("name", col("info").getField("name"))
Copy

推荐修复方法

作为一种替代方案,可以使用以字段名称作为索引的 Snowpark 列索引器运算符

df = spark.createDataFrame([(1, {"name": "John", "age": 30}), (2, {"name": "Jane", "age": 25})], ["id", "info"])
df_with_name = df.withColumn("name", col("info")["name"])
Copy

其他建议

SPRKPY1037

警告

此问题代码已 弃用

消息:pyspark.sql.functions.sort_array 有替代方案,请参阅文档了解更多信息

类别:警告

描述

当 SMA 检测到使用了 pyspark.sql.functions.sort_array (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.sort_array.html) 函数时,就会出现此问题;此问题有替代方案。

场景

输入

以下示例使用 pyspark.sql.functions.sort_array 函数生成了此 EWI。在此示例中,sort_array 函数用于按升序和降序对 numbers 数组进行排序。

df = spark.createDataFrame([(1, [3, 1, 2]), (2, [10, 5, 8]), (3, [6, 4, 7])], ["id", "numbers"])
df_sorted_asc = df.withColumn("sorted_numbers_asc", sort_array("numbers", asc=True))
df_sorted_desc = df.withColumn("sorted_numbers_desc", sort_array("numbers", asc=False))
Copy

输出

SMA 在输出代码中添加了 EWI SPRKPY1037,以提示 Snowpark 并非完全支持此函数,但它有替代方案。

df = spark.createDataFrame([(1, [3, 1, 2]), (2, [10, 5, 8]), (3, [6, 4, 7])], ["id", "numbers"])
#EWI: SPRKPY1037 => pyspark.sql.functions.sort_array has a workaround, see documentation for more info
df_sorted_asc = df.withColumn("sorted_numbers_asc", sort_array("numbers", asc=True))
#EWI: SPRKPY1037 => pyspark.sql.functions.sort_array has a workaround, see documentation for more info
df_sorted_desc = df.withColumn("sorted_numbers_desc", sort_array("numbers", asc=False))
Copy

推荐修复方法

作为一种替代方案,可以导入 snowpark_extensions (https://pypi.org/project/snowpark-extensions/) 包,它为 sort_array 函数提供了扩展。

import snowpark_extensions

df = spark.createDataFrame([(1, [3, 1, 2]), (2, [10, 5, 8]), (3, [6, 4, 7])], ["id", "numbers"])
df_sorted_asc = df.withColumn("sorted_numbers_asc", sort_array("numbers", asc=True))
df_sorted_desc = df.withColumn("sorted_numbers_desc", sort_array("numbers", asc=False))
Copy

其他建议

SPRKPY1038

消息:spark 元素 尚未识别

类别:转换错误

描述

当您的源代码中有 SMA 无法识别的 PySpark 元素时,就会出现此问题。出现这种情况可能由多种原因导致,例如:

  • PySpark 中不存在的元素。

  • 在 SMA 尚不支持的 PySpark 版本中添加的元素。

  • 处理元素时 SMA 的内部错误。

这是 SMA 对任何无法识别的元素使用的通用错误代码。

场景

输入

以下是一个函数使用示例,由于该函数在 PySpark 中不存在,因此 SMA 无法识别该函数。

from pyspark.sql import functions as F
F.unrecognized_function()
Copy

输出

SMA 在输出代码中添加了 EWI SPRKPY1038,以提示此元素无法识别。

from snowflake.snowpark import functions as F
#EWI: SPRKPY1038 => pyspark.sql.functions.non_existent_function is not yet recognized
F.unrecognized_function()
Copy

推荐修复方法

要尝试找出问题,可以执行以下验证:

  • 检查 PySpark 中是否存在该元素。

  • 检查元素拼写是否正确。

  • 检查是否使用的是 SMA 支持的 PySpark 版本。要了解运行 SMA 时 SMA 支持哪个 PySpark 版本,可以查看 DetailedReport.docx 文件的第一页。

如果它是有效的 PySpark 元素,请使用 SMA 的 报告问题 选项报告您遇到的有关该特定元素的转换错误,并添加您认为可能有帮助的任何其他信息。

请注意,如果 SMA 无法识别某个元素,这并不意味着 Snowpark 不支持该元素。您应该查看 Snowpark 文档 以验证是否存在等效元素。

其他建议

SPRKPY1039

警告

此问题代码已 弃用

消息:pyspark.sql.column.Column.getItem 有替代方案,请参阅文档了解更多信息

类别:警告

描述

当 SMA 检测到使用了 pyspark.sql.column.Column.getItem (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Column.getItem.html) 函数时,就会出现此问题;此问题有替代方案。

场景

输入

以下示例使用 pyspark.sql.column.Column.getItem 函数生成了此 EWI。在此示例中,使用 getItem 函数通过位置索引和键获取项目。

df = spark.createDataFrame([(1, ["apple", "banana", "orange"]), (2, ["carrot", "avocado", "banana"])], ["id", "fruits"])
df.withColumn("first_fruit", col("fruits").getItem(0))

df = spark.createDataFrame([(1, {"apple": 10, "banana": 20}), (2, {"carrot": 15, "grape": 25}), (3, {"pear": 30, "apple": 35})], ["id", "fruit_quantities"])
df.withColumn("apple_quantity", col("fruit_quantities").getItem("apple"))
Copy

输出

SMA 在输出代码中添加了 EWI SPRKPY1039,以提示 Snowpark 并非完全支持此函数,但它有替代方案。

df = spark.createDataFrame([(1, ["apple", "banana", "orange"]), (2, ["carrot", "avocado", "banana"])], ["id", "fruits"])
#EWI: SPRKPY1039 => pyspark.sql.column.Column.getItem has a workaround, see documentation for more info
df.withColumn("first_fruit", col("fruits").getItem(0))

df = spark.createDataFrame([(1, {"apple": 10, "banana": 20}), (2, {"carrot": 15, "grape": 25}), (3, {"pear": 30, "apple": 35})], ["id", "fruit_quantities"])
#EWI: SPRKPY1039 => pyspark.sql.column.Column.getItem has a workaround, see documentation for more info
df.withColumn("apple_quantity", col("fruit_quantities").getItem("apple"))
Copy

推荐修复方法

作为一种替代方案,可以使用 Snowpark 列索引器运算符,并将字段的名称或位置用作索引。

df = spark.createDataFrame([(1, ["apple", "banana", "orange"]), (2, ["carrot", "avocado", "banana"])], ["id", "fruits"])
df.withColumn("first_fruit", col("fruits")[0])

df = spark.createDataFrame([(1, {"apple": 10, "banana": 20}), (2, {"carrot": 15, "grape": 25}), (3, {"pear": 30, "apple": 35})], ["id", "fruit_quantities"])
df.withColumn("apple_quantity", col("fruit_quantities")["apple"])
Copy

其他建议

SPRKPY1040

警告

此问题代码已 弃用

消息:pyspark.sql.functions.explode 有替代方案,请参阅文档了解更多信息

类别:警告

描述

当 SMA 检测到使用了 pyspark.sql.functions.explode (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.explode.html) 函数时,就会出现此问题;此问题有替代方案。

场景

输入

以下示例使用 pyspark.sql.functions.explode 函数生成了此 EWI。在此示例中,使用了 explode 函数来为 numbers 列的每个数组项生成一行。

df = spark.createDataFrame([("Alice", [1, 2, 3]), ("Bob", [4, 5]), ("Charlie", [6, 7, 8, 9])], ["name", "numbers"])
exploded_df = df.select("name", explode(df.numbers).alias("number"))
Copy

输出

SMA 在输出代码中添加了 EWI SPRKPY1040,以提示 Snowpark 并非完全支持此函数,但它有替代方案。

df = spark.createDataFrame([("Alice", [1, 2, 3]), ("Bob", [4, 5]), ("Charlie", [6, 7, 8, 9])], ["name", "numbers"])
#EWI: SPRKPY1040 => pyspark.sql.functions.explode has a workaround, see documentation for more info
exploded_df = df.select("name", explode(df.numbers).alias("number"))
Copy

推荐修复方法

作为一种替代方案,可以导入 snowpark_extensions (https://pypi.org/project/snowpark-extensions/) 包,它为 explode 函数提供了扩展。

import snowpark_extensions

df = spark.createDataFrame([("Alice", [1, 2, 3]), ("Bob", [4, 5]), ("Charlie", [6, 7, 8, 9])], ["name", "numbers"])
exploded_df = df.select("name", explode(df.numbers).alias("number"))
Copy

其他建议

SPRKPY1041

警告

Spark Conversion Core Version 2.9.0 起,此问题代码已 弃用

消息:pyspark.sql.functions.explode_outer 有替代方案

类别:警告

描述

当该工具检测到使用了 pyspark.sql.functions.explode_outer (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.explode_outer.html#pyspark.sql.functions.explode_outer) 时,就会出现此问题;此问题有替代方案。

场景

输入

该示例显示了在 select 调用中使用 explode_outer 方法的情况。

df = spark.createDataFrame(
    [(1, ["foo", "bar"], {"x": 1.0}),
     (2, [], {}),
     (3, None, None)],
    ("id", "an_array", "a_map")
)

df.select("id", "an_array", explode_outer("a_map")).show()
Copy

输出

该工具添加了 EWI SPRKPY1041,提示可以实施替代方案。

df = spark.createDataFrame(
    [(1, ["foo", "bar"], {"x": 1.0}),
     (2, [], {}),
     (3, None, None)],
    ("id", "an_array", "a_map")
)

#EWI: SPRKPY1041 => pyspark.sql.functions.explode_outer has a workaround, see documentation for more info
df.select("id", "an_array", explode_outer("a_map")).show()
Copy

推荐修复方法

作为一种替代方案,可以导入 snowpark_extensions 包,其包含 explode_outer 辅助函数。

import snowpark_extensions

df = spark.createDataFrame(
    [(1, ["foo", "bar"], {"x": 1.0}),
     (2, [], {}),
     (3, None, None)],
    ("id", "an_array", "a_map")
)

df.select("id", "an_array", explode_outer("a_map")).show()
Copy

其他建议

SPRKPY1042

消息:pyspark.sql.functions.posexplode 有替代方案

类别:警告

描述

当该工具检测到使用了 pyspark.sql.functions.posexplode (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.posexplode.html?highlight=posexplode) 时,就会出现此问题;此问题有替代方案。

场景

根据作为参数传递的列类型,此方法可以处理几种情况:参数可以是 list of valuesmap/directory (keys/values)

场景 1

输入

以下示例使用 posexplode,并传递 list of values 作为参数。

df = spark.createDataFrame(
    [Row(a=1,
         intlist=[1, 2, 3])])

df.select(posexplode(df.intlist)).collect()
Copy

输出

该工具添加了 EWI SPRKPY1042,提示可以实施替代方案。

df = spark.createDataFrame(
    [Row(a=1,
         intlist=[100, 200, 300])])
#EWI: SPRKPY1042 => pyspark.sql.functions.posexplode has a workaround, see documentation for more info

df.select(posexplode(df.intlist)).show()
Copy

推荐修复方法

要获得相同的行为,请使用方法 functions.flatten,删除多余的列,然后重命名索引和值列的名称。

df = spark.createDataFrame(
  [Row(a=1,
       intlist=[1, 2, 3])])

df.select(
    flatten(df.intlist))\
    .drop("DATA", "SEQ", "KEY", "PATH", "THIS")\
    .rename({"INDEX": "pos", "VALUE": "col"}).show()
Copy
场景 2

输入

以下是使用 posexplode 的另一个示例,它接受 map/dictionary (keys/values) 作为参数

df = spark.createDataFrame([
    [1, [1, 2, 3], {"Ashi Garami": "Single Leg X"}, "Kimura"],
    [2, [11, 22], {"Sankaku": "Triangle"}, "Coffee"]
],
schema=["idx", "lists", "maps", "strs"])

df.select(posexplode(df.maps)).show()
Copy

输出

该工具添加了 EWI SPRKPY1042,提示可以实施替代方案。

df = spark.createDataFrame([
    [1, [1, 2, 3], {"Ashi Garami": "Single Leg X"}, "Kimura"],
    [2, [11, 22], {"Sankaku": "Triangle"}, "Coffee"]
],
schema=["idx", "lists", "maps", "strs"])
#EWI: SPRKPY1042 => pyspark.sql.functions.posexplode has a workaround, see documentation for more info

df.select(posexplode(df.maps)).show()
Copy

推荐修复方法

作为一种替代方案,可以使用 functions.row_number 来获取位置,使用 functions.explode 及字段的名称来获取字典的键/值的值。

df = spark.createDataFrame([
    [10, [1, 2, 3], {"Ashi Garami": "Single Leg X"}, "Kimura"],
    [11, [11, 22], {"Sankaku": "Triangle"}, "Coffee"]
],
    schema=["idx", "lists", "maps", "strs"])

window = Window.orderBy(col("idx").asc())

df.select(
    row_number().over(window).alias("pos"),
    explode(df.maps).alias("key", "value")).show()
Copy

注意: 使用 row_number 并不完全等效,因为它从 1 开始(而 Spark 方法从零开始)

其他建议

SPRKPY1043

消息:pyspark.sql.functions.posexplode_outer 有替代方案

类别:警告

描述

当该工具检测到使用了 pyspark.sql.functions.posexplode_outer (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.posexplode_outer.html) 时,就会出现此问题;此问题有替代方案。

场景

根据作为参数传递的列类型,此方法可以处理几种情况:参数可以是 list of valuesmap/directory (keys/values)

场景 1

输入

以下示例使用 posexplode_outer,它接受 list of values 作为参数。

df = spark.createDataFrame(
    [
        (1, ["foo", "bar"]),
        (2, []),
        (3, None)],
    ("id", "an_array"))

df.select("id", "an_array", posexplode_outer("an_array")).show()
Copy

输出

该工具添加了 EWI SPRKPY1043,提示可以实施替代方案。

df = spark.createDataFrame(
    [
        (1, ["foo", "bar"]),
        (2, []),
        (3, None)],
    ("id", "an_array"))
#EWI: SPRKPY1043 => pyspark.sql.functions.posexplode_outer has a workaround, see documentation for more info

df.select("id", "an_array", posexplode_outer("an_array")).show()
Copy

推荐修复方法

要获得相同的行为,请使用方法 functions.flatten 并将 outer 参数设置为 True,删除多余的列,然后重命名索引和值列的名称。

df = spark.createDataFrame(
    [
        (1, ["foo", "bar"]),
        (2, []),
        (3, None)],
    ("id", "an_array"))

df.select(
    flatten(df.an_array, outer=True))\
    .drop("DATA", "SEQ", "KEY", "PATH", "THIS")\
    .rename({"INDEX": "pos", "VALUE": "col"}).show()
Copy
场景 2

输入

以下是使用 posexplode_outer 的另一个示例,其接收 map/dictionary (keys/values) 作为参数

df = spark.createDataFrame(
    [
        (1, {"x": 1.0}),
        (2, {}),
        (3, None)],
    ("id", "a_map"))

df.select(posexplode_outer(df.a_map)).show()
Copy

输出

该工具添加了 EWI SPRKPY1043,提示可以实施替代方案。

df = spark.createDataFrame(
    [
        (1, {"x": "Ashi Garami"}),
        (2, {}),
        (3, None)],
    ("id", "a_map"))
#EWI: SPRKPY1043 => pyspark.sql.functions.posexplode_outer has a workaround, see documentation for more info

df.select(posexplode_outer(df.a_map)).show()
Copy

推荐修复方法

作为一种替代方案,可以使用 functions.row_number 来获取位置,使用 functions.explode_outer 及字段的名称来获取字典的键/值的值。

df = spark.createDataFrame(
    [
        (1, {"x": "Ashi Garami"}),
        (2,  {}),
        (3, None)],
    ("id", "a_map"))

window = Window.orderBy(col("id").asc())

df.select(
    row_number().over(window).alias("pos"),
          explode_outer(df.a_map)).show()
Copy

注意: 使用 row_number 并不完全等效,因为它从 1 开始(而 Spark 方法从零开始)

其他建议

SPRKPY1044

警告

Spark Conversion Core 版本 2.4.0 起,此问题代码已 弃用

消息:pyspark.sql.functions.split 有替代方案

类别:警告。

描述

当该工具检测到使用了 pyspark.sql.functions.split (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.split.html) 时,就会出现此问题;此问题有替代方案。

场景

具体实现方式取决于传递给该方法的参数数量,主要分为几种情况。

场景 1

输入

以下是函数 split 只有 strpattern 参数时的示例

F.split('col', '\\|')
Copy

输出

该工具显示 EWI SPRKPY1044,提示有替代方案。

#EWI: SPRKPY1044 => pyspark.sql.functions.split has a workaround, see the documentation for more info
F.split('col', '\\|')
Copy

推荐修复方法

作为一种替代方案,可以使用 pattern 参数调用函数 snowflake.snowpark.functions.lit,然后将其发送到 split 中。

F.split('col', lit('\\|'))
## the result of lit will be sent to the split function
Copy

场景 2

输入

以下是函数 split 具有 str、pattern 和 limit 参数的另一个示例。

F.split('col', '\\|', 2)
Copy

输出

该工具显示 EWI SPRKPY1044,提示有替代方案。

#EWI: SPRKPY1044 => pyspark.sql.functions.split has a workaround, see the documentation for more info
F.split('col', '\\|', 2)
Copy

推荐修复方法

不支持此特定方案。

其他建议

SPRKPY1045

消息:pyspark.sql.functions.map_values 有替代方案

类别:警告。

描述

此函数用于从包含 map/dictionary (keys/values) 的列中提取值列表。

当该工具检测到使用了 pyspark.sql.functions.map_values (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.map_values.html) 时,就会出现问题;该问题有替代方案。

场景

输入

以下是使用方法 map_values 的示例。

df = spark.createDataFrame(
    [(1, {'Apple': 'Fruit', 'Potato': 'Vegetable'})],
    ("id", "a_map"))

df.select(map_values("a_map")).show()
Copy

输出

该工具添加了 EWI SPRKPY1045,提示可以实施替代方案。

df = spark.createDataFrame(
    [(1, {'Apple': 'Fruit', 'Potato': 'Vegetable'})],
    ("id", "a_map"))
#EWI: SPRKPY1045 => pyspark.sql.functions.map_values has a workaround, see documentation for more info

df.select(map_values("a_map")).show()
Copy

推荐修复方法

作为一种替代方案,可以创建 udf 来获取列的值。下面的示例显示如何创建 udf,接着将其分配给 F.map_values,然后使用它。

from snowflake.snowpark import functions as F
from snowflake.snowpark.types import ArrayType, MapType

map_values_udf=None

def map_values(map):
    global map_values_udf
    if not map_values_udf:
        def _map_values(map: dict)->list:
            return list(map.values())
        map_values_udf = F.udf(_map_values,return_type=ArrayType(),input_types=[MapType()],name="map_values",is_permanent=False,replace=True)
    return map_values_udf(map)

F.map_values = map_values

df.select(map_values(colDict))
Copy

其他建议

SPRKPY1046

警告

Spark Conversion Core 版本 2.1.22 起,此问题代码已 弃用

消息:pyspark.sql.functions.monotonically_increasing_id 有替代方案

类别:警告。

描述

当该工具检测到使用了 pyspark.sql.functions.monotonically_increasing_id (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.monotonically_increasing_id.html) 时,就会出现此问题;此问题有替代方案。

场景

输入

以下示例使用 monotonically_increasing_id 方法。

from pyspark.sql import functions as F

spark.range(0, 10, 1, 2).select(F.monotonically_increasing_id()).show()
Copy

输出

该工具添加了 EWI SPRKPY1046,提示可以实施替代方案。

from pyspark.sql import functions as F
#EWI: SPRKPY1046 => pyspark.sql.functions.monotonically_increasing_id has a workaround, see documentation for more info
spark.range(0, 10, 1, 2).select(F.monotonically_increasing_id()).show()
Copy

推荐修复方法

更新工具版本。

其他建议

SPRKPY1047

警告

Spark Conversion Core 版本 4.6.0 起,此问题代码已 弃用

描述

当该工具检测到使用了 pyspark.context.SparkContext.setLogLevel (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.setLogLevel.html?highlight=pyspark%20context%20sparkcontext%20setloglevel#pyspark.SparkContext.setLogLevel) 时,就会出现此问题;此问题有替代方案。

场景

输入

以下是使用方法 setLogLevel 的示例。

sparkSession.sparkContext.setLogLevel("WARN")
Copy

输出

该工具添加了 EWI SPRKPY1047,提示可以实施替代方案。

#EWI: SPRKPY1047 => pyspark.context.SparkContext.setLogLevel has a workaround, see documentation for more info
sparkSession.sparkContext.setLogLevel("WARN")
Copy

推荐修复方法

将使用的 setLogLevel 函数替换为 logging.basicConfig,后者为简单日志记录提供了一系列便捷函数。为了使用它,需要导入“logging”和“sys”这两个模块,并且应该使用“级别等效选项表”替换级别常量:

import logging
import sys
logging.basicConfig(stream=sys.stdout, level=logging.WARNING)
Copy
  • 级别等效选项表

级别源参数

级别目标参数

"ALL"

此参数没有等效选项

"DEBUG"

logging.DEBUG

"ERROR"

logging.ERROR

"FATAL"

logging.CRITICAL

"INFO"

logging.INFO

"OFF"

logging.NOTSET

"TRACE"

此参数没有等效选项

"WARN"

logging.WARNING

其他建议

SPRKPY1048

警告

Spark Conversion Core 版本 2.4.0 起,此问题代码已 弃用

消息:pyspark.sql.session.SparkSession.conf 有替代方案

类别:警告。

描述

当该工具检测到使用了 pyspark.sql.session.SparkSession.conf (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.conf.html) 时,就会出现此问题;此问题有替代方案。

场景

输入

以下是如何将配置设置为属性 conf 的示例。

spark.conf.set("spark.sql.crossJoin.enabled", "true")
Copy

输出

该工具添加了 EWI SPRKPY1048,提示可以实施替代方案。

#EWI: SPRKPY1048 => pyspark.sql.session.SparkSession.conf has a workaround, see documentation for more info
spark.conf.set("spark.sql.crossJoin.enabled", "true")
Copy

推荐修复方法

SparkSession.conf 用于传递仅 Pyspark 使用的特定设置,这些设置不适用于 Snowpark。可以在代码上删除或添加注释

#spark.conf.set("spark.sql.crossJoin.enabled", "true")
Copy

其他建议

SPRKPY1049

警告

Spark Conversion Core 版本 2.1.9 起,此问题代码已 弃用

消息:pyspark.sql.session.SparkSession.sparkContext 有替代方案

类别:警告。

描述

当该工具检测到使用了 pyspark.sql.session.SparkSession.sparkContext (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.sparkContext.html) 时,就会出现此问题;此问题有替代方案。

场景

输入

以下是创建 Spark 会话,然后使用 SparkContext 属性显示 appName 的示例。

print("APP Name :"+spark.sparkContext.appName())
Copy

输出

该工具添加了 EWI SPRKPY1049,提示可以实施替代方案。

#EWI: SPRKPY1049 => pyspark.sql.session.SparkSession.sparkContext has a workaround, see documentation for more info
print("APP Name :"+spark.sparkContext.appName())
Copy

推荐修复方法

SnowPark 不支持 SparkContext,但可以直接从 Session 实例访问 SparkContext 中的方法和属性。

## Pyspark
print("APP Name :"+spark.sparkContext.appName())
can be used in SnowPark removing the sparkContext as:
#Manual adjustment in SnowPark
print("APP Name :"+spark.appName());
Copy

其他建议

SPRKPY1050

消息:pyspark.conf.SparkConf.set 有替代方案。

类别:警告。

描述

当该工具检测到使用了 pyspark.conf.SparkConf.set (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkConf.set.html) 时,就会出现此问题;此问题有替代方案。

场景

输入

以下是使用 conf.set 设置变量的示例。

conf = SparkConf().setAppName('my_app')

conf.set("spark.storage.memoryFraction", "0.5")
Copy

输出

该工具添加了 EWI SPRKPY1050,提示可以实施替代方案。

conf = SparkConf().setAppName('my_app')

#EWI: SPRKPY1050 => pyspark.conf.SparkConf.set has a workaround, see documentation for more info
conf.set("spark.storage.memoryFraction", "0.5")
Copy

推荐修复方法

SparkConf.set 用于设置仅由 Pyspark 使用的配置设置,不适用于 Snowpark。可以在代码上删除或添加注释

#conf.set("spark.storage.memoryFraction", "0.5")
Copy

其他建议

SPRKPY1051

警告

Spark Conversion Core 版本 2.4.0 起,此问题代码已 弃用

消息:pyspark.sql.session.SparkSession.Builder.master 有替代方案

类别:警告。

描述

当该工具检测到使用了 pyspark.sql.session.SparkSession.Builder.master (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.builder.master.html) 时,就会出现此问题;此问题有替代方案。

场景

输入

以下示例展示了如何使用 builder.master 方法,将 Spark Master URL 设置为使用单核模式连接到本地位置。

spark = SparkSession.builder.master("local[1]")
Copy

输出

该工具添加了 EWI SPRKPY1051,提示可以实施替代方案。

#EWI: SPRKPY1051 => pyspark.sql.session.SparkSession.Builder.master has a workaround, see documentation for more info
spark = Session.builder.master("local[1]")
Copy

推荐修复方法

pyspark.sql.session.SparkSession.Builder.master 用于设置 Spark 集群。Snowpark 不使用 Spark 集群,因此可为该代码删除或注释。

## spark = Session.builder.master("local[1]")
Copy

其他建议

SPRKPY1052

警告

Spark Conversion Core 版本 2.8.0 起,此问题代码已 弃用

消息:pyspark.sql.session.SparkSession.Builder.enableHiveSupport 有替代方案

类别:警告。

描述

当该工具检测到使用了 pyspark.sql.session.SparkSession.Builder.enableHiveSupport (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.builder.enableHiveSupport.html) 时,就会出现此问题;此问题有替代方案。

场景

输入

以下示例使用 enableHiveSupport 方法配置 SparkSession 并启用 Hive 支持。

spark = Session.builder.appName("Merge_target_table")\
        .config("spark.port.maxRetries","100") \
        .enableHiveSupport().getOrCreate()
Copy

输出

该工具添加了 EWI SPRKPY1052,提示可以实施替代方案。

#EWI: SPRKPY1052 => pyspark.sql.session.SparkSession.Builder.enableHiveSupport has a workaround, see documentation for more info
spark = Session.builder.appName("Merge_target_table")\
        .config("spark.port.maxRetries","100") \
        .enableHiveSupport().getOrCreate()
Copy

推荐修复方法

移除 enableHiveSupport 函数的使用,因为在 Snowpark 中不需要该函数。

spark = Session.builder.appName("Merge_target_table")\
        .config("spark.port.maxRetries","100") \
        .getOrCreate()
Copy

其他建议

SPRKPY1053

消息:提取 dbc 文件时出错。

类别:警告。

描述

当无法提取 dbc 文件时,就会出现此问题。此警告可能是由以下一个或多个原因引起的:文件过大、无法访问、只读等。

其他建议

  • 作为一种替代方案,如果文件太大无法处理,则可以检查文件的大小。此外,分析该工具是否可以访问文件,以免出现任何访问问题。

  • 如需更多支持,可以发送电子邮件至 snowconvert-info@snowflake.com 联系我们。如果您与 Snowflake 签订了支持合同,请联系您的销售工程师,他们可以指导您的支持需求。

SPRKPY1080

消息:SparkContext 的值被替换为“session”变量。

类别:警告

描述

Spark 上下文存储在一个名为“session”的变量中,该变量创建 Snowpark 会话。

场景

输入

以下代码段描述了 SparkContext

## Input Code
from pyspark import SparkContext
from pyspark.sql import SparkSession

def example1():

    sc = SparkContext("local[*]", "TestApp")

    sc.setLogLevel("ALL")
    sc.setLogLevel("DEBUG")
Copy

输出

在此输出代码中,SMA 已将 PySpark.SparkContext 替换为 SparkSession,请注意,SMA 还在“connection.json”文件中添加了一个模板来替换连接,然后在 connection_parameter 变量上加载此配置。

## Output Code
import logging
import sys
import json
from snowflake.snowpark import Session
from snowflake.snowpark import Session

def example1():
    jsonFile = open("connection.json")
    connection_parameter = json.load(jsonFile)
    jsonFile.close()
    #EWI: SPRKPY1080 => The value of SparkContext is replaced with 'session' variable.
    sc = Session.builder.configs(connection_parameter).getOrCreate()
    sc.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
    logging.basicConfig(stream = sys.stdout, level = logging.NOTSET)
    logging.basicConfig(stream = sys.stdout, level = logging.DEBUG)
Copy

推荐修复方法

必须使用所需的连接信息更新配置文件“connection.json”:

{
  "user": "my_user",
  "password": "my_password",
  "account": "my_account",
  "role": "my_role",
  "warehouse": "my_warehouse",
  "database": "my_database",
  "schema": "my_schema"
}
Copy

其他建议

SPRKPY1054

消息:pyspark.sql.readwriter.DataFrameReader.format 不受支持。

类别:警告。

描述

当 pyspark.sql.readwriter.DataFrameReader.format (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.format.html) 具有 Snowpark 不支持的实参时,就会出现此问题。

场景

根据您尝试加载的格式类型,分为几种情况。它可以是 supportednon-supported 格式。

场景 1

输入

该工具会分析尝试加载的格式类型,支持的格式有:

  • Csv

  • JSON

  • Parquet

  • Orc

以下示例显示了该工具在传递 csv 值时如何转换 format 方法。

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

df1 = spark.read.format('csv').load('/path/to/file')
Copy

输出

该工具将 format 方法转换为 Csv 方法调用。

from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()

df1 = spark.read.csv('/path/to/file')
Copy

推荐修复方法

在此示例中,该工具不显示 EWI,这意味着无需修复。

场景 2

输入

以下示例显示了该工具在传递 Jdbc 值时如何转换 format 方法。

from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()

df2 = spark.read.format('jdbc') \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("dbtable", "employee") \
    .option("user", "root") \
    .option("password", "root") \
    .load()
Copy

输出

该工具显示 EWI SPRKPY1054,提示不支持“jdbc”值。

from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()

#EWI: SPRKPY1054 => pyspark.sql.readwriter.DataFrameReader.format with argument value "jdbc" is not supported.
#EWI: SPRKPY1002 => pyspark.sql.readwriter.DataFrameReader.load is not supported

df2 = spark.read.format('jdbc') \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("dbtable", "employee") \
    .option("user", "root") \
    .option("password", "root") \
    .load()
Copy

推荐修复方法

对于 not supported 场景,没有具体的修复方法,因为它取决于尝试读取的文件。

场景 3

输入

以下示例显示了该工具在传递 CSV 时如何转换 format 方法,但改用变量。

from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()

myFormat = 'csv'
df3 = spark.read.format(myFormat).load('/path/to/file')
Copy

输出

由于该工具无法在运行时确定变量的值,因此会显示 EWI SPRKPY1054,提示不支持该值。

from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()

myFormat = 'csv'
#EWI: SPRKPY1054 => pyspark.sql.readwriter.DataFrameReader.format with argument value "" is not supported.
#EWI: SPRKPY1002 => pyspark.sql.readwriter.DataFrameReader.load is not supported
df3 = spark.read.format(myFormat).load('/path/to/file')
Copy

推荐修复方法

替代方案是,您可以检查变量的值并将其作为字符串添加到 format 调用中。

其他建议

SPRKPY1055

消息:pyspark.sql.readwriter.DataFrameReader.option 键值不受支持。

类别:警告。

描述

当 SnowFlake 不支持 pyspark.sql.readwriter.DataFrameReader.option 键值时,就会出现此问题。

该工具会分析 option 调用参数,并根据方法(CSV、JSON 或 PARQUET)判断键值在 Snowpark 中是否存在等效选项。如果所有参数均有等效选项,则该工具不会添加 EWI,并将键值替换为其等效选项;反之,工具将添加 EWI。

等效选项列表:

  • CSV 的等效选项:

Spark 选项键

Snowpark 等效选项

sep

FIELD_DELIMITER

header

PARSE_HEADER

lineSep

RECORD_DELIMITER

pathGlobFilter

PATTERN

quote

FIELD_OPTIONALLY_ENCLOSED_BY

nullValue

NULL_IF

dateFormat

DATE_FORMAT

timestampFormat

TIMESTAMP_FORMAT

inferSchema

INFER_SCHEMA

delimiter

FIELD_DELIMITER

  • JSON 的等效选项:

Spark 选项键

Snowpark 等效选项

dateFormat

DATE_FORMAT

timestampFormat

TIMESTAMP_FORMAT

pathGlobFilter

PATTERN

  • PARQUET 的等效选项:

Spark 选项键

Snowpark 等效选项

pathGlobFilter

PATTERN

其他未在上述表格中列出的关键选项,Snowpark 既不支持,也不存在等效选项。如果是这样,该工具会添加参数信息的 EWI 并将其从链中移除。

场景

以下场景适用于 CSV、JSON 和 PARQUET。

分为几种情况,具体视在 option 方法中所使用键的值而定。

场景 1

输入

以下是使用 equivalent key 进行 option 调用的示例。

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

## CSV example:
spark.read.option("header", True).csv(csv_file_path)

## Json example:
spark.read.option("dateFormat", "dd-MM-yyyy").json(json_file_path)

## Parquet example:
spark.read.option("pathGlobFilter", "*.parquet").parquet(parquet_file_path)
Copy

输出

该工具使用正确的等效选项转换键。

from snowflake.snowpark import Session

spark = Session.builder.getOrCreate()

## CSV example:
spark.read.option("PARSE_HEADER", True).csv(csv_file_path)

## Json example:
spark.read.option("DATE_FORMAT", "dd-MM-yyyy").json(json_file_path)

## Parquet example:
spark.read.option("PATTERN", "*.parquet").parquet(parquet_file_path)
Copy

推荐修复方法

由于该工具会转换键的值,因此没有必要的修复。

场景 2

输入

以下是使用 non-equivalent key 进行 option 调用的示例。

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

## CSV example:
spark.read.option("anotherKeyValue", "myVal").csv(csv_file_path)

## Json example:
spark.read.option("anotherKeyValue", "myVal").json(json_file_path)

## Parquet example:
spark.read.option("anotherKeyValue", "myVal").parquet(parquet_file_path)
Copy

输出

该工具添加了 EWI SPRKPY1055,提示键不受支持,并移除了 option 调用。

from snowflake.snowpark import Session

spark = Session.builder.getOrCreate()

## CSV example:
#EWI: SPRKPY1055 => pyspark.sql.readwriter.DataFrameReader.option with key value "anotherKeyValue" is not supported.
spark.read.csv(csv_file_path)

## Json example:
#EWI: SPRKPY1055 => pyspark.sql.readwriter.DataFrameReader.option with key value "anotherKeyValue" is not supported.
spark.read.json(json_file_path)

## Parquet example:
#EWI: SPRKPY1055 => pyspark.sql.readwriter.DataFrameReader.option with key value "anotherKeyValue" is not supported.
spark.read.parquet(parquet_file_path)
Copy

推荐修复方法

建议在转换后检查行为。

其他建议

  • 当存在非等效参数时,建议检查转换后的行为。

  • 如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。

SPRKPY1056

警告

此问题代码已 弃用

消息:pyspark.sql.readwriter.DataFrameReader.option 实参 <argument_name> 不是字面量,无法进行求值

类别:警告

描述

当 pyspark.sql.readwriter.DataFrameReader.option (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.option.html) 函数的实参键或值不是字面量值(例如是变量)时,就会出现此问题。SMA 会对源代码进行静态分析,因此无法对实参的内容进行求值。

场景

输入

以下示例使用 pyspark.sql.readwriter.DataFrameReader.option 函数生成了此 EWI。

my_value = ...
my_option = ...

df1 = spark.read.option("dateFormat", my_value).format("csv").load('filename.csv')
df2 = spark.read.option(my_option, "false").format("csv").load('filename.csv')
Copy

输出

SMA 在输出代码中添加了 EWI SPRKPY1056 ,以提示此函数的实参不是字面量值,因此 SMA 无法对其进行求值。

my_value = ...
my_option = ...

#EWI: SPRKPY1056 => pyspark.sql.readwriter.DataFrameReader.option argument "dateFormat" is not a literal and can't be evaluated
df1 = spark.read.option("dateFormat", my_value).format("csv").load('filename.csv')
#EWI: SPRKPY1056 => pyspark.sql.readwriter.DataFrameReader.option argument key is not a literal and can't be evaluated
df2 = spark.read.option(my_option, "false").format("csv").load('filename.csv')
Copy

推荐修复方法

尽管 SMA 无法对此实参求值,但这并不意味着 Snowpark 不支持该实参。请查看 文档,确保实参的值在 Snowpark 中有效且等效。

其他建议

SPRKPY1057

警告

Spark Conversion Core 版本 4.8.0 起,此问题代码已 弃用

消息:PySpark Dataframe Option (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.option.html#pyspark.sql.DataFrameReader.option) 实参包含不是字面量的值,因此无法求值

类别:警告。

描述

此问题代码已弃用。如果您使用的是旧版本,请升级到最新版本。

其他建议

SPRKPY1058

消息:带 < key > 平台特定键的 < method > 不受支持。

类别:ConversionError

描述

pyspark.sql.conf.RuntimeConfig (https://spark.apache.org/docs/3.5.3/api/python/reference/pyspark.sql/api/pyspark.sql.conf.RuntimeConfig.html#pyspark.sql.conf.RuntimeConfig) 中带平台特定键的 getset 方法不受支持。

场景

并非所有 getset 方法的使用都会在输出代码中生成 EWI。当工具检测到使用了带平台特定键的这些方法(不支持此用法)时,就会出现此 EWI。

场景 1

输入

以下是 Snowpark 中具有受支持键的 getset 方法的示例。

session.conf.set("use_constant_subquery_alias", False)
spark.conf.set("sql_simplifier_enabled", True)

session.conf.get("use_constant_subquery_alias")
session.conf.get("use_constant_subquery_alias")
Copy

输出

由于 Snowpark 支持键,因此该工具不会在输出代码中添加 EWI。

session.conf.set("use_constant_subquery_alias", True)
session.conf.set("sql_simplifier_enabled", False)

session.conf.get("use_constant_subquery_alias")
session.conf.get("sql_simplifier_enabled")
Copy

推荐修复方法

对于此场景,没有建议的修复方法。

场景 2

输入

以下是使用不支持的键的示例。

data =
    [
      ("John", 30, "New York"),
      ("Jane", 25, "San Francisco")
    ]

session.conf.set("spark.sql.shuffle.partitions", "50")
spark.conf.set("spark.yarn.am.memory", "1g")

session.conf.get("spark.sql.shuffle.partitions")
session = spark.conf.get("spark.yarn.am.memory")

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
Copy

输出

该工具在输出代码中添加了 EWI SPRKPY1058,以提示带平台特定键的这些方法不受支持。

data =
    [
      ("John", 30, "New York"),
      ("Jane", 25, "San Francisco")
    ]

#EWI: SPRKPY1058 => pyspark.sql.conf.RuntimeConfig.set method with this "spark.sql.shuffle.partitions" Platform specific key is not supported.
spark.conf.set("spark.sql.shuffle.partitions", "50")
#EWI: SPRKPY1058 => pyspark.sql.conf.RuntimeConfig.set method with this "spark.yarn.am.memory" Platform specific key is not supported.
spark.conf.set("spark.yarn.am.memory", "1g")

#EWI: SPRKPY1058 => pyspark.sql.conf.RuntimeConfig.get method with this "spark.sql.shuffle.partitions" Platform specific key is not supported.
spark.conf.get("spark.sql.shuffle.partitions")
#EWI: SPRKPY1058 => pyspark.sql.conf.RuntimeConfig.get method with this "spark.yarn.am.memory" Platform specific key is not supported.
spark.conf.get("spark.yarn.am.memory")

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
Copy

推荐修复方法

建议的修复方法是移除这些方法。

data =
    [
      ("John", 30, "New York"),
      ("Jane", 25, "San Francisco")
    ]

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
Copy

其他建议

SPRKPY1059

警告

Spark Conversion Core 版本 2.45.1 起,此问题代码已 弃用

消息:pyspark.storagelevel.StorageLevel (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.StorageLevel.html) 有替代方案,请参阅文档。

类别:警告

描述

目前,在 Snowpark 中不需要使用 StorageLevel,因为 Snowflake 控制存储。有关更多信息,可以参阅 EWI SPRKPY1072

其他建议

SPRKPY1060

消息:身份验证机制是 connection.json(提供了模板)。

类别:警告。

描述

当该工具检测到使用了 pyspark.conf.SparkConf (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkConf.html) 时,就会出现此问题。

场景

输入

由于 Snowpark 中的身份验证机制不同,该工具删除了相关使用,改为创建 连接配置文件 (connection.json)

from pyspark import SparkConf

my_conf = SparkConf(loadDefaults=True)
Copy

输出

该工具添加了 EWI SPRKPY1060,提示身份验证机制不相同。

#EWI: SPRKPY1002 => pyspark.conf.SparkConf is not supported
#EWI: SPRKPY1060 => The authentication mechanism is connection.json (template provided).
#my_conf = Session.builder.configs(connection_parameter).getOrCreate()

my_conf = None
Copy

推荐修复方法

要创建连接,必须在 connection.json 文件中填写信息。

{
  "user": "<USER>",
  "password": "<PASSWORD>",
  "account": "<ACCOUNT>",
  "role": "<ROLE>",
  "warehouse": "<WAREHOUSE>",
  "database": "<DATABASE>",
  "schema": "<SCHEMA>"
}
Copy

其他建议

SPRKPY1061

消息:Snowpark 不支持 unix_timestamp 函数

类别:警告

描述

在 Snowpark 中,第一个参数为必需;当该工具检测到使用了不带参数的 pyspark.sql.functions.unix_timestamp (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.unix_timestamp.html) 时,就会出现问题。

场景

输入

以下是不带参数调用 unix_timestamp 方法的示例。

data = [["2015-04-08", "10"],["2015-04-10", "15"]]

df = spark.createDataFrame(data, ['dt', 'val'])
df.select(unix_timestamp()).show()
Copy

输出

以下函数的 Snowpark 签名:unix_timestamp(e: ColumnOrName, fmt:Optional["Column"] = None),您可以注意到第一个参数是必填的。

该工具添加了 EWI SPRKPY1061,以提示不带参数调用函数 unix_timestamp 在 Snowpark 中不受支持。

data = [["2015-04-08", "10"],["2015-04-10", "15"]]

df = spark.createDataFrame(data, ['dt', 'val'])
#EWI: SPRKPY1061 => Snowpark does not support unix_timestamp functions with no parameters. See documentation for more info.
df.select(unix_timestamp()).show()
Copy

推荐修复方法

作为一种替代方案,可以至少添加 timestamp 字符串的名称或列。

data = [["2015-04-08", "10"],["2015-04-10", "15"]]

df = spark.createDataFrame(data, ["dt", "val"])
df.select(unix_timestamp("dt")).show()
Copy

其他建议

SPRKPY1062

消息:Snowpark 不支持不带“values”参数的 GroupedData.pivot。

类别:警告

描述

当 SMA 检测到使用了 pyspark.sql.group.GroupedData.pivot (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.GroupedData.pivot.html) 函数但未指定“values”参数 (即需要透视的值列表) 时,就会出现此问题。

目前,Snowpark Python pivot 函数要求明确指定要透视的不同值列表。

场景

场景 1

输入

SMA 检测到与模式 dataFrame.groupBy("columnX").pivot("columnY") 相匹配的表达式,并且该 pivot 函数没有 values 参数。

df.groupBy("date").pivot("category").sum("amount")
Copy

输出

SMA 添加了一条 EWI 消息,提示不支持没有“values”参数的 pivot 函数。

此外,它将在 pivot 函数中添加第二个参数,即列表推导式,用于计算将转换为列的值列表。请记住,此操作对于大型数据集效率不高,建议明确指明值。

#EWI: SPRKPY1062 => pyspark.sql.group.GroupedData.pivot without parameter 'values' is not supported. See documentation for more info.
df.groupBy("date").pivot("category", [v[0] for v in df.select("category").distinct().limit(10000).collect()]]).sum("amount")
Copy

推荐修复方法

对于这种情况,SMA 在 pivot 函数中添加第二个参数,即列表推导式,用于计算将转换为列的值列表,但可以根据不同的值列表进行转换,如下所示:

df = spark.createDataFrame([
      Row(category="Client_ID", date=2012, amount=10000),
      Row(category="Client_name",   date=2012, amount=20000)
  ])

df.groupBy("date").pivot("category", ["dotNET", "Java"]).sum("amount")
Copy
场景 2

输入

SMA 未能检测到与模式 dataFrame.groupBy("columnX").pivot("columnY") 相匹配的表达式,并且该 pivot 函数没有 values 参数。

df1.union(df2).groupBy("date").pivot("category").sum("amount")
Copy

输出

SMA 添加了一条 EWI 消息,提示不支持没有“values”参数的 pivot 函数。

#EWI: SPRKPY1062 => pyspark.sql.group.GroupedData.pivot without parameter 'values' is not supported. See documentation for more info.
df1.union(df2).groupBy("date").pivot("category").sum("amount")
Copy

推荐修复方法

添加要透视的唯一值列表,具体操作如下:

df = spark.createDataFrame([
      Row(course="dotNET", year=2012, earnings=10000),
      Row(course="Java",   year=2012, earnings=20000)
  ])

df.groupBy("year").pivot("course", ["dotNET", "Java"]).sum("earnings").show()
Copy

其他建议

  • 对于大型数据集,计算要透视的唯一值列表并不是一项有效的操作,可能会成为阻塞性调用。请考虑明确指定要用于透视的唯一值列表。

  • 如果您不想明确指定要透视的唯一值列表(不建议这样做),可添加以下代码作为 pivot 函数的第二个实参,以便在运行时推断这些值*

[v[0] for v in <df>.select(<column>).distinct().limit(<count>).collect()]]
Copy

使用相应的 DataFrame、要透视的列和要选择的行数 *替换 <df>

SPRKPY1063

消息:pyspark.sql.pandas.functions.pandas_udf 有替代方案。

类别:警告

描述

当该工具检测到使用了 pyspark.sql.pandas.functions.pandas_udf (https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.pandas_udf.html) 时,就会出现此问题;此问题有替代方案。

场景

输入

使用 pandas_udf 函数创建可处理大量数据的用户定义的函数。

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def modify_df(pdf):
    return pd.DataFrame({'result': pdf['col1'] + pdf['col2'] + 1})
df = spark.createDataFrame([(1, 2), (3, 4), (1, 1)], ["col1", "col2"])
new_df = df.groupby().apply(modify_df)
Copy

输出

SMA 添加了一条 EWI 消息,提示 pandas_udf 有替代方案。

#EWI: SPRKPY1062 => pyspark.sql.pandas.functions.pandas_udf has a workaround, see documentation for more info
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)

def modify_df(pdf):
    return pd.DataFrame({'result': pdf['col1'] + pdf['col2'] + 1})

df = spark.createDataFrame([(1, 2), (3, 4), (1, 1)], ["col1", "col2"])

new_df = df.groupby().apply(modify_df)
Copy

推荐修复方法

将参数类型明确指定为新参数 input_types 并移除 functionType 参数(如适用)。创建的函数必须在 select 语句中调用。

@pandas_udf(
    return_type = schema,
    input_types = [PandasDataFrameType([IntegerType(), IntegerType()])]
)

def modify_df(pdf):
    return pd.DataFrame({'result': pdf['col1'] + pdf['col2'] + 1})

df = spark.createDataFrame([(1, 2), (3, 4), (1, 1)], ["col1", "col2"])

new_df = df.groupby().apply(modify_df) # You must modify function call to be a select and not an apply
Copy

其他建议

SPRKPY1064

消息:Spark 元素 不适用,因为 Snowflake 改用 Snowpipe 机制。

类别:警告

描述

当该工具检测到使用了 pyspark.streaming 库中的任何元素时,就会出现此问题:

  • pyspark.streaming.DStream (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.streaming.DStream.html)

  • pyspark.streaming.StreamingContext (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.streaming.StreamingContext.html)

  • pyspark.streaming.listener.StreamingListener。

场景

输入

以下是触发此 EWI 的元素之一的示例。

from pyspark.streaming.listener import StreamingListener

var = StreamingListener.Java
var.mro()

df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])
df.show()
Copy

输出

SMA 在输出代码中添加了 EWI SPRKPY1064,以提示此函数不适用。

#EWI: SPRKPY1064 => The element does not apply since snowflake uses snowpipe mechanism instead.

var = StreamingListener.Java
var.mro()

df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])
df.show()
Copy

推荐修复方法

SMA 移除了 import 语句并将问题添加到 Issues.csv 清单中,移除了使用的所有 Spark 元素。

df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])
df.show()
Copy

其他建议

SPRKPY1065

消息:pyspark.context.SparkContext.broadcast 不适用,因为 Snowflake 使用数据聚类机制来计算数据。

类别:警告

描述

当该工具检测到使用了元素 pyspark.context.SparkContext.broadcast (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.broadcast.html) 时,就会出现此问题;因为使用了 Snowflake 的 数据群集,因此没有必要使用该元素。

输入代码

在此示例中,创建了一个广播变量,该变量能够实现数据在所有节点间的高效共享。

sc = SparkContext(conf=conf_spark)

mapping = {1: 10001, 2: 10002}

bc = sc.broadcast(mapping)
Copy

输出代码

SMA 添加了一条 EWI 消息,提示不需要进行广播。

sc = conf_spark

mapping = {1: 10001, 2: 10002}
#EWI: SPRKPY1065 => The element does not apply since snowflake use data-clustering mechanism to compute the data.

bc = sc.broadcast(mapping)
Copy

推荐修复方法

移除所有使用的 pyspark.context.SparkContext.broadcast。

sc = conf_spark

mapping = {1: 10001, 2: 10002}
Copy

其他建议

SPRKPY1066

消息:Spark 元素不适用,因为 Snowflake 使用的微分区机制是自动创建的。

类别:警告

描述

当该工具检测到使用了与分区相关的元素时,就会出现此问题:

  • pyspark.sql.catalog.Catalog.recoverPartitions (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Catalog.recoverPartitions.html)

  • pyspark.sql.dataframe.DataFrame.foreachPartition (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.foreachPartition.html)

  • pyspark.sql.dataframe.DataFrame.sortWithinPartitions (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.sortWithinPartitions.html)

  • pyspark.sql.functions.spark_partition_id (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.spark_partition_id.html)

由于使用了 Snowflake 的 微分区,这些元素不适用。

输入代码

在此示例 sortWithinPartitions (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.sortWithinPartitions.html) 中,它用于在 DataFrame 中创建按指定列排序的分区。

df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
df.sortWithinPartitions("age", ascending=False)
Copy

输出代码

SMA 添加了一条 EWI 消息,提示 Spark 元素并非必需。

df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
#EWI: SPRKPY1066 => The element does not apply since snowflake use micro-partitioning mechanism are created automatically.
df.sortWithinPartitions("age", ascending=False)
Copy

推荐修复方法

移除对该元素的使用。

df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
Copy

其他建议

SPRKPY1067

消息:pyspark.sql.functions.split 包含在 Snowpark 中不受支持的参数。

类别:警告

描述

当该工具检测到使用了 pyspark.sql.functions.split (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.split.html) 并传递了超过两个参数,或以正则表达式模式作为参数时,就会出现此问题;这两种使用方式均不受支持。

场景

场景 1

输入代码

在此示例中,split 函数有两个以上的参数。

df.select(split(columnName, ",", 5))
Copy

输出代码

该工具在输出代码中添加了此 EWI,以提示当此函数具有两个以上的参数时,不支持此函数。

#EWI: SPRKPY1067 => Snowpark does not support split functions with more than two parameters or containing regex pattern. See documentation for more info.
df.select(split(columnName, ",", 5))
Copy

推荐修复方法

请确保 split 函数仅使用两个参数。

df.select(split(columnName, ","))
Copy
场景 2

输入代码

在此示例中,split 函数使用正则表达式模式作为参数。

df.select(split(columnName, "^([\d]+-[\d]+-[\d])"))
Copy

输出代码

该工具在输出代码中添加了此 EWI,以提示当它使用正则表达式模式作为参数时,不支持此函数。

#EWI: SPRKPY1067 => Snowpark does not support split functions with more than two parameters or containing regex pattern. See documentation for more info.
df.select(split(columnName, "^([\d]+-[\d]+-[\d])"))
Copy

推荐修复方法

functions.split(str: ColumnOrName, pattern: str, limit: int = - 1) 方法的 Spark 签名与 Snowpark 中的以下方法:functions.split(str:Union[Column, str], pattern:Union[Column, str]) 不完全相符,因此,使用正则表达式的场景目前没有建议的修复方法。

其他建议

SPRKPY1068

消息:toPandas 包含类型为不受支持的 ArrayType 的列,并且有替代方案。

类别:警告

描述

如果存在 ArrayType 类型的列,pyspark.sql.DataFrame.toPandas (https://spark.apache.org/docs/3.5.3/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.toPandas.html) 无法正常工作。这些情况的替代方案是使用 json.loads 方法将这些列转换为 Python 字典。

场景

输入

ToPandas 以 Pandas DataFrame 的形式返回原始 DataFrame 数据。

sparkDF = spark.createDataFrame([
Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0))
])

pandasDF = sparkDF.toPandas()
Copy

输出

如果有类型为 ArrayType 的列,该工具会添加此 EWI,以提示 toPandas 不受支持,但有替代方案。

sparkDF = spark.createDataFrame([
Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0))
])
#EWI: SPRKPY1068 => toPandas doesn't work properly If there are columns of type ArrayType. The workaround for these cases is converting those columns into a Python Dictionary by using json.loads method. example: df[colName] = json.loads(df[colName]).
pandasDF = sparkDF.toPandas()
Copy

推荐修复方法

pandas_df = sparkDF.toPandas()

## check/convert all resulting fields from calling toPandas when they are of
## type ArrayType,
## they will be reasigned by converting them into a Python Dictionary
## using json.loads method​

for field in pandas_df.schema.fields:
    if isinstance(field.datatype, ArrayType):
        pandas_df[field.name] = pandas_df[field.name].apply(lambda x: json.loads(x) if x is not None else x)
Copy

其他建议

SPRKPY1069

消息:如果 partitionBy 参数是列表,则 Snowpark 会抛出错误。

类别:警告

描述

当使用 pyspark.sql.readwriter.DataFrameWriter.parquet (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.parquet.html) 方法且涉及参数 partitionBy 时,该工具会显示 EWI。

这是因为在 Snowpark 中,DataFrameWriter.parquet 仅支持 ColumnOrSqlExpr 作为 partitionBy 参数。

场景

场景 1

输入代码:

在此场景中,partitionBy 参数不是列表。

df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])

df.write.parquet(file_path, partitionBy="age")
Copy

输出代码:

该工具添加了 EWI SPRKPY1069,以提示如果参数是列表,在 Snowpark 会抛出错误。

df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])

#EWI: SPRKPY1069 => If partitionBy parameter is a list, Snowpark will throw and error.
df.write.parquet(file_path, partition_by = "age", format_type_options = dict(compression = "None"))
Copy

推荐修复方法

此场景没有建议的修复方法,因为该工具总会预先添加此 EWI,以应对 partitionBy 参数是列表的情况。请记住,在 Snowpark 中,只接受使用 Snowflake 暂存区 的云位置。

df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])

stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
Session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {stage}').show()
Session.file.put(f"file:///path/to/data/file.parquet", f"@{stage}")

df.write.parquet(stage, partition_by = "age", format_type_options = dict(compression = "None"))
Copy
场景 2

输入代码:

在此场景中,partitionBy 参数是列表。

df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])

df.write.parquet(file_path, partitionBy=["age", "name"])
Copy

输出代码:

该工具添加了 EWI SPRKPY1069,以提示如果参数是列表,在 Snowpark 会抛出错误。

df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])

#EWI: SPRKPY1069 => If partitionBy parameter is a list, Snowpark will throw and error.
df.write.parquet(file_path, partition_by = ["age", "name"], format_type_options = dict(compression = "None"))
Copy

推荐修复方法

如果参数的值是 list,则将其替换为 ColumnOrSqlExpr

df.write.parquet(file_path, partition_by = sql_expr("age || name"), format_type_options = dict(compression = "None"))
Copy

其他建议

SPRKPY1070

消息:mode 实参被转换为 overwrite,检查变量值并设置相应的布尔值。

类别:警告

描述

当使用了以下各项时:

  • pyspark.sql.readwriter.DataFrameWriter.csv (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.csv.html)

  • pyspark.sql.readwriter.DataFrameWriter.json (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.json.html)

  • pyspark.sql.readwriter.DataFrameWriter.parquet (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.json.html)

该工具会分析参数 mode 以确定该值是否为 overwrite

场景

场景 1

输入代码

对于此场景,该工具检测到 mode 参数可以设置相应的布尔值。

df.write.csv(file_path, mode="overwrite")
Copy

输出代码:

SMA 工具会分析 mode 参数,确定该值为 overwrite,并设置相应的布尔值

df.write.csv(file_path, format_type_options = dict(compression = "None"), overwrite = True)
Copy

推荐修复方法

对于此场景,没有建议的修复方法,因为该工具执行了相应的转换。

场景 2:

输入代码

在此场景中,该工具无法验证该值是否为 overwrite

df.write.csv(file_path, mode=myVal)
Copy

输出代码:

SMA 添加了一条 EWI 消息,提示 mode 参数已转换为“overwrite”,但它也是为了提示最好检查变量值并设置正确的布尔值。

#EWI: SPRKPY1070 => The 'mode' argument is transformed to 'overwrite', check the variable value and set the corresponding bool value.
df.write.csv(file_path, format_type_options = dict(compression = "None"), overwrite = myVal)
Copy

推荐修复方法

检查参数 mode 的值并为参数 overwrite 添加正确的值。

df.write.csv(file_path, format_type_options = dict(compression = "None"), overwrite = True)
Copy

其他建议

SPRKPY1071

消息:在 Snowpark 中,pyspark.rdd.RDD.getNumPartitions 函数并非必需。因此,应该移除所有引用。

类别:警告

描述

当该工具发现使用了 pyspark.rdd.RDD.getNumPartitions (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.getNumPartitions.html) 函数时,就会出现此问题。Snowflake 使用微分区机制,因此使用此函数并非必需。

场景

输入

getNumPartitions 返回 RDD 上的分区数量。

df = spark.createDataFrame([('2015-04-08',), ('5',), [Row(a=1, b="b")]], ['dt', 'num', 'row'])

print(df.getNumPartitions())
Copy

输出

该工具添加了此 EWI,以提示 getNumPartitions 并非必需。

df = spark.createDataFrame([('2015-04-08',), ('5',), [Row(a=1, b="b")]], ['dt', 'num', 'row'])
#EWI: SPRKPY1071 => The getNumPartitions are not required in Snowpark. So, you should remove all references.

print(df.getNumPartitions())
Copy

推荐修复方法

移除对此函数的所有使用。

df = spark.createDataFrame([('2015-04-08',), ('5',), [Row(a=1, b="b")]], ['dt', 'num', 'row'])
Copy

其他建议

SPRKPY1072

消息:在 Snowpark 中,并非必须使用 StorageLevel。

类别:警告。

描述

当该工具发现使用了 StorageLevel (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.StorageLevel.html) 类时,就会出现此问题;该类的作用类似于使用“flags”来设置存储级别。由于 Snowflake 控制存储,因此并非必须使用此函数。

其他建议

SPRKPY1073

消息:不支持不带参数或返回类型参数的 pyspark.sql.functions.udf

类别:警告。

描述

此问题会在以下情况下出现:当该工具检测到使用了 pyspark.sql.functions.udf (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.udf.html) 作为函数或装饰器,但在它没有参数或返回类型参数这两种特定情况下,不支持这种用法。

场景

场景 1

输入

在 Pyspark 中,可以创建不带输入或返回类型参数的用户定义的函数:

from pyspark.sql import SparkSession, DataFrameStatFunctions
from pyspark.sql.functions import col, udf

spark = SparkSession.builder.getOrCreate()
data = [['Q1', 'Test 1'],
        ['Q2', 'Test 2'],
        ['Q3', 'Test 1'],
        ['Q4', 'Test 1']]

columns = ['Quadrant', 'Value']
df = spark.createDataFrame(data, columns)

my_udf = udf(lambda s: len(s))
df.withColumn('Len Value' ,my_udf(col('Value')) ).show()
Copy

输出

Snowpark 需要 Udf 函数的输入和返回类型。因为未提供这些参数,而且 SMA 不能缺少此参数。

from snowflake.snowpark import Session, DataFrameStatFunctions
from snowflake.snowpark.functions import col, udf

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 'Test 1'],
        ['Q2', 'Test 2'],
        ['Q3', 'Test 1'],
        ['Q4', 'Test 1']]

columns = ['Quadrant', 'Value']
df = spark.createDataFrame(data, columns)
#EWI: SPRKPY1073 => pyspark.sql.functions.udf function without the return type parameter is not supported. See documentation for more info.
my_udf = udf(lambda s: len(s))

df.withColumn('Len Value' ,my_udf(col('Value')) ).show()
Copy

推荐修复方法

要修复这种情况,需要为输入和输出的返回类型添加导入,然后在 udf 函数 _my_udf* 上添加 return*type 和 input_types[] 的参数。

from snowflake.snowpark import Session, DataFrameStatFunctions
from snowflake.snowpark.functions import col, udf
from snowflake.snowpark.types import IntegerType, StringType

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 'Test 1'],
        ['Q2', 'Test 2'],
        ['Q3', 'Test 1'],
        ['Q4', 'Test 1']]

columns = ['Quadrant', 'Value']
df = spark.createDataFrame(data, columns)

my_udf = udf(lambda s: len(s), return_type=IntegerType(), input_types=[StringType()])

df.with_column("result", my_udf(df.Value)).show()
Copy
场景 2

在 PySpark 中,可以使用不带参数的 @udf 装饰器

输入

from pyspark.sql.functions import col, udf

spark = SparkSession.builder.getOrCreate()
data = [['Q1', 'Test 1'],
        ['Q2', 'Test 2'],
        ['Q3', 'Test 1'],
        ['Q4', 'Test 1']]

columns = ['Quadrant', 'Value']
df = spark.createDataFrame(data, columns)

@udf()
def my_udf(str):
    return len(str)


df.withColumn('Len Value' ,my_udf(col('Value')) ).show()
Copy

输出

在 Snowpark 中,udf 装饰器的所有参数都是必需的。

from snowflake.snowpark.functions import col, udf

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 'Test 1'],
        ['Q2', 'Test 2'],
        ['Q3', 'Test 1'],
        ['Q4', 'Test 1']]

columns = ['Quadrant', 'Value']
df = spark.createDataFrame(data, columns)

#EWI: SPRKPY1073 => pyspark.sql.functions.udf decorator without parameters is not supported. See documentation for more info.

@udf()
def my_udf(str):
    return len(str)

df.withColumn('Len Value' ,my_udf(col('Value')) ).show()
Copy

推荐修复方法

要修复这种情况,需要为输入和输出的返回类型添加导入,然后在 udf @udf 装饰器上添加 return_type 和 input_types[] 的参数。

from snowflake.snowpark.functions import col, udf
from snowflake.snowpark.types import IntegerType, StringType

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 'Test 1'],
        ['Q2', 'Test 2'],
        ['Q3', 'Test 1'],
        ['Q4', 'Test 1']]

columns = ['Quadrant', 'Value']
df = spark.createDataFrame(data, columns)

@udf(return_type=IntegerType(), input_types=[StringType()])
def my_udf(str):
    return len(str)

df.withColumn('Len Value' ,my_udf(col('Value')) ).show()
Copy

其他建议

SPRKPY1074

消息:文件含有混合缩进(空格和制表符)。

类别:解析错误。

描述

当该工具检测到文件含有混合缩进时,就会出现此问题。这意味着,文件混合使用空格和制表符来缩进代码行。

场景

输入

在 Pyspark 中,可以混合使用空格和制表符来表示标识级别。

def foo():
    x = 5 # spaces
    y = 6 # tab
Copy

输出

SMA 无法处理混合使用的缩进标记。当在 Python 代码文件中检测到这种情况时,SMA 会在第一行添加 EWI SPRKPY1074。

## EWI: SPRKPY1074 => File has mixed indentation (spaces and tabs).
## This file was not converted, so it is expected to still have references to the Spark API
def foo():
    x = 5 # spaces
    y = 6 # tabs
Copy

推荐修复方法

解决方案是将所有缩进符号统一为相同格式。

def foo():
  x = 5 # tab
  y = 6 # tab
Copy

其他建议

  • 有用的缩进工具有 PEP-8 (https://peps.python.org/pep-0008/) 和 Reindent (https://pypi.org/project/reindent/)。

  • 如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。

SPRKPY1075

类别

警告。

描述

parse_json 不应用架构验证,如果需要根据架构进行筛选/验证,则可能需要引入一些逻辑。

示例

输入

df.select(from_json(df.value, Schema))
df.select(from_json(schema=Schema, col=df.value))
df.select(from_json(df.value, Schema, option))
Copy

输出

#EWI: SPRKPY1075 => The parse_json does not apply schema validation, if you need to filter/validate based on schema you might need to introduce some logic.
df.select(parse_json(df.value))
#EWI: SPRKPY1075 => The parse_json does not apply schema validation, if you need to filter/validate based on schema you might need to introduce some logic.
df.select(parse_json(df.value))
#EWI: SPRKPY1075 => The parse_json does not apply schema validation, if you need to filter/validate based on schema you might need to introduce some logic.
df.select(parse_json(df.value))
Copy

对于 from_json 函数,传递架构实际上并不是用于推断,而是用于验证。看看下面的示例:

data = [
    ('{"name": "John", "age": 30, "city": "New York"}',),
    ('{"name": "Jane", "age": "25", "city": "San Francisco"}',)
]

df = spark.createDataFrame(data, ["json_str"])
Copy

示例 1:强制使用数据类型和更改列名:

## Parse JSON column with schema
parsed_df = df.withColumn("parsed_json", from_json(col("json_str"), schema))

parsed_df.show(truncate=False)

## +------------------------------------------------------+---------------------------+
## |json_str                                              |parsed_json                |
## +------------------------------------------------------+---------------------------+
## |{"name": "John", "age": 30, "city": "New York"}       |{John, 30, New York}       |
## |{"name": "Jane", "age": "25", "city": "San Francisco"}|{Jane, null, San Francisco}|
## +------------------------------------------------------+---------------------------+
## notice that values outside of the schema were dropped and columns not matched are returned as null
Copy

示例 2:选择特定列:

## Define a schema with only the columns we want to use
partial_schema = StructType([
    StructField("name", StringType(), True),
    StructField("city", StringType(), True)
])

## Parse JSON column with partial schema
partial_df = df.withColumn("parsed_json", from_json(col("json_str"), partial_schema))

partial_df.show(truncate=False)

## +------------------------------------------------------+---------------------+
## |json_str                                              |parsed_json          |
## +------------------------------------------------------+---------------------+
## |{"name": "John", "age": 30, "city": "New York"}       |{John, New York}     |
## |{"name": "Jane", "age": "25", "city": "San Francisco"}|{Jane, San Francisco}|
## +------------------------------------------------------+---------------------+
## there is also an automatic filtering
Copy

建议

  • 如需更多支持,您可以发送电子邮件至 sma-support@snowflake.com 联系我们。如果您与 Snowflake 签订了支持合同,请联系您的销售工程师,他们可以指导您的支持需求。

  • 有用的工具 PEP-8 (https://peps.python.org/pep-0008/) and Reindent (https://pypi.org/project/reindent/)。

SPRKPY1076

消息:pyspark.sql.readwriter.DataFrameReader (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.html) 方法中的参数不受支持。这适用于 CSV、JSON 和 PARQUET 方法。

类别:警告。

描述

对于 pyspark.sql.readwriter.DataFrameReader (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.html) 对象上的 CSV、JSON 和 PARQUET 方法,该工具将分析参数并根据每种情况添加转换:

  • 所有参数均与其在 Snowpark 中的等效选项名称匹配:在这种情况下,该工具会将参数转换为 .option() 调用。在这种情况下,参数不会添加此 EWI。

  • 部分参数与其在 Snowpark 中的等效选项不匹配:在这种情况下,工具将添加包含该参数信息的 EWI,并将其从方法调用中移除。

等效选项列表:

  • CSV 的等效选项:

Spark 键

Snowpark 等效选项

sep

FIELD_DELIMITER

header

PARSE_HEADER

lineSep

RECORD_DELIMITER

pathGlobFilter

PATTERN

quote

FIELD_OPTIONALLY_ENCLOSED_BY

nullValue

NULL_IF

dateFormat

DATE_FORMAT

timestampFormat

TIMESTAMP_FORMAT

inferSchema

INFER_SCHEMA

delimiter

FIELD_DELIMITER

  • JSON 的等效选项:

Spark 键

Snowpark 等效选项

dateFormat

DATE_FORMAT

timestampFormat

TIMESTAMP_FORMAT

pathGlobFilter

PATTERN

  • PARQUET 的等效选项:

Spark 键

Snowpark 等效选项

pathGlobFilter

PATTERN

场景

场景 1

输入

对于 CVS,以下是一些示例:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('myapp').getOrCreate()

spark.read.csv("path3", None,None,None,None,None,None,True).show()
Copy

输出

在转换后的代码中,参数作为单个选项添加到 cvs 函数中

from snowflake.snowpark import Session

spark = Session.builder.app_name('myapp', True).getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})

#EWI: SPRKPY1076 => Some of the included parameters are not supported in the csv function, the supported ones will be added into a option method.
spark.read.option("FIELD_DELIMITER", None).option("PARSE_HEADER", True).option("FIELD_OPTIONALLY_ENCLOSED_BY", None).csv("path3").show()
Copy

场景 2

输入

对于 JSON,以下是一些示例:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('myapp').getOrCreate()
spark.read.json("/myPath/jsonFile/", dateFormat='YYYY/MM/DD').show()
Copy

输出

在转换后的代码中,参数作为单个选项添加到 json 函数中

from snowflake.snowpark import Session
spark = Session.builder.app_name('myapp', True).getOrCreate()
#EWI: SPRKPY1076 => Some of the included parameters are not supported in the json function, the supported ones will be added into a option method.

spark.read.option("DATE_FORMAT", 'YYYY/MM/DD').json("/myPath/jsonFile/").show()
Copy
场景 3

输入

对于 PARQUET,以下是一些示例:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('myapp').getOrCreate()

spark.read.parquet("/path/to/my/file.parquet", pathGlobFilter="*.parquet").show()
Copy

输出

在转换后的代码中,参数作为单个选项添加到 parquet 函数中

from snowflake.snowpark import Session

spark = Session.builder.app_name('myapp', True).getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})

#EWI: SPRKPY1076 => Some of the included parameters are not supported in the parquet function, the supported ones will be added into a option method.
#EWI: SPRKPY1029 => The parquet function require adjustments, in Snowpark the parquet files needs to be located in an stage. See the documentation for more info.

spark.read.option("PATTERN", "*.parquet").parquet("/path/to/my/file.parquet")
Copy

其他建议

  • 当存在非等效参数时,建议检查转换后的行为。

  • 此外,建议参考相关文档以找到更合适的实现方案:

    • CSV 的选项文档:

      • PySpark CSV Options (https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option)。

      • Snowpark CSV Options

    • JSON 的选项文档:

      • PySpark JSON Options (https://spark.apache.org/docs/latest/sql-data-sources-json.html)。

      • Snowpark JSON Options

    • PARQUET 的选项文档:

      • Pyspark PARQUET options (https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#data-source-option)。

      • SnowPark PARQUET options。

  • 如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。

SPRKPY1077

消息:无法处理嵌入式 SQL 代码。

类别:警告。

描述

当该工具检测到无法转换为 Snowpark 的嵌入式 SQL 代码时,就会出现此问题。

有关更多信息,请查看 SQL 嵌入式代码部分。

场景

输入

在此示例中,SQL 代码嵌入到一个名为 query 的变量中,而该变量用作 Pyspark.sql 方法的参数。

query = f"SELECT * from myTable"
spark.sql(query)
Copy

输出

SMA 检测到 PySpark.sql 参数是变量而不是 SQL 代码,因此在 PySpark.sql 所在行中添加了 EWI SPRKPY1077 消息。

query = f"SELECT * myTable"
#EWI: SPRKPY1077 => SQL embedded code cannot be processed.
spark.sql(query)
Copy

其他建议

  • 在 SQL 转换过程中,此代码必须直接作为方法参数传递,且仅能使用字符串值,禁止使用插值。请检查传递 PySpark.SQL 函数的 SQL 语句,以验证其在 Snowflake 平台上的功能。

  • 如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。

SPRKPY1078

消息:pyspark.context.SparkContext.setLogLevel 函数的实参不是字面量值,因此无法求值

类别:警告

描述

当 SMA 检测到带非字面量值(例如实参为变量)使用了 pyspark.context.SparkContext.setLogLevel (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.setLogLevel.html) 函数时,就会出现此问题。

SMA 会对您的源代码进行静态分析,因此无法对实参的内容求值并确定在 Snowpark 中是否有等效选项。

场景

输入

在此示例中,在变量 my_log_level 中定义 logLevel,然后通过 setLogLevel 方法将 my_log_level 用作参数。

my_log_level = "WARN"
sparkSession.sparkContext.setLogLevel(my_log_level)
Copy

输出

SMA 无法对日志级别参数的实参求值,因此会在转换后的日志记录代码行上方添加 EWI SPRKPY1078:

my_log_level = "WARN"
#EWI: SPRKPY1078 => my_log_level is not a literal value and therefore could not be evaluated. Make sure the value of my_log_level is a valid level in Snowpark. Valid log levels are: logging.CRITICAL, logging.DEBUG, logging.ERROR, logging.INFO, logging.NOTSET, logging.WARNING
logging.basicConfig(stream = sys.stdout, level = my_log_level)
Copy

推荐修复方法

尽管 SMA 无法对此实参求值,但它会将 pyspark.context.SparkContext.setLogLevel 函数转换为 Snowpark 等效选项。请根据下表确保生成的输出代码中 level 实参的值是 Snowpark 中有效且等效的日志级别:

PySpark 日志级别

Snowpark 日志级别等效选项

ALL

logging.NOTSET

DEBUG

logging.DEBUG

ERROR

logging.ERROR

FATAL

logging.CRITICAL

INFO

logging.INFO

OFF

logging.WARNING

TRACE

logging.NOTSET

WARN

logging.WARNING

因此,建议的修复方法将如下所示:

my_log_level = logging.WARNING
logging.basicConfig(stream = sys.stdout, level = my_log_level)
Copy

其他建议

SPRKPY1079

消息:pyspark.context.SparkContext.setLogLevel 函数的参数不是有效的 PySpark 日志级别。

类别:警告

描述

在以下情况下会出现此问题:SMA 检测到带实参使用了 pyspark.context.SparkContext.setLogLevel (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.setLogLevel.html) 函数,该实参是在 PySpark 中无效的日志级别,因此无法确定在 Snowpark 中的等效日志级别。

场景

输入

这里的日志级别使用的“INVALID_LOG_LEVEL”不是有效的 Pyspark 日志级别。

sparkSession.sparkContext.setLogLevel("INVALID_LOG_LEVEL")
Copy

输出

尽管 SMA 进行了转换,但 SMA 无法识别日志级别“INVALID_LOG_LEVEL”,添加了 EWI SPRKPY1079 以提示可能存在问题。

#EWI: SPRKPY1079 => INVALID_LOG_LEVEL is not a valid PySpark log level, therefore an equivalent could not be determined in Snowpark. Valid PySpark log levels are: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
logging.basicConfig(stream = sys.stdout, level = logging.INVALID_LOG_LEVEL)
Copy

推荐修复方法

请确保在 pyspark.context.SparkContext.setLogLevel 函数中使用的日志级别是 PySpark (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.setLogLevel.html) 或 Snowpark 中的有效日志级别,然后重试。

logging.basicConfig(stream = sys.stdout, level = logging.DEBUG)
Copy

其他建议

SPRKPY1081

Spark Conversion Core 4.12.0 起,此问题代码已 弃用

消息:pyspark.sql.readwriter.DataFrameWriter.partitionBy 有替代方案。

类别:警告

描述

Pyspark.sql.readwriter.DataFrameWriter.partitionBy (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.partitionBy.html) 函数不受支持。替代方案是改用 Snowpark 的 copy_into_location。有关更多信息,请参阅文档。

场景

输入

此代码将为 FIRST_NAME 列中的每个唯一值创建一个单独的目录。数据是相同的,但会根据列存储在不同的目录中。

df = session.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]], schema = ["FIRST_NAME", "LAST_NAME"])
df.write.partitionBy("FIRST_NAME").csv("/home/data")
Copy

此代码将为 FIRST_NAME 列中的每个唯一值创建一个单独的目录。数据是相同的,但会根据列存储在不同的目录中。

输出代码

df = session.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]], schema = ["FIRST_NAME", "LAST_NAME"])
#EWI: SPRKPY1081 => The partitionBy function is not supported, but you can instead use copy_into_location as workaround. See the documentation for more info.
df.write.partitionBy("FIRST_NAME").csv("/home/data", format_type_options = dict(compression = "None"))
Copy

推荐修复方法

在 Snowpark 中,copy_into_location 有一个 partition_by 参数可以用来代替 partitionBy 函数,但需要进行一些手动调整,如下面的示例所示:

Spark 代码:

df = session.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]], schema = ["FIRST_NAME", "LAST_NAME"])
df.write.partitionBy("FIRST_NAME").csv("/home/data")
Copy

经过手动调整的 Snowpark 代码:

df = session.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]], schema = ["FIRST_NAME", "LAST_NAME"])
df.write.copy_into_location(location=temp_stage, partition_by=col("FIRST_NAME"), file_format_type="csv", format_type_options={"COMPRESSION": "NONE"}, header=True)
Copy

copy_into_location 具有以下参数

  • location:Snowpark 位置仅接受使用 Snowflake 暂存区 的云位置。

  • partition_by:它可以是列名或 SQL 表达式,因此需要使用 col 或 sql_expr 转换为列或 SQL。

其他建议

SPRKPY1082

消息:pyspark.sql.readwriter.DataFrameReader.load 函数不受支持。一种替代方案是改用 Snowpark DataFrameReader 格式特定的方法(avro csv、json、orc、parquet)。path 参数应该是暂存区位置。

类别:警告

描述

pyspark.sql.readwriter.DataFrameReader.load (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.load.html) 函数不受支持。替代方案是改用 Snowpark DataFrameReader 方法。

场景

Snowpark 中不存在此方法 DataFrameReader.load(path, format, schema, **options) 的 Spark 签名。因此,只要使用 load 函数,输出代码中就会有 EWI。

场景 1

输入

以下示例尝试从 CSV 源加载数据。

path_csv_file = "/path/to/file.csv"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])

my_session.read.load(path_csv_file, "csv").show()
my_session.read.load(path_csv_file, "csv", schema=schemaParam).show()
my_session.read.load(path_csv_file, "csv", schema=schemaParam, lineSep="\r\n", dateFormat="YYYY/MM/DD").show()
Copy

输出

SMA 添加了 EWI SPRKPY1082,以提示 Snowpark 不支持此函数,但它有替代方案。

path_csv_file = "/path/to/file.csv"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.

my_session.read.load(path_csv_file, "csv").show()
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.
my_session.read.load(path_csv_file, "csv", schema=schemaParam).show()
#EWI: The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.
my_session.read.load(path_csv_file, "csv", schema=schemaParam, lineSep="\r\n", dateFormat="YYYY/MM/DD").show()
Copy

推荐修复方法

作为一种替代方案,可以改用 Snowpark DataFrameReader 方法。

  • 修复 pathformat 参数:

    • csv 方法替换 load 方法。

    • 第一个参数 path 必须在暂存区中才能与 Snowpark 等效。

以下示例演示了如何创建临时暂存区并将文件存入其中,然后调用 CSV 方法。

path_csv_file = "/path/to/file.csv"

## Stage creation

temp_stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
my_session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}').show()
my_session.file.put(f"file:///path/to/file.csv", f"@{temp_stage}")
stage_file_path = f"{temp_stage}file.csv"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])

my_session.read.csv(stage_file_path).show()
Copy
  • 修复 schema 参数:

    • 可以使用 schema 函数设置架构,如下所示:

schemaParam = StructType([
        StructField("name", StringType(), True),
        StructField("city", StringType(), True)
    ])

df = my_session.read.schema(schemaParam).csv(temp_stage)
Copy
  • 修复 options 参数:

Spark 和 Snowpark 之间的 选项 不一样,在本示例中,lineSepdateFormat 被替换为 RECORD_DELIMITERDATE_FORMAT其他建议 部分中含有包含所有等效值的表格。

以下示例演示了如何使用 RECORD_DELIMITERDATE_FORMAT 创建字典,然后使用该字典调用 options 方法。

optionsParam = {"RECORD_DELIMITER": "\r\n", "DATE_FORMAT": "YYYY/MM/DD"}
df = my_session.read.options(optionsParam).csv(stage)
Copy

场景 2

输入

以下示例尝试从 JSON 源加载数据。

path_json_file = "/path/to/file.json"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])

my_session.read.load(path_json_file, "json").show()
my_session.read.load(path_json_file, "json", schema=schemaParam).show()
my_session.read.load(path_json_file, "json", schema=schemaParam, dateFormat="YYYY/MM/DD", timestampFormat="YYYY-MM-DD HH24:MI:SS.FF3").show()
Copy

输出

SMA 添加了 EWI SPRKPY1082,以提示 Snowpark 不支持此函数,但它有替代方案。

path_json_file = "/path/to/file.json"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.

my_session.read.load(path_json_file, "json").show()
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.
my_session.read.load(path_json_file, "json", schema=schemaParam).show()
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.
my_session.read.load(path_json_file, "json", schema=schemaParam, dateFormat="YYYY/MM/DD", timestampFormat="YYYY-MM-DD HH24:MI:SS.FF3").show()
Copy

推荐修复方法

作为一种替代方案,可以改用 Snowpark DataFrameReader 方法。

  • 修复 pathformat 参数:

    • json 方法替换 load 方法

    • 第一个参数 path 必须在暂存区中才能与 Snowpark 等效。

以下示例演示了如何创建临时暂存区并将文件存入其中,然后调用 JSON 方法。

path_json_file = "/path/to/file.json"

## Stage creation

temp_stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
my_session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}').show()
my_session.file.put(f"file:///path/to/file.json", f"@{temp_stage}")
stage_file_path = f"{temp_stage}file.json"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])

my_session.read.json(stage_file_path).show()
Copy
  • 修复 schema 参数:

    • 可以使用 schema 函数设置架构,如下所示:

schemaParam = StructType([
        StructField("name", StringType(), True),
        StructField("city", StringType(), True)
    ])

df = my_session.read.schema(schemaParam).json(temp_stage)
Copy
  • 修复 options 参数:

Spark 和 Snowpark 之间的 选项 不一样,在本示例中,dateFormattimestampFormat 被替换为 DATE_FORMATTIMESTAMP_FORMAT其他建议 部分中含有包含所有等效值的表格。

以下示例演示了如何使用 DATE_FORMATTIMESTAMP_FORMAT 创建字典,然后使用该字典调用 options 方法。

optionsParam = {"DATE_FORMAT": "YYYY/MM/DD", "TIMESTAMP_FORMAT": "YYYY-MM-DD HH24:MI:SS.FF3"}
df = Session.read.options(optionsParam).json(stage)
Copy

场景 3

输入

以下示例尝试从 PARQUET 源加载数据。

path_parquet_file = "/path/to/file.parquet"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])

my_session.read.load(path_parquet_file, "parquet").show()
my_session.read.load(path_parquet_file, "parquet", schema=schemaParam).show()
my_session.read.load(path_parquet_file, "parquet", schema=schemaParam, pathGlobFilter="*.parquet").show()
Copy

输出

SMA 添加了 EWI SPRKPY1082,以提示 Snowpark 不支持此函数,但它有替代方案。

path_parquet_file = "/path/to/file.parquet"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.

my_session.read.load(path_parquet_file, "parquet").show()
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.
my_session.read.load(path_parquet_file, "parquet", schema=schemaParam).show()
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.
my_session.read.load(path_parquet_file, "parquet", schema=schemaParam, pathGlobFilter="*.parquet").show()
Copy

推荐修复方法

作为一种替代方案,可以改用 Snowpark DataFrameReader 方法。

  • 修复 pathformat 参数:

    • parquet 方法替换 load 方法

    • 第一个参数 path 必须在暂存区中才能与 Snowpark 等效。

以下示例演示了如何创建临时暂存区并将文件存入其中,然后调用 PARQUET 方法。

path_parquet_file = "/path/to/file.parquet"

## Stage creation

temp_stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
my_session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}').show()
my_session.file.put(f"file:///path/to/file.parquet", f"@{temp_stage}")
stage_file_path = f"{temp_stage}file.parquet"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])

my_session.read.parquet(stage_file_path).show()
Copy
  • 修复 schema 参数:

    • 可以使用 schema 函数设置架构,如下所示:

schemaParam = StructType([
        StructField("name", StringType(), True),
        StructField("city", StringType(), True)
    ])

df = my_session.read.schema(schemaParam).parquet(temp_stage)
Copy
  • 修复 options 参数:

Spark 和 Snowpark 之间的 选项 不一样,在本示例中,pathGlobFilter 被替换为 PATTERN其他建议 部分中含有包含所有等效值的表格。

以下示例演示了如何使用 PATTERN 创建字典,然后使用该字典调用 options 方法。

optionsParam = {"PATTERN": "*.parquet"}
df = Session.read.options(optionsParam).parquet(stage)
Copy

其他建议

  • 请注意,Spark 和 Snowpark 之间的选项不一样,但它们可以映射:

Spark 选项

可能的值

Snowpark 等效选项

描述

header

True 或 False

SKIP_HEADER = 1 / SKIP_HEADER = 0

使用文件的第一行作为列名。

delimiter

任何单字符/多字符字段分隔符

FIELD_DELIMITER

指定单个字符/多个字符作为每个列/字段的分隔符。

sep

任何单字符字段分隔符

FIELD_DELIMITER

为每个列/字段指定一个字符作为分隔符。

encoding

UTF-8、UTF-16 等等...

ENCODING

按给定的编码类型对 CSV 文件进行解码。默认编码为 UTF-8

lineSep

任何单字符行分隔符

RECORD_DELIMITER

定义应用于文件解析的行分隔符。

pathGlobFilter

文件模式

PATTERN

定义一种模式,以仅读取文件名与模式匹配的文件。

recursiveFileLookup

True 或 False

不适用

以递归方式扫描目录以读取文件。此选项的默认值为 False。

quote

要引用的单个字符

FIELD_OPTIONALLY_ENCLOSED_BY

引用字段/列,其包含定界符/分隔符可以是值的一部分的字段。此字符 To 与 quoteAll 选项一起使用时引用所有字段。此选项的默认值为双引号 (")。

nullValue

用于替换 null 的字符串

NULL_IF

在读取和写入 DataFrame 时用字符串替换空值。

dateFormat

有效的日期格式

DATE_FORMAT

定义一个表示日期格式的字符串。默认格式为 yyyy-MM-dd。

timestampFormat

有效的时间戳格式

TIMESTAMP_FORMAT

定义一个表示时间戳格式的字符串。默认格式为 yyyy-MM-dd 'T'HH:mm:ss。

escape

任何单个字符

ESCAPE

将单个字符设置为转义字符以覆盖默认的转义字符 (\)。

inferSchema

True 或 False

INFER_SCHEMA

自动检测文件架构

mergeSchema

True 或 False

不适用

在 Snowflake 中不需要,因为每当 infer_schema 确定 parquet 文件结构时就会发生这种情况

  • 对于 modifiedBefore / modifiedAfter 选项,可以通过使用元数据列然后添加诸如以下筛选器以在 Snowflake 中获得相同的结果:df.filter(METADATA_FILE_LAST_MODIFIED > ‘some_date’)

  • 如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。

SPRKPY1083

消息:pyspark.sql.readwriter.DataFrameWriter.save 函数不受支持。一种替代方案是改用 Snowpark DataFrameWriter copy_into_location 方法。

类别:警告

描述

pyspark.sql.readwriter.DataFrameWriter.save (https://spark.apache.org/docs/3.5.3/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.save.html) 函数不受支持。替代方案是改用 Snowpark DataFrameWriter 方法。

场景

Snowpark 中不存在此方法 DataFrameWriter.save(path, format, mode, partitionBy, **options) 的 Spark 签名。因此,只要使用 load 函数,输出代码中就会有 EWI。

场景 1

输入代码

以下是尝试使用 CSV 格式保存数据的示例。

path_csv_file = "/path/to/file.csv"

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = my_session.createDataFrame(data, schema=["Name", "Age", "City"])

df.write.save(path_csv_file, format="csv")
df.write.save(path_csv_file, format="csv", mode="overwrite")
df.write.save(path_csv_file, format="csv", mode="overwrite", lineSep="\r\n", dateFormat="YYYY/MM/DD")
df.write.save(path_csv_file, format="csv", mode="overwrite", partitionBy="City", lineSep="\r\n", dateFormat="YYYY/MM/DD")
Copy

输出代码

该工具在输出代码中添加了 EWI SPRKPY1083,以提示 Snowpark 不支持此函数,但它有替代方案。

path_csv_file = "/path/to/file.csv"

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = my_session.createDataFrame(data, schema=["Name", "Age", "City"])

#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_csv_file, format="csv")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_csv_file, format="csv", mode="overwrite")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_csv_file, format="csv", mode="overwrite", lineSep="\r\n", dateFormat="YYYY/MM/DD")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_csv_file, format="csv", mode="overwrite", partitionBy="City", lineSep="\r\n", dateFormat="YYYY/MM/DD")
Copy

推荐修复方法

作为一种替代方案,可以改用 Snowpark DataFrameWriter 方法。

  • 修复 pathformat 参数:

    • csvcopy_into_location 方法替换 load 方法。

    • 如果您使用的是 copy_into_location 方法,则需要使用 file_format_type 参数指定格式。

    • 第一个参数 path 必须在暂存区中才能与 Snowpark 等效。

以下示例演示了如何创建临时暂存区并将文件存入其中,然后调用上述方法之一。

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Stage creation

temp_stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
my_session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}').show()
my_session.file.put(f"file:///path/to/file.csv", f"@{temp_stage}")
stage_file_path = f"{temp_stage}file.csv"

## Using csv method
df.write.csv(stage_file_path)

## Using copy_into_location method
df.write.copy_into_location(stage_file_path, file_format_type="csv")
Copy

以下示例展示了如何在调用链中添加带有 overwrite 参数的 mode 方法:

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Using csv method
df.write.mode("overwrite").csv(temp_stage)

## Using copy_into_location method
df.write.mode("overwrite").copy_into_location(temp_stage, file_format_type="csv")
Copy
  • 修复 partitionBy 参数:

    • 使用 CSV 方法中的 partition_by 参数,如下所示:

以下是使用 CSV 方法中的 partition_by 参数的示例。

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Using csv method
df.write.csv(temp_stage, partition_by="City")

## Using copy_into_location method
df.write.copy_into_location(temp_stage, file_format_type="csv", partition_by="City")
Copy
  • 修复 options 参数:

spark 和 snowpark 之间的选项不一样,在此示例中,lineSepdateFormat 被替换为 RECORD_DELIMITERDATE_FORMAT其他建议 部分中含有包含所有等效值的表格。

以下示例演示了如何使用 RECORD_DELIMITERDATE_FORMAT 创建字典,然后使用该字典调用 options 方法。

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
optionsParam = {"RECORD_DELIMITER": "\r\n", "DATE_FORMAT": "YYYY/MM/DD"}

## Using csv method
df.write.csv(stage, format_type_options=optionsParam)

## Using copy_into_location method
df.write.csv(stage, file_format_type="csv", format_type_options=optionsParam)
Copy

场景 2

输入代码

以下是尝试使用 JSON 格式保存数据的示例。

path_json_file = "/path/to/file.json"

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

df.write.save(path_json_file, format="json")
df.write.save(path_json_file, format="json", mode="overwrite")
df.write.save(path_json_file, format="json", mode="overwrite", dateFormat="YYYY/MM/DD", timestampFormat="YYYY-MM-DD HH24:MI:SS.FF3")
df.write.save(path_json_file, format="json", mode="overwrite", partitionBy="City", dateFormat="YYYY/MM/DD", timestampFormat="YYYY-MM-DD HH24:MI:SS.FF3")
Copy

输出代码

该工具在输出代码中添加了 EWI SPRKPY1083,以提示 Snowpark 不支持此函数,但它有替代方案。

path_json_file = "/path/to/file.json"

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_json_file, format="json")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_json_file, format="json", mode="overwrite")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_json_file, format="json", mode="overwrite", dateFormat="YYYY/MM/DD", timestampFormat="YYYY-MM-DD HH24:MI:SS.FF3")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_json_file, format="json", mode="overwrite", partitionBy="City", dateFormat="YYYY/MM/DD", timestampFormat="YYYY-MM-DD HH24:MI:SS.FF3")
Copy

推荐修复方法

作为一种替代方案,可以改用 Snowpark DataFrameReader 方法。

  • 修复 pathformat 参数:

    • jsoncopy_into_location 方法替换 load 方法

    • 如果您使用的是 copy_into_location 方法,则需要使用 file_format_type 参数指定格式。

    • 第一个参数 path 必须在暂存区中才能与 Snowpark 等效。

以下示例演示了如何创建临时暂存区并将文件存入其中,然后调用上述方法之一。

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Stage creation

temp_stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
my_session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}').show()
my_session.file.put(f"file:///path/to/file.json", f"@{temp_stage}")
stage_file_path = f"{temp_stage}file.json"

## Using json method
df.write.json(stage_file_path)

## Using copy_into_location method
df.write.copy_into_location(stage_file_path, file_format_type="json")
Copy

以下示例展示了如何在调用链中添加带有 overwrite 参数的 mode 方法:

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Using json method
df.write.mode("overwrite").json(temp_stage)

## Using copy_into_location method
df.write.mode("overwrite").copy_into_location(temp_stage, file_format_type="json")
Copy
  • 修复 partitionBy 参数:

    • 使用 CSV 方法中的 partition_by 参数,如下所示:

以下是使用 CSV 方法中的 partition_by 参数的示例。

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Using json method
df.write.json(temp_stage, partition_by="City")

## Using copy_into_location method
df.write.copy_into_location(temp_stage, file_format_type="json", partition_by="City")
Copy
  • 修复 options 参数:

Spark 和 Snowpark 之间的选项不一样,在此示例中,dateFormattimestampFormat 被替换为 DATE_FORMATTIMESTAMP_FORMAT其他建议 部分中含有包含所有等效值的表格。

以下示例演示了如何使用 DATE_FORMATTIMESTAMP_FORMAT 创建字典,然后使用该字典调用 options 方法。

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
optionsParam = {"DATE_FORMAT": "YYYY/MM/DD", "TIMESTAMP_FORMAT": "YYYY-MM-DD HH24:MI:SS.FF3"}

## Using json method
df.write.json(stage, format_type_options=optionsParam)

## Using copy_into_location method
df.write.copy_into_location(stage, file_format_type="json", format_type_options=optionsParam)
Copy

场景 3

输入代码

以下是尝试使用 PARQUET 格式保存数据的示例。

path_parquet_file = "/path/to/file.parquet"

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

df.write.save(path_parquet_file, format="parquet")
df.write.save(path_parquet_file, format="parquet", mode="overwrite")
df.write.save(path_parquet_file, format="parquet", mode="overwrite", pathGlobFilter="*.parquet")
df.write.save(path_parquet_file, format="parquet", mode="overwrite", partitionBy="City", pathGlobFilter="*.parquet")
Copy

输出代码

该工具在输出代码中添加了 EWI SPRKPY1083,以提示 Snowpark 不支持此函数,但它有替代方案。

path_parquet_file = "/path/to/file.parquet"

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_parquet_file, format="parquet")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_parquet_file, format="parquet", mode="overwrite")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_parquet_file, format="parquet", mode="overwrite", pathGlobFilter="*.parquet")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_parquet_file, format="parquet", mode="overwrite", partitionBy="City", pathGlobFilter="*.parquet")
Copy

推荐修复方法

作为一种替代方案,可以改用 Snowpark DataFrameReader 方法。

  • 修复 pathformat 参数:

    • parquetcopy_into_location 方法替换 load 方法。

    • 如果您使用的是 copy_into_location 方法,则需要使用 file_format_type 参数指定格式。

    • 第一个参数 path 必须在暂存区中才能与 Snowpark 等效。

以下示例演示了如何创建临时暂存区并将文件存入其中,然后调用上述方法之一。

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Stage creation

temp_stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
my_session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}').show()
my_session.file.put(f"file:///path/to/file.parquet", f"@{temp_stage}")
stage_file_path = f"{temp_stage}file.parquet"

## Using parquet method
df.write.parquet(stage_file_path)

## Using copy_into_location method
df.write.copy_into_location(stage, file_format_type="parquet")
Copy

以下示例展示了如何在调用链中添加带有 overwrite 参数的 mode 方法:

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Using parquet method
df.write.mode("overwrite").parquet(temp_stage)

## Using copy_into_location method
df.write.mode("overwrite").copy_into_location(stage, file_format_type="parquet")
Copy
  • 修复 partitionBy 参数:

    • 使用 CSV 方法中的 partition_by 参数,如下所示:

以下是使用 parquet 方法中的 partition_by 参数的示例。

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Using parquet method
df.write.parquet(temp_stage, partition_by="City")

## Using copy_into_location method
df.write.copy_into_location(stage, file_format_type="parquet", partition_by="City")
Copy
  • 修复 options 参数:

Spark 和 Snowpark 之间的选项不一样,在此示例中,pathGlobFilter 被替换为 PATTERN其他建议 部分中含有包含所有等效值的表格。

以下示例演示了如何使用 PATTERN 创建字典,然后使用该字典调用 options 方法。

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
optionsParam = {"PATTERN": "*.parquet"}

## Using parquet method
df.write.parquet(stage, format_type_options=optionsParam)

## Using copy_into_location method
df.write.copy_into_location(stage, file_format_type="parquet", format_type_options=optionsParam)
Copy

其他建议

  • 考虑到 Spark 和 Snowpark 之间的选项不一样,但它们可以映射:

Spark 选项

可能的值

Snowpark 等效选项

描述

header

True 或 False

SKIP_HEADER = 1 / SKIP_HEADER = 0

使用文件的第一行作为列名。

delimiter

任何单字符/多字符字段分隔符

FIELD_DELIMITER

指定单个字符/多个字符作为每个列/字段的分隔符。

sep

任何单字符字段分隔符

FIELD_DELIMITER

为每个列/字段指定一个字符作为分隔符。

encoding

UTF-8、UTF-16 等等...

ENCODING

按给定的编码类型对 CSV 文件进行解码。默认编码为 UTF-8

lineSep

任何单字符行分隔符

RECORD_DELIMITER

定义应用于文件解析的行分隔符。

pathGlobFilter

文件模式

PATTERN

定义一种模式,以仅读取文件名与模式匹配的文件。

recursiveFileLookup

True 或 False

不适用

以递归方式扫描目录以读取文件。此选项的默认值为 False。

quote

要引用的单个字符

FIELD_OPTIONALLY_ENCLOSED_BY

引用字段/列,其包含定界符/分隔符可以是值的一部分的字段。此字符 To 与 quoteAll 选项一起使用时引用所有字段。此选项的默认值为双引号 (")。

nullValue

用于替换 null 的字符串

NULL_IF

在读取和写入 DataFrame 时用字符串替换空值。

dateFormat

有效的日期格式

DATE_FORMAT

定义一个表示日期格式的字符串。默认格式为 yyyy-MM-dd。

timestampFormat

有效的时间戳格式

TIMESTAMP_FORMAT

定义一个表示时间戳格式的字符串。默认格式为 yyyy-MM-dd 'T'HH:mm:ss。

escape

任何单个字符

ESCAPE

将单个字符设置为转义字符以覆盖默认的转义字符 (\)。

inferSchema

True 或 False

INFER_SCHEMA

自动检测文件架构

mergeSchema

True 或 False

不适用

在 Snowflake 中不需要,因为每当 infer_schema 确定 parquet 文件结构时就会发生这种情况

  • 对于 modifiedBefore / modifiedAfter 选项,可以使用元数据列在 Snowflake 中获得相同的结果,然后添加一个筛选器,比如:df.filter(METADATA_FILE_LAST_MODIFIED > ‘some_date’)

  • 如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。

SPRKPY1084

Spark Conversion Core 4.12.0 起,此问题代码已 弃用

消息:pyspark.sql.readwriter.DataFrameWriter.option 不受支持。

类别:警告

描述

pyspark.sql.readwriter.DataFrameWriter.option (https://spark.apache.org/docs/3.5.3/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.option.html) 函数不受支持。

场景

输入代码

以下示例使用 option 方法,该方法用于在写入 DataFrame 的数据时添加其他配置。

path_csv_file = "/path/to/file.csv"
data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

df.write.option("header", True).csv(csv_file_path)
df.write.option("sep", ";").option("lineSep","-").csv(csv_file_path)
Copy

输出代码

该工具在输出代码中添加了 EWI SPRKPY1084,以提示 Snowpark 不支持此函数。

path_csv_file = "/path/to/file.csv"
data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

#EWI: SPRKPY1084 => The pyspark.sql.readwriter.DataFrameWriter.option function is not supported.

df.write.option("header", True).csv(csv_file_path)
#EWI: SPRKPY1084 => The pyspark.sql.readwriter.DataFrameWriter.option function is not supported.
df.write.option("sep", ";").option("lineSep","-").csv(csv_file_path)
Copy

推荐修复方法

pyspark.sql.readwriter.DataFrameWriter.option 方法没有建议的修复方法。

其他建议

SPRKPY1085

消息:pyspark.ml.feature.VectorAssembler 不受支持。

类别:警告

描述

pyspark.ml.feature.VectorAssembler (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.VectorAssembler.html) 不受支持。

场景

输入代码

VectorAssembler 用于将几列组合成一个向量。

data = [
        (1, 10.0, 20.0),
        (2, 25.0, 30.0),
        (3, 50.0, 60.0)
    ]

df = SparkSession.createDataFrame(data, schema=["Id", "col1", "col2"])
vector = VectorAssembler(inputCols=["col1", "col2"], output="cols")
Copy

输出代码

该工具在输出代码中添加了此 EWI SPRKPY1085,以提示 Snowpark 不支持此类。

data = [
        (1, 10.0, 20.0),
        (2, 25.0, 30.0),
        (3, 50.0, 60.0)
    ]

df = spark.createDataFrame(data, schema=["Id", "col1", "col2"])
#EWI: SPRKPY1085 => The pyspark.ml.feature.VectorAssembler function is not supported.

vector = VectorAssembler(inputCols=["col1", "col2"], output="cols")
Copy

推荐修复方法

pyspark.ml.feature.VectorAssembler 没有建议的修复方法。

其他建议

SPRKPY1086

消息:pyspark.ml.linalg.VectorUDT 不受支持。

类别:警告

描述

pyspark.ml.linalg.VectorUDT (https://spark.apache.org/docs/latest/api/python/_modules/pyspark/ml/linalg.html) 不受支持。

场景

输入代码

VectorUDT 是一种用于表示 DataFrame 中向量列的数据类型。

data = [
        (1, Vectors.dense([10.0, 20.0])),
        (2, Vectors.dense([25.0, 30.0])),
        (3, Vectors.dense([50.0, 60.0]))
    ]

schema = StructType([
        StructField("Id", IntegerType(), True),
        StructField("VectorCol", VectorUDT(), True),
    ])

df = SparkSession.createDataFrame(data, schema=schema)
Copy

输出代码

该工具在输出代码中添加了 EWI SPRKPY1086,以提示 Snowpark 不支持此函数。

data = [
        (1, Vectors.dense([10.0, 20.0])),
        (2, Vectors.dense([25.0, 30.0])),
        (3, Vectors.dense([50.0, 60.0]))
    ]

#EWI: SPRKPY1086 => The pyspark.ml.linalg.VectorUDT function is not supported.
schema = StructType([
        StructField("Id", IntegerType(), True),
        StructField("VectorCol", VectorUDT(), True),
    ])

df = spark.createDataFrame(data, schema=schema)
Copy

推荐修复方法

pyspark.ml.linalg.VectorUDT 没有建议的修复方法。

其他建议

SPRKPY1087

消息:pyspark.sql.dataframe.DataFrame.writeTo 函数不受支持,但它有替代方案。

类别:警告。

描述

pyspark.sql.dataframe.DataFrame.writeTo (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.writeTo.html) 函数不受支持。替代方案是改为使用 Snowpark DataFrameWriter SaveAsTable 方法。

场景

输入

以下示例使用 pyspark.sql.dataframe.DataFrame.writeTo 函数,DataFrame df 被写入名为 Personal_info 的表中。

df = spark.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]],
                                 schema=["FIRST_NAME", "LAST_NAME"])

df.writeTo("Personal_info")
Copy

输出

SMA 在输出代码中添加了 EWI SPRKPY1087,以提示不支持此函数,但它有替代方案。

df = spark.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]],
                                 schema=["FIRST_NAME", "LAST_NAME"])

#EWI: SPRKPY1087 => pyspark.sql.dataframe.DataFrame.writeTo is not supported, but it has a workaround.
df.writeTo("Personal_info")
Copy

推荐修复方法

替代方案是改用 Snowpark DataFrameWriter SaveAsTable 方法。

df = spark.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]],
                                 schema=["FIRST_NAME", "LAST_NAME"])

df.write.saveAsTable("Personal_info")
Copy

其他建议

SPRKPY1088

消息:Snowpark 中的 pyspark.sql.readwriter.DataFrameWriter.option 值可能不同,因此可能需要进行验证。

类别:警告

描述

Snowpark 中的 pyspark.sql.readwriter.DataFrameWriter.option (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.option.html) 值可能不同,因此可能需要进行验证以确保行为正确无误。

场景

有一些场景,具体取决于是否支持该选项,或者写入文件时使用的格式。

场景 1

输入

下面的示例展示了如何使用方法选项,添加了 sep 选项,其当前为 supported

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])

df.write.option("sep", ",").csv("some_path")
Copy

输出

该工具添加了 EWI SPRKPY1088,提示需要进行验证。

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])
#EWI: SPRKPY1088 => The pyspark.sql.readwriter.DataFrameWriter.option values in Snowpark may be different, so required validation might be needed.
df.write.option("sep", ",").csv("some_path")
Copy

推荐修复方法

Snowpark API 支持此参数,因此唯一的操作是在迁移后检查行为。请参阅 等效选项表 以查看支持的参数。

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])
#EWI: SPRKPY1088 => The pyspark.sql.readwriter.DataFrameWriter.option values in Snowpark may be different, so required validation might be needed.
df.write.option("sep", ",").csv("some_path")
Copy
场景 2

输入

下面的场景展示了如何使用选项,但添加了一个 header 选项,其为 not supported

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])

df.write.option("header", True).csv("some_path")
Copy

输出

该工具添加了 EWI SPRKPY1088,提示需要进行验证。

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])
#EWI: SPRKPY1088 => The pyspark.sql.readwriter.DataFrameWriter.option values in Snowpark may be different, so required validation might be needed.
df.write.option("header", True).csv("some_path")
Copy

推荐修复方法

对于此场景,建议对 Snowpark 格式类型选项 求值,看看是否可以根据需要进行更改。另外,检查更改后的行为。

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])
#EWI: SPRKPY1088 => The pyspark.sql.readwriter.DataFrameWriter.option values in Snowpark may be different, so required validation might be needed.
df.write.csv("some_path")
Copy
场景 3

输入

此场景添加一个 sep 选项,其为 supported 并使用 JSON 方法。

  • 注意:此场景也适用于 PARQUET

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])

df.write.option("sep", ",").json("some_path")
Copy

输出

该工具添加了 EWI SPRKPY1088,提示需要进行验证。

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])
#EWI: SPRKPY1088 => The pyspark.sql.readwriter.DataFrameWriter.option values in Snowpark may be different, so required validation might be needed.
df.write.option("sep", ",").json("some_path")
Copy

推荐修复方法

文件格式 JSON 不支持参数 sep,因此建议对 Snowpark 格式类型选项 进行求值,看看是否可以根据需要对其进行更改。另外,检查更改后的行为。

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])
#EWI: SPRKPY1088 => The pyspark.sql.readwriter.DataFrameWriter.option values in Snowpark may be different, so required validation might be needed.
df.write.json("some_path")
Copy

其他建议

  • 由于有一些 not supported 参数,建议查看 table of equivalences 并检查转换后的行为。

  • 等效选项表:

PySpark 选项

SnowFlake 选项

支持的文件格式

描述

SEP

FIELD_DELIMITER

CSV

一个或多个单字节或多字节字符,用于分隔输入文件中的字段。

LINESEP

RECORD_DELIMITER

CSV

用于分隔输入文件中记录的一个或多个字符。

QUOTE

FIELD_OPTIONALLY_ENCLOSED_BY

CSV

用于包围字符串的字符。

NULLVALUE

NULL_IF

CSV

用于与 SQL NULL 相互转换的字符串。

DATEFORMAT

DATE_FORMAT

CSV

定义要加载的数据文件中日期值格式的字符串。

TIMESTAMPFORMAT

TIMESTAMP_FORMAT

CSV

定义要加载的数据文件中时间戳值格式的字符串。

如果使用的参数不在列表中,则 API 会抛出错误。

SPRKPY1089

消息:Snowpark 中的 pyspark.sql.readwriter.DataFrameWriter.options 值可能不同,因此可能需要进行验证。

类别:警告

描述

Snowpark 中的 pyspark.sql.readwriter.DataFrameWriter.options (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.options.html) 值可能不同,因此可能需要进行验证以确保行为正确无误。

场景

有一些场景,具体取决于是否支持这些选项,或者写入文件时使用的格式。

场景 1

输入

下面的示例展示了如何使用方法选项,添加了 sepnullValue 选项,当前均为 supported

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])

df.write.options(nullValue="myVal", sep=",").csv("some_path")
Copy

输出

该工具添加了 EWI SPRKPY1089,提示需要进行验证。

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])
#EWI: SPRKPY1089 => The pyspark.sql.readwriter.DataFrameWriter.options values in Snowpark may be different, so required validation might be needed.
df.write.options(nullValue="myVal", sep=",").csv("some_path")
Copy

推荐修复方法

Snowpark API 支持这些参数,因此唯一的操作是在迁移后检查行为。请参阅 等效选项表 以查看支持的参数。

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])
#EWI: SPRKPY1089 => The pyspark.sql.readwriter.DataFrameWriter.options values in Snowpark may be different, so required validation might be needed.
df.write.options(nullValue="myVal", sep=",").csv("some_path")
Copy
场景 2

输入

下面的场景展示使用了 options,但添加了一个 not supportedheader 选项。

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])

df.write.options(header=True, sep=",").csv("some_path")
Copy

输出

该工具添加了 EWI SPRKPY1089,提示需要进行验证。

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])
#EWI: SPRKPY1089 => The pyspark.sql.readwriter.DataFrameWriter.options values in Snowpark may be different, so required validation might be needed.
df.write.options(header=True, sep=",").csv("some_path")
Copy

推荐修复方法

对于此场景,建议对 Snowpark 格式类型选项 求值,看看是否可以根据需要进行更改。另外,检查更改后的行为。

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])
#EWI: SPRKPY1089 => The pyspark.sql.readwriter.DataFrameWriter.options values in Snowpark may be different, so required validation might be needed.
df.write.csv("some_path")
Copy
场景 3

输入

此场景添加一个 sep 选项,其为 supported 并使用 JSON 方法。

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])

df.write.options(nullValue="myVal", sep=",").json("some_path")
Copy

输出

该工具添加了 EWI SPRKPY1089,提示需要进行验证。

  • 注意:此场景也适用于 PARQUET

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])
#EWI: SPRKPY1089 => The pyspark.sql.readwriter.DataFrameWriter.options values in Snowpark may be different, so required validation might be needed.
df.write.options(nullValue="myVal", sep=",").json("some_path")
Copy

推荐修复方法

文件格式 JSON 不支持参数 sep,因此建议对 Snowpark 格式类型选项 进行求值,看看是否可以根据需要对其进行更改。另外,检查更改后的行为。

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])
#EWI: SPRKPY1089 => The pyspark.sql.readwriter.DataFrameWriter.options values in Snowpark may be different, so required validation might be needed.
df.write.json("some_path")
Copy

其他建议

  • 由于有一些 not supported 参数,建议查看 table of equivalences 并检查转换后的行为。

  • 等效选项表:

Snowpark 可以支持某些参数的 等效选项 列表:

PySpark 选项

SnowFlake 选项

支持的文件格式

描述

SEP

FIELD_DELIMITER

CSV

一个或多个单字节或多字节字符,用于分隔输入文件中的字段。

LINESEP

RECORD_DELIMITER

CSV

用于分隔输入文件中记录的一个或多个字符。

QUOTE

FIELD_OPTIONALLY_ENCLOSED_BY

CSV

用于包围字符串的字符。

NULLVALUE

NULL_IF

CSV

用于与 SQL NULL 相互转换的字符串。

DATEFORMAT

DATE_FORMAT

CSV

定义要加载的数据文件中日期值格式的字符串。

TIMESTAMPFORMAT

TIMESTAMP_FORMAT

CSV

定义要加载的数据文件中时间戳值格式的字符串。

如果使用的参数不在列表中,则 API 会抛出错误。

SPRKPY1101

类别

解析错误。

描述

当该工具识别出解析错误时,它会尝试从错误中恢复并在下一行继续该过程。在这种情况下,它会在该行中显示错误和注释。

下面的示例说明如何处理空格和制表符之间的不匹配错误。

输入代码

def foo():
    x = 5 # Spaces
	y = 6 # Tab

def foo2():
    x=6
    y=7
Copy

输出代码

def foo():
    x = 5 # Spaces
## EWI: SPRKPY1101 => Unrecognized or invalid CODE STATEMENT @(3, 2). Last valid token was '5' @(2, 9), failed token 'y' @(3, 2)
## y = 6 # Tab

def foo2():
    x=6
    y=7
Copy

建议

  • 尝试修复带注释的行。

  • 如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们。如果您与 Snowflake 签订了支持合同,请联系您的销售工程师,他们可以指导您的支持需求。

语言: 中文