Snowpark Migration Accelerator:Python 的问题代码¶
SPRKPY1000¶
消息:源项目 spark-core 版本为 xxx.xx:xx.x.x,Snowpark 支持的 Snowpark 核心版本为 2.12:3.1.2,因此现有映射之间可能存在功能差异。
类别:警告。
描述¶
当源代码的 Pyspark 版本不受支持时,就会出现此问题。这意味着现有映射之间可能存在功能差异。
其他建议¶
SMA 扫描的 pyspark 兼容 Snowpark 的版本范围为 2.12 至 3.1.2。如果您使用的版本不在此范围内,该工具可能产生不一致的分析结果。您可以修改正在扫描的源代码的版本。
要获得更多支持,您可以发送电子邮件至 sma-support@snowflake.com 联系我们,或者在 SMA (https://snowflakecomputing.atlassian.net/o/-MB4z_O8Sl--Tfl3XVml/s/6on4bNAZUZGzMpdEum8X/~/changes/371/user-guide/project-overview/configuration-and-settings#report-an-issue) 中发布问题。
SPRKPY1001¶
消息: 此代码段存在解析错误
类别: 解析错误。
描述¶
当 Snowpark Migration Accelerator (SMA) 无法正确读取或理解文件中的代码(它无法正确“解析”文件)时,就会报告解析错误。当文件出现一个或多个解析错误时,就会出现此问题代码。
场景 ¶
输入: 当代码的语法无效时,会出现 EWI 消息,例如:
def foo():
x = %%%%%%1###1
输出: SMA 发现解析错误并注释解析错误,添加了相应的 EWI 消息:
def foo():
x
## EWI: SPRKPY1101 => Unrecognized or invalid CODE STATEMENT @(2, 7). Last valid token was 'x' @(2, 5), failed token '=' @(2, 7)
## = %%%%%%1###1
其他建议 ¶
检查该文件是否包含有效的 Python 代码。(您可以使用 issues.csv 文件查找带有此 EWI 代码的所有文件,以确定哪些文件因解析错误而未得到该工具处理。)许多解析错误的发生愿因是只有一部分代码输入到工具中,因此最好确保代码将在源代码中运行。如果有效,请使用 SMA 中的“Report an Issue”选项报告您遇到的解析错误。在提交此问题时,请在描述中包括导致解析错误的代码行。
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1002¶
消息:不支持 < element >,不支持 Spark 元素。
类别: 转换错误。
描述¶
当该工具检测到使用了 Snowpark 不支持的元素,就会出现此问题,并且它没有自己的关联错误代码。这是 SMA 对不受支持的元素使用的通用错误代码。
其他建议¶
尽管不支持消息中的选项或元素,但这并不意味着找不到解决方案。这仅意味着该工具本身无法找到解决方案。
如果您遇到了 pyspark.ml 库中不受支持的元素,请考虑备选方法。还有其他指南可以解决与机器学习相关的问题,例如 Snowflake 的这篇指南。
检查源代码的语法是否正确。(您可以使用 issues.csv 文件来确定转换错误发生的位置。)如果语法正确,请使用 SMA 中的“报告问题”选项报告您在特定元素上遇到的转换错误。在提交此问题时,请在描述中包括导致错误的代码行。
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1003¶
消息: 加载符号表时出错。
类别: 转换错误。
描述¶
当处理符号表中的符号期间出错时,就会出现此问题。符号表是 SMA 基础架构的一部分,允许进行更复杂的转换。此错误可能是源代码中的意外语句造成的。
其他建议 ¶
这不太可能是源代码本身存在错误,而更可能是该工具处理源代码的方式出现了错误。最好的解决方案是在 SMA (https://snowflakecomputing.atlassian.net/o/-MB4z_O8Sl--Tfl3XVml/s/6on4bNAZUZGzMpdEum8X/~/changes/371/user-guide/project-overview/configuration-and-settings#report-an-issue) 中发布问题。
要获得更多支持,您可以发送电子邮件至 sma-support@snowflake.com 联系我们,或者在 SMA (https://snowflakecomputing.atlassian.net/o/-MB4z_O8Sl--Tfl3XVml/s/6on4bNAZUZGzMpdEum8X/~/changes/371/user-guide/project-overview/configuration-and-settings#report-an-issue) 中发布问题。
SPRKPY1004¶
消息: 无法加载符号表。
类别: 解析错误。
描述¶
当工具执行过程中出现意外错误时,就会出现此问题。由于无法加载符号表,该工具无法启动评估或转换过程。
其他建议 ¶
这不太可能是源代码本身存在错误,而更可能是该工具处理源代码的方式出现了错误。最好的解决方案是联系 SMA 支持团队。您可以发送电子邮件至 sma-support@snowflake.com 联系我们。
要获得更多支持,您可以发送电子邮件至 sma-support@snowflake.com 联系我们,或者在 SMA (https://snowflakecomputing.atlassian.net/o/-MB4z_O8Sl--Tfl3XVml/s/6on4bNAZUZGzMpdEum8X/~/changes/371/user-guide/project-overview/configuration-and-settings#report-an-issue) 中发布问题。
SPRKPY1005¶
警告
自 Spark Conversion Core 版本 4.8.0 起,此问题代码已 弃用
消息: pyspark.conf.SparkConf 并非必需
类别: 警告。
描述¶
当该工具检测到使用了并非必需的 pyspark.conf.SparkConf (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkConf.html) 时,就会出现此问题。
场景¶
输入
SparkConf 可以在不带参数调用,也可以带 loadDefaults 调用。
from pyspark import SparkConf
my_conf = SparkConf(loadDefaults=True)
输出
对于带或不带参数两种情况,SMA 都会创建一个 Snowpark Session.builder 对象:
#EWI: SPRKPY1005 => pyspark.conf.SparkConf is not required
#from pyspark import SparkConf
pass
#EWI: SPRKPY1005 => pyspark.conf.SparkConf is not required
my_conf = Session.builder.configs({"user" : "my_user", "password" : "my_password", "account" : "my_account", "role" : "my_role", "warehouse" : "my_warehouse", "database" : "my_database", "schema" : "my_schema"}).create()
其他建议¶
此参数并非必要参数,移除它会插入警告注释。用户不应执行任何其他操作。
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1006¶
警告
自 Spark Conversion Core 版本 4.8.0 起,此问题代码已 弃用
消息: pyspark.context.SparkContext 并非必需
类别: 警告。
描述¶
当该工具检测到使用了 pyspark.context.SparkContext (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.html) 时,就会出现此问题;在 Snowflake 环境中使用它并非必需。
场景¶
输入
在此示例中,有两个上下文用于创建与 Spark 集群的连接
from pyspark import SparkContext
sql_context1 = SparkContext(my_sc1)
sql_context2 = SparkContext(sparkContext=my_sc2)
输出
由于 Snowflake 上没有集群,因此不需要上下文,因此请注意,包含 Spark 属性的变量 my_sc1 和 my_sc2 可能并非必需,或者需要对其进行调整以修复代码。
from snowflake.snowpark import Session
#EWI: SPRKPY1006 => pyspark.sql.context.SparkContext is not required
sql_context1 = my_sc1
#EWI: SPRKPY1006 => pyspark.sql.context.SparkContext is not required
sql_context2 = my_sc2
其他建议¶
此参数并非必要参数,移除它会插入警告注释。用户不应执行任何操作。
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1007¶
警告
自 Spark Conversion Core 版本 4.8.0 起,此问题代码已 弃用
消息: pyspark.sql.context.SQLContext 并非必需
类别: 警告。
描述¶
当该工具检测到使用了非必需的 pyspark.sql.context.SQLContext (https://downloads.apache.org/spark/docs/1.6.1/api/python/pyspark.sql.html) 时,就会出现此问题。
场景¶
输入
下面的示例展示了不同 SparkContext 过载。
from pyspark import SQLContext
my_sc1 = SQLContext(myMaster, myAppName, mySparkHome, myPyFiles, myEnvironment, myBatctSize, mySerializer, my_conf1)
my_sc2 = SQLContext(conf=my_conf2)
my_sc3 = SQLContext()
输出
输出代码为 pyspark.SQLContext 的相应行添加了注释,并将这些场景替换为对配置的引用。请注意,包含 Spark 属性的变量 my_sc1 和 my_sc2 可能并非必需,或者需要对其进行调整以修复代码。
#EWI: SPRKPY1007 => pyspark.sql.context.SQLContext is not required
#from pyspark import SQLContext
pass
#EWI: SPRKPY1007 => pyspark.sql.context.SQLContext is not required
sql_context1 = my_sc1
#EWI: SPRKPY1007 => pyspark.sql.context.SQLContext is not required
sql_context2 = my_sc2
其他建议¶
此参数为非必要参数,工具已将其移除并在源代码中插入警告注释。用户不应执行任何操作。
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1008¶
消息:pyspark.sql.context.HiveContext 并非必需
类别:警告。
描述¶
当该工具检测到使用了非必需的 pyspark.sql.context.HiveContext (https://downloads.apache.org/spark/docs/1.6.1/api/python/pyspark.sql.html#pyspark.sql.HiveContext) 时,就会出现此问题。
场景¶
输入
在此示例中,将演示如何创建与 Hive 数据仓库的连接。
from pyspark.sql import HiveContext
hive_context = HiveContext(sc)
df = hive_context.table("myTable")
df.show()
输出
在 Snowflake 中没有 Hive 数据仓库,因此无需配置 Hive 上下文。仍可在 Snowflake 中使用 parquet 文件,具体操作方法请参阅此 教程。
#EWI: SPRKPY1008 => pyspark.sql.context.HiveContext is not required
hive_context = sc
df = hive_context.table("myTable")
df.show()
sc 变量指的是 Snowpark 会话对象
推荐修复方法
对于示例中的输出代码,应该添加与以下代码类似的 Snowpark 会话对象:
## Here manually we can add the Snowpark Session object via a json config file called connection.json
import json
from snowflake.snowpark import Session
jsonFile = open("connection.json")
connection_parameter = json.load(jsonFile)
jsonFile.close()
sc = Session.builder.configs(connection_parameter).getOrCreate()
hive_context = sc
df = hive_context.table("myTable")
df.show()
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1009¶
消息: pyspark.sql.dataframe.DataFrame.approxQuantile 有替代方案
类别: 警告。
描述¶
当该工具检测到使用了 pyspark.sql.dataframe.DataFrame.approxQuantile (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.approxQuantile.html) 时,就会出现此问题;此问题有替代方案。
场景¶
输入
需要重点了解的是,Pyspark 使用两种不同的 approxQuantile 函数,在这里使用的是 DataFrame approxQuantile (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.approxQuantile.html) 版本
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
data = [['Sun', 10],
['Mon', 64],
['Thr', 12],
['Wen', 15],
['Thu', 68],
['Fri', 14],
['Sat', 13]]
columns = ['Day', 'Ammount']
df = spark.createDataFrame(data, columns)
df.approxQuantile('Ammount', [0.25, 0.5, 0.75], 0)
输出
SMA 会在使用 approxQuantile 的代码行上方返回 EWI SPRKPY1009,以便您可以用来识别需要修改的位置。
from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Sun', 10],
['Mon', 64],
['Thr', 12],
['Wen', 15],
['Thu', 68],
['Fri', 14],
['Sat', 13]]
columns = ['Day', 'Ammount']
df = spark.createDataFrame(data, columns)
#EWI: SPRKPY1009 => pyspark.sql.dataframe.DataFrame.approxQuantile has a workaround, see documentation for more info
df.approxQuantile('Ammount', [0.25, 0.5, 0.75], 0)
推荐修复方法
使用 Snowpark approxQuantile 方法。部分参数不匹配,因此需要进行一些手动调整。对于输出代码的示例,建议的修复方法可能是:
from snowflake.snowpark import Session
...
df = spark.createDataFrame(data, columns)
df.stat.approx_quantile('Ammount', [0.25, 0.5, 0.75])
pyspark.sql.dataframe.DataFrame.approxQuantile 的 relativeError 参数在 SnowPark 中不存在。
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1010¶
消息:pyspark.sql.dataframe.DataFrame.checkpoint 有替代方案
类别:警告。
描述¶
当该工具检测到使用了 pyspark.sql.dataframe.DataFrame.checkpoint (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.checkpoint.html) 时,就会出现此问题;此问题有替代方案。
场景¶
输入
在 PySpark 中,检查点用于截断 DataFrame 的逻辑计划,这是为了避免逻辑计划的增长。
import tempfile
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
data = [['Q1', 300000],
['Q2', 60000],
['Q3', 500002],
['Q4', 130000]]
columns = ['Quarter', 'Score']
df = spark.createDataFrame(data, columns)
with tempfile.TemporaryDirectory() as d:
spark.sparkContext.setCheckpointDir("/tmp/bb")
df.checkpoint(False)
输出
SMA 会在使用 approxQuantile 的代码行上方返回 EWI SPRKPY1010,以便您可以用来识别需要修改的位置。请注意,这也将 setCheckpointDir (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.setCheckpointDir.html) 标记为不受支持,但修复不需要使用已设置检查点的目录。
import tempfile
from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 300000],
['Q2', 60000],
['Q3', 500002],
['Q4', 130000]]
columns = ['Quarter', 'Score']
df = spark.createDataFrame(data, columns)
with tempfile.TemporaryDirectory() as d:
#EWI: SPRKPY1002 => pyspark.context.SparkContext.setCheckpointDir is not supported
spark.setCheckpointDir("/tmp/bb")
#EWI: SPRKPY1010 => pyspark.sql.dataframe.DataFrame.checkpoint has a workaround, see documentation for more info
df.checkpoint(False)
推荐修复方法
无需显式设置检查点,这是因为 Snowpark 使用基于 SQL 的操作,这些操作经过 Snowflake 查询优化引擎优化,从而避免了不必要的计算或逻辑计划失控增长的问题。
但是,在某些情况下,可能需要在 DataFrame 上持久保存计算结果。在这种情况下,您可以通过将 DataFrame 写入 Snowflake 表或 Snowflake 临时表 来保存物化结果。
通过使用永久表,即使在会话结束之后,也可以随时访问计算结果。
from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 300000],
['Q2', 60000],
['Q3', 500002],
['Q4', 130000]]
columns = ['Quarter', 'Score']
df = spark.createDataFrame(data, columns)
df.write.save_as_table("my_table", table_type="temporary") # Save the dataframe into Snowflake table "my_table".
df2 = Session.table("my_table") # Now I can access the stored result quering the table "my_table"
对于另一种修复方法,使用临时表的优点是在会话结束后删除该表:
from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 300000],
['Q2', 60000],
['Q3', 500002],
['Q4', 130000]]
columns = ['Quarter', 'Score']
df = spark.createDataFrame(data, columns)
df.write.save_as_table("my_temp_table", table_type="temporary") # Save the dataframe into Snowflake table "my_temp_table".
df2 = Session.table("my_temp_table") # Now I can access the stored result quering the table "my_temp_table"
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1011¶
消息:pyspark.sql.dataframe.DataFrameStatFunctions.approxQuantile 有替代方案
类别:警告。
描述¶
当该工具检测到使用了 pyspark.sql.dataframe.DataFrameStatFunctions.approxQuantile (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameStatFunctions.approxQuantile.html#pyspark.sql.DataFrameStatFunctions.approxQuantile) 时,就会出现此问题;此问题有替代方案。
场景¶
输入
需要重点了解的是,Pyspark 使用两种不同的 approxQuantile 函数,在这里使用的是 DataFrameStatFunctions approxQuantile (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameStatFunctions.approxQuantile.html#pyspark.sql.DataFrameStatFunctions.approxQuantile) 版本。
import tempfile
from pyspark.sql import SparkSession, DataFrameStatFunctions
spark = SparkSession.builder.getOrCreate()
data = [['Q1', 300000],
['Q2', 60000],
['Q3', 500002],
['Q4', 130000]]
columns = ['Quarter', 'Gain']
df = spark.createDataFrame(data, columns)
aprox_quantille = DataFrameStatFunctions(df).approxQuantile('Gain', [0.25, 0.5, 0.75], 0)
print(aprox_quantille)
输出
SMA 会在使用 approxQuantile 的代码行上方返回 EWI SPRKPY1011,以便您可以用来识别需要修改的位置。
import tempfile
from snowflake.snowpark import Session, DataFrameStatFunctions
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 300000],
['Q2', 60000],
['Q3', 500002],
['Q4', 130000]]
columns = ['Quarter', 'Gain']
df = spark.createDataFrame(data, columns)
#EWI: SPRKPY1011 => pyspark.sql.dataframe.DataFrameStatFunctions.approxQuantile has a workaround, see documentation for more info
aprox_quantille = DataFrameStatFunctions(df).approxQuantile('Gain', [0.25, 0.5, 0.75], 0)
推荐修复方法
可以使用 Snowpark approxQuantile 方法。部分参数不匹配,因此需要进行一些手动调整。对于输出代码的示例,建议的修复方法可能是:
from snowflake.snowpark import Session # remove DataFrameStatFunctions because is not required
...
df = spark.createDataFrame(data, columns)
aprox_quantille = df.stat.approx_quantile('Ammount', [0.25, 0.5, 0.75])
pyspark.sql.dataframe.DataFrame.approxQuantile 的 relativeError 参数在 SnowPark 中不存在。
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1012¶
警告
此问题代码已 弃用
消息:pyspark.sql.dataframe.DataFrameStatFunctions.writeTo 有替代方案
类别:警告。
描述¶
当该工具检测到使用了 pyspark.sql.dataframe.DataFrameStatFunctions.writeTo (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.writeTo.html) 时,就会出现此问题;此问题有替代方案。
场景¶
输入
在此示例中,将 DataFrame df 写入 Spark 表“table”。
writer = df.writeTo("table")
输出
SMA 会在使用 DataFrameStatFunctions.writeTo 的代码行上方返回 EWI SPRKPY1012,以便您可以用来识别需要修改的位置。
#EWI: SPRKPY1012 => pyspark.sql.dataframe.DataFrameStatFunctions.writeTo has a workaround, see documentation for more info
writer = df.writeTo("table")
建议的修复方法
改用 df.write.SaveAsTable()。
import df.write as wt
writer = df.write.save_as_table(table)
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1013¶
消息:pyspark.sql.functions.acosh 有替代方案
类别:警告。
描述¶
当该工具检测到使用了 pyspark.sql.functions.acosh (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.acosh.html) 时,就会出现此问题;此问题有替代方案。
场景¶
输入
在此示例中,pyspark 使用 pyspark.sql.functions.acosh (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.acosh.html) 计算 DataFrame 的 acosh
from pyspark.sql import SparkSession
from pyspark.sql.functions import acosh
spark = SparkSession.builder.getOrCreate()
data = [['V1', 30],
['V2', 60],
['V3', 50],
['V4', 13]]
columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
df_with_acosh = df.withColumn("acosh_value", acosh(df["value"]))
输出
SMA 会在使用 acosh 的代码行上方返回 EWI SPRKPY1013,以便您可以用来识别需要修改的位置。
from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['V1', 30],
['V2', 60],
['V3', 50],
['V4', 13]]
columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
#EWI: SPRKPY1013 => pyspark.sql.functions.acosh has a workaround, see documentation for more info
df_with_acosh = df.withColumn("acosh_value", acosh(df["value"]))
推荐修复方法
没有直接的“acosh”实现,但可以改用“call_function”,并使用“acosh”作为第一个参数,使用 colName 作为第二个参数。
import snowflake.snowpark as snowpark
from snowflake.snowpark import Session
from snowflake.snowpark.functions import call_function, col
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['V1', 30],
['V2', 60],
['V3', 50],
['V4', 13]]
columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
df_with_acosh = df.select(call_function('ACOSH', col('value')))
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1014¶
消息:pyspark.sql.functions.asinh 有替代方案
类别:警告。
描述¶
当该工具检测到使用了 pyspark.sql.functions.asinh (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.asinh.html) 时,就会出现此问题;此问题有替代方案。
场景¶
输入
在此示例中,pyspark 使用 pyspark.sql.functions.asinh (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.asinh.html) 计算 DataFrame 的 asinh。
from pyspark.sql import SparkSession
from pyspark.sql.functions import asinh
spark = SparkSession.builder.getOrCreate()
data = [['V1', 3.0],
['V2', 60.0],
['V3', 14.0],
['V4', 3.1]]
columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
df_result = df.withColumn("asinh_value", asinh(df["value"]))
输出
SMA 会在使用 asinh 的代码行上方返回 EWI SPRKPY1014,以便您可以用来识别需要修改的位置。
from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['V1', 3.0],
['V2', 60.0],
['V3', 14.0],
['V4', 3.1]]
columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
#EWI: SPRKPY1014 => pyspark.sql.functions.asinh has a workaround, see documentation for more info
df_result = df.withColumn("asinh_value", asinh(df["value"]))
建议的修复方法
没有直接的“asinh”实现,但可以改用“call_function”,并使用“asinh”作为第一个参数,使用 colName 作为第二个参数。
import snowflake.snowpark as snowpark
from snowflake.snowpark import Session
from snowflake.snowpark.functions import call_function, col
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['V1', 3.0],
['V2', 60.0],
['V3', 14.0],
['V4', 3.1]]
columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
df_result = df.select(call_function('asinh', col('value')))
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1015¶
消息:pyspark.sql.functions.atanh 有替代方案
类别:警告。
描述¶
当该工具检测到使用了 pyspark.sql.functions.atanh (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.atanh.html) 时,就会出现此问题;此问题有替代方案。
场景¶
输入
在此示例中,pyspark 使用 pyspark.sql.functions.atanh (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.atanh.html) 计算 DataFrame 的 atanh。
from pyspark.sql import SparkSession
from pyspark.sql.functions import atanh
spark = SparkSession.builder.getOrCreate()
data = [['V1', 0.14],
['V2', 0.32],
['V3', 0.4],
['V4', -0.36]]
columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
df_result = df.withColumn("atanh_value", atanh(df["value"]))
输出
SMA 会在使用 atanh 的代码行上方返回 EWI SPRKPY1015,以便您可以用来识别需要修改的位置。
from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['V1', 0.14],
['V2', 0.32],
['V3', 0.4],
['V4', -0.36]]
columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
#EWI: SPRKPY1015 => pyspark.sql.functions.atanh has a workaround, see documentation for more info
df_result = df.withColumn("atanh_value", atanh(df["value"]))
推荐修复方法
没有直接的“atanh”实现,但可以改用“call_function”,并使用“atanh”作为第一个参数,使用 colName 作为第二个参数。
import snowflake.snowpark as snowpark
from snowflake.snowpark import Session
from snowflake.snowpark.functions import call_function, col
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['V1', 0.14],
['V2', 0.32],
['V3', 0.4],
['V4', -0.36]]
columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
df_result = df.select(call_function('atanh', col('value')))
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1016¶
警告
自 Spark Conversion Core 版本 0.11.7 起,此问题代码已 弃用
消息:pyspark.sql.functions.collect_set 有替代方案
类别:警告。
描述¶
当该工具检测到使用了 pyspark.sql.functions.collect_set (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.collect_set.html) 时,就会出现此问题;此问题有替代方案。
场景¶
输入
使用 collectset 来获取 _colname 的元素(不含重复项):
col = collect_set(colName)
输出
SMA 会在使用 collect_set 的代码行上方返回 EWI SPRKPY1016,以便您可以用来识别需要修改的位置。
#EWI: SPRKPY1016 => pyspark.sql.functions.collect_set has a workaround, see documentation for more info
col = collect_set(colName)
推荐修复方法
使用函数 array_agg,然后添加第二个值为 True 的实参。
col = array_agg(col, True)
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1017¶
警告
自 Spark Conversion Core 版本 4.8.0 起,此问题代码已 弃用
pyspark.sql.functions.date_add 有替代方案
类别:警告。
描述¶
当该工具检测到使用了 pyspark.sql.functions.date_add (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.date_add.html) 时,就会出现此问题;此问题有替代方案。
场景¶
输入
在此示例中,使用 date_add 来为 DataFrame df 计算当前日期后 5 天的日期。
col = df.select(date_add(df.colName, 5))
输出
SMA 会在使用 date_add 的代码行上方返回 EWI SPRKPY1017 警告,以便您可以用来识别需要修改的位置。
#EWI: SPRKPY1017 => pyspark.sql.functions.date_add has a workaround, see documentation for more info
col = df.select(date_add(df.colName, 5))
推荐修复方法
导入 snowflake.snowpark.functions,其包含 date_add(别名 dateAdd)函数的实现。
from snowflake.snowpark.functions import date_add
col = df.select(date_add(df.dt, 1))
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1018¶
警告
自 Spark Conversion Core 版本 4.8.0 起,此问题代码已 弃用
消息:pyspark.sql.functions.date_sub 有替代方案
类别:警告。
描述¶
当该工具检测到使用了 pyspark.sql.functions.date_sub (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.date_sub.html) 时,就会出现此问题;此问题有替代方案。
场景¶
输入
在此示例中,使用 date_add 来为 DataFrame df 计算当前日期前 5 天的日期。
col = df.select(date_sub(df.colName, 5))
输出
SMA 会在使用 date_sub 的代码行上方返回 EWI SPRKPY1018 警告,以便您可以用来识别需要修改的位置。
#EWI: SPRKPY1018 => pyspark.sql.functions.date_sub has a workaround, see documentation for more info
col = df.select(date_sub(df.colName, 5))
推荐修复方法
导入 snowflake.snowpark.functions,其包含 date_sub 函数的实现。
from pyspark.sql.functions import date_sub
df.withColumn("date", date_sub(df.colName, 5))
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1019¶
警告
自 Spark Conversion Core 版本 4.8.0 起,此问题代码已 弃用
消息:pyspark.sql.functions.datediff 有替代方案
类别:警告。
描述¶
当该工具检测到使用了 pyspark.sql.functions.datediff (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.datediff.html) 时,就会出现此问题;此问题有替代方案。
场景¶
输入
在此示例中,我们使用 datediff 来计算与 'today' 和其他日期的天数差。
contacts = (contacts
#days since last event
.withColumn('daysSinceLastEvent', datediff(lit(today),'lastEvent'))
#days since deployment
.withColumn('daysSinceLastDeployment', datediff(lit(today),'lastDeploymentEnd'))
#days since online training
.withColumn('daysSinceLastTraining', datediff(lit(today),'lastTraining'))
#days since last RC login
.withColumn('daysSinceLastRollCallLogin', datediff(lit(today),'adx_identity_lastsuccessfullogin'))
#days since last EMS login
.withColumn('daysSinceLastEMSLogin', datediff(lit(today),'vms_lastuserlogin'))
)
输出
SMA 会在使用 datediff 的代码行上方返回 EWI SPRKPY1019,以便您可以用来识别需要修改的位置。
from pyspark.sql.functions import datediff
#EWI: SPRKPY1019 => pyspark.sql.functions.datediff has a workaround, see documentation for more info
contacts = (contacts
#days since last event
.withColumn('daysSinceLastEvent', datediff(lit(today),'lastEvent'))
#days since deployment
.withColumn('daysSinceLastDeployment', datediff(lit(today),'lastDeploymentEnd'))
#days since online training
.withColumn('daysSinceLastTraining', datediff(lit(today),'lastTraining'))
#days since last RC login
.withColumn('daysSinceLastRollCallLogin', datediff(lit(today),'adx_identity_lastsuccessfullogin'))
#days since last EMS login
.withColumn('daysSinceLastEMSLogin', datediff(lit(today),'vms_lastuserlogin'))
)
SMA 将 pyspark.sql.functions.datediff 转换为 snowflake.snowpark.functions.daydiff,后者同样用于计算两个日期之间的天数差。
推荐修复方法
datediff(part: string ,end: ColumnOrName, start: ColumnOrName)
操作: 导入 snowflake.snowpark.functions,其包含 datediff 函数的实现,该函数需要 日期时间部分 的额外参数,并允许在计算日期之间的天数差时具有更大的灵活性。
from snowflake.snowpark import Session
from snowflake.snowpark.functions import datediff
contacts = (contacts
#days since last event
.withColumn('daysSinceLastEvent', datediff('day', lit(today),'lastEvent'))
#days since deployment
.withColumn('daysSinceLastDeployment', datediff('day',lit(today),'lastDeploymentEnd'))
#days since online training
.withColumn('daysSinceLastTraining', datediff('day', lit(today),'lastTraining'))
#days since last RC login
.withColumn('daysSinceLastRollCallLogin', datediff('day', lit(today),'adx_identity_lastsuccessfullogin'))
#days since last EMS login
.withColumn('daysSinceLastEMSLogin', datediff('day', lit(today),'vms_lastuserlogin'))
)
建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1020¶
消息:pyspark.sql.functions.instr 有替代方案
类别:警告。
描述¶
当该工具检测到使用了 pyspark.sql.functions.instr (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.instr.html) 时,就会出现此问题;此问题有替代方案。
场景¶
输入
以下是使用 pyspark instr 的基本示例:
from pyspark.sql import SparkSession
from pyspark.sql.functions import instr
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('abcd',)], ['test',])
df.select(instr(df.test, 'cd').alias('result')).collect()
输出:
SMA 会在使用 instr 的代码行上方返回 EWI SPRKPY1020,以便您可以用来识别需要修改的位置。
from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
df = spark.createDataFrame([('abcd',)], ['test',])
#EWI: SPRKPY1020 => pyspark.sql.functions.instr has a workaround, see documentation for more info
df.select(instr(df.test, 'cd').alias('result')).collect()
推荐修复方法
需要手动进行以下更改:使用函数 charindex 并更改前两个参数的顺序。
import snowflake.snowpark as snowpark
from snowflake.snowpark import Session
from snowflake.snowpark.functions import charindex, lit
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
df = spark.createDataFrame([('abcd',)], ['test',])
df.select(charindex(lit('cd'), df.test).as_('result')).show()
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1021¶
警告
此问题代码已 弃用
消息:pyspark.sql.functions.last 有替代方案,请参阅文档了解更多信息
类别:警告
描述¶
当 SMA 检测到使用了 pyspark.sql.functions.last (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.last.html) 函数时,就会出现此问题;此问题有替代方案。
场景¶
输入
以下示例使用 pyspark.sql.functions.last 函数生成了此 EWI。在此示例中,last 函数用于获取每个名称的最后一个 value。
df = spark.createDataFrame([("Alice", 1), ("Bob", 2), ("Charlie", 3), ("Alice", 4), ("Bob", 5)], ["name", "value"])
df_grouped = df.groupBy("name").agg(last("value").alias("last_value"))
输出
SMA 在输出代码中添加了 EWI SPRKPY1021,以提示 Snowpark 并非完全支持此函数,但它有替代方案。
df = spark.createDataFrame([("Alice", 1), ("Bob", 2), ("Charlie", 3), ("Alice", 4), ("Bob", 5)], ["name", "value"])
#EWI: SPRKPY1021 => pyspark.sql.functions.last has a workaround, see documentation for more info
df_grouped = df.groupBy("name").agg(last("value").alias("last_value"))
推荐修复方法
作为一种替代方案,可以使用 Snowflake LAST_VALUE 函数。要从 Snowpark 调用此函数,请使用 snowflake.snowpark.functions.call_builtin 函数,并传递字符串 last_value 作为第一个实参,传递相应的列作为第二个实参。如果您在 last 函数中使用该列的名称,则应在调用 call_builtin 函数时将其转换为列。
df = spark.createDataFrame([("Alice", 1), ("Bob", 2), ("Charlie", 3), ("Alice", 4), ("Bob", 5)], ["name", "value"])
df_grouped = df.groupBy("name").agg(call_builtin("last_value", col("value")).alias("last_value"))
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
描述:>- CSV、JSON 和 PARQUET 方法中的 mode 参数被转换为 overwrite
SPRKPY1022¶
消息:pyspark.sql.functions.log10 有替代方案,请参阅文档了解更多信息
类别:警告
描述¶
当 SMA 检测到使用了 pyspark.sql.functions.log10 (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.log10.html) 函数时,就会出现此问题;此问题有替代方案。
场景¶
输入
以下示例使用 pyspark.sql.functions.log10 函数生成了此 EWI。在此示例中,log10 函数用于计算 value 列以 10 为底的对数值。
df = spark.createDataFrame([(1,), (10,), (100,), (1000,), (10000,)], ["value"])
df_with_log10 = df.withColumn("log10_value", log10(df["value"]))
输出
SMA 在输出代码中添加了 EWI SPRKPY1022,以提示 Snowpark 并非完全支持此函数,但它有替代方案。
df = spark.createDataFrame([(1,), (10,), (100,), (1000,), (10000,)], ["value"])
#EWI: SPRKPY1022 => pyspark.sql.functions.log10 has a workaround, see documentation for more info
df_with_log10 = df.withColumn("log10_value", log10(df["value"]))
推荐修复方法
作为一种替代方案,可以使用snowflake.snowpark.functions.log 函数并传递字面量值 10 作为对数的底。
df = spark.createDataFrame([(1,), (10,), (100,), (1000,), (10000,)], ["value"])
df_with_log10 = df.withColumn("log10_value", log(10, df["value"]))
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1023¶
消息:pyspark.sql.functions.log1p 有替代方案,请参阅文档了解更多信息
类别:警告
描述¶
当 SMA 检测到使用了 pyspark.sql.functions.log1p (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.log1p.html) 函数时,就会出现此问题;此问题有替代方案。
场景¶
输入
以下示例使用 pyspark.sql.functions.log1p 函数生成了此 EWI。在此示例中,log1p 函数用于计算 value 列的自然对数。
df = spark.createDataFrame([(0,), (1,), (10,), (100,)], ["value"])
df_with_log1p = df.withColumn("log1p_value", log1p(df["value"]))
输出
SMA 在输出代码中添加了 EWI SPRKPY1023,以提示 Snowpark 并非完全支持此函数,但它有替代方案。
df = spark.createDataFrame([(0,), (1,), (10,), (100,)], ["value"])
#EWI: SPRKPY1023 => pyspark.sql.functions.log1p has a workaround, see documentation for more info
df_with_log1p = df.withColumn("log1p_value", log1p(df["value"]))
推荐修复方法
作为一种替代方案,可以使用 call_function 函数,并传递字符串 ln 作为第一个实参,然后在第二个实参添加 1。
df = spark.createDataFrame([(0,), (1,), (10,), (100,)], ["value"])
df_with_log1p = df.withColumn("log1p_value", call_function("ln", lit(1) + df["value"]))
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1024¶
消息:pyspark.sql.functions.log2 有替代方案,请参阅文档了解更多信息
类别:警告
描述¶
当 SMA 检测到使用了 pyspark.sql.functions.log2 (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.log2.html) 函数时,就会出现此问题;此问题有替代方案。
场景¶
输入
以下示例使用 pyspark.sql.functions.log2 函数生成了此 EWI。在此示例中,log2 函数用于计算 value 列以 2 为底的对数值。
df = spark.createDataFrame([(1,), (2,), (4,), (8,), (16,)], ["value"])
df_with_log2 = df.withColumn("log2_value", log2(df["value"]))
输出
SMA 在输出代码中添加了 EWI SPRKPY1024,以提示 Snowpark 并非完全支持此函数,但它有替代方案。
df = spark.createDataFrame([(1,), (2,), (4,), (8,), (16,)], ["value"])
#EWI: SPRKPY1024 => pyspark.sql.functions.log2 has a workaround, see documentation for more info
df_with_log2 = df.withColumn("log2_value", log2(df["value"]))
推荐修复方法
作为一种替代方案,可以使用 snowflake.snowpark.functions.log 函数并传递字面量值 2 作为对数的底。
df = session.createDataFrame([(1,), (2,), (4,), (8,), (16,)], ["value"])
df_with_log2 = df.withColumn("log2_value", log(2, df["value"]))
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1025¶
警告
此问题代码已 弃用
消息:pyspark.sql.functions.ntile 有替代方案,请参阅文档了解更多信息
类别:警告
描述¶
当 SMA 检测到使用了 pyspark.sql.functions.ntile (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.ntile.html) 函数时,就会出现此问题;此问题有替代方案。
场景¶
输入
以下示例使用 pyspark.sql.functions.ntile 函数生成了此 EWI。在此示例中,使用 ntile 函数将行分成 3 个桶。
df = spark.createDataFrame([("Alice", 50), ("Bob", 30), ("Charlie", 60), ("David", 90), ("Eve", 70), ("Frank", 40)], ["name", "score"])
windowSpec = Window.orderBy("score")
df_with_ntile = df.withColumn("bucket", ntile(3).over(windowSpec))
输出
SMA 在输出代码中添加了 EWI SPRKPY1025,以提示 Snowpark 并非完全支持此函数,但它有替代方案。
df = spark.createDataFrame([("Alice", 50), ("Bob", 30), ("Charlie", 60), ("David", 90), ("Eve", 70), ("Frank", 40)], ["name", "score"])
windowSpec = Window.orderBy("score")
#EWI: SPRKPY1025 => pyspark.sql.functions.ntile has a workaround, see documentation for more info
df_with_ntile = df.withColumn("bucket", ntile(3).over(windowSpec))
推荐修复方法
Snowpark 有一个等效的 ntile 函数,但传递给它的实参应该是列。作为一种替代方案,可以使用 snowflake.snowpark.functions.lit 函数将字面量实参转换为列。
df = spark.createDataFrame([("Alice", 50), ("Bob", 30), ("Charlie", 60), ("David", 90), ("Eve", 70), ("Frank", 40)], ["name", "score"])
windowSpec = Window.orderBy("score")
df_with_ntile = df.withColumn("bucket", ntile(lit(3)).over(windowSpec))
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1026¶
警告
自 Spark Conversion Core 4.3.2 起,此问题代码已 弃用
消息:pyspark.sql.readwriter.DataFrameReader.csv 有替代方案,请参阅文档了解更多信息
类别:警告
描述¶
当 SMA 检测到使用了 pyspark.sql.readwriter.DataFrameReader.csv (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.csv.html) 函数时,就会出现此问题;此问题有替代方案。
场景¶
输入
以下示例使用 pyspark.sql.readwriter.DataFrameReader.csv 函数生成了此 EWI。在此示例中,csv 函数用于读取具有给定架构的多个 .csv 文件,并使用一些额外的选项(例如 encoding、header 和 sep)来微调读取文件的行为。
file_paths = [
"path/to/your/file1.csv",
"path/to/your/file2.csv",
"path/to/your/file3.csv",
]
df = session.read.csv(
file_paths,
schema=my_schema,
encoding="UTF-8",
header=True,
sep=","
)
输出
SMA 在输出代码中添加了 EWI SPRKPY1026,以提示 Snowpark 并非完全支持此函数,但它有替代方案。
file_paths = [
"path/to/your/file1.csv",
"path/to/your/file2.csv",
"path/to/your/file3.csv",
]
#EWI: SPRKPY1026 => pyspark.sql.readwriter.DataFrameReader.csv has a workaround, see documentation for more info
df = session.read.csv(
file_paths,
schema=my_schema,
encoding="UTF-8",
header=True,
sep=","
)
推荐修复方法
本节将说明如何配置 path 参数、schema 参数和一些 options,以确保其在 Snowpark 中的正常运行。
1. path 参数
Snowpark 要求将 path 参数为暂存区位置,因此,作为一种替代方案,可以创建一个临时暂存区,并使用前缀 file:// 将每个 .csv 文件添加到该暂存区。
2. schema 参数
Snowpark 不允许将 schema 定义为 csv 函数的参数。作为一种替代方案,可以使用 snowflake.snowpark.DataFrameReader.schema 函数。
3. options 参数
Snowpark 不允许将 extra options 定义为 csv 函数的参数。作为一种替代方案,对于其中许多参数,可以使用 snowflake.snowpark.DataFrameReader.option 函数将这些参数指定为 DataFrameReader 的选项。
备注
Snowpark 不支持 以下选项:
columnNameOfCorruptRecord
emptyValue
enforceSchema
header
ignoreLeadingWhiteSpace
ignoreTrailingWhiteSpace
inferSchema
locale
maxCharsPerColumn
maxColumns
mode
multiLine
nanValue
negativeInf
nullValue
positiveInf
quoteAll
samplingRatio
timestampNTZFormat
unescapedQuoteHandling
以下是应用上述建议后,可在 Snowpark 中正常运行的完整代码示例:
stage = f'{session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {stage}')
session.file.put(f"file:///path/to/your/file1.csv", f"@{stage}")
session.file.put(f"file:///path/to/your/file2.csv", f"@{stage}")
session.file.put(f"file:///path/to/your/file3.csv", f"@{stage}")
df = session.read.schema(my_schema).option("encoding", "UTF-8").option("sep", ",").csv(stage)
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1027¶
警告
自 Spark Conversion Core 4.5.2 起,此问题代码已 弃用
消息:pyspark.sql.readwriter.DataFrameReader.json 有替代方案,请参阅文档了解更多信息
类别:警告
描述¶
当 SMA 检测到使用了 pyspark.sql.readwriter.DataFrameReader.json (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.json.html) 函数时,就会出现此问题;此问题有替代方案。
场景¶
输入
以下示例使用 pyspark.sql.readwriter.DataFrameReader.json 函数生成了此 EWI。在此示例中,json 函数用于读取具有给定架构的多个 .json 文件,并使用一些额外的选项(例如 primitiveAsString 和 dateFormat)来微调读取文件的行为。
file_paths = [
"path/to/your/file1.json",
"path/to/your/file2.json",
"path/to/your/file3.json",
]
df = session.read.json(
file_paths,
schema=my_schema,
primitiveAsString=True,
dateFormat="2023-06-20"
)
输出
SMA 在输出代码中添加了 EWI SPRKPY1027,以提示 Snowpark 并非完全支持此函数,但它有替代方案。
file_paths = [
"path/to/your/file1.json",
"path/to/your/file2.json",
"path/to/your/file3.json",
]
#EWI: SPRKPY1027 => pyspark.sql.readwriter.DataFrameReader.json has a workaround, see documentation for more info
df = session.read.json(
file_paths,
schema=my_schema,
primitiveAsString=True,
dateFormat="2023-06-20"
)
推荐修复方法
本节将说明如何配置 path 参数、schema 参数和一些 options,以确保其在 Snowpark 中的正常运行。
1. path 参数
Snowpark 要求将 path 参数为暂存区位置,因此,作为一种替代方案,可以创建一个临时暂存区,并使用前缀 file:// 将每个 .json 文件添加到该暂存区。
2. schema 参数
Snowpark 不允许将 schema 定义为 json 函数的参数。作为一种替代方案,可以使用 snowflake.snowpark.DataFrameReader.schema 函数。
3. options 参数
Snowpark 不允许将 extra options 定义为 json 函数的参数。作为一种替代方案,对于其中许多参数,可以使用 snowflake.snowpark.DataFrameReader.option 函数将这些参数指定为 DataFrameReader 的选项。
备注
Snowpark 不支持以下选项:
allowBackslashEscapingAnyCharacter
allowComments
allowNonNumericNumbers
allowNumericLeadingZero
allowSingleQuotes
allowUnquotedControlChars
allowUnquotedFieldNames
columnNameOfCorruptRecord
dropFiledIfAllNull
encoding
ignoreNullFields
lineSep
locale
mode
multiline
prefersDecimal
primitiveAsString
samplingRatio
timestampNTZFormat
timeZone
以下是应用上述建议后,可在 Snowpark 中正常运行的完整代码示例:
stage = f'{session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {stage}')
session.file.put(f"file:///path/to/your/file1.json", f"@{stage}")
session.file.put(f"file:///path/to/your/file2.json", f"@{stage}")
session.file.put(f"file:///path/to/your/file3.json", f"@{stage}")
df = session.read.schema(my_schema).option("dateFormat", "2023-06-20").json(stage)
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1028¶
消息:pyspark.sql.readwriter.DataFrameReader.orc 有替代方案,请参阅文档了解更多信息
类别:警告
描述¶
当 SMA 检测到使用了 pyspark.sql.readwriter.DataFrameReader.orc (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.orc.html) 函数时,就会出现此问题;此问题有替代方案。
场景¶
输入
以下示例使用 pyspark.sql.readwriter.DataFrameReader.orc 函数生成了此 EWI。在此示例中,使用 orc 函数来读取多个 .orc 文件,并使用一些额外的选项(例如 mergeSchema 和 recursiveFileLookup)来微调读取文件的行为。
file_paths = [
"path/to/your/file1.orc",
"path/to/your/file2.orc",
"path/to/your/file3.orc",
]
df = session.read.orc(
file_paths,
mergeSchema="True",
recursiveFileLookup="True"
)
输出
SMA 在输出代码中添加了 EWI SPRKPY1028,以提示 Snowpark 并非完全支持此函数,但它有替代方案。
file_paths = [
"path/to/your/file1.orc",
"path/to/your/file2.orc",
"path/to/your/file3.orc",
]
#EWI: SPRKPY1028 => pyspark.sql.readwriter.DataFrameReader.orc has a workaround, see documentation for more info
df = session.read.orc(
file_paths,
mergeSchema="True",
recursiveFileLookup="True"
)
推荐修复方法
本节将说明如何配置 path 参数和额外的 options,以使其在 Snowpark 中正常工作。
1. path 参数
Snowpark 要求将 path 参数为暂存区位置,因此,作为一种替代方案,可以创建一个临时暂存区,并使用前缀 file:// 将每个 .orc 文件添加到该暂存区。
2. options 参数
Snowpark 不允许将 extra options 定义为 orc 函数的参数。作为一种替代方案,对于其中许多参数,可以使用 snowflake.snowpark.DataFrameReader.option 函数将这些参数指定为 DataFrameReader 的选项。
备注
Snowpark 不支持以下选项:
compression
mergeSchema
以下是应用上述建议后,可在 Snowpark 中正常运行的完整代码示例:
stage = f'{session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {stage}')
session.file.put(f"file:///path/to/your/file1.orc", f"@{stage}")
session.file.put(f"file:///path/to/your/file2.orc", f"@{stage}")
session.file.put(f"file:///path/to/your/file3.orc", f"@{stage}")
df = session.read.option(recursiveFileLookup, "True").orc(stage)
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1029¶
消息:当该工具检测到使用了 pyspark.sql.readwriter.DataFrameReader.parquet 时,就会出现此问题。支持此功能,但由于 Snowpark 和 Spark API 之间存在一些差异,可能需要手动进行一些更改。
类别:警告
描述¶
当 SMA 检测到使用了 pyspark.sql.readwriter.DataFrameReader.parquet (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.parquet.html) 函数时,就会出现此问题。Snowpark 支持此函数,但存在一些差异,需要手动进行一些更改。
场景¶
输入
以下示例使用 pyspark.sql.readwriter.DataFrameReader.parquet 函数生成了此 EWI。
file_paths = [
"path/to/your/file1.parquet",
"path/to/your/file2.parquet",
"path/to/your/file3.parquet",
]
df = session.read.parquet(
*file_paths,
mergeSchema="true",
pathGlobFilter="*file*",
recursiveFileLookup="true",
modifiedBefore="2024-12-31T00:00:00",
modifiedAfter="2023-12-31T00:00:00"
)
输出
SMA 在输出代码中添加了 EWI SPRKPY1029,以提示 Snowpark 支持此函数,但需要进行一些手动调整。请注意,Snowpark 支持的 options 将转换为 option 函数调用,不支持的 options 将被移除。具体说明详见后续章节。
file_paths = [
"path/to/your/file1.parquet",
"path/to/your/file2.parquet",
"path/to/your/file3.parquet"
]
#EWI: SPRKPY1076 => Some of the included parameters are not supported in the parquet function, the supported ones will be added into a option method.
#EWI: SPRKPY1029 => This issue appears when the tool detects the usage of pyspark.sql.readwriter.DataFrameReader.parquet. This function is supported, but some of the differences between Snowpark and the Spark API might require making some manual changes.
df = session.read.option("PATTERN", "*file*").parquet(
*file_paths
)
推荐修复方法
本节将说明如何配置 paths 和 options 参数以使其在 Snowpark 中正常工作。
1. paths 参数
在 Spark 中,此参数可以是本地位置,也可以是云位置。在 Snowpark 中,只接受使用 Snowflake 暂存区 的云位置。因此,可以创建一个临时暂存区,并使用前缀 file:// 将每个文件添加到其中。
2. options 参数
Snowpark 不允许将不同的 options 定义为 parquet 函数的参数。作为一种替代方案,可以使用 option 或 options 函数将这些参数指定为 DataFrameReader 的额外选项。
请注意,Snowpark options 与 PySpark options 并非完全相同,因此可能需要手动进行一些更改。下面更详细说明了如何在 Snowpark 中配置最常见的 PySpark options。
2.1 mergeSchema 选项
Parquet 支持架构演进,允许用户从简单的架构开始,然后根据需要逐步添加更多列。这可能会导致多个具有不同但兼容架构的 parquet 文件。在 Snowflake 中,由于 infer_schema 功能,您不需要这样做,因此可以移除 mergeSchema 选项。
2.2 pathGlobFilter 选项
如果您只想从暂存区加载一部分文件,则可以使用 pattern 选项来指定与要加载的文件相匹配的正则表达式。如当前场景输出所示,SMA 已自动完成该操作。
2.3 recursiveFileLookupstr 选项
Snowpark 不支持此选项。最好的建议是使用类似 pathGlobFilter 选项的正则表达式来实现类似功能。
2.4 modifiedBefore/modifiedAfter 选项
在 Snowflake 中,可以使用 metadata 列获得相同的结果。
备注
Snowpark 不支持以下选项:
compression
datetimeRebaseMode
int96RebaseMode
mergeSchema
以下是应如何转换输入代码以让它在 Snowpark 中可以正常工作的完整示例:
from snowflake.snowpark.column import METADATA_FILE_LAST_MODIFIED, METADATA_FILENAME
temp_stage = f'{session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}')
session.file.put(f"file:///path/to/your/file1.parquet", f"@{temp_stage}")
session.file.put(f"file:///path/to/your/file2.parquet", f"@{temp_stage}")
session.file.put(f"file:///path/to/your/file3.parquet", f"@{temp_stage}")
df = session.read \
.option("PATTERN", ".*file.*") \
.with_metadata(METADATA_FILENAME, METADATA_FILE_LAST_MODIFIED) \
.parquet(temp_stage) \
.where(METADATA_FILE_LAST_MODIFIED < '2024-12-31T00:00:00') \
.where(METADATA_FILE_LAST_MODIFIED > '2023-12-31T00:00:00')
其他建议¶
在 Snowflake 中,可以利用其他方法来进行 parquet 数据引入,例如:
利用 原生 parquet 引入功能。也可以考虑 使用 snowpipe 自动引入。
Parquet 外部表,其可以直接指向云文件位置。
使用 Iceberg 表。
在进行迁移时,最佳实践是利用 SMA 报告构建文件清单,并确定现代化改造后数据将映射到哪些暂存区/表。
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1030¶
警告
此问题代码已 弃用
消息:pyspark.sql.session.SparkSession.Builder.appName 有替代方案,请参阅文档了解更多信息
类别:警告
描述¶
当 SMA 检测到使用了 pyspark.sql.session.SparkSession.Builder.appName (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.builder.appName.html) 函数时,就会出现此问题;此问题有替代方案。
场景¶
输入
以下示例使用 pyspark.sql.session.SparkSession.Builder.appName 函数生成了此 EWI。在此示例中,appName 函数用于将 MyApp 设置为应用程序的名称。
session = SparkSession.builder.appName("MyApp").getOrCreate()
输出
SMA 在输出代码中添加了 EWI SPRKPY1030,以提示 Snowpark 并非完全支持此函数,但它有替代方案。
#EWI: SPRKPY1030 => pyspark.sql.session.SparkSession.Builder.appName has a workaround, see documentation for more info
session = Session.builder.appName("MyApp").getOrCreate()
推荐修复方法
作为一种替代方案,您可以导入 snowpark_extensions (https://pypi.org/project/snowpark-extensions/) 包,它为 appName 函数提供了扩展。
import snowpark_extensions
session = SessionBuilder.appName("MyApp").getOrCreate()
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1031¶
警告
自 Spark Conversion Core 2.7.0 起,此问题代码已 弃用
消息:pyspark.sql.Column.Column.Contains 有替代方案,请参阅文档了解更多信息
类别:警告
描述¶
当 SMA 检测到使用了 pyspark.sql.column.Column.contains (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Column.contains.html) 函数时,就会出现此问题;此问题有替代方案。
场景¶
输入
以下示例使用 pyspark.sql.column.Column.contains 函数生成了此 EWI。在此示例中,contains 函数用于过滤“City”列包含子字符串“New”的行。
df = spark.createDataFrame([("Alice", "New York"), ("Bob", "Los Angeles"), ("Charlie", "Chicago")], ["Name", "City"])
df_filtered = df.filter(col("City").contains("New"))
输出
SMA 在输出代码中添加了 EWI SPRKPY1031,以提示 Snowpark 并非完全支持此函数,但它有替代方案。
df = spark.createDataFrame([("Alice", "New York"), ("Bob", "Los Angeles"), ("Charlie", "Chicago")], ["Name", "City"])
#EWI: SPRKPY1031 => pyspark.sql.column.Column.contains has a workaround, see documentation for more info
df_filtered = df.filter(col("City").contains("New"))
推荐修复方法
作为一种替代方案,可以使用 snowflake.snowpark.functions.contains 函数,并将该列作为第一个实参传递,将要搜索的元素作为第二个实参传递。如果要搜索的元素是字面量值,则应使用 lit 函数将其转换为列表达式。
from snowflake.snowpark import functions as f
df = spark.createDataFrame([("Alice", "New York"), ("Bob", "Los Angeles"), ("Charlie", "Chicago")], ["Name", "City"])
df_filtered = df.filter(f.contains(col("City"), f.lit("New")))
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1032¶
消息:spark 元素 未定义
类别:转换错误
描述¶
当 SMA 无法确定给定元素的相应映射状态时,就会出现此问题。这意味着,SMA 还不知道 Snowpark 是否支持该元素。请注意,这是 SMA 对任何未定义元素使用的通用错误代码。
场景¶
输入
以下是 SMA 无法确定其相应映射状态的函数的示例。在这种情况下,应假设 not_defined_function() 是有效的 PySpark 函数,代码就可以运行了。
sc.parallelize(["a", "b", "c", "d", "e"], 3).not_defined_function().collect()
输出
SMA 在输出代码中添加了 EWI SPRKPY1032,以提示此元素未定义。
#EWI: SPRKPY1032 => pyspark.rdd.RDD.not_defined_function is not defined
sc.parallelize(["a", "b", "c", "d", "e"], 3).not_defined_function().collect()
推荐修复方法
要尝试找出问题,可以执行以下验证:
检查源代码的语法是否正确以及拼写是否正确。
检查是否使用的是 SMA 支持的 PySpark 版本。要了解运行 SMA 时 SMA 支持哪个 PySpark 版本,可以查看
DetailedReport.docx文件的第一页。
如果这是有效的 PySpark 元素,请使用 SMA 的 报告问题 选项报告您遇到的有关该特定元素的转换错误,并添加您认为可能有帮助的任何其他信息。
请注意,如果某个元素未定义,并不意味着 Snowpark 不支持该元素。您应该查看 Snowpark 文档 以验证是否存在等效元素。
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1033¶
警告
此问题代码已 弃用
消息:pyspark.sql.functions.asc 有替代方案,请参阅文档了解更多信息
类别:警告
描述¶
当 SMA 检测到使用了 pyspark.sql.functions.asc (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.asc.html) 函数时,就会出现此问题;此问题有替代方案。
场景¶
pyspark.sql.functions.asc 函数会将列对象或列名视为字符串并用作其参数。Snowpark 不支持这两种场景,因此生成了此 EWI。
场景 1¶
输入
以下示例使用 pyspark.sql.functions.asc 函数,它接收列对象作为参数。
df.orderBy(asc(col))
输出
SMA 在输出代码中添加了 EWI SPRKPY1033,以提示 Snowpark 并非完全支持带有列对象参数的 asc 函数,但它有替代方案。
#EWI: SPRKPY1033 => pyspark.sql.functions.asc has a workaround, see documentation for more info
df.orderBy(asc(col))
推荐修复方法
作为一种替代方案,可以从 column 列参数中调用 snowflake.snowpark.Column.asc 函数。
df.orderBy(col.asc())
场景 2¶
输入
以下示例使用 pyspark.sql.functions.asc 函数,它接收列名作为参数。
df.orderBy(asc("colName"))
输出
SMA 在输出代码中添加了 EWI SPRKPY1033,以提示 Snowpark 并非完全支持带有列名参数的 asc 函数,但它有替代方案。
#EWI: SPRKPY1033 => pyspark.sql.functions.asc has a workaround, see documentation for more info
df.orderBy(asc("colName"))
推荐修复方法
作为一种替代方案,可以使用 snowflake.snowpark.functions.col 函数将字符串参数转换为列对象,然后调用 snowflake.snowpark.Column.asc 函数。
df.orderBy(col("colName").asc())
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1034¶
警告
此问题代码已 弃用
消息:pyspark.sql.functions.desc 有替代方案,请参阅文档了解更多信息
类别:警告
描述¶
当 SMA 检测到使用了 pyspark.sql.functions.desc (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.desc.html) 函数时,就会出现此问题;此问题有替代方案。
场景¶
pyspark.sql.functions.desc 函数将列对象或列名作为字符串并用作其参数。Snowpark 不支持这两种场景,因此生成了此 EWI。
场景 1¶
输入
以下示例使用 pyspark.sql.functions.desc 函数,它接收列对象作为参数。
df.orderBy(desc(col))
输出
SMA 在输出代码中添加了 EWI SPRKPY1034,以提示 Snowpark 并非完全支持带有列对象参数的 desc 函数,但它有替代方案。
#EWI: SPRKPY1034 => pyspark.sql.functions.desc has a workaround, see documentation for more info
df.orderBy(desc(col))
推荐修复方法
作为一种替代方案,可以从 column 参数中调用 snowflake.snowpark.Column.desc 函数。
df.orderBy(col.desc())
场景 2¶
输入
以下示例使用 pyspark.sql.functions.desc 函数,它接收列名作为参数。
df.orderBy(desc("colName"))
输出
SMA 在输出代码中添加了 EWI SPRKPY1034,以提示 Snowpark 并非完全支持带有列名参数的 desc 函数,但它有替代方案。
#EWI: SPRKPY1034 => pyspark.sql.functions.desc has a workaround, see documentation for more info
df.orderBy(desc("colName"))
推荐修复方法
作为一种替代方案,可以使用 snowflake.snowpark.functions.col 函数将字符串参数转换为列对象,然后调用 snowflake.snowpark.Column.desc 函数。
df.orderBy(col("colName").desc())
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1035¶
警告
此问题代码已 弃用
消息:pyspark.sql.functions.reverse 有替代方案,请参阅文档了解更多信息
类别:警告
描述¶
当 SMA 检测到使用了 pyspark.sql.functions.reverse (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.reverse.html) 函数时,就会出现此问题;此问题有替代方案。
场景¶
输入
以下示例使用 pyspark.sql.functions.reverse 函数生成了此 EWI。在此示例中,reverse 函数用于反转 word 列的每个字符串。
df = spark.createDataFrame([("hello",), ("world",)], ["word"])
df_reversed = df.withColumn("reversed_word", reverse(df["word"]))
df_reversed = df.withColumn("reversed_word", reverse("word"))
输出
SMA 在输出代码中添加了 EWI SPRKPY1035,以提示 Snowpark 并非完全支持此函数,但它有替代方案。
df = spark.createDataFrame([("hello",), ("world",)], ["word"])
#EWI: SPRKPY1035 => pyspark.sql.functions.reverse has a workaround, see documentation for more info
df_reversed = df.withColumn("reversed_word", reverse(df["word"]))
#EWI: SPRKPY1035 => pyspark.sql.functions.reverse has a workaround, see documentation for more info
df_reversed = df.withColumn("reversed_word", reverse("word"))
推荐修复方法
作为一种替代方案,您可以导入 snowpark_extensions (https://pypi.org/project/snowpark-extensions/) 包,它为 reverse 函数提供了扩展。
import snowpark_extensions
df = spark.createDataFrame([("hello",), ("world",)], ["word"])
df_reversed = df.withColumn("reversed_word", reverse(df["word"]))
df_reversed = df.withColumn("reversed_word", reverse("word"))
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1036¶
警告
此问题代码已 弃用
消息:pyspark.sql.column.Column.getField 有替代方案,请参阅文档了解更多信息
类别:警告
描述¶
当 SMA 检测到使用了 pyspark.sql.column.Column.getField (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Column.getField.html) 函数时,就会出现此问题;此问题有替代方案。
场景¶
输入
以下示例使用 pyspark.sql.column.Column.getField 函数生成了此 EWI。在此示例中,使用 getField 函数从 info 列中提取 name。
df = spark.createDataFrame([(1, {"name": "John", "age": 30}), (2, {"name": "Jane", "age": 25})], ["id", "info"])
df_with_name = df.withColumn("name", col("info").getField("name"))
输出
SMA 在输出代码中添加了 EWI SPRKPY1036,以提示 Snowpark 并非完全支持此函数,但它有替代方案。
df = spark.createDataFrame([(1, {"name": "John", "age": 30}), (2, {"name": "Jane", "age": 25})], ["id", "info"])
#EWI: SPRKPY1036 => pyspark.sql.column.Column.getField has a workaround, see documentation for more info
df_with_name = df.withColumn("name", col("info").getField("name"))
推荐修复方法
作为一种替代方案,可以使用以字段名称作为索引的 Snowpark 列索引器运算符。
df = spark.createDataFrame([(1, {"name": "John", "age": 30}), (2, {"name": "Jane", "age": 25})], ["id", "info"])
df_with_name = df.withColumn("name", col("info")["name"])
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1037¶
警告
此问题代码已 弃用
消息:pyspark.sql.functions.sort_array 有替代方案,请参阅文档了解更多信息
类别:警告
描述¶
当 SMA 检测到使用了 pyspark.sql.functions.sort_array (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.sort_array.html) 函数时,就会出现此问题;此问题有替代方案。
场景¶
输入
以下示例使用 pyspark.sql.functions.sort_array 函数生成了此 EWI。在此示例中,sort_array 函数用于按升序和降序对 numbers 数组进行排序。
df = spark.createDataFrame([(1, [3, 1, 2]), (2, [10, 5, 8]), (3, [6, 4, 7])], ["id", "numbers"])
df_sorted_asc = df.withColumn("sorted_numbers_asc", sort_array("numbers", asc=True))
df_sorted_desc = df.withColumn("sorted_numbers_desc", sort_array("numbers", asc=False))
输出
SMA 在输出代码中添加了 EWI SPRKPY1037,以提示 Snowpark 并非完全支持此函数,但它有替代方案。
df = spark.createDataFrame([(1, [3, 1, 2]), (2, [10, 5, 8]), (3, [6, 4, 7])], ["id", "numbers"])
#EWI: SPRKPY1037 => pyspark.sql.functions.sort_array has a workaround, see documentation for more info
df_sorted_asc = df.withColumn("sorted_numbers_asc", sort_array("numbers", asc=True))
#EWI: SPRKPY1037 => pyspark.sql.functions.sort_array has a workaround, see documentation for more info
df_sorted_desc = df.withColumn("sorted_numbers_desc", sort_array("numbers", asc=False))
推荐修复方法
作为一种替代方案,可以导入 snowpark_extensions (https://pypi.org/project/snowpark-extensions/) 包,它为 sort_array 函数提供了扩展。
import snowpark_extensions
df = spark.createDataFrame([(1, [3, 1, 2]), (2, [10, 5, 8]), (3, [6, 4, 7])], ["id", "numbers"])
df_sorted_asc = df.withColumn("sorted_numbers_asc", sort_array("numbers", asc=True))
df_sorted_desc = df.withColumn("sorted_numbers_desc", sort_array("numbers", asc=False))
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1038¶
消息:spark 元素 尚未识别
类别:转换错误
描述¶
当您的源代码中有 SMA 无法识别的 PySpark 元素时,就会出现此问题。出现这种情况可能由多种原因导致,例如:
PySpark 中不存在的元素。
在 SMA 尚不支持的 PySpark 版本中添加的元素。
处理元素时 SMA 的内部错误。
这是 SMA 对任何无法识别的元素使用的通用错误代码。
场景¶
输入
以下是一个函数使用示例,由于该函数在 PySpark 中不存在,因此 SMA 无法识别该函数。
from pyspark.sql import functions as F
F.unrecognized_function()
输出
SMA 在输出代码中添加了 EWI SPRKPY1038,以提示此元素无法识别。
from snowflake.snowpark import functions as F
#EWI: SPRKPY1038 => pyspark.sql.functions.non_existent_function is not yet recognized
F.unrecognized_function()
推荐修复方法
要尝试找出问题,可以执行以下验证:
检查 PySpark 中是否存在该元素。
检查元素拼写是否正确。
检查是否使用的是 SMA 支持的 PySpark 版本。要了解运行 SMA 时 SMA 支持哪个 PySpark 版本,可以查看
DetailedReport.docx文件的第一页。
如果它是有效的 PySpark 元素,请使用 SMA 的 报告问题 选项报告您遇到的有关该特定元素的转换错误,并添加您认为可能有帮助的任何其他信息。
请注意,如果 SMA 无法识别某个元素,这并不意味着 Snowpark 不支持该元素。您应该查看 Snowpark 文档 以验证是否存在等效元素。
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1039¶
警告
此问题代码已 弃用
消息:pyspark.sql.column.Column.getItem 有替代方案,请参阅文档了解更多信息
类别:警告
描述¶
当 SMA 检测到使用了 pyspark.sql.column.Column.getItem (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Column.getItem.html) 函数时,就会出现此问题;此问题有替代方案。
场景¶
输入
以下示例使用 pyspark.sql.column.Column.getItem 函数生成了此 EWI。在此示例中,使用 getItem 函数通过位置索引和键获取项目。
df = spark.createDataFrame([(1, ["apple", "banana", "orange"]), (2, ["carrot", "avocado", "banana"])], ["id", "fruits"])
df.withColumn("first_fruit", col("fruits").getItem(0))
df = spark.createDataFrame([(1, {"apple": 10, "banana": 20}), (2, {"carrot": 15, "grape": 25}), (3, {"pear": 30, "apple": 35})], ["id", "fruit_quantities"])
df.withColumn("apple_quantity", col("fruit_quantities").getItem("apple"))
输出
SMA 在输出代码中添加了 EWI SPRKPY1039,以提示 Snowpark 并非完全支持此函数,但它有替代方案。
df = spark.createDataFrame([(1, ["apple", "banana", "orange"]), (2, ["carrot", "avocado", "banana"])], ["id", "fruits"])
#EWI: SPRKPY1039 => pyspark.sql.column.Column.getItem has a workaround, see documentation for more info
df.withColumn("first_fruit", col("fruits").getItem(0))
df = spark.createDataFrame([(1, {"apple": 10, "banana": 20}), (2, {"carrot": 15, "grape": 25}), (3, {"pear": 30, "apple": 35})], ["id", "fruit_quantities"])
#EWI: SPRKPY1039 => pyspark.sql.column.Column.getItem has a workaround, see documentation for more info
df.withColumn("apple_quantity", col("fruit_quantities").getItem("apple"))
推荐修复方法
作为一种替代方案,可以使用 Snowpark 列索引器运算符,并将字段的名称或位置用作索引。
df = spark.createDataFrame([(1, ["apple", "banana", "orange"]), (2, ["carrot", "avocado", "banana"])], ["id", "fruits"])
df.withColumn("first_fruit", col("fruits")[0])
df = spark.createDataFrame([(1, {"apple": 10, "banana": 20}), (2, {"carrot": 15, "grape": 25}), (3, {"pear": 30, "apple": 35})], ["id", "fruit_quantities"])
df.withColumn("apple_quantity", col("fruit_quantities")["apple"])
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1040¶
警告
此问题代码已 弃用
消息:pyspark.sql.functions.explode 有替代方案,请参阅文档了解更多信息
类别:警告
描述¶
当 SMA 检测到使用了 pyspark.sql.functions.explode (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.explode.html) 函数时,就会出现此问题;此问题有替代方案。
场景¶
输入
以下示例使用 pyspark.sql.functions.explode 函数生成了此 EWI。在此示例中,使用了 explode 函数来为 numbers 列的每个数组项生成一行。
df = spark.createDataFrame([("Alice", [1, 2, 3]), ("Bob", [4, 5]), ("Charlie", [6, 7, 8, 9])], ["name", "numbers"])
exploded_df = df.select("name", explode(df.numbers).alias("number"))
输出
SMA 在输出代码中添加了 EWI SPRKPY1040,以提示 Snowpark 并非完全支持此函数,但它有替代方案。
df = spark.createDataFrame([("Alice", [1, 2, 3]), ("Bob", [4, 5]), ("Charlie", [6, 7, 8, 9])], ["name", "numbers"])
#EWI: SPRKPY1040 => pyspark.sql.functions.explode has a workaround, see documentation for more info
exploded_df = df.select("name", explode(df.numbers).alias("number"))
推荐修复方法
作为一种替代方案,可以导入 snowpark_extensions (https://pypi.org/project/snowpark-extensions/) 包,它为 explode 函数提供了扩展。
import snowpark_extensions
df = spark.createDataFrame([("Alice", [1, 2, 3]), ("Bob", [4, 5]), ("Charlie", [6, 7, 8, 9])], ["name", "numbers"])
exploded_df = df.select("name", explode(df.numbers).alias("number"))
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1041¶
警告
自 Spark Conversion Core Version 2.9.0 起,此问题代码已 弃用
消息:pyspark.sql.functions.explode_outer 有替代方案
类别:警告
描述¶
当该工具检测到使用了 pyspark.sql.functions.explode_outer (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.explode_outer.html#pyspark.sql.functions.explode_outer) 时,就会出现此问题;此问题有替代方案。
场景¶
输入
该示例显示了在 select 调用中使用 explode_outer 方法的情况。
df = spark.createDataFrame(
[(1, ["foo", "bar"], {"x": 1.0}),
(2, [], {}),
(3, None, None)],
("id", "an_array", "a_map")
)
df.select("id", "an_array", explode_outer("a_map")).show()
输出
该工具添加了 EWI SPRKPY1041,提示可以实施替代方案。
df = spark.createDataFrame(
[(1, ["foo", "bar"], {"x": 1.0}),
(2, [], {}),
(3, None, None)],
("id", "an_array", "a_map")
)
#EWI: SPRKPY1041 => pyspark.sql.functions.explode_outer has a workaround, see documentation for more info
df.select("id", "an_array", explode_outer("a_map")).show()
推荐修复方法
作为一种替代方案,可以导入 snowpark_extensions 包,其包含 explode_outer 辅助函数。
import snowpark_extensions
df = spark.createDataFrame(
[(1, ["foo", "bar"], {"x": 1.0}),
(2, [], {}),
(3, None, None)],
("id", "an_array", "a_map")
)
df.select("id", "an_array", explode_outer("a_map")).show()
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1042¶
消息:pyspark.sql.functions.posexplode 有替代方案
类别:警告
描述¶
当该工具检测到使用了 pyspark.sql.functions.posexplode (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.posexplode.html?highlight=posexplode) 时,就会出现此问题;此问题有替代方案。
场景¶
根据作为参数传递的列类型,此方法可以处理几种情况:参数可以是 list of values 或 map/directory (keys/values)。
场景 1¶
输入
以下示例使用 posexplode,并传递 list of values 作为参数。
df = spark.createDataFrame(
[Row(a=1,
intlist=[1, 2, 3])])
df.select(posexplode(df.intlist)).collect()
输出
该工具添加了 EWI SPRKPY1042,提示可以实施替代方案。
df = spark.createDataFrame(
[Row(a=1,
intlist=[100, 200, 300])])
#EWI: SPRKPY1042 => pyspark.sql.functions.posexplode has a workaround, see documentation for more info
df.select(posexplode(df.intlist)).show()
推荐修复方法
要获得相同的行为,请使用方法 functions.flatten,删除多余的列,然后重命名索引和值列的名称。
df = spark.createDataFrame(
[Row(a=1,
intlist=[1, 2, 3])])
df.select(
flatten(df.intlist))\
.drop("DATA", "SEQ", "KEY", "PATH", "THIS")\
.rename({"INDEX": "pos", "VALUE": "col"}).show()
场景 2¶
输入
以下是使用 posexplode 的另一个示例,它接受 map/dictionary (keys/values) 作为参数
df = spark.createDataFrame([
[1, [1, 2, 3], {"Ashi Garami": "Single Leg X"}, "Kimura"],
[2, [11, 22], {"Sankaku": "Triangle"}, "Coffee"]
],
schema=["idx", "lists", "maps", "strs"])
df.select(posexplode(df.maps)).show()
输出
该工具添加了 EWI SPRKPY1042,提示可以实施替代方案。
df = spark.createDataFrame([
[1, [1, 2, 3], {"Ashi Garami": "Single Leg X"}, "Kimura"],
[2, [11, 22], {"Sankaku": "Triangle"}, "Coffee"]
],
schema=["idx", "lists", "maps", "strs"])
#EWI: SPRKPY1042 => pyspark.sql.functions.posexplode has a workaround, see documentation for more info
df.select(posexplode(df.maps)).show()
推荐修复方法
作为一种替代方案,可以使用 functions.row_number 来获取位置,使用 functions.explode 及字段的名称来获取字典的键/值的值。
df = spark.createDataFrame([
[10, [1, 2, 3], {"Ashi Garami": "Single Leg X"}, "Kimura"],
[11, [11, 22], {"Sankaku": "Triangle"}, "Coffee"]
],
schema=["idx", "lists", "maps", "strs"])
window = Window.orderBy(col("idx").asc())
df.select(
row_number().over(window).alias("pos"),
explode(df.maps).alias("key", "value")).show()
注意: 使用 row_number 并不完全等效,因为它从 1 开始(而 Spark 方法从零开始)
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1043¶
消息:pyspark.sql.functions.posexplode_outer 有替代方案
类别:警告
描述¶
当该工具检测到使用了 pyspark.sql.functions.posexplode_outer (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.posexplode_outer.html) 时,就会出现此问题;此问题有替代方案。
场景¶
根据作为参数传递的列类型,此方法可以处理几种情况:参数可以是 list of values 或 map/directory (keys/values)。
场景 1¶
输入
以下示例使用 posexplode_outer,它接受 list of values 作为参数。
df = spark.createDataFrame(
[
(1, ["foo", "bar"]),
(2, []),
(3, None)],
("id", "an_array"))
df.select("id", "an_array", posexplode_outer("an_array")).show()
输出
该工具添加了 EWI SPRKPY1043,提示可以实施替代方案。
df = spark.createDataFrame(
[
(1, ["foo", "bar"]),
(2, []),
(3, None)],
("id", "an_array"))
#EWI: SPRKPY1043 => pyspark.sql.functions.posexplode_outer has a workaround, see documentation for more info
df.select("id", "an_array", posexplode_outer("an_array")).show()
推荐修复方法
要获得相同的行为,请使用方法 functions.flatten 并将 outer 参数设置为 True,删除多余的列,然后重命名索引和值列的名称。
df = spark.createDataFrame(
[
(1, ["foo", "bar"]),
(2, []),
(3, None)],
("id", "an_array"))
df.select(
flatten(df.an_array, outer=True))\
.drop("DATA", "SEQ", "KEY", "PATH", "THIS")\
.rename({"INDEX": "pos", "VALUE": "col"}).show()
场景 2¶
输入
以下是使用 posexplode_outer 的另一个示例,其接收 map/dictionary (keys/values) 作为参数
df = spark.createDataFrame(
[
(1, {"x": 1.0}),
(2, {}),
(3, None)],
("id", "a_map"))
df.select(posexplode_outer(df.a_map)).show()
输出
该工具添加了 EWI SPRKPY1043,提示可以实施替代方案。
df = spark.createDataFrame(
[
(1, {"x": "Ashi Garami"}),
(2, {}),
(3, None)],
("id", "a_map"))
#EWI: SPRKPY1043 => pyspark.sql.functions.posexplode_outer has a workaround, see documentation for more info
df.select(posexplode_outer(df.a_map)).show()
推荐修复方法
作为一种替代方案,可以使用 functions.row_number 来获取位置,使用 functions.explode_outer 及字段的名称来获取字典的键/值的值。
df = spark.createDataFrame(
[
(1, {"x": "Ashi Garami"}),
(2, {}),
(3, None)],
("id", "a_map"))
window = Window.orderBy(col("id").asc())
df.select(
row_number().over(window).alias("pos"),
explode_outer(df.a_map)).show()
注意: 使用 row_number 并不完全等效,因为它从 1 开始(而 Spark 方法从零开始)
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1044¶
警告
自 Spark Conversion Core 版本 2.4.0 起,此问题代码已 弃用
消息:pyspark.sql.functions.split 有替代方案
类别:警告。
描述¶
当该工具检测到使用了 pyspark.sql.functions.split (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.split.html) 时,就会出现此问题;此问题有替代方案。
场景¶
具体实现方式取决于传递给该方法的参数数量,主要分为几种情况。
场景 1¶
输入
以下是函数 split 只有 str 和 pattern 参数时的示例
F.split('col', '\\|')
输出
该工具显示 EWI SPRKPY1044,提示有替代方案。
#EWI: SPRKPY1044 => pyspark.sql.functions.split has a workaround, see the documentation for more info
F.split('col', '\\|')
推荐修复方法
作为一种替代方案,可以使用 pattern 参数调用函数 snowflake.snowpark.functions.lit,然后将其发送到 split 中。
F.split('col', lit('\\|'))
## the result of lit will be sent to the split function
场景 2¶
输入
以下是函数 split 具有 str、pattern 和 limit 参数的另一个示例。
F.split('col', '\\|', 2)
输出
该工具显示 EWI SPRKPY1044,提示有替代方案。
#EWI: SPRKPY1044 => pyspark.sql.functions.split has a workaround, see the documentation for more info
F.split('col', '\\|', 2)
推荐修复方法
不支持此特定方案。
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1045¶
消息:pyspark.sql.functions.map_values 有替代方案
类别:警告。
描述¶
此函数用于从包含 map/dictionary (keys/values) 的列中提取值列表。
当该工具检测到使用了 pyspark.sql.functions.map_values (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.map_values.html) 时,就会出现问题;该问题有替代方案。
场景¶
输入
以下是使用方法 map_values 的示例。
df = spark.createDataFrame(
[(1, {'Apple': 'Fruit', 'Potato': 'Vegetable'})],
("id", "a_map"))
df.select(map_values("a_map")).show()
输出
该工具添加了 EWI SPRKPY1045,提示可以实施替代方案。
df = spark.createDataFrame(
[(1, {'Apple': 'Fruit', 'Potato': 'Vegetable'})],
("id", "a_map"))
#EWI: SPRKPY1045 => pyspark.sql.functions.map_values has a workaround, see documentation for more info
df.select(map_values("a_map")).show()
推荐修复方法
作为一种替代方案,可以创建 udf 来获取列的值。下面的示例显示如何创建 udf,接着将其分配给 F.map_values,然后使用它。
from snowflake.snowpark import functions as F
from snowflake.snowpark.types import ArrayType, MapType
map_values_udf=None
def map_values(map):
global map_values_udf
if not map_values_udf:
def _map_values(map: dict)->list:
return list(map.values())
map_values_udf = F.udf(_map_values,return_type=ArrayType(),input_types=[MapType()],name="map_values",is_permanent=False,replace=True)
return map_values_udf(map)
F.map_values = map_values
df.select(map_values(colDict))
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1046¶
警告
自 Spark Conversion Core 版本 2.1.22 起,此问题代码已 弃用
消息:pyspark.sql.functions.monotonically_increasing_id 有替代方案
类别:警告。
描述¶
当该工具检测到使用了 pyspark.sql.functions.monotonically_increasing_id (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.monotonically_increasing_id.html) 时,就会出现此问题;此问题有替代方案。
场景¶
输入
以下示例使用 monotonically_increasing_id 方法。
from pyspark.sql import functions as F
spark.range(0, 10, 1, 2).select(F.monotonically_increasing_id()).show()
输出
该工具添加了 EWI SPRKPY1046,提示可以实施替代方案。
from pyspark.sql import functions as F
#EWI: SPRKPY1046 => pyspark.sql.functions.monotonically_increasing_id has a workaround, see documentation for more info
spark.range(0, 10, 1, 2).select(F.monotonically_increasing_id()).show()
推荐修复方法
更新工具版本。
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1047¶
警告
自 Spark Conversion Core 版本 4.6.0 起,此问题代码已 弃用
描述¶
当该工具检测到使用了 pyspark.context.SparkContext.setLogLevel (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.setLogLevel.html?highlight=pyspark%20context%20sparkcontext%20setloglevel#pyspark.SparkContext.setLogLevel) 时,就会出现此问题;此问题有替代方案。
场景¶
输入
以下是使用方法 setLogLevel 的示例。
sparkSession.sparkContext.setLogLevel("WARN")
输出
该工具添加了 EWI SPRKPY1047,提示可以实施替代方案。
#EWI: SPRKPY1047 => pyspark.context.SparkContext.setLogLevel has a workaround, see documentation for more info
sparkSession.sparkContext.setLogLevel("WARN")
推荐修复方法
将使用的 setLogLevel 函数替换为 logging.basicConfig,后者为简单日志记录提供了一系列便捷函数。为了使用它,需要导入“logging”和“sys”这两个模块,并且应该使用“级别等效选项表”替换级别常量:
import logging
import sys
logging.basicConfig(stream=sys.stdout, level=logging.WARNING)
级别等效选项表
级别源参数 |
级别目标参数 |
|---|---|
"ALL" |
此参数没有等效选项 |
"DEBUG" |
logging.DEBUG |
"ERROR" |
logging.ERROR |
"FATAL" |
logging.CRITICAL |
"INFO" |
logging.INFO |
"OFF" |
logging.NOTSET |
"TRACE" |
此参数没有等效选项 |
"WARN" |
logging.WARNING |
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1048¶
警告
自 Spark Conversion Core 版本 2.4.0 起,此问题代码已 弃用
消息:pyspark.sql.session.SparkSession.conf 有替代方案
类别:警告。
描述¶
当该工具检测到使用了 pyspark.sql.session.SparkSession.conf (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.conf.html) 时,就会出现此问题;此问题有替代方案。
场景¶
输入
以下是如何将配置设置为属性 conf 的示例。
spark.conf.set("spark.sql.crossJoin.enabled", "true")
输出
该工具添加了 EWI SPRKPY1048,提示可以实施替代方案。
#EWI: SPRKPY1048 => pyspark.sql.session.SparkSession.conf has a workaround, see documentation for more info
spark.conf.set("spark.sql.crossJoin.enabled", "true")
推荐修复方法
SparkSession.conf 用于传递仅 Pyspark 使用的特定设置,这些设置不适用于 Snowpark。可以在代码上删除或添加注释
#spark.conf.set("spark.sql.crossJoin.enabled", "true")
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1049¶
警告
自 Spark Conversion Core 版本 2.1.9 起,此问题代码已 弃用
消息:pyspark.sql.session.SparkSession.sparkContext 有替代方案
类别:警告。
描述¶
当该工具检测到使用了 pyspark.sql.session.SparkSession.sparkContext (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.sparkContext.html) 时,就会出现此问题;此问题有替代方案。
场景¶
输入
以下是创建 Spark 会话,然后使用 SparkContext 属性显示 appName 的示例。
print("APP Name :"+spark.sparkContext.appName())
输出
该工具添加了 EWI SPRKPY1049,提示可以实施替代方案。
#EWI: SPRKPY1049 => pyspark.sql.session.SparkSession.sparkContext has a workaround, see documentation for more info
print("APP Name :"+spark.sparkContext.appName())
推荐修复方法
SnowPark 不支持 SparkContext,但可以直接从 Session 实例访问 SparkContext 中的方法和属性。
## Pyspark
print("APP Name :"+spark.sparkContext.appName())
can be used in SnowPark removing the sparkContext as:
#Manual adjustment in SnowPark
print("APP Name :"+spark.appName());
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1050¶
消息:pyspark.conf.SparkConf.set 有替代方案。
类别:警告。
描述¶
当该工具检测到使用了 pyspark.conf.SparkConf.set (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkConf.set.html) 时,就会出现此问题;此问题有替代方案。
场景¶
输入
以下是使用 conf.set 设置变量的示例。
conf = SparkConf().setAppName('my_app')
conf.set("spark.storage.memoryFraction", "0.5")
输出
该工具添加了 EWI SPRKPY1050,提示可以实施替代方案。
conf = SparkConf().setAppName('my_app')
#EWI: SPRKPY1050 => pyspark.conf.SparkConf.set has a workaround, see documentation for more info
conf.set("spark.storage.memoryFraction", "0.5")
推荐修复方法
SparkConf.set 用于设置仅由 Pyspark 使用的配置设置,不适用于 Snowpark。可以在代码上删除或添加注释
#conf.set("spark.storage.memoryFraction", "0.5")
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1051¶
警告
自 Spark Conversion Core 版本 2.4.0 起,此问题代码已 弃用
消息:pyspark.sql.session.SparkSession.Builder.master 有替代方案
类别:警告。
描述¶
当该工具检测到使用了 pyspark.sql.session.SparkSession.Builder.master (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.builder.master.html) 时,就会出现此问题;此问题有替代方案。
场景¶
输入
以下示例展示了如何使用 builder.master 方法,将 Spark Master URL 设置为使用单核模式连接到本地位置。
spark = SparkSession.builder.master("local[1]")
输出
该工具添加了 EWI SPRKPY1051,提示可以实施替代方案。
#EWI: SPRKPY1051 => pyspark.sql.session.SparkSession.Builder.master has a workaround, see documentation for more info
spark = Session.builder.master("local[1]")
推荐修复方法
pyspark.sql.session.SparkSession.Builder.master 用于设置 Spark 集群。Snowpark 不使用 Spark 集群,因此可为该代码删除或注释。
## spark = Session.builder.master("local[1]")
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1052¶
警告
自 Spark Conversion Core 版本 2.8.0 起,此问题代码已 弃用
消息:pyspark.sql.session.SparkSession.Builder.enableHiveSupport 有替代方案
类别:警告。
描述¶
当该工具检测到使用了 pyspark.sql.session.SparkSession.Builder.enableHiveSupport (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.builder.enableHiveSupport.html) 时,就会出现此问题;此问题有替代方案。
场景¶
输入
以下示例使用 enableHiveSupport 方法配置 SparkSession 并启用 Hive 支持。
spark = Session.builder.appName("Merge_target_table")\
.config("spark.port.maxRetries","100") \
.enableHiveSupport().getOrCreate()
输出
该工具添加了 EWI SPRKPY1052,提示可以实施替代方案。
#EWI: SPRKPY1052 => pyspark.sql.session.SparkSession.Builder.enableHiveSupport has a workaround, see documentation for more info
spark = Session.builder.appName("Merge_target_table")\
.config("spark.port.maxRetries","100") \
.enableHiveSupport().getOrCreate()
推荐修复方法
移除 enableHiveSupport 函数的使用,因为在 Snowpark 中不需要该函数。
spark = Session.builder.appName("Merge_target_table")\
.config("spark.port.maxRetries","100") \
.getOrCreate()
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1053¶
消息:提取 dbc 文件时出错。
类别:警告。
描述¶
当无法提取 dbc 文件时,就会出现此问题。此警告可能是由以下一个或多个原因引起的:文件过大、无法访问、只读等。
其他建议¶
作为一种替代方案,如果文件太大无法处理,则可以检查文件的大小。此外,分析该工具是否可以访问文件,以免出现任何访问问题。
如需更多支持,可以发送电子邮件至 snowconvert-info@snowflake.com 联系我们。如果您与 Snowflake 签订了支持合同,请联系您的销售工程师,他们可以指导您的支持需求。
SPRKPY1080¶
消息:SparkContext 的值被替换为“session”变量。
类别:警告
描述¶
Spark 上下文存储在一个名为“session”的变量中,该变量创建 Snowpark 会话。
场景¶
输入
以下代码段描述了 SparkContext
## Input Code
from pyspark import SparkContext
from pyspark.sql import SparkSession
def example1():
sc = SparkContext("local[*]", "TestApp")
sc.setLogLevel("ALL")
sc.setLogLevel("DEBUG")
输出
在此输出代码中,SMA 已将 PySpark.SparkContext 替换为 SparkSession,请注意,SMA 还在“connection.json”文件中添加了一个模板来替换连接,然后在 connection_parameter 变量上加载此配置。
## Output Code
import logging
import sys
import json
from snowflake.snowpark import Session
from snowflake.snowpark import Session
def example1():
jsonFile = open("connection.json")
connection_parameter = json.load(jsonFile)
jsonFile.close()
#EWI: SPRKPY1080 => The value of SparkContext is replaced with 'session' variable.
sc = Session.builder.configs(connection_parameter).getOrCreate()
sc.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
logging.basicConfig(stream = sys.stdout, level = logging.NOTSET)
logging.basicConfig(stream = sys.stdout, level = logging.DEBUG)
推荐修复方法
必须使用所需的连接信息更新配置文件“connection.json”:
{
"user": "my_user",
"password": "my_password",
"account": "my_account",
"role": "my_role",
"warehouse": "my_warehouse",
"database": "my_database",
"schema": "my_schema"
}
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1054¶
消息:pyspark.sql.readwriter.DataFrameReader.format 不受支持。
类别:警告。
描述¶
当 pyspark.sql.readwriter.DataFrameReader.format (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.format.html) 具有 Snowpark 不支持的实参时,就会出现此问题。
场景¶
根据您尝试加载的格式类型,分为几种情况。它可以是 supported 或 non-supported 格式。
场景 1¶
输入
该工具会分析尝试加载的格式类型,支持的格式有:
Csv
JSON
Parquet
Orc
以下示例显示了该工具在传递 csv 值时如何转换 format 方法。
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df1 = spark.read.format('csv').load('/path/to/file')
输出
该工具将 format 方法转换为 Csv 方法调用。
from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()
df1 = spark.read.csv('/path/to/file')
推荐修复方法
在此示例中,该工具不显示 EWI,这意味着无需修复。
场景 2¶
输入
以下示例显示了该工具在传递 Jdbc 值时如何转换 format 方法。
from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()
df2 = spark.read.format('jdbc') \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.option("url", "jdbc:mysql://localhost:3306/emp") \
.option("dbtable", "employee") \
.option("user", "root") \
.option("password", "root") \
.load()
输出
该工具显示 EWI SPRKPY1054,提示不支持“jdbc”值。
from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()
#EWI: SPRKPY1054 => pyspark.sql.readwriter.DataFrameReader.format with argument value "jdbc" is not supported.
#EWI: SPRKPY1002 => pyspark.sql.readwriter.DataFrameReader.load is not supported
df2 = spark.read.format('jdbc') \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.option("url", "jdbc:mysql://localhost:3306/emp") \
.option("dbtable", "employee") \
.option("user", "root") \
.option("password", "root") \
.load()
推荐修复方法
对于 not supported 场景,没有具体的修复方法,因为它取决于尝试读取的文件。
场景 3¶
输入
以下示例显示了该工具在传递 CSV 时如何转换 format 方法,但改用变量。
from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()
myFormat = 'csv'
df3 = spark.read.format(myFormat).load('/path/to/file')
输出
由于该工具无法在运行时确定变量的值,因此会显示 EWI SPRKPY1054,提示不支持该值。
from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()
myFormat = 'csv'
#EWI: SPRKPY1054 => pyspark.sql.readwriter.DataFrameReader.format with argument value "" is not supported.
#EWI: SPRKPY1002 => pyspark.sql.readwriter.DataFrameReader.load is not supported
df3 = spark.read.format(myFormat).load('/path/to/file')
推荐修复方法
替代方案是,您可以检查变量的值并将其作为字符串添加到 format 调用中。
其他建议¶
Snowpark 位置仅接受使用 Snowflake 暂存区 的云位置。
Snowpark 支持的方法的文档可以在 文档 中找到
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1055¶
消息:pyspark.sql.readwriter.DataFrameReader.option 键值不受支持。
类别:警告。
描述¶
当 SnowFlake 不支持 pyspark.sql.readwriter.DataFrameReader.option 键值时,就会出现此问题。
该工具会分析 option 调用参数,并根据方法(CSV、JSON 或 PARQUET)判断键值在 Snowpark 中是否存在等效选项。如果所有参数均有等效选项,则该工具不会添加 EWI,并将键值替换为其等效选项;反之,工具将添加 EWI。
等效选项列表:
CSV 的等效选项:
Spark 选项键 |
Snowpark 等效选项 |
|---|---|
sep |
FIELD_DELIMITER |
header |
PARSE_HEADER |
lineSep |
RECORD_DELIMITER |
pathGlobFilter |
PATTERN |
quote |
FIELD_OPTIONALLY_ENCLOSED_BY |
nullValue |
NULL_IF |
dateFormat |
DATE_FORMAT |
timestampFormat |
TIMESTAMP_FORMAT |
inferSchema |
INFER_SCHEMA |
delimiter |
FIELD_DELIMITER |
JSON 的等效选项:
Spark 选项键 |
Snowpark 等效选项 |
|---|---|
dateFormat |
DATE_FORMAT |
timestampFormat |
TIMESTAMP_FORMAT |
pathGlobFilter |
PATTERN |
PARQUET 的等效选项:
Spark 选项键 |
Snowpark 等效选项 |
|---|---|
pathGlobFilter |
PATTERN |
其他未在上述表格中列出的关键选项,Snowpark 既不支持,也不存在等效选项。如果是这样,该工具会添加参数信息的 EWI 并将其从链中移除。
场景¶
以下场景适用于 CSV、JSON 和 PARQUET。
分为几种情况,具体视在 option 方法中所使用键的值而定。
场景 1¶
输入
以下是使用 equivalent key 进行 option 调用的示例。
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
## CSV example:
spark.read.option("header", True).csv(csv_file_path)
## Json example:
spark.read.option("dateFormat", "dd-MM-yyyy").json(json_file_path)
## Parquet example:
spark.read.option("pathGlobFilter", "*.parquet").parquet(parquet_file_path)
输出
该工具使用正确的等效选项转换键。
from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()
## CSV example:
spark.read.option("PARSE_HEADER", True).csv(csv_file_path)
## Json example:
spark.read.option("DATE_FORMAT", "dd-MM-yyyy").json(json_file_path)
## Parquet example:
spark.read.option("PATTERN", "*.parquet").parquet(parquet_file_path)
推荐修复方法
由于该工具会转换键的值,因此没有必要的修复。
场景 2¶
输入
以下是使用 non-equivalent key 进行 option 调用的示例。
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
## CSV example:
spark.read.option("anotherKeyValue", "myVal").csv(csv_file_path)
## Json example:
spark.read.option("anotherKeyValue", "myVal").json(json_file_path)
## Parquet example:
spark.read.option("anotherKeyValue", "myVal").parquet(parquet_file_path)
输出
该工具添加了 EWI SPRKPY1055,提示键不受支持,并移除了 option 调用。
from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()
## CSV example:
#EWI: SPRKPY1055 => pyspark.sql.readwriter.DataFrameReader.option with key value "anotherKeyValue" is not supported.
spark.read.csv(csv_file_path)
## Json example:
#EWI: SPRKPY1055 => pyspark.sql.readwriter.DataFrameReader.option with key value "anotherKeyValue" is not supported.
spark.read.json(json_file_path)
## Parquet example:
#EWI: SPRKPY1055 => pyspark.sql.readwriter.DataFrameReader.option with key value "anotherKeyValue" is not supported.
spark.read.parquet(parquet_file_path)
推荐修复方法
建议在转换后检查行为。
其他建议¶
当存在非等效参数时,建议检查转换后的行为。
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1056¶
警告
此问题代码已 弃用
消息:pyspark.sql.readwriter.DataFrameReader.option 实参 <argument_name> 不是字面量,无法进行求值
类别:警告
描述¶
当 pyspark.sql.readwriter.DataFrameReader.option (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.option.html) 函数的实参键或值不是字面量值(例如是变量)时,就会出现此问题。SMA 会对源代码进行静态分析,因此无法对实参的内容进行求值。
场景¶
输入
以下示例使用 pyspark.sql.readwriter.DataFrameReader.option 函数生成了此 EWI。
my_value = ...
my_option = ...
df1 = spark.read.option("dateFormat", my_value).format("csv").load('filename.csv')
df2 = spark.read.option(my_option, "false").format("csv").load('filename.csv')
输出
SMA 在输出代码中添加了 EWI SPRKPY1056 ,以提示此函数的实参不是字面量值,因此 SMA 无法对其进行求值。
my_value = ...
my_option = ...
#EWI: SPRKPY1056 => pyspark.sql.readwriter.DataFrameReader.option argument "dateFormat" is not a literal and can't be evaluated
df1 = spark.read.option("dateFormat", my_value).format("csv").load('filename.csv')
#EWI: SPRKPY1056 => pyspark.sql.readwriter.DataFrameReader.option argument key is not a literal and can't be evaluated
df2 = spark.read.option(my_option, "false").format("csv").load('filename.csv')
推荐修复方法
尽管 SMA 无法对此实参求值,但这并不意味着 Snowpark 不支持该实参。请查看 文档,确保实参的值在 Snowpark 中有效且等效。
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1057¶
警告
自 Spark Conversion Core 版本 4.8.0 起,此问题代码已 弃用
消息:PySpark Dataframe Option (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.option.html#pyspark.sql.DataFrameReader.option) 实参包含不是字面量的值,因此无法求值
类别:警告。
描述¶
此问题代码已弃用。如果您使用的是旧版本,请升级到最新版本。
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1058¶
消息:带 < key > 平台特定键的 < method > 不受支持。
类别:ConversionError
描述¶
pyspark.sql.conf.RuntimeConfig (https://spark.apache.org/docs/3.5.3/api/python/reference/pyspark.sql/api/pyspark.sql.conf.RuntimeConfig.html#pyspark.sql.conf.RuntimeConfig) 中带平台特定键的 get 和 set 方法不受支持。
场景¶
并非所有 get 或 set 方法的使用都会在输出代码中生成 EWI。当工具检测到使用了带平台特定键的这些方法(不支持此用法)时,就会出现此 EWI。
场景 1¶
输入
以下是 Snowpark 中具有受支持键的 get 或 set 方法的示例。
session.conf.set("use_constant_subquery_alias", False)
spark.conf.set("sql_simplifier_enabled", True)
session.conf.get("use_constant_subquery_alias")
session.conf.get("use_constant_subquery_alias")
输出
由于 Snowpark 支持键,因此该工具不会在输出代码中添加 EWI。
session.conf.set("use_constant_subquery_alias", True)
session.conf.set("sql_simplifier_enabled", False)
session.conf.get("use_constant_subquery_alias")
session.conf.get("sql_simplifier_enabled")
推荐修复方法
对于此场景,没有建议的修复方法。
场景 2¶
输入
以下是使用不支持的键的示例。
data =
[
("John", 30, "New York"),
("Jane", 25, "San Francisco")
]
session.conf.set("spark.sql.shuffle.partitions", "50")
spark.conf.set("spark.yarn.am.memory", "1g")
session.conf.get("spark.sql.shuffle.partitions")
session = spark.conf.get("spark.yarn.am.memory")
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
输出
该工具在输出代码中添加了 EWI SPRKPY1058,以提示带平台特定键的这些方法不受支持。
data =
[
("John", 30, "New York"),
("Jane", 25, "San Francisco")
]
#EWI: SPRKPY1058 => pyspark.sql.conf.RuntimeConfig.set method with this "spark.sql.shuffle.partitions" Platform specific key is not supported.
spark.conf.set("spark.sql.shuffle.partitions", "50")
#EWI: SPRKPY1058 => pyspark.sql.conf.RuntimeConfig.set method with this "spark.yarn.am.memory" Platform specific key is not supported.
spark.conf.set("spark.yarn.am.memory", "1g")
#EWI: SPRKPY1058 => pyspark.sql.conf.RuntimeConfig.get method with this "spark.sql.shuffle.partitions" Platform specific key is not supported.
spark.conf.get("spark.sql.shuffle.partitions")
#EWI: SPRKPY1058 => pyspark.sql.conf.RuntimeConfig.get method with this "spark.yarn.am.memory" Platform specific key is not supported.
spark.conf.get("spark.yarn.am.memory")
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
推荐修复方法
建议的修复方法是移除这些方法。
data =
[
("John", 30, "New York"),
("Jane", 25, "San Francisco")
]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1059¶
警告
自 Spark Conversion Core 版本 2.45.1 起,此问题代码已 弃用
消息:pyspark.storagelevel.StorageLevel (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.StorageLevel.html) 有替代方案,请参阅文档。
类别:警告
描述¶
目前,在 Snowpark 中不需要使用 StorageLevel,因为 Snowflake 控制存储。有关更多信息,可以参阅 EWI SPRKPY1072
其他建议¶
将您的应用程序升级到最新版本。
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1060¶
消息:身份验证机制是 connection.json(提供了模板)。
类别:警告。
描述¶
当该工具检测到使用了 pyspark.conf.SparkConf (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkConf.html) 时,就会出现此问题。
场景¶
输入
由于 Snowpark 中的身份验证机制不同,该工具删除了相关使用,改为创建 连接配置文件 (connection.json)。
from pyspark import SparkConf
my_conf = SparkConf(loadDefaults=True)
输出
该工具添加了 EWI SPRKPY1060,提示身份验证机制不相同。
#EWI: SPRKPY1002 => pyspark.conf.SparkConf is not supported
#EWI: SPRKPY1060 => The authentication mechanism is connection.json (template provided).
#my_conf = Session.builder.configs(connection_parameter).getOrCreate()
my_conf = None
推荐修复方法
要创建连接,必须在 connection.json 文件中填写信息。
{
"user": "<USER>",
"password": "<PASSWORD>",
"account": "<ACCOUNT>",
"role": "<ROLE>",
"warehouse": "<WAREHOUSE>",
"database": "<DATABASE>",
"schema": "<SCHEMA>"
}
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1061¶
消息:Snowpark 不支持 unix_timestamp 函数
类别:警告
描述¶
在 Snowpark 中,第一个参数为必需;当该工具检测到使用了不带参数的 pyspark.sql.functions.unix_timestamp (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.unix_timestamp.html) 时,就会出现问题。
场景¶
输入
以下是不带参数调用 unix_timestamp 方法的示例。
data = [["2015-04-08", "10"],["2015-04-10", "15"]]
df = spark.createDataFrame(data, ['dt', 'val'])
df.select(unix_timestamp()).show()
输出
以下函数的 Snowpark 签名:unix_timestamp(e: ColumnOrName, fmt:Optional["Column"] = None),您可以注意到第一个参数是必填的。
该工具添加了 EWI SPRKPY1061,以提示不带参数调用函数 unix_timestamp 在 Snowpark 中不受支持。
data = [["2015-04-08", "10"],["2015-04-10", "15"]]
df = spark.createDataFrame(data, ['dt', 'val'])
#EWI: SPRKPY1061 => Snowpark does not support unix_timestamp functions with no parameters. See documentation for more info.
df.select(unix_timestamp()).show()
推荐修复方法
作为一种替代方案,可以至少添加 timestamp 字符串的名称或列。
data = [["2015-04-08", "10"],["2015-04-10", "15"]]
df = spark.createDataFrame(data, ["dt", "val"])
df.select(unix_timestamp("dt")).show()
其他建议¶
还可以添加 current_timestamp() 作为第一个参数。
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1062¶
消息:Snowpark 不支持不带“values”参数的 GroupedData.pivot。
类别:警告
描述¶
当 SMA 检测到使用了 pyspark.sql.group.GroupedData.pivot (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.GroupedData.pivot.html) 函数但未指定“values”参数 (即需要透视的值列表) 时,就会出现此问题。
目前,Snowpark Python pivot 函数要求明确指定要透视的不同值列表。
场景¶
场景 1¶
输入
SMA 检测到与模式 dataFrame.groupBy("columnX").pivot("columnY") 相匹配的表达式,并且该 pivot 函数没有 values 参数。
df.groupBy("date").pivot("category").sum("amount")
输出
SMA 添加了一条 EWI 消息,提示不支持没有“values”参数的 pivot 函数。
此外,它将在 pivot 函数中添加第二个参数,即列表推导式,用于计算将转换为列的值列表。请记住,此操作对于大型数据集效率不高,建议明确指明值。
#EWI: SPRKPY1062 => pyspark.sql.group.GroupedData.pivot without parameter 'values' is not supported. See documentation for more info.
df.groupBy("date").pivot("category", [v[0] for v in df.select("category").distinct().limit(10000).collect()]]).sum("amount")
推荐修复方法
对于这种情况,SMA 在 pivot 函数中添加第二个参数,即列表推导式,用于计算将转换为列的值列表,但可以根据不同的值列表进行转换,如下所示:
df = spark.createDataFrame([
Row(category="Client_ID", date=2012, amount=10000),
Row(category="Client_name", date=2012, amount=20000)
])
df.groupBy("date").pivot("category", ["dotNET", "Java"]).sum("amount")
场景 2¶
输入
SMA 未能检测到与模式 dataFrame.groupBy("columnX").pivot("columnY") 相匹配的表达式,并且该 pivot 函数没有 values 参数。
df1.union(df2).groupBy("date").pivot("category").sum("amount")
输出
SMA 添加了一条 EWI 消息,提示不支持没有“values”参数的 pivot 函数。
#EWI: SPRKPY1062 => pyspark.sql.group.GroupedData.pivot without parameter 'values' is not supported. See documentation for more info.
df1.union(df2).groupBy("date").pivot("category").sum("amount")
推荐修复方法
添加要透视的唯一值列表,具体操作如下:
df = spark.createDataFrame([
Row(course="dotNET", year=2012, earnings=10000),
Row(course="Java", year=2012, earnings=20000)
])
df.groupBy("year").pivot("course", ["dotNET", "Java"]).sum("earnings").show()
其他建议¶
对于大型数据集,计算要透视的唯一值列表并不是一项有效的操作,可能会成为阻塞性调用。请考虑明确指定要用于透视的唯一值列表。
如果您不想明确指定要透视的唯一值列表(不建议这样做),可添加以下代码作为 pivot 函数的第二个实参,以便在运行时推断这些值*
[v[0] for v in <df>.select(<column>).distinct().limit(<count>).collect()]]
使用相应的 DataFrame、要透视的列和要选择的行数 *替换 <df>。
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1063¶
消息:pyspark.sql.pandas.functions.pandas_udf 有替代方案。
类别:警告
描述¶
当该工具检测到使用了 pyspark.sql.pandas.functions.pandas_udf (https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.pandas_udf.html) 时,就会出现此问题;此问题有替代方案。
场景¶
输入
使用 pandas_udf 函数创建可处理大量数据的用户定义的函数。
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def modify_df(pdf):
return pd.DataFrame({'result': pdf['col1'] + pdf['col2'] + 1})
df = spark.createDataFrame([(1, 2), (3, 4), (1, 1)], ["col1", "col2"])
new_df = df.groupby().apply(modify_df)
输出
SMA 添加了一条 EWI 消息,提示 pandas_udf 有替代方案。
#EWI: SPRKPY1062 => pyspark.sql.pandas.functions.pandas_udf has a workaround, see documentation for more info
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def modify_df(pdf):
return pd.DataFrame({'result': pdf['col1'] + pdf['col2'] + 1})
df = spark.createDataFrame([(1, 2), (3, 4), (1, 1)], ["col1", "col2"])
new_df = df.groupby().apply(modify_df)
推荐修复方法
将参数类型明确指定为新参数 input_types 并移除 functionType 参数(如适用)。创建的函数必须在 select 语句中调用。
@pandas_udf(
return_type = schema,
input_types = [PandasDataFrameType([IntegerType(), IntegerType()])]
)
def modify_df(pdf):
return pd.DataFrame({'result': pdf['col1'] + pdf['col2'] + 1})
df = spark.createDataFrame([(1, 2), (3, 4), (1, 1)], ["col1", "col2"])
new_df = df.groupby().apply(modify_df) # You must modify function call to be a select and not an apply
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1064¶
消息:Spark 元素 不适用,因为 Snowflake 改用 Snowpipe 机制。
类别:警告
描述¶
当该工具检测到使用了 pyspark.streaming 库中的任何元素时,就会出现此问题:
pyspark.streaming.DStream (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.streaming.DStream.html)
pyspark.streaming.StreamingContext (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.streaming.StreamingContext.html)
pyspark.streaming.listener.StreamingListener。
场景¶
输入
以下是触发此 EWI 的元素之一的示例。
from pyspark.streaming.listener import StreamingListener
var = StreamingListener.Java
var.mro()
df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])
df.show()
输出
SMA 在输出代码中添加了 EWI SPRKPY1064,以提示此函数不适用。
#EWI: SPRKPY1064 => The element does not apply since snowflake uses snowpipe mechanism instead.
var = StreamingListener.Java
var.mro()
df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])
df.show()
推荐修复方法
SMA 移除了 import 语句并将问题添加到 Issues.csv 清单中,移除了使用的所有 Spark 元素。
df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])
df.show()
其他建议¶
查看 Snowpipe 的文档,看看它如何适合当前场景。
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1065¶
消息:pyspark.context.SparkContext.broadcast 不适用,因为 Snowflake 使用数据聚类机制来计算数据。
类别:警告
描述¶
当该工具检测到使用了元素 pyspark.context.SparkContext.broadcast (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.broadcast.html) 时,就会出现此问题;因为使用了 Snowflake 的 数据群集,因此没有必要使用该元素。
输入代码
在此示例中,创建了一个广播变量,该变量能够实现数据在所有节点间的高效共享。
sc = SparkContext(conf=conf_spark)
mapping = {1: 10001, 2: 10002}
bc = sc.broadcast(mapping)
输出代码
SMA 添加了一条 EWI 消息,提示不需要进行广播。
sc = conf_spark
mapping = {1: 10001, 2: 10002}
#EWI: SPRKPY1065 => The element does not apply since snowflake use data-clustering mechanism to compute the data.
bc = sc.broadcast(mapping)
推荐修复方法
移除所有使用的 pyspark.context.SparkContext.broadcast。
sc = conf_spark
mapping = {1: 10001, 2: 10002}
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1066¶
消息:Spark 元素不适用,因为 Snowflake 使用的微分区机制是自动创建的。
类别:警告
描述¶
当该工具检测到使用了与分区相关的元素时,就会出现此问题:
pyspark.sql.catalog.Catalog.recoverPartitions (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Catalog.recoverPartitions.html)
pyspark.sql.dataframe.DataFrame.foreachPartition (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.foreachPartition.html)
pyspark.sql.dataframe.DataFrame.sortWithinPartitions (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.sortWithinPartitions.html)
pyspark.sql.functions.spark_partition_id (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.spark_partition_id.html)
由于使用了 Snowflake 的 微分区,这些元素不适用。
输入代码
在此示例 sortWithinPartitions (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.sortWithinPartitions.html) 中,它用于在 DataFrame 中创建按指定列排序的分区。
df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
df.sortWithinPartitions("age", ascending=False)
输出代码
SMA 添加了一条 EWI 消息,提示 Spark 元素并非必需。
df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
#EWI: SPRKPY1066 => The element does not apply since snowflake use micro-partitioning mechanism are created automatically.
df.sortWithinPartitions("age", ascending=False)
推荐修复方法
移除对该元素的使用。
df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1067¶
消息:pyspark.sql.functions.split 包含在 Snowpark 中不受支持的参数。
类别:警告
描述¶
当该工具检测到使用了 pyspark.sql.functions.split (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.split.html) 并传递了超过两个参数,或以正则表达式模式作为参数时,就会出现此问题;这两种使用方式均不受支持。
场景¶
场景 1¶
输入代码
在此示例中,split 函数有两个以上的参数。
df.select(split(columnName, ",", 5))
输出代码
该工具在输出代码中添加了此 EWI,以提示当此函数具有两个以上的参数时,不支持此函数。
#EWI: SPRKPY1067 => Snowpark does not support split functions with more than two parameters or containing regex pattern. See documentation for more info.
df.select(split(columnName, ",", 5))
推荐修复方法
请确保 split 函数仅使用两个参数。
df.select(split(columnName, ","))
场景 2¶
输入代码
在此示例中,split 函数使用正则表达式模式作为参数。
df.select(split(columnName, "^([\d]+-[\d]+-[\d])"))
输出代码
该工具在输出代码中添加了此 EWI,以提示当它使用正则表达式模式作为参数时,不支持此函数。
#EWI: SPRKPY1067 => Snowpark does not support split functions with more than two parameters or containing regex pattern. See documentation for more info.
df.select(split(columnName, "^([\d]+-[\d]+-[\d])"))
推荐修复方法
此 functions.split(str: ColumnOrName, pattern: str, limit: int = - 1) 方法的 Spark 签名与 Snowpark 中的以下方法:functions.split(str:Union[Column, str], pattern:Union[Column, str]) 不完全相符,因此,使用正则表达式的场景目前没有建议的修复方法。
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1068¶
消息:toPandas 包含类型为不受支持的 ArrayType 的列,并且有替代方案。
类别:警告
描述¶
如果存在 ArrayType 类型的列,pyspark.sql.DataFrame.toPandas (https://spark.apache.org/docs/3.5.3/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.toPandas.html) 无法正常工作。这些情况的替代方案是使用 json.loads 方法将这些列转换为 Python 字典。
场景¶
输入
ToPandas 以 Pandas DataFrame 的形式返回原始 DataFrame 数据。
sparkDF = spark.createDataFrame([
Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0))
])
pandasDF = sparkDF.toPandas()
输出
如果有类型为 ArrayType 的列,该工具会添加此 EWI,以提示 toPandas 不受支持,但有替代方案。
sparkDF = spark.createDataFrame([
Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0))
])
#EWI: SPRKPY1068 => toPandas doesn't work properly If there are columns of type ArrayType. The workaround for these cases is converting those columns into a Python Dictionary by using json.loads method. example: df[colName] = json.loads(df[colName]).
pandasDF = sparkDF.toPandas()
推荐修复方法
pandas_df = sparkDF.toPandas()
## check/convert all resulting fields from calling toPandas when they are of
## type ArrayType,
## they will be reasigned by converting them into a Python Dictionary
## using json.loads method
for field in pandas_df.schema.fields:
if isinstance(field.datatype, ArrayType):
pandas_df[field.name] = pandas_df[field.name].apply(lambda x: json.loads(x) if x is not None else x)
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1069¶
消息:如果 partitionBy 参数是列表,则 Snowpark 会抛出错误。
类别:警告
描述¶
当使用 pyspark.sql.readwriter.DataFrameWriter.parquet (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.parquet.html) 方法且涉及参数 partitionBy 时,该工具会显示 EWI。
这是因为在 Snowpark 中,DataFrameWriter.parquet 仅支持 ColumnOrSqlExpr 作为 partitionBy 参数。
场景¶
场景 1¶
输入代码:
在此场景中,partitionBy 参数不是列表。
df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])
df.write.parquet(file_path, partitionBy="age")
输出代码:
该工具添加了 EWI SPRKPY1069,以提示如果参数是列表,在 Snowpark 会抛出错误。
df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])
#EWI: SPRKPY1069 => If partitionBy parameter is a list, Snowpark will throw and error.
df.write.parquet(file_path, partition_by = "age", format_type_options = dict(compression = "None"))
推荐修复方法
此场景没有建议的修复方法,因为该工具总会预先添加此 EWI,以应对 partitionBy 参数是列表的情况。请记住,在 Snowpark 中,只接受使用 Snowflake 暂存区 的云位置。
df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])
stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
Session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {stage}').show()
Session.file.put(f"file:///path/to/data/file.parquet", f"@{stage}")
df.write.parquet(stage, partition_by = "age", format_type_options = dict(compression = "None"))
场景 2¶
输入代码:
在此场景中,partitionBy 参数是列表。
df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])
df.write.parquet(file_path, partitionBy=["age", "name"])
输出代码:
该工具添加了 EWI SPRKPY1069,以提示如果参数是列表,在 Snowpark 会抛出错误。
df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])
#EWI: SPRKPY1069 => If partitionBy parameter is a list, Snowpark will throw and error.
df.write.parquet(file_path, partition_by = ["age", "name"], format_type_options = dict(compression = "None"))
推荐修复方法
如果参数的值是 list,则将其替换为 ColumnOrSqlExpr。
df.write.parquet(file_path, partition_by = sql_expr("age || name"), format_type_options = dict(compression = "None"))
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1070¶
消息:mode 实参被转换为 overwrite,检查变量值并设置相应的布尔值。
类别:警告
描述¶
当使用了以下各项时:
pyspark.sql.readwriter.DataFrameWriter.csv (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.csv.html)
pyspark.sql.readwriter.DataFrameWriter.json (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.json.html)
pyspark.sql.readwriter.DataFrameWriter.parquet (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.json.html)
该工具会分析参数 mode 以确定该值是否为 overwrite。
场景¶
场景 1¶
输入代码
对于此场景,该工具检测到 mode 参数可以设置相应的布尔值。
df.write.csv(file_path, mode="overwrite")
输出代码:
SMA 工具会分析 mode 参数,确定该值为 overwrite,并设置相应的布尔值
df.write.csv(file_path, format_type_options = dict(compression = "None"), overwrite = True)
推荐修复方法
对于此场景,没有建议的修复方法,因为该工具执行了相应的转换。
场景 2:
输入代码
在此场景中,该工具无法验证该值是否为 overwrite。
df.write.csv(file_path, mode=myVal)
输出代码:
SMA 添加了一条 EWI 消息,提示 mode 参数已转换为“overwrite”,但它也是为了提示最好检查变量值并设置正确的布尔值。
#EWI: SPRKPY1070 => The 'mode' argument is transformed to 'overwrite', check the variable value and set the corresponding bool value.
df.write.csv(file_path, format_type_options = dict(compression = "None"), overwrite = myVal)
推荐修复方法
检查参数 mode 的值并为参数 overwrite 添加正确的值。
df.write.csv(file_path, format_type_options = dict(compression = "None"), overwrite = True)
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1071¶
消息:在 Snowpark 中,pyspark.rdd.RDD.getNumPartitions 函数并非必需。因此,应该移除所有引用。
类别:警告
描述¶
当该工具发现使用了 pyspark.rdd.RDD.getNumPartitions (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.getNumPartitions.html) 函数时,就会出现此问题。Snowflake 使用微分区机制,因此使用此函数并非必需。
场景¶
输入
getNumPartitions 返回 RDD 上的分区数量。
df = spark.createDataFrame([('2015-04-08',), ('5',), [Row(a=1, b="b")]], ['dt', 'num', 'row'])
print(df.getNumPartitions())
输出
该工具添加了此 EWI,以提示 getNumPartitions 并非必需。
df = spark.createDataFrame([('2015-04-08',), ('5',), [Row(a=1, b="b")]], ['dt', 'num', 'row'])
#EWI: SPRKPY1071 => The getNumPartitions are not required in Snowpark. So, you should remove all references.
print(df.getNumPartitions())
推荐修复方法
移除对此函数的所有使用。
df = spark.createDataFrame([('2015-04-08',), ('5',), [Row(a=1, b="b")]], ['dt', 'num', 'row'])
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者在 SMA 中提交问题。`
SPRKPY1072¶
消息:在 Snowpark 中,并非必须使用 StorageLevel。
类别:警告。
描述¶
当该工具发现使用了 StorageLevel (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.StorageLevel.html) 类时,就会出现此问题;该类的作用类似于使用“flags”来设置存储级别。由于 Snowflake 控制存储,因此并非必须使用此函数。
其他建议¶
移除对此函数的所有使用。
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1073¶
消息:不支持不带参数或返回类型参数的 pyspark.sql.functions.udf
类别:警告。
描述¶
此问题会在以下情况下出现:当该工具检测到使用了 pyspark.sql.functions.udf (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.udf.html) 作为函数或装饰器,但在它没有参数或返回类型参数这两种特定情况下,不支持这种用法。
场景¶
场景 1¶
输入
在 Pyspark 中,可以创建不带输入或返回类型参数的用户定义的函数:
from pyspark.sql import SparkSession, DataFrameStatFunctions
from pyspark.sql.functions import col, udf
spark = SparkSession.builder.getOrCreate()
data = [['Q1', 'Test 1'],
['Q2', 'Test 2'],
['Q3', 'Test 1'],
['Q4', 'Test 1']]
columns = ['Quadrant', 'Value']
df = spark.createDataFrame(data, columns)
my_udf = udf(lambda s: len(s))
df.withColumn('Len Value' ,my_udf(col('Value')) ).show()
输出
Snowpark 需要 Udf 函数的输入和返回类型。因为未提供这些参数,而且 SMA 不能缺少此参数。
from snowflake.snowpark import Session, DataFrameStatFunctions
from snowflake.snowpark.functions import col, udf
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 'Test 1'],
['Q2', 'Test 2'],
['Q3', 'Test 1'],
['Q4', 'Test 1']]
columns = ['Quadrant', 'Value']
df = spark.createDataFrame(data, columns)
#EWI: SPRKPY1073 => pyspark.sql.functions.udf function without the return type parameter is not supported. See documentation for more info.
my_udf = udf(lambda s: len(s))
df.withColumn('Len Value' ,my_udf(col('Value')) ).show()
推荐修复方法
要修复这种情况,需要为输入和输出的返回类型添加导入,然后在 udf 函数 _my_udf* 上添加 return*type 和 input_types[] 的参数。
from snowflake.snowpark import Session, DataFrameStatFunctions
from snowflake.snowpark.functions import col, udf
from snowflake.snowpark.types import IntegerType, StringType
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 'Test 1'],
['Q2', 'Test 2'],
['Q3', 'Test 1'],
['Q4', 'Test 1']]
columns = ['Quadrant', 'Value']
df = spark.createDataFrame(data, columns)
my_udf = udf(lambda s: len(s), return_type=IntegerType(), input_types=[StringType()])
df.with_column("result", my_udf(df.Value)).show()
场景 2¶
在 PySpark 中,可以使用不带参数的 @udf 装饰器
输入
from pyspark.sql.functions import col, udf
spark = SparkSession.builder.getOrCreate()
data = [['Q1', 'Test 1'],
['Q2', 'Test 2'],
['Q3', 'Test 1'],
['Q4', 'Test 1']]
columns = ['Quadrant', 'Value']
df = spark.createDataFrame(data, columns)
@udf()
def my_udf(str):
return len(str)
df.withColumn('Len Value' ,my_udf(col('Value')) ).show()
输出
在 Snowpark 中,udf 装饰器的所有参数都是必需的。
from snowflake.snowpark.functions import col, udf
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 'Test 1'],
['Q2', 'Test 2'],
['Q3', 'Test 1'],
['Q4', 'Test 1']]
columns = ['Quadrant', 'Value']
df = spark.createDataFrame(data, columns)
#EWI: SPRKPY1073 => pyspark.sql.functions.udf decorator without parameters is not supported. See documentation for more info.
@udf()
def my_udf(str):
return len(str)
df.withColumn('Len Value' ,my_udf(col('Value')) ).show()
推荐修复方法
要修复这种情况,需要为输入和输出的返回类型添加导入,然后在 udf @udf 装饰器上添加 return_type 和 input_types[] 的参数。
from snowflake.snowpark.functions import col, udf
from snowflake.snowpark.types import IntegerType, StringType
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 'Test 1'],
['Q2', 'Test 2'],
['Q3', 'Test 1'],
['Q4', 'Test 1']]
columns = ['Quadrant', 'Value']
df = spark.createDataFrame(data, columns)
@udf(return_type=IntegerType(), input_types=[StringType()])
def my_udf(str):
return len(str)
df.withColumn('Len Value' ,my_udf(col('Value')) ).show()
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1074¶
消息:文件含有混合缩进(空格和制表符)。
类别:解析错误。
描述¶
当该工具检测到文件含有混合缩进时,就会出现此问题。这意味着,文件混合使用空格和制表符来缩进代码行。
场景¶
输入
在 Pyspark 中,可以混合使用空格和制表符来表示标识级别。
def foo():
x = 5 # spaces
y = 6 # tab
输出
SMA 无法处理混合使用的缩进标记。当在 Python 代码文件中检测到这种情况时,SMA 会在第一行添加 EWI SPRKPY1074。
## EWI: SPRKPY1074 => File has mixed indentation (spaces and tabs).
## This file was not converted, so it is expected to still have references to the Spark API
def foo():
x = 5 # spaces
y = 6 # tabs
推荐修复方法
解决方案是将所有缩进符号统一为相同格式。
def foo():
x = 5 # tab
y = 6 # tab
其他建议¶
有用的缩进工具有 PEP-8 (https://peps.python.org/pep-0008/) 和 Reindent (https://pypi.org/project/reindent/)。
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1075¶
类别
警告。
描述¶
parse_json 不应用架构验证,如果需要根据架构进行筛选/验证,则可能需要引入一些逻辑。
示例¶
输入
df.select(from_json(df.value, Schema))
df.select(from_json(schema=Schema, col=df.value))
df.select(from_json(df.value, Schema, option))
输出
#EWI: SPRKPY1075 => The parse_json does not apply schema validation, if you need to filter/validate based on schema you might need to introduce some logic.
df.select(parse_json(df.value))
#EWI: SPRKPY1075 => The parse_json does not apply schema validation, if you need to filter/validate based on schema you might need to introduce some logic.
df.select(parse_json(df.value))
#EWI: SPRKPY1075 => The parse_json does not apply schema validation, if you need to filter/validate based on schema you might need to introduce some logic.
df.select(parse_json(df.value))
对于 from_json 函数,传递架构实际上并不是用于推断,而是用于验证。看看下面的示例:
data = [
('{"name": "John", "age": 30, "city": "New York"}',),
('{"name": "Jane", "age": "25", "city": "San Francisco"}',)
]
df = spark.createDataFrame(data, ["json_str"])
示例 1:强制使用数据类型和更改列名:
## Parse JSON column with schema
parsed_df = df.withColumn("parsed_json", from_json(col("json_str"), schema))
parsed_df.show(truncate=False)
## +------------------------------------------------------+---------------------------+
## |json_str |parsed_json |
## +------------------------------------------------------+---------------------------+
## |{"name": "John", "age": 30, "city": "New York"} |{John, 30, New York} |
## |{"name": "Jane", "age": "25", "city": "San Francisco"}|{Jane, null, San Francisco}|
## +------------------------------------------------------+---------------------------+
## notice that values outside of the schema were dropped and columns not matched are returned as null
示例 2:选择特定列:
## Define a schema with only the columns we want to use
partial_schema = StructType([
StructField("name", StringType(), True),
StructField("city", StringType(), True)
])
## Parse JSON column with partial schema
partial_df = df.withColumn("parsed_json", from_json(col("json_str"), partial_schema))
partial_df.show(truncate=False)
## +------------------------------------------------------+---------------------+
## |json_str |parsed_json |
## +------------------------------------------------------+---------------------+
## |{"name": "John", "age": 30, "city": "New York"} |{John, New York} |
## |{"name": "Jane", "age": "25", "city": "San Francisco"}|{Jane, San Francisco}|
## +------------------------------------------------------+---------------------+
## there is also an automatic filtering
建议¶
如需更多支持,您可以发送电子邮件至 sma-support@snowflake.com 联系我们。如果您与 Snowflake 签订了支持合同,请联系您的销售工程师,他们可以指导您的支持需求。
有用的工具 PEP-8 (https://peps.python.org/pep-0008/) and Reindent (https://pypi.org/project/reindent/)。
SPRKPY1076¶
消息:pyspark.sql.readwriter.DataFrameReader (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.html) 方法中的参数不受支持。这适用于 CSV、JSON 和 PARQUET 方法。
类别:警告。
描述¶
对于 pyspark.sql.readwriter.DataFrameReader (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.html) 对象上的 CSV、JSON 和 PARQUET 方法,该工具将分析参数并根据每种情况添加转换:
所有参数均与其在 Snowpark 中的等效选项名称匹配:在这种情况下,该工具会将参数转换为 .option() 调用。在这种情况下,参数不会添加此 EWI。
部分参数与其在 Snowpark 中的等效选项不匹配:在这种情况下,工具将添加包含该参数信息的 EWI,并将其从方法调用中移除。
等效选项列表:
CSV 的等效选项:
Spark 键 |
Snowpark 等效选项 |
|---|---|
sep |
FIELD_DELIMITER |
header |
PARSE_HEADER |
lineSep |
RECORD_DELIMITER |
pathGlobFilter |
PATTERN |
quote |
FIELD_OPTIONALLY_ENCLOSED_BY |
nullValue |
NULL_IF |
dateFormat |
DATE_FORMAT |
timestampFormat |
TIMESTAMP_FORMAT |
inferSchema |
INFER_SCHEMA |
delimiter |
FIELD_DELIMITER |
JSON 的等效选项:
Spark 键 |
Snowpark 等效选项 |
|---|---|
dateFormat |
DATE_FORMAT |
timestampFormat |
TIMESTAMP_FORMAT |
pathGlobFilter |
PATTERN |
PARQUET 的等效选项:
Spark 键 |
Snowpark 等效选项 |
|---|---|
pathGlobFilter |
PATTERN |
场景¶
场景 1¶
输入
对于 CVS,以下是一些示例:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('myapp').getOrCreate()
spark.read.csv("path3", None,None,None,None,None,None,True).show()
输出
在转换后的代码中,参数作为单个选项添加到 cvs 函数中
from snowflake.snowpark import Session
spark = Session.builder.app_name('myapp', True).getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
#EWI: SPRKPY1076 => Some of the included parameters are not supported in the csv function, the supported ones will be added into a option method.
spark.read.option("FIELD_DELIMITER", None).option("PARSE_HEADER", True).option("FIELD_OPTIONALLY_ENCLOSED_BY", None).csv("path3").show()
场景 2¶
输入
对于 JSON,以下是一些示例:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('myapp').getOrCreate()
spark.read.json("/myPath/jsonFile/", dateFormat='YYYY/MM/DD').show()
输出
在转换后的代码中,参数作为单个选项添加到 json 函数中
from snowflake.snowpark import Session
spark = Session.builder.app_name('myapp', True).getOrCreate()
#EWI: SPRKPY1076 => Some of the included parameters are not supported in the json function, the supported ones will be added into a option method.
spark.read.option("DATE_FORMAT", 'YYYY/MM/DD').json("/myPath/jsonFile/").show()
场景 3¶
输入
对于 PARQUET,以下是一些示例:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('myapp').getOrCreate()
spark.read.parquet("/path/to/my/file.parquet", pathGlobFilter="*.parquet").show()
输出
在转换后的代码中,参数作为单个选项添加到 parquet 函数中
from snowflake.snowpark import Session
spark = Session.builder.app_name('myapp', True).getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
#EWI: SPRKPY1076 => Some of the included parameters are not supported in the parquet function, the supported ones will be added into a option method.
#EWI: SPRKPY1029 => The parquet function require adjustments, in Snowpark the parquet files needs to be located in an stage. See the documentation for more info.
spark.read.option("PATTERN", "*.parquet").parquet("/path/to/my/file.parquet")
其他建议¶
当存在非等效参数时,建议检查转换后的行为。
此外,建议参考相关文档以找到更合适的实现方案:
CSV 的选项文档:
PySpark CSV Options (https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option)。
JSON 的选项文档:
PySpark JSON Options (https://spark.apache.org/docs/latest/sql-data-sources-json.html)。
PARQUET 的选项文档:
Pyspark PARQUET options (https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#data-source-option)。
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1077¶
消息:无法处理嵌入式 SQL 代码。
类别:警告。
描述¶
当该工具检测到无法转换为 Snowpark 的嵌入式 SQL 代码时,就会出现此问题。
有关更多信息,请查看 SQL 嵌入式代码部分。
场景¶
输入
在此示例中,SQL 代码嵌入到一个名为 query 的变量中,而该变量用作 Pyspark.sql 方法的参数。
query = f"SELECT * from myTable"
spark.sql(query)
输出
SMA 检测到 PySpark.sql 参数是变量而不是 SQL 代码,因此在 PySpark.sql 所在行中添加了 EWI SPRKPY1077 消息。
query = f"SELECT * myTable"
#EWI: SPRKPY1077 => SQL embedded code cannot be processed.
spark.sql(query)
其他建议¶
在 SQL 转换过程中,此代码必须直接作为方法参数传递,且仅能使用字符串值,禁止使用插值。请检查传递 PySpark.SQL 函数的 SQL 语句,以验证其在 Snowflake 平台上的功能。
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1078¶
消息:pyspark.context.SparkContext.setLogLevel 函数的实参不是字面量值,因此无法求值
类别:警告
描述¶
当 SMA 检测到带非字面量值(例如实参为变量)使用了 pyspark.context.SparkContext.setLogLevel (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.setLogLevel.html) 函数时,就会出现此问题。
SMA 会对您的源代码进行静态分析,因此无法对实参的内容求值并确定在 Snowpark 中是否有等效选项。
场景¶
输入
在此示例中,在变量 my_log_level 中定义 logLevel,然后通过 setLogLevel 方法将 my_log_level 用作参数。
my_log_level = "WARN"
sparkSession.sparkContext.setLogLevel(my_log_level)
输出
SMA 无法对日志级别参数的实参求值,因此会在转换后的日志记录代码行上方添加 EWI SPRKPY1078:
my_log_level = "WARN"
#EWI: SPRKPY1078 => my_log_level is not a literal value and therefore could not be evaluated. Make sure the value of my_log_level is a valid level in Snowpark. Valid log levels are: logging.CRITICAL, logging.DEBUG, logging.ERROR, logging.INFO, logging.NOTSET, logging.WARNING
logging.basicConfig(stream = sys.stdout, level = my_log_level)
推荐修复方法
尽管 SMA 无法对此实参求值,但它会将 pyspark.context.SparkContext.setLogLevel 函数转换为 Snowpark 等效选项。请根据下表确保生成的输出代码中 level 实参的值是 Snowpark 中有效且等效的日志级别:
PySpark 日志级别 |
Snowpark 日志级别等效选项 |
|---|---|
ALL |
logging.NOTSET |
DEBUG |
logging.DEBUG |
ERROR |
logging.ERROR |
FATAL |
logging.CRITICAL |
INFO |
logging.INFO |
OFF |
logging.WARNING |
TRACE |
logging.NOTSET |
WARN |
logging.WARNING |
因此,建议的修复方法将如下所示:
my_log_level = logging.WARNING
logging.basicConfig(stream = sys.stdout, level = my_log_level)
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1079¶
消息:pyspark.context.SparkContext.setLogLevel 函数的参数不是有效的 PySpark 日志级别。
类别:警告
描述¶
在以下情况下会出现此问题:SMA 检测到带实参使用了 pyspark.context.SparkContext.setLogLevel (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.setLogLevel.html) 函数,该实参是在 PySpark 中无效的日志级别,因此无法确定在 Snowpark 中的等效日志级别。
场景¶
输入
这里的日志级别使用的“INVALID_LOG_LEVEL”不是有效的 Pyspark 日志级别。
sparkSession.sparkContext.setLogLevel("INVALID_LOG_LEVEL")
输出
尽管 SMA 进行了转换,但 SMA 无法识别日志级别“INVALID_LOG_LEVEL”,添加了 EWI SPRKPY1079 以提示可能存在问题。
#EWI: SPRKPY1079 => INVALID_LOG_LEVEL is not a valid PySpark log level, therefore an equivalent could not be determined in Snowpark. Valid PySpark log levels are: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
logging.basicConfig(stream = sys.stdout, level = logging.INVALID_LOG_LEVEL)
推荐修复方法
请确保在 pyspark.context.SparkContext.setLogLevel 函数中使用的日志级别是 PySpark (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.setLogLevel.html) 或 Snowpark 中的有效日志级别,然后重试。
logging.basicConfig(stream = sys.stdout, level = logging.DEBUG)
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1081¶
自 Spark Conversion Core 4.12.0 起,此问题代码已 弃用
消息:pyspark.sql.readwriter.DataFrameWriter.partitionBy 有替代方案。
类别:警告
描述¶
Pyspark.sql.readwriter.DataFrameWriter.partitionBy (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.partitionBy.html) 函数不受支持。替代方案是改用 Snowpark 的 copy_into_location。有关更多信息,请参阅文档。
场景¶
输入
此代码将为 FIRST_NAME 列中的每个唯一值创建一个单独的目录。数据是相同的,但会根据列存储在不同的目录中。
df = session.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]], schema = ["FIRST_NAME", "LAST_NAME"])
df.write.partitionBy("FIRST_NAME").csv("/home/data")
此代码将为 FIRST_NAME 列中的每个唯一值创建一个单独的目录。数据是相同的,但会根据列存储在不同的目录中。
输出代码
df = session.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]], schema = ["FIRST_NAME", "LAST_NAME"])
#EWI: SPRKPY1081 => The partitionBy function is not supported, but you can instead use copy_into_location as workaround. See the documentation for more info.
df.write.partitionBy("FIRST_NAME").csv("/home/data", format_type_options = dict(compression = "None"))
推荐修复方法
在 Snowpark 中,copy_into_location 有一个 partition_by 参数可以用来代替 partitionBy 函数,但需要进行一些手动调整,如下面的示例所示:
Spark 代码:
df = session.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]], schema = ["FIRST_NAME", "LAST_NAME"])
df.write.partitionBy("FIRST_NAME").csv("/home/data")
经过手动调整的 Snowpark 代码:
df = session.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]], schema = ["FIRST_NAME", "LAST_NAME"])
df.write.copy_into_location(location=temp_stage, partition_by=col("FIRST_NAME"), file_format_type="csv", format_type_options={"COMPRESSION": "NONE"}, header=True)
copy_into_location 具有以下参数
location:Snowpark 位置仅接受使用 Snowflake 暂存区 的云位置。
partition_by:它可以是列名或 SQL 表达式,因此需要使用 col 或 sql_expr 转换为列或 SQL。
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1082¶
消息:pyspark.sql.readwriter.DataFrameReader.load 函数不受支持。一种替代方案是改用 Snowpark DataFrameReader 格式特定的方法(avro csv、json、orc、parquet)。path 参数应该是暂存区位置。
类别:警告
描述¶
pyspark.sql.readwriter.DataFrameReader.load (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.load.html) 函数不受支持。替代方案是改用 Snowpark DataFrameReader 方法。
场景¶
Snowpark 中不存在此方法 DataFrameReader.load(path, format, schema, **options) 的 Spark 签名。因此,只要使用 load 函数,输出代码中就会有 EWI。
场景 1¶
输入
以下示例尝试从 CSV 源加载数据。
path_csv_file = "/path/to/file.csv"
schemaParam = StructType([
StructField("Name", StringType(), True),
StructField("Superhero", StringType(), True)
])
my_session.read.load(path_csv_file, "csv").show()
my_session.read.load(path_csv_file, "csv", schema=schemaParam).show()
my_session.read.load(path_csv_file, "csv", schema=schemaParam, lineSep="\r\n", dateFormat="YYYY/MM/DD").show()
输出
SMA 添加了 EWI SPRKPY1082,以提示 Snowpark 不支持此函数,但它有替代方案。
path_csv_file = "/path/to/file.csv"
schemaParam = StructType([
StructField("Name", StringType(), True),
StructField("Superhero", StringType(), True)
])
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.
my_session.read.load(path_csv_file, "csv").show()
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.
my_session.read.load(path_csv_file, "csv", schema=schemaParam).show()
#EWI: The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.
my_session.read.load(path_csv_file, "csv", schema=schemaParam, lineSep="\r\n", dateFormat="YYYY/MM/DD").show()
推荐修复方法
作为一种替代方案,可以改用 Snowpark DataFrameReader 方法。
修复
path和format参数:用
csv方法替换load方法。第一个参数
path必须在暂存区中才能与 Snowpark 等效。
以下示例演示了如何创建临时暂存区并将文件存入其中,然后调用 CSV 方法。
path_csv_file = "/path/to/file.csv"
## Stage creation
temp_stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
my_session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}').show()
my_session.file.put(f"file:///path/to/file.csv", f"@{temp_stage}")
stage_file_path = f"{temp_stage}file.csv"
schemaParam = StructType([
StructField("Name", StringType(), True),
StructField("Superhero", StringType(), True)
])
my_session.read.csv(stage_file_path).show()
修复
schema参数:可以使用 schema 函数设置架构,如下所示:
schemaParam = StructType([
StructField("name", StringType(), True),
StructField("city", StringType(), True)
])
df = my_session.read.schema(schemaParam).csv(temp_stage)
修复
options参数:
Spark 和 Snowpark 之间的 选项 不一样,在本示例中,lineSep 和 dateFormat 被替换为 RECORD_DELIMITER 和 DATE_FORMAT,其他建议 部分中含有包含所有等效值的表格。
以下示例演示了如何使用 RECORD_DELIMITER 和 DATE_FORMAT 创建字典,然后使用该字典调用 options 方法。
optionsParam = {"RECORD_DELIMITER": "\r\n", "DATE_FORMAT": "YYYY/MM/DD"}
df = my_session.read.options(optionsParam).csv(stage)
场景 2¶
输入
以下示例尝试从 JSON 源加载数据。
path_json_file = "/path/to/file.json"
schemaParam = StructType([
StructField("Name", StringType(), True),
StructField("Superhero", StringType(), True)
])
my_session.read.load(path_json_file, "json").show()
my_session.read.load(path_json_file, "json", schema=schemaParam).show()
my_session.read.load(path_json_file, "json", schema=schemaParam, dateFormat="YYYY/MM/DD", timestampFormat="YYYY-MM-DD HH24:MI:SS.FF3").show()
输出
SMA 添加了 EWI SPRKPY1082,以提示 Snowpark 不支持此函数,但它有替代方案。
path_json_file = "/path/to/file.json"
schemaParam = StructType([
StructField("Name", StringType(), True),
StructField("Superhero", StringType(), True)
])
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.
my_session.read.load(path_json_file, "json").show()
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.
my_session.read.load(path_json_file, "json", schema=schemaParam).show()
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.
my_session.read.load(path_json_file, "json", schema=schemaParam, dateFormat="YYYY/MM/DD", timestampFormat="YYYY-MM-DD HH24:MI:SS.FF3").show()
推荐修复方法
作为一种替代方案,可以改用 Snowpark DataFrameReader 方法。
修复
path和format参数:用
json方法替换load方法第一个参数
path必须在暂存区中才能与 Snowpark 等效。
以下示例演示了如何创建临时暂存区并将文件存入其中,然后调用 JSON 方法。
path_json_file = "/path/to/file.json"
## Stage creation
temp_stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
my_session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}').show()
my_session.file.put(f"file:///path/to/file.json", f"@{temp_stage}")
stage_file_path = f"{temp_stage}file.json"
schemaParam = StructType([
StructField("Name", StringType(), True),
StructField("Superhero", StringType(), True)
])
my_session.read.json(stage_file_path).show()
修复
schema参数:可以使用 schema 函数设置架构,如下所示:
schemaParam = StructType([
StructField("name", StringType(), True),
StructField("city", StringType(), True)
])
df = my_session.read.schema(schemaParam).json(temp_stage)
修复
options参数:
Spark 和 Snowpark 之间的 选项 不一样,在本示例中,dateFormat 和 timestampFormat 被替换为 DATE_FORMAT 和 TIMESTAMP_FORMAT,其他建议 部分中含有包含所有等效值的表格。
以下示例演示了如何使用 DATE_FORMAT 和 TIMESTAMP_FORMAT 创建字典,然后使用该字典调用 options 方法。
optionsParam = {"DATE_FORMAT": "YYYY/MM/DD", "TIMESTAMP_FORMAT": "YYYY-MM-DD HH24:MI:SS.FF3"}
df = Session.read.options(optionsParam).json(stage)
场景 3¶
输入
以下示例尝试从 PARQUET 源加载数据。
path_parquet_file = "/path/to/file.parquet"
schemaParam = StructType([
StructField("Name", StringType(), True),
StructField("Superhero", StringType(), True)
])
my_session.read.load(path_parquet_file, "parquet").show()
my_session.read.load(path_parquet_file, "parquet", schema=schemaParam).show()
my_session.read.load(path_parquet_file, "parquet", schema=schemaParam, pathGlobFilter="*.parquet").show()
输出
SMA 添加了 EWI SPRKPY1082,以提示 Snowpark 不支持此函数,但它有替代方案。
path_parquet_file = "/path/to/file.parquet"
schemaParam = StructType([
StructField("Name", StringType(), True),
StructField("Superhero", StringType(), True)
])
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.
my_session.read.load(path_parquet_file, "parquet").show()
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.
my_session.read.load(path_parquet_file, "parquet", schema=schemaParam).show()
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.
my_session.read.load(path_parquet_file, "parquet", schema=schemaParam, pathGlobFilter="*.parquet").show()
推荐修复方法
作为一种替代方案,可以改用 Snowpark DataFrameReader 方法。
修复
path和format参数:用
parquet方法替换load方法第一个参数
path必须在暂存区中才能与 Snowpark 等效。
以下示例演示了如何创建临时暂存区并将文件存入其中,然后调用 PARQUET 方法。
path_parquet_file = "/path/to/file.parquet"
## Stage creation
temp_stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
my_session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}').show()
my_session.file.put(f"file:///path/to/file.parquet", f"@{temp_stage}")
stage_file_path = f"{temp_stage}file.parquet"
schemaParam = StructType([
StructField("Name", StringType(), True),
StructField("Superhero", StringType(), True)
])
my_session.read.parquet(stage_file_path).show()
修复
schema参数:可以使用 schema 函数设置架构,如下所示:
schemaParam = StructType([
StructField("name", StringType(), True),
StructField("city", StringType(), True)
])
df = my_session.read.schema(schemaParam).parquet(temp_stage)
修复
options参数:
Spark 和 Snowpark 之间的 选项 不一样,在本示例中,pathGlobFilter 被替换为 PATTERN,其他建议 部分中含有包含所有等效值的表格。
以下示例演示了如何使用 PATTERN 创建字典,然后使用该字典调用 options 方法。
optionsParam = {"PATTERN": "*.parquet"}
df = Session.read.options(optionsParam).parquet(stage)
其他建议¶
请注意,Spark 和 Snowpark 之间的选项不一样,但它们可以映射:
Spark 选项 |
可能的值 |
Snowpark 等效选项 |
描述 |
|---|---|---|---|
header |
True 或 False |
SKIP_HEADER = 1 / SKIP_HEADER = 0 |
使用文件的第一行作为列名。 |
delimiter |
任何单字符/多字符字段分隔符 |
FIELD_DELIMITER |
指定单个字符/多个字符作为每个列/字段的分隔符。 |
sep |
任何单字符字段分隔符 |
FIELD_DELIMITER |
为每个列/字段指定一个字符作为分隔符。 |
encoding |
UTF-8、UTF-16 等等... |
ENCODING |
按给定的编码类型对 CSV 文件进行解码。默认编码为 UTF-8 |
lineSep |
任何单字符行分隔符 |
RECORD_DELIMITER |
定义应用于文件解析的行分隔符。 |
pathGlobFilter |
文件模式 |
PATTERN |
定义一种模式,以仅读取文件名与模式匹配的文件。 |
recursiveFileLookup |
True 或 False |
不适用 |
以递归方式扫描目录以读取文件。此选项的默认值为 False。 |
quote |
要引用的单个字符 |
FIELD_OPTIONALLY_ENCLOSED_BY |
引用字段/列,其包含定界符/分隔符可以是值的一部分的字段。此字符 To 与 quoteAll 选项一起使用时引用所有字段。此选项的默认值为双引号 (")。 |
nullValue |
用于替换 null 的字符串 |
NULL_IF |
在读取和写入 DataFrame 时用字符串替换空值。 |
dateFormat |
有效的日期格式 |
DATE_FORMAT |
定义一个表示日期格式的字符串。默认格式为 yyyy-MM-dd。 |
timestampFormat |
有效的时间戳格式 |
TIMESTAMP_FORMAT |
定义一个表示时间戳格式的字符串。默认格式为 yyyy-MM-dd 'T'HH:mm:ss。 |
escape |
任何单个字符 |
ESCAPE |
将单个字符设置为转义字符以覆盖默认的转义字符 (\)。 |
inferSchema |
True 或 False |
INFER_SCHEMA |
自动检测文件架构 |
mergeSchema |
True 或 False |
不适用 |
在 Snowflake 中不需要,因为每当 infer_schema 确定 parquet 文件结构时就会发生这种情况 |
对于 modifiedBefore / modifiedAfter 选项,可以通过使用元数据列然后添加诸如以下筛选器以在 Snowflake 中获得相同的结果:
df.filter(METADATA_FILE_LAST_MODIFIED > ‘some_date’)。如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1083¶
消息:pyspark.sql.readwriter.DataFrameWriter.save 函数不受支持。一种替代方案是改用 Snowpark DataFrameWriter copy_into_location 方法。
类别:警告
描述¶
pyspark.sql.readwriter.DataFrameWriter.save (https://spark.apache.org/docs/3.5.3/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.save.html) 函数不受支持。替代方案是改用 Snowpark DataFrameWriter 方法。
场景¶
Snowpark 中不存在此方法 DataFrameWriter.save(path, format, mode, partitionBy, **options) 的 Spark 签名。因此,只要使用 load 函数,输出代码中就会有 EWI。
场景 1¶
输入代码
以下是尝试使用 CSV 格式保存数据的示例。
path_csv_file = "/path/to/file.csv"
data = [
("John", 30, "New York"),
("Jane", 25, "San Francisco")
]
df = my_session.createDataFrame(data, schema=["Name", "Age", "City"])
df.write.save(path_csv_file, format="csv")
df.write.save(path_csv_file, format="csv", mode="overwrite")
df.write.save(path_csv_file, format="csv", mode="overwrite", lineSep="\r\n", dateFormat="YYYY/MM/DD")
df.write.save(path_csv_file, format="csv", mode="overwrite", partitionBy="City", lineSep="\r\n", dateFormat="YYYY/MM/DD")
输出代码
该工具在输出代码中添加了 EWI SPRKPY1083,以提示 Snowpark 不支持此函数,但它有替代方案。
path_csv_file = "/path/to/file.csv"
data = [
("John", 30, "New York"),
("Jane", 25, "San Francisco")
]
df = my_session.createDataFrame(data, schema=["Name", "Age", "City"])
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_csv_file, format="csv")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_csv_file, format="csv", mode="overwrite")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_csv_file, format="csv", mode="overwrite", lineSep="\r\n", dateFormat="YYYY/MM/DD")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_csv_file, format="csv", mode="overwrite", partitionBy="City", lineSep="\r\n", dateFormat="YYYY/MM/DD")
推荐修复方法
作为一种替代方案,可以改用 Snowpark DataFrameWriter 方法。
修复
path和format参数:用 csv 或 copy_into_location 方法替换
load方法。如果您使用的是
copy_into_location方法,则需要使用file_format_type参数指定格式。第一个参数
path必须在暂存区中才能与 Snowpark 等效。
以下示例演示了如何创建临时暂存区并将文件存入其中,然后调用上述方法之一。
data = [
("John", 30, "New York"),
("Jane", 25, "San Francisco")
]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
## Stage creation
temp_stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
my_session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}').show()
my_session.file.put(f"file:///path/to/file.csv", f"@{temp_stage}")
stage_file_path = f"{temp_stage}file.csv"
## Using csv method
df.write.csv(stage_file_path)
## Using copy_into_location method
df.write.copy_into_location(stage_file_path, file_format_type="csv")
修复
mode参数:使用 Snowpark DataFrameWriter 中的 mode 函数,如下所示:
以下示例展示了如何在调用链中添加带有 overwrite 参数的 mode 方法:
data = [
("John", 30, "New York"),
("Jane", 25, "San Francisco")
]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
## Using csv method
df.write.mode("overwrite").csv(temp_stage)
## Using copy_into_location method
df.write.mode("overwrite").copy_into_location(temp_stage, file_format_type="csv")
修复
partitionBy参数:使用
CSV方法中的 partition_by 参数,如下所示:
以下是使用 CSV 方法中的 partition_by 参数的示例。
data = [
("John", 30, "New York"),
("Jane", 25, "San Francisco")
]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
## Using csv method
df.write.csv(temp_stage, partition_by="City")
## Using copy_into_location method
df.write.copy_into_location(temp_stage, file_format_type="csv", partition_by="City")
修复
options参数:使用
CSV方法中的 format_type_options 参数,如下所示:
spark 和 snowpark 之间的选项不一样,在此示例中,lineSep 和 dateFormat 被替换为 RECORD_DELIMITER 和 DATE_FORMAT,其他建议 部分中含有包含所有等效值的表格。
以下示例演示了如何使用 RECORD_DELIMITER 和 DATE_FORMAT 创建字典,然后使用该字典调用 options 方法。
data = [
("John", 30, "New York"),
("Jane", 25, "San Francisco")
]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
optionsParam = {"RECORD_DELIMITER": "\r\n", "DATE_FORMAT": "YYYY/MM/DD"}
## Using csv method
df.write.csv(stage, format_type_options=optionsParam)
## Using copy_into_location method
df.write.csv(stage, file_format_type="csv", format_type_options=optionsParam)
场景 2¶
输入代码
以下是尝试使用 JSON 格式保存数据的示例。
path_json_file = "/path/to/file.json"
data = [
("John", 30, "New York"),
("Jane", 25, "San Francisco")
]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
df.write.save(path_json_file, format="json")
df.write.save(path_json_file, format="json", mode="overwrite")
df.write.save(path_json_file, format="json", mode="overwrite", dateFormat="YYYY/MM/DD", timestampFormat="YYYY-MM-DD HH24:MI:SS.FF3")
df.write.save(path_json_file, format="json", mode="overwrite", partitionBy="City", dateFormat="YYYY/MM/DD", timestampFormat="YYYY-MM-DD HH24:MI:SS.FF3")
输出代码
该工具在输出代码中添加了 EWI SPRKPY1083,以提示 Snowpark 不支持此函数,但它有替代方案。
path_json_file = "/path/to/file.json"
data = [
("John", 30, "New York"),
("Jane", 25, "San Francisco")
]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_json_file, format="json")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_json_file, format="json", mode="overwrite")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_json_file, format="json", mode="overwrite", dateFormat="YYYY/MM/DD", timestampFormat="YYYY-MM-DD HH24:MI:SS.FF3")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_json_file, format="json", mode="overwrite", partitionBy="City", dateFormat="YYYY/MM/DD", timestampFormat="YYYY-MM-DD HH24:MI:SS.FF3")
推荐修复方法
作为一种替代方案,可以改用 Snowpark DataFrameReader 方法。
修复
path和format参数:用 json 或 copy_into_location 方法替换
load方法如果您使用的是
copy_into_location方法,则需要使用file_format_type参数指定格式。第一个参数
path必须在暂存区中才能与 Snowpark 等效。
以下示例演示了如何创建临时暂存区并将文件存入其中,然后调用上述方法之一。
data = [
("John", 30, "New York"),
("Jane", 25, "San Francisco")
]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
## Stage creation
temp_stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
my_session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}').show()
my_session.file.put(f"file:///path/to/file.json", f"@{temp_stage}")
stage_file_path = f"{temp_stage}file.json"
## Using json method
df.write.json(stage_file_path)
## Using copy_into_location method
df.write.copy_into_location(stage_file_path, file_format_type="json")
修复
mode参数:使用 Snowpark DataFrameWriter 中的 mode 函数,如下所示:
以下示例展示了如何在调用链中添加带有 overwrite 参数的 mode 方法:
data = [
("John", 30, "New York"),
("Jane", 25, "San Francisco")
]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
## Using json method
df.write.mode("overwrite").json(temp_stage)
## Using copy_into_location method
df.write.mode("overwrite").copy_into_location(temp_stage, file_format_type="json")
修复
partitionBy参数:使用
CSV方法中的 partition_by 参数,如下所示:
以下是使用 CSV 方法中的 partition_by 参数的示例。
data = [
("John", 30, "New York"),
("Jane", 25, "San Francisco")
]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
## Using json method
df.write.json(temp_stage, partition_by="City")
## Using copy_into_location method
df.write.copy_into_location(temp_stage, file_format_type="json", partition_by="City")
修复
options参数:使用
CSV方法中的 format_type_options 参数,如下所示:
Spark 和 Snowpark 之间的选项不一样,在此示例中,dateFormat 和 timestampFormat 被替换为 DATE_FORMAT 和 TIMESTAMP_FORMAT,其他建议 部分中含有包含所有等效值的表格。
以下示例演示了如何使用 DATE_FORMAT 和 TIMESTAMP_FORMAT 创建字典,然后使用该字典调用 options 方法。
data = [
("John", 30, "New York"),
("Jane", 25, "San Francisco")
]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
optionsParam = {"DATE_FORMAT": "YYYY/MM/DD", "TIMESTAMP_FORMAT": "YYYY-MM-DD HH24:MI:SS.FF3"}
## Using json method
df.write.json(stage, format_type_options=optionsParam)
## Using copy_into_location method
df.write.copy_into_location(stage, file_format_type="json", format_type_options=optionsParam)
场景 3¶
输入代码
以下是尝试使用 PARQUET 格式保存数据的示例。
path_parquet_file = "/path/to/file.parquet"
data = [
("John", 30, "New York"),
("Jane", 25, "San Francisco")
]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
df.write.save(path_parquet_file, format="parquet")
df.write.save(path_parquet_file, format="parquet", mode="overwrite")
df.write.save(path_parquet_file, format="parquet", mode="overwrite", pathGlobFilter="*.parquet")
df.write.save(path_parquet_file, format="parquet", mode="overwrite", partitionBy="City", pathGlobFilter="*.parquet")
输出代码
该工具在输出代码中添加了 EWI SPRKPY1083,以提示 Snowpark 不支持此函数,但它有替代方案。
path_parquet_file = "/path/to/file.parquet"
data = [
("John", 30, "New York"),
("Jane", 25, "San Francisco")
]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_parquet_file, format="parquet")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_parquet_file, format="parquet", mode="overwrite")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_parquet_file, format="parquet", mode="overwrite", pathGlobFilter="*.parquet")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_parquet_file, format="parquet", mode="overwrite", partitionBy="City", pathGlobFilter="*.parquet")
推荐修复方法
作为一种替代方案,可以改用 Snowpark DataFrameReader 方法。
修复
path和format参数:用 parquet 或 copy_into_location 方法替换
load方法。如果您使用的是
copy_into_location方法,则需要使用file_format_type参数指定格式。第一个参数
path必须在暂存区中才能与 Snowpark 等效。
以下示例演示了如何创建临时暂存区并将文件存入其中,然后调用上述方法之一。
data = [
("John", 30, "New York"),
("Jane", 25, "San Francisco")
]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
## Stage creation
temp_stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
my_session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}').show()
my_session.file.put(f"file:///path/to/file.parquet", f"@{temp_stage}")
stage_file_path = f"{temp_stage}file.parquet"
## Using parquet method
df.write.parquet(stage_file_path)
## Using copy_into_location method
df.write.copy_into_location(stage, file_format_type="parquet")
修复
mode参数:使用 Snowpark DataFrameWriter 中的 mode 函数,如下所示:
以下示例展示了如何在调用链中添加带有 overwrite 参数的 mode 方法:
data = [
("John", 30, "New York"),
("Jane", 25, "San Francisco")
]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
## Using parquet method
df.write.mode("overwrite").parquet(temp_stage)
## Using copy_into_location method
df.write.mode("overwrite").copy_into_location(stage, file_format_type="parquet")
修复
partitionBy参数:使用
CSV方法中的 partition_by 参数,如下所示:
以下是使用 parquet 方法中的 partition_by 参数的示例。
data = [
("John", 30, "New York"),
("Jane", 25, "San Francisco")
]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
## Using parquet method
df.write.parquet(temp_stage, partition_by="City")
## Using copy_into_location method
df.write.copy_into_location(stage, file_format_type="parquet", partition_by="City")
修复
options参数:使用
CSV方法中的 format_type_options 参数,如下所示:
Spark 和 Snowpark 之间的选项不一样,在此示例中,pathGlobFilter 被替换为 PATTERN,其他建议 部分中含有包含所有等效值的表格。
以下示例演示了如何使用 PATTERN 创建字典,然后使用该字典调用 options 方法。
data = [
("John", 30, "New York"),
("Jane", 25, "San Francisco")
]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
optionsParam = {"PATTERN": "*.parquet"}
## Using parquet method
df.write.parquet(stage, format_type_options=optionsParam)
## Using copy_into_location method
df.write.copy_into_location(stage, file_format_type="parquet", format_type_options=optionsParam)
其他建议¶
考虑到 Spark 和 Snowpark 之间的选项不一样,但它们可以映射:
Spark 选项 |
可能的值 |
Snowpark 等效选项 |
描述 |
|---|---|---|---|
header |
True 或 False |
SKIP_HEADER = 1 / SKIP_HEADER = 0 |
使用文件的第一行作为列名。 |
delimiter |
任何单字符/多字符字段分隔符 |
FIELD_DELIMITER |
指定单个字符/多个字符作为每个列/字段的分隔符。 |
sep |
任何单字符字段分隔符 |
FIELD_DELIMITER |
为每个列/字段指定一个字符作为分隔符。 |
encoding |
UTF-8、UTF-16 等等... |
ENCODING |
按给定的编码类型对 CSV 文件进行解码。默认编码为 UTF-8 |
lineSep |
任何单字符行分隔符 |
RECORD_DELIMITER |
定义应用于文件解析的行分隔符。 |
pathGlobFilter |
文件模式 |
PATTERN |
定义一种模式,以仅读取文件名与模式匹配的文件。 |
recursiveFileLookup |
True 或 False |
不适用 |
以递归方式扫描目录以读取文件。此选项的默认值为 False。 |
quote |
要引用的单个字符 |
FIELD_OPTIONALLY_ENCLOSED_BY |
引用字段/列,其包含定界符/分隔符可以是值的一部分的字段。此字符 To 与 quoteAll 选项一起使用时引用所有字段。此选项的默认值为双引号 (")。 |
nullValue |
用于替换 null 的字符串 |
NULL_IF |
在读取和写入 DataFrame 时用字符串替换空值。 |
dateFormat |
有效的日期格式 |
DATE_FORMAT |
定义一个表示日期格式的字符串。默认格式为 yyyy-MM-dd。 |
timestampFormat |
有效的时间戳格式 |
TIMESTAMP_FORMAT |
定义一个表示时间戳格式的字符串。默认格式为 yyyy-MM-dd 'T'HH:mm:ss。 |
escape |
任何单个字符 |
ESCAPE |
将单个字符设置为转义字符以覆盖默认的转义字符 (\)。 |
inferSchema |
True 或 False |
INFER_SCHEMA |
自动检测文件架构 |
mergeSchema |
True 或 False |
不适用 |
在 Snowflake 中不需要,因为每当 infer_schema 确定 parquet 文件结构时就会发生这种情况 |
对于 modifiedBefore / modifiedAfter 选项,可以使用元数据列在 Snowflake 中获得相同的结果,然后添加一个筛选器,比如:
df.filter(METADATA_FILE_LAST_MODIFIED > ‘some_date’)。如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1084¶
自 Spark Conversion Core 4.12.0 起,此问题代码已 弃用
消息:pyspark.sql.readwriter.DataFrameWriter.option 不受支持。
类别:警告
描述¶
pyspark.sql.readwriter.DataFrameWriter.option (https://spark.apache.org/docs/3.5.3/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.option.html) 函数不受支持。
场景¶
输入代码
以下示例使用 option 方法,该方法用于在写入 DataFrame 的数据时添加其他配置。
path_csv_file = "/path/to/file.csv"
data = [
("John", 30, "New York"),
("Jane", 25, "San Francisco")
]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
df.write.option("header", True).csv(csv_file_path)
df.write.option("sep", ";").option("lineSep","-").csv(csv_file_path)
输出代码
该工具在输出代码中添加了 EWI SPRKPY1084,以提示 Snowpark 不支持此函数。
path_csv_file = "/path/to/file.csv"
data = [
("John", 30, "New York"),
("Jane", 25, "San Francisco")
]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
#EWI: SPRKPY1084 => The pyspark.sql.readwriter.DataFrameWriter.option function is not supported.
df.write.option("header", True).csv(csv_file_path)
#EWI: SPRKPY1084 => The pyspark.sql.readwriter.DataFrameWriter.option function is not supported.
df.write.option("sep", ";").option("lineSep","-").csv(csv_file_path)
推荐修复方法
pyspark.sql.readwriter.DataFrameWriter.option 方法没有建议的修复方法。
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1085¶
消息:pyspark.ml.feature.VectorAssembler 不受支持。
类别:警告
描述¶
pyspark.ml.feature.VectorAssembler (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.VectorAssembler.html) 不受支持。
场景¶
输入代码
VectorAssembler 用于将几列组合成一个向量。
data = [
(1, 10.0, 20.0),
(2, 25.0, 30.0),
(3, 50.0, 60.0)
]
df = SparkSession.createDataFrame(data, schema=["Id", "col1", "col2"])
vector = VectorAssembler(inputCols=["col1", "col2"], output="cols")
输出代码
该工具在输出代码中添加了此 EWI SPRKPY1085,以提示 Snowpark 不支持此类。
data = [
(1, 10.0, 20.0),
(2, 25.0, 30.0),
(3, 50.0, 60.0)
]
df = spark.createDataFrame(data, schema=["Id", "col1", "col2"])
#EWI: SPRKPY1085 => The pyspark.ml.feature.VectorAssembler function is not supported.
vector = VectorAssembler(inputCols=["col1", "col2"], output="cols")
推荐修复方法
pyspark.ml.feature.VectorAssembler 没有建议的修复方法。
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1086¶
消息:pyspark.ml.linalg.VectorUDT 不受支持。
类别:警告
描述¶
pyspark.ml.linalg.VectorUDT (https://spark.apache.org/docs/latest/api/python/_modules/pyspark/ml/linalg.html) 不受支持。
场景¶
输入代码
VectorUDT 是一种用于表示 DataFrame 中向量列的数据类型。
data = [
(1, Vectors.dense([10.0, 20.0])),
(2, Vectors.dense([25.0, 30.0])),
(3, Vectors.dense([50.0, 60.0]))
]
schema = StructType([
StructField("Id", IntegerType(), True),
StructField("VectorCol", VectorUDT(), True),
])
df = SparkSession.createDataFrame(data, schema=schema)
输出代码
该工具在输出代码中添加了 EWI SPRKPY1086,以提示 Snowpark 不支持此函数。
data = [
(1, Vectors.dense([10.0, 20.0])),
(2, Vectors.dense([25.0, 30.0])),
(3, Vectors.dense([50.0, 60.0]))
]
#EWI: SPRKPY1086 => The pyspark.ml.linalg.VectorUDT function is not supported.
schema = StructType([
StructField("Id", IntegerType(), True),
StructField("VectorCol", VectorUDT(), True),
])
df = spark.createDataFrame(data, schema=schema)
推荐修复方法
pyspark.ml.linalg.VectorUDT 没有建议的修复方法。
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1087¶
消息:pyspark.sql.dataframe.DataFrame.writeTo 函数不受支持,但它有替代方案。
类别:警告。
描述¶
pyspark.sql.dataframe.DataFrame.writeTo (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.writeTo.html) 函数不受支持。替代方案是改为使用 Snowpark DataFrameWriter SaveAsTable 方法。
场景¶
输入
以下示例使用 pyspark.sql.dataframe.DataFrame.writeTo 函数,DataFrame df 被写入名为 Personal_info 的表中。
df = spark.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]],
schema=["FIRST_NAME", "LAST_NAME"])
df.writeTo("Personal_info")
输出
SMA 在输出代码中添加了 EWI SPRKPY1087,以提示不支持此函数,但它有替代方案。
df = spark.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]],
schema=["FIRST_NAME", "LAST_NAME"])
#EWI: SPRKPY1087 => pyspark.sql.dataframe.DataFrame.writeTo is not supported, but it has a workaround.
df.writeTo("Personal_info")
推荐修复方法
替代方案是改用 Snowpark DataFrameWriter SaveAsTable 方法。
df = spark.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]],
schema=["FIRST_NAME", "LAST_NAME"])
df.write.saveAsTable("Personal_info")
其他建议¶
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1088¶
消息:Snowpark 中的 pyspark.sql.readwriter.DataFrameWriter.option 值可能不同,因此可能需要进行验证。
类别:警告
描述¶
Snowpark 中的 pyspark.sql.readwriter.DataFrameWriter.option (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.option.html) 值可能不同,因此可能需要进行验证以确保行为正确无误。
场景¶
有一些场景,具体取决于是否支持该选项,或者写入文件时使用的格式。
场景 1¶
输入
下面的示例展示了如何使用方法选项,添加了 sep 选项,其当前为 supported。
df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])
df.write.option("sep", ",").csv("some_path")
输出
该工具添加了 EWI SPRKPY1088,提示需要进行验证。
df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])
#EWI: SPRKPY1088 => The pyspark.sql.readwriter.DataFrameWriter.option values in Snowpark may be different, so required validation might be needed.
df.write.option("sep", ",").csv("some_path")
推荐修复方法
Snowpark API 支持此参数,因此唯一的操作是在迁移后检查行为。请参阅 等效选项表 以查看支持的参数。
df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])
#EWI: SPRKPY1088 => The pyspark.sql.readwriter.DataFrameWriter.option values in Snowpark may be different, so required validation might be needed.
df.write.option("sep", ",").csv("some_path")
场景 2¶
输入
下面的场景展示了如何使用选项,但添加了一个 header 选项,其为 not supported。
df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])
df.write.option("header", True).csv("some_path")
输出
该工具添加了 EWI SPRKPY1088,提示需要进行验证。
df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])
#EWI: SPRKPY1088 => The pyspark.sql.readwriter.DataFrameWriter.option values in Snowpark may be different, so required validation might be needed.
df.write.option("header", True).csv("some_path")
推荐修复方法
对于此场景,建议对 Snowpark 格式类型选项 求值,看看是否可以根据需要进行更改。另外,检查更改后的行为。
df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])
#EWI: SPRKPY1088 => The pyspark.sql.readwriter.DataFrameWriter.option values in Snowpark may be different, so required validation might be needed.
df.write.csv("some_path")
场景 3¶
输入
此场景添加一个 sep 选项,其为 supported 并使用 JSON 方法。
注意:此场景也适用于
PARQUET。
df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])
df.write.option("sep", ",").json("some_path")
输出
该工具添加了 EWI SPRKPY1088,提示需要进行验证。
df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])
#EWI: SPRKPY1088 => The pyspark.sql.readwriter.DataFrameWriter.option values in Snowpark may be different, so required validation might be needed.
df.write.option("sep", ",").json("some_path")
推荐修复方法
文件格式 JSON 不支持参数 sep,因此建议对 Snowpark 格式类型选项 进行求值,看看是否可以根据需要对其进行更改。另外,检查更改后的行为。
df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])
#EWI: SPRKPY1088 => The pyspark.sql.readwriter.DataFrameWriter.option values in Snowpark may be different, so required validation might be needed.
df.write.json("some_path")
其他建议¶
由于有一些
not supported参数,建议查看table of equivalences并检查转换后的行为。等效选项表:
PySpark 选项 |
SnowFlake 选项 |
支持的文件格式 |
描述 |
|---|---|---|---|
SEP |
FIELD_DELIMITER |
CSV |
一个或多个单字节或多字节字符,用于分隔输入文件中的字段。 |
LINESEP |
RECORD_DELIMITER |
CSV |
用于分隔输入文件中记录的一个或多个字符。 |
QUOTE |
FIELD_OPTIONALLY_ENCLOSED_BY |
CSV |
用于包围字符串的字符。 |
NULLVALUE |
NULL_IF |
CSV |
用于与 SQL NULL 相互转换的字符串。 |
DATEFORMAT |
DATE_FORMAT |
CSV |
定义要加载的数据文件中日期值格式的字符串。 |
TIMESTAMPFORMAT |
TIMESTAMP_FORMAT |
CSV |
定义要加载的数据文件中时间戳值格式的字符串。 |
如果使用的参数不在列表中,则 API 会抛出错误。
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1089¶
消息:Snowpark 中的 pyspark.sql.readwriter.DataFrameWriter.options 值可能不同,因此可能需要进行验证。
类别:警告
描述¶
Snowpark 中的 pyspark.sql.readwriter.DataFrameWriter.options (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.options.html) 值可能不同,因此可能需要进行验证以确保行为正确无误。
场景¶
有一些场景,具体取决于是否支持这些选项,或者写入文件时使用的格式。
场景 1¶
输入
下面的示例展示了如何使用方法选项,添加了 sep 和 nullValue 选项,当前均为 supported。
df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])
df.write.options(nullValue="myVal", sep=",").csv("some_path")
输出
该工具添加了 EWI SPRKPY1089,提示需要进行验证。
df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])
#EWI: SPRKPY1089 => The pyspark.sql.readwriter.DataFrameWriter.options values in Snowpark may be different, so required validation might be needed.
df.write.options(nullValue="myVal", sep=",").csv("some_path")
推荐修复方法
Snowpark API 支持这些参数,因此唯一的操作是在迁移后检查行为。请参阅 等效选项表 以查看支持的参数。
df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])
#EWI: SPRKPY1089 => The pyspark.sql.readwriter.DataFrameWriter.options values in Snowpark may be different, so required validation might be needed.
df.write.options(nullValue="myVal", sep=",").csv("some_path")
场景 2¶
输入
下面的场景展示使用了 options,但添加了一个 not supported 的 header 选项。
df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])
df.write.options(header=True, sep=",").csv("some_path")
输出
该工具添加了 EWI SPRKPY1089,提示需要进行验证。
df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])
#EWI: SPRKPY1089 => The pyspark.sql.readwriter.DataFrameWriter.options values in Snowpark may be different, so required validation might be needed.
df.write.options(header=True, sep=",").csv("some_path")
推荐修复方法
对于此场景,建议对 Snowpark 格式类型选项 求值,看看是否可以根据需要进行更改。另外,检查更改后的行为。
df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])
#EWI: SPRKPY1089 => The pyspark.sql.readwriter.DataFrameWriter.options values in Snowpark may be different, so required validation might be needed.
df.write.csv("some_path")
场景 3¶
输入
此场景添加一个 sep 选项,其为 supported 并使用 JSON 方法。
df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])
df.write.options(nullValue="myVal", sep=",").json("some_path")
输出
该工具添加了 EWI SPRKPY1089,提示需要进行验证。
注意:此场景也适用于
PARQUET。
df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])
#EWI: SPRKPY1089 => The pyspark.sql.readwriter.DataFrameWriter.options values in Snowpark may be different, so required validation might be needed.
df.write.options(nullValue="myVal", sep=",").json("some_path")
推荐修复方法
文件格式 JSON 不支持参数 sep,因此建议对 Snowpark 格式类型选项 进行求值,看看是否可以根据需要对其进行更改。另外,检查更改后的行为。
df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])
#EWI: SPRKPY1089 => The pyspark.sql.readwriter.DataFrameWriter.options values in Snowpark may be different, so required validation might be needed.
df.write.json("some_path")
其他建议¶
由于有一些
not supported参数,建议查看table of equivalences并检查转换后的行为。等效选项表:
Snowpark 可以支持某些参数的 等效选项 列表:
PySpark 选项 |
SnowFlake 选项 |
支持的文件格式 |
描述 |
|---|---|---|---|
SEP |
FIELD_DELIMITER |
CSV |
一个或多个单字节或多字节字符,用于分隔输入文件中的字段。 |
LINESEP |
RECORD_DELIMITER |
CSV |
用于分隔输入文件中记录的一个或多个字符。 |
QUOTE |
FIELD_OPTIONALLY_ENCLOSED_BY |
CSV |
用于包围字符串的字符。 |
NULLVALUE |
NULL_IF |
CSV |
用于与 SQL NULL 相互转换的字符串。 |
DATEFORMAT |
DATE_FORMAT |
CSV |
定义要加载的数据文件中日期值格式的字符串。 |
TIMESTAMPFORMAT |
TIMESTAMP_FORMAT |
CSV |
定义要加载的数据文件中时间戳值格式的字符串。 |
如果使用的参数不在列表中,则 API 会抛出错误。
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们,或者 在 SMA 中 提交问题。
SPRKPY1101¶
类别¶
解析错误。
描述¶
当该工具识别出解析错误时,它会尝试从错误中恢复并在下一行继续该过程。在这种情况下,它会在该行中显示错误和注释。
下面的示例说明如何处理空格和制表符之间的不匹配错误。
输入代码
def foo():
x = 5 # Spaces
y = 6 # Tab
def foo2():
x=6
y=7
输出代码
def foo():
x = 5 # Spaces
## EWI: SPRKPY1101 => Unrecognized or invalid CODE STATEMENT @(3, 2). Last valid token was '5' @(2, 9), failed token 'y' @(3, 2)
## y = 6 # Tab
def foo2():
x=6
y=7
建议¶
尝试修复带注释的行。
如需更多支持,请发送电子邮件至 sma-support@snowflake.com 联系我们。如果您与 Snowflake 签订了支持合同,请联系您的销售工程师,他们可以指导您的支持需求。