Snowpark Migration Accelerator:Python 的问题代码

SPRKPY1000

消息:源项目 spark-core 版本为 xxx.xx:xx.x.x,Snowpark 支持的 Snowpark 核心版本为 2.12:3.1.2,因此现有映射之间可能存在功能差异。

类别:警告。

描述

当源代码的 Pyspark 版本不受支持时,就会出现此问题。这意味着现有映射之间可能存在功能差异。

其他建议

  • SMA 扫描的 pyspark 兼容 Snowpark 的版本范围为 2.12 至 3.1.2。如果您使用的版本不在此范围内,该工具可能产生不一致的分析结果。您可以修改正在扫描的源代码的版本。

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA (https://snowflakecomputing.atlassian.net/o/-MB4z_O8Sl--Tfl3XVml/s/6on4bNAZUZGzMpdEum8X/~/changes/371/user-guide/project-overview/configuration-and-settings#report-an-issue).

SPRKPY1001

Message**:** This code section has parsing errors

Category**:** Parsing error.

描述

当 Snowpark Migration Accelerator (SMA) 无法正确读取或理解文件中的代码(它无法正确“解析”文件)时,就会报告解析错误。当文件出现一个或多个解析错误时,就会出现此问题代码。

场景

输入: 当代码的语法无效时,会出现 EWI 消息,例如:

def foo():
    x = %%%%%%1###1

输出: SMA 发现解析错误并注释解析错误,添加了相应的 EWI 消息:

def foo():
    x
## EWI: SPRKPY1101 => Unrecognized or invalid CODE STATEMENT @(2, 7). Last valid token was 'x' @(2, 5), failed token '=' @(2, 7)
##      = %%%%%%1###1

其他建议

  • Check that the file contains valid Python code. (You can use the issues.csv file to find all files with this EWI code to determine which file(s) were not processed by the tool due to parsing error(s).) Many parsing errors occur because only part of the code is input into the tool, so it's bets to ensure that the code will run in the source. If it is valid, report that you encountered a parsing error using the Report an Issue option in the SMA. Include the line of code that was causing a parsing error in the description when you file this issue.

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1002

Message**:** < element > is not supported,Spark element is not supported.

Category**:** Conversion error.

描述

当该工具检测到使用了 Snowpark 不支持的元素,就会出现此问题,并且它没有自己的关联错误代码。这是 SMA 对不受支持的元素使用的通用错误代码。

其他建议

  • 尽管不支持消息中的选项或元素,但这并不意味着找不到解决方案。这仅意味着该工具本身无法找到解决方案。

  • 如果您遇到了 pyspark.ml 库中不受支持的元素,请考虑备选方法。还有其他指南可以解决与机器学习相关的问题,例如 Snowflake 的这篇指南。

  • 检查源代码的语法是否正确。(您可以使用 issues.csv 文件来确定转换错误发生的位置。)如果语法正确,请使用 SMA 中的“报告问题”选项报告您在特定元素上遇到的转换错误。在提交此问题时,请在描述中包括导致错误的代码行。

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1003

Message**:** An error occurred when loading the symbol table.

Category**:** Conversion error.

描述

当处理符号表中的符号期间出错时,就会出现此问题。符号表是 SMA 基础架构的一部分,允许进行更复杂的转换。此错误可能是源代码中的意外语句造成的。

其他建议

  • This is unlikely to be an error in the source code itself, but rather is an error in how the tool processes the source code. The best resolution would be to post an issue in the SMA (https://snowflakecomputing.atlassian.net/o/-MB4z_O8Sl--Tfl3XVml/s/6on4bNAZUZGzMpdEum8X/~/changes/371/user-guide/project-overview/configuration-and-settings#report-an-issue).

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA (https://snowflakecomputing.atlassian.net/o/-MB4z_O8Sl--Tfl3XVml/s/6on4bNAZUZGzMpdEum8X/~/changes/371/user-guide/project-overview/configuration-and-settings#report-an-issue).

SPRKPY1004

Message**:** The symbol table could not be loaded.

Category**:** Parsing error.

描述

当工具执行过程中出现意外错误时,就会出现此问题。由于无法加载符号表,该工具无法启动评估或转换过程。

其他建议

  • This is unlikely to be an error in the source code itself, but rather is an error in how the tool processes the source code. The best resolution would be to reach out to the SMA support team. You can email us at sma-support@snowflake.com.

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA (https://snowflakecomputing.atlassian.net/o/-MB4z_O8Sl--Tfl3XVml/s/6on4bNAZUZGzMpdEum8X/~/changes/371/user-guide/project-overview/configuration-and-settings#report-an-issue).

SPRKPY1005

警告

This issue code has been deprecated since Spark Conversion Core Version 4.8.0

Message**:** pyspark.conf.SparkConf is not required

Category**:** Warning.

描述

This issue appears when the tool detects the usage of pyspark.conf.SparkConf (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkConf.html) which is not required.

场景

输入

SparkConf 可以在不带参数调用,也可以带 loadDefaults 调用。

from pyspark import SparkConf

my_conf = SparkConf(loadDefaults=True)

输出

For both cases (with or without parameters) SMA creates a Snowpark Session.builder object:

#EWI: SPRKPY1005 => pyspark.conf.SparkConf is not required
#from pyspark import SparkConf
pass

#EWI: SPRKPY1005 => pyspark.conf.SparkConf is not required
my_conf = Session.builder.configs({"user" : "my_user", "password" : "my_password", "account" : "my_account", "role" : "my_role", "warehouse" : "my_warehouse", "database" : "my_database", "schema" : "my_schema"}).create()

其他建议

  • 此参数并非必要参数,移除它会插入警告注释。用户不应执行任何其他操作。

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1006

警告

This issue code has been deprecated since Spark Conversion Core Version 4.8.0

Message**:** pyspark.context.SparkContext is not required

Category**:** Warning.

描述

This issue appears when the tool detects the usage of pyspark.context.SparkContext (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.html), which is not required in Snowflake.

场景

输入

在此示例中,有两个上下文用于创建与 Spark 集群的连接

from pyspark import SparkContext

sql_context1 = SparkContext(my_sc1)
sql_context2 = SparkContext(sparkContext=my_sc2)

输出

由于 Snowflake 上没有集群,因此不需要上下文,因此请注意,包含 Spark 属性的变量 my_sc1 和 my_sc2 可能并非必需,或者需要对其进行调整以修复代码。

from snowflake.snowpark import Session
#EWI: SPRKPY1006 => pyspark.sql.context.SparkContext is not required
sql_context1 = my_sc1
#EWI: SPRKPY1006 => pyspark.sql.context.SparkContext is not required

sql_context2 = my_sc2

其他建议

  • 此参数并非必要参数,移除它会插入警告注释。用户不应执行任何操作。

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1007

警告

This issue code has been deprecated since Spark Conversion Core Version 4.8.0

Message**:** pyspark.sql.context.SQLContext is not required

Category**:** Warning.

描述

This issue appears when the tool detects the usage of pyspark.sql.context.SQLContext (https://downloads.apache.org/spark/docs/1.6.1/api/python/pyspark.sql.html), which is not required.

场景

输入

下面的示例展示了不同 SparkContext 过载。

from pyspark import SQLContext

my_sc1 = SQLContext(myMaster, myAppName, mySparkHome, myPyFiles, myEnvironment, myBatctSize, mySerializer, my_conf1)
my_sc2 = SQLContext(conf=my_conf2)
my_sc3 = SQLContext()

输出

输出代码为 pyspark.SQLContext 的相应行添加了注释,并将这些场景替换为对配置的引用。请注意,包含 Spark 属性的变量 my_sc1 和 my_sc2 可能并非必需,或者需要对其进行调整以修复代码。

#EWI: SPRKPY1007 => pyspark.sql.context.SQLContext is not required
#from pyspark import SQLContext
pass

#EWI: SPRKPY1007 => pyspark.sql.context.SQLContext is not required
sql_context1 = my_sc1
#EWI: SPRKPY1007 => pyspark.sql.context.SQLContext is not required
sql_context2 = my_sc2

其他建议

  • 此参数为非必要参数,工具已将其移除并在源代码中插入警告注释。用户不应执行任何操作。

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1008

消息:pyspark.sql.context.HiveContext 并非必需

类别:警告。

描述

This issue appears when the tool detects the usage of pyspark.sql.context.HiveContext (https://downloads.apache.org/spark/docs/1.6.1/api/python/pyspark.sql.html#pyspark.sql.HiveContext), which is not required.

场景

输入

在此示例中,将演示如何创建与 Hive 数据仓库的连接。

from pyspark.sql import HiveContext
hive_context = HiveContext(sc)
df = hive_context.table("myTable")
df.show()

输出

In Snowflake there are not Hive stores, so the Hive Context is not required, You can still use parquet files on Snowflake please check this tutorial to learn how.

#EWI: SPRKPY1008 => pyspark.sql.context.HiveContext is not required
hive_context = sc
df = hive_context.table("myTable")
df.show()

the sc variable refers to a Snow Park Session Object

推荐修复方法

For the output code in the example you should add the Snow Park Session Object similar to this code:

## Here manually we can add the Snowpark Session object via a json config file called connection.json
import json
from snowflake.snowpark import Session
jsonFile = open("connection.json")
connection_parameter = json.load(jsonFile)
jsonFile.close()
sc = Session.builder.configs(connection_parameter).getOrCreate()

hive_context = sc
df = hive_context.table("myTable")
df.show()

其他建议

SPRKPY1009

Message**:** pyspark.sql.dataframe.DataFrame.approxQuantile has a workaround

Category**:** Warning.

描述

This issue appears when the tool detects the usage of pyspark.sql.dataframe.DataFrame.approxQuantile (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.approxQuantile.html) which has a workaround.

场景

输入

It's important understand that Pyspark uses two different approxQuantile functions, here we use the DataFrame approxQuantile (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.approxQuantile.html) version

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
data = [['Sun', 10],
        ['Mon', 64],
        ['Thr', 12],
        ['Wen', 15],
        ['Thu', 68],
        ['Fri', 14],
        ['Sat', 13]]

columns = ['Day', 'Ammount']
df = spark.createDataFrame(data, columns)
df.approxQuantile('Ammount', [0.25, 0.5, 0.75], 0)

输出

SMA 会在使用 approxQuantile 的代码行上方返回 EWI SPRKPY1009,以便您可以用来识别需要修改的位置。

from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Sun', 10],
        ['Mon', 64],
        ['Thr', 12],
        ['Wen', 15],
        ['Thu', 68],
        ['Fri', 14],
        ['Sat', 13]]

columns = ['Day', 'Ammount']
df = spark.createDataFrame(data, columns)
#EWI: SPRKPY1009 => pyspark.sql.dataframe.DataFrame.approxQuantile has a workaround, see documentation for more info
df.approxQuantile('Ammount', [0.25, 0.5, 0.75], 0)

推荐修复方法

Use Snowpark approxQuantile method. Some parameters don't match so they require some manual adjustments. for the output code's example a recommended fix could be:

from snowflake.snowpark import Session
...
df = spark.createDataFrame(data, columns)

df.stat.approx_quantile('Ammount', [0.25, 0.5, 0.75])

pyspark.sql.dataframe.DataFrame.approxQuantile 的 relativeError 参数在 SnowPark 中不存在。

其他建议

SPRKPY1010

消息:pyspark.sql.dataframe.DataFrame.checkpoint 有替代方案

类别:警告。

描述

This issue appears when the tool detects the usage of pyspark.sql.dataframe.DataFrame.checkpoint (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.checkpoint.html) which has a workaround.

场景

输入

在 PySpark 中,检查点用于截断 DataFrame 的逻辑计划,这是为了避免逻辑计划的增长。

import tempfile
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
data = [['Q1', 300000],
        ['Q2', 60000],
        ['Q3', 500002],
        ['Q4', 130000]]

columns = ['Quarter', 'Score']
df = spark.createDataFrame(data, columns)
with tempfile.TemporaryDirectory() as d:
    spark.sparkContext.setCheckpointDir("/tmp/bb")
    df.checkpoint(False)

输出

SMA returns the EWI SPRKPY1010 over the line where approxQuantile is used, so you can use to identify where to fix. Note that also marks the setCheckpointDir (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.setCheckpointDir.html) as unsupported, but a checpointed directory is not required for the fix.

import tempfile
from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 300000],
        ['Q2', 60000],
        ['Q3', 500002],
        ['Q4', 130000]]

columns = ['Quarter', 'Score']
df = spark.createDataFrame(data, columns)
with tempfile.TemporaryDirectory() as d:
    #EWI: SPRKPY1002 => pyspark.context.SparkContext.setCheckpointDir is not supported
    spark.setCheckpointDir("/tmp/bb")
    #EWI: SPRKPY1010 => pyspark.sql.dataframe.DataFrame.checkpoint has a workaround, see documentation for more info
    df.checkpoint(False)

推荐修复方法

无需显式设置检查点,这是因为 Snowpark 使用基于 SQL 的操作,这些操作经过 Snowflake 查询优化引擎优化,从而避免了不必要的计算或逻辑计划失控增长的问题。

However there could be scenarios where you would require persist the result of a computation on a dataframe. In this scenarios you can save materialize the results by writing the dataframe on a Snowflake Table or in a Snowflake Temporary Table.

  • 通过使用永久表,即使在会话结束之后,也可以随时访问计算结果。

from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 300000],
        ['Q2', 60000],
        ['Q3', 500002],
        ['Q4', 130000]]

columns = ['Quarter', 'Score']
df = spark.createDataFrame(data, columns)
df.write.save_as_table("my_table", table_type="temporary") # Save the dataframe into Snowflake table "my_table".
df2 = Session.table("my_table") # Now I can access the stored result quering the table "my_table"
  • 对于另一种修复方法,使用临时表的优点是在会话结束后删除该表:

from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 300000],
        ['Q2', 60000],
        ['Q3', 500002],
        ['Q4', 130000]]

columns = ['Quarter', 'Score']
df = spark.createDataFrame(data, columns)
df.write.save_as_table("my_temp_table", table_type="temporary") # Save the dataframe into Snowflake table "my_temp_table".
df2 = Session.table("my_temp_table") # Now I can access the stored result quering the table "my_temp_table"

其他建议

SPRKPY1011

消息:pyspark.sql.dataframe.DataFrameStatFunctions.approxQuantile 有替代方案

类别:警告。

描述

This issue appears when the tool detects the usage of pyspark.sql.dataframe.DataFrameStatFunctions.approxQuantile (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameStatFunctions.approxQuantile.html#pyspark.sql.DataFrameStatFunctions.approxQuantile) which has a workaround.

场景

输入

It's important understand that Pyspark uses two different approxQuantile functions, here we use the DataFrameStatFunctions approxQuantile (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameStatFunctions.approxQuantile.html#pyspark.sql.DataFrameStatFunctions.approxQuantile) version.

import tempfile
from pyspark.sql import SparkSession, DataFrameStatFunctions
spark = SparkSession.builder.getOrCreate()
data = [['Q1', 300000],
        ['Q2', 60000],
        ['Q3', 500002],
        ['Q4', 130000]]

columns = ['Quarter', 'Gain']
df = spark.createDataFrame(data, columns)
aprox_quantille = DataFrameStatFunctions(df).approxQuantile('Gain', [0.25, 0.5, 0.75], 0)
print(aprox_quantille)

输出

SMA 会在使用 approxQuantile 的代码行上方返回 EWI SPRKPY1011,以便您可以用来识别需要修改的位置。

import tempfile
from snowflake.snowpark import Session, DataFrameStatFunctions
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 300000],
        ['Q2', 60000],
        ['Q3', 500002],
        ['Q4', 130000]]

columns = ['Quarter', 'Gain']
df = spark.createDataFrame(data, columns)
#EWI: SPRKPY1011 => pyspark.sql.dataframe.DataFrameStatFunctions.approxQuantile has a workaround, see documentation for more info
aprox_quantille = DataFrameStatFunctions(df).approxQuantile('Gain', [0.25, 0.5, 0.75], 0)

推荐修复方法

You can use Snowpark approxQuantile method. Some parameters don't match so they require some manual adjustments. for the output code's example a recommended fix could be:

from snowflake.snowpark import Session # remove DataFrameStatFunctions because is not required
...
df = spark.createDataFrame(data, columns)

aprox_quantille = df.stat.approx_quantile('Ammount', [0.25, 0.5, 0.75])

pyspark.sql.dataframe.DataFrame.approxQuantile 的 relativeError 参数在 SnowPark 中不存在。

其他建议

SPRKPY1012

警告

此问题代码已 弃用

消息:pyspark.sql.dataframe.DataFrameStatFunctions.writeTo 有替代方案

类别:警告。

描述

This issue appears when the tool detects the usage of pyspark.sql.dataframe.DataFrameStatFunctions.writeTo (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.writeTo.html) which has a workaround.

场景

输入

在此示例中,将 DataFrame df 写入 Spark 表“table”。

writer = df.writeTo("table")

输出

SMA 会在使用 DataFrameStatFunctions.writeTo 的代码行上方返回 EWI SPRKPY1012,以便您可以用来识别需要修改的位置。

#EWI: SPRKPY1012 => pyspark.sql.dataframe.DataFrameStatFunctions.writeTo has a workaround, see documentation for more info
writer = df.writeTo("table")

建议的修复方法

改用 df.write.SaveAsTable()。

import df.write as wt
writer = df.write.save_as_table(table)

其他建议

SPRKPY1013

消息:pyspark.sql.functions.acosh 有替代方案

类别:警告。

描述

This issue appears when the tool detects the usage of pyspark.sql.functions.acosh (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.acosh.html) which has a workaround.

场景

输入

On this example pyspark calculates the acosh for a dataframe by using pyspark.sql.functions.acosh (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.acosh.html)

from pyspark.sql import SparkSession
from pyspark.sql.functions import acosh
spark = SparkSession.builder.getOrCreate()
data = [['V1', 30],
        ['V2', 60],
        ['V3', 50],
        ['V4', 13]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
df_with_acosh = df.withColumn("acosh_value", acosh(df["value"]))

输出

SMA 会在使用 acosh 的代码行上方返回 EWI SPRKPY1013,以便您可以用来识别需要修改的位置。

from snowflake.snowpark import Session

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['V1', 30],
        ['V2', 60],
        ['V3', 50],
        ['V4', 13]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
#EWI: SPRKPY1013 => pyspark.sql.functions.acosh has a workaround, see documentation for more info
df_with_acosh = df.withColumn("acosh_value", acosh(df["value"]))

推荐修复方法

There is no direct "acosh" implementation but "call_function" can be used instead, using "acosh" as the first parameter, and colName as the second one.

import snowflake.snowpark as snowpark
from snowflake.snowpark import Session
from snowflake.snowpark.functions import call_function, col

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['V1', 30],
        ['V2', 60],
        ['V3', 50],
        ['V4', 13]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
df_with_acosh = df.select(call_function('ACOSH', col('value')))

其他建议

SPRKPY1014

消息:pyspark.sql.functions.asinh 有替代方案

类别:警告。

描述

This issue appears when the tool detects the usage of pyspark.sql.functions.asinh (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.asinh.html) which has a workaround.

场景

输入

On this example pyspark calculates the asinh for a dataframe by using pyspark.sql.functions.asinh (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.asinh.html).

from pyspark.sql import SparkSession
from pyspark.sql.functions import asinh
spark = SparkSession.builder.getOrCreate()
data = [['V1', 3.0],
        ['V2', 60.0],
        ['V3', 14.0],
        ['V4', 3.1]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
df_result = df.withColumn("asinh_value", asinh(df["value"]))

输出

SMA 会在使用 asinh 的代码行上方返回 EWI SPRKPY1014,以便您可以用来识别需要修改的位置。

from snowflake.snowpark import Session

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['V1', 3.0],
        ['V2', 60.0],
        ['V3', 14.0],
        ['V4', 3.1]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
#EWI: SPRKPY1014 => pyspark.sql.functions.asinh has a workaround, see documentation for more info
df_result = df.withColumn("asinh_value", asinh(df["value"]))

建议的修复方法

There is no direct "asinh" implementation but "call_function" can be used instead, using "asinh" as the first parameter, and colName as the second one.

import snowflake.snowpark as snowpark
from snowflake.snowpark import Session
from snowflake.snowpark.functions import call_function, col

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['V1', 3.0],
        ['V2', 60.0],
        ['V3', 14.0],
        ['V4', 3.1]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
df_result = df.select(call_function('asinh', col('value')))

其他建议

SPRKPY1015

消息:pyspark.sql.functions.atanh 有替代方案

类别:警告。

描述

This issue appears when the tool detects the usage of pyspark.sql.functions.atanh (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.atanh.html) which has a workaround.

场景

输入

On this example pyspark calculates the atanh for a dataframe by using pyspark.sql.functions.atanh (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.atanh.html).

from pyspark.sql import SparkSession
from pyspark.sql.functions import atanh
spark = SparkSession.builder.getOrCreate()
data = [['V1', 0.14],
        ['V2', 0.32],
        ['V3', 0.4],
        ['V4', -0.36]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
df_result = df.withColumn("atanh_value", atanh(df["value"]))

输出

SMA 会在使用 atanh 的代码行上方返回 EWI SPRKPY1015,以便您可以用来识别需要修改的位置。

from snowflake.snowpark import Session

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['V1', 0.14],
        ['V2', 0.32],
        ['V3', 0.4],
        ['V4', -0.36]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
#EWI: SPRKPY1015 => pyspark.sql.functions.atanh has a workaround, see documentation for more info
df_result = df.withColumn("atanh_value", atanh(df["value"]))

推荐修复方法

There is no direct "atanh" implementation but "call_function" can be used instead, using "atanh" as the first parameter, and colName as the second one.

import snowflake.snowpark as snowpark
from snowflake.snowpark import Session
from snowflake.snowpark.functions import call_function, col

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['V1', 0.14],
        ['V2', 0.32],
        ['V3', 0.4],
        ['V4', -0.36]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
df_result = df.select(call_function('atanh', col('value')))

其他建议

SPRKPY1016

警告

This issue code has been deprecated since Spark Conversion Core Version 0.11.7

消息:pyspark.sql.functions.collect_set 有替代方案

类别:警告。

描述

This issue appears when the tool detects the usage of pyspark.sql.functions.collect_set (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.collect_set.html) which has a workaround.

场景

输入

Using collect*set to get the elements of _colname* without duplicates:

col = collect_set(colName)

输出

SMA 会在使用 collect_set 的代码行上方返回 EWI SPRKPY1016,以便您可以用来识别需要修改的位置。

#EWI: SPRKPY1016 => pyspark.sql.functions.collect_set has a workaround, see documentation for more info
col = collect_set(colName)

推荐修复方法

使用函数 array_agg,然后添加第二个值为 True 的实参。

col = array_agg(col, True)

其他建议

SPRKPY1017

警告

This issue code has been deprecated since Spark Conversion Core Version 4.8.0

pyspark.sql.functions.date_add 有替代方案

类别:警告。

描述

This issue appears when the tool detects the usage of pyspark.sql.functions.date_add (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.date_add.html) which has a workaround.

场景

输入

在此示例中,使用 date_add 来为 DataFrame df 计算当前日期后 5 天的日期。

col = df.select(date_add(df.colName, 5))

输出

SMA 会在使用 date_add 的代码行上方返回 EWI SPRKPY1017 警告,以便您可以用来识别需要修改的位置。

#EWI: SPRKPY1017 => pyspark.sql.functions.date_add has a workaround, see documentation for more info
col = df.select(date_add(df.colName, 5))

推荐修复方法

Import snowflake.snowpark.functions, which contains an implementation for date_add (and alias dateAdd) function.

from snowflake.snowpark.functions import date_add

col = df.select(date_add(df.dt, 1))

其他建议

SPRKPY1018

警告

This issue code has been deprecated since Spark Conversion Core Version 4.8.0

消息:pyspark.sql.functions.date_sub 有替代方案

类别:警告。

描述

This issue appears when the tool detects the usage of pyspark.sql.functions.date_sub (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.date_sub.html) which has a workaround.

场景

输入

在此示例中,使用 date_add 来为 DataFrame df 计算当前日期前 5 天的日期。

col = df.select(date_sub(df.colName, 5))

输出

SMA 会在使用 date_sub 的代码行上方返回 EWI SPRKPY1018 警告,以便您可以用来识别需要修改的位置。

#EWI: SPRKPY1018 => pyspark.sql.functions.date_sub has a workaround, see documentation for more info
col = df.select(date_sub(df.colName, 5))

推荐修复方法

Import snowflake.snowpark.functions, which contains an implementation for date_sub function.

from pyspark.sql.functions import date_sub
df.withColumn("date", date_sub(df.colName, 5))

其他建议

SPRKPY1019

警告

This issue code has been deprecated since Spark Conversion Core Version 4.8.0

消息:pyspark.sql.functions.datediff 有替代方案

类别:警告。

描述

This issue appears when the tool detects the usage of pyspark.sql.functions.datediff (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.datediff.html) which has a workaround.

场景

输入

在此示例中,我们使用 datediff 来计算与 'today' 和其他日期的天数差。

contacts = (contacts
            #days since last event
            .withColumn('daysSinceLastEvent', datediff(lit(today),'lastEvent'))
            #days since deployment
            .withColumn('daysSinceLastDeployment', datediff(lit(today),'lastDeploymentEnd'))
            #days since online training
            .withColumn('daysSinceLastTraining', datediff(lit(today),'lastTraining'))
            #days since last RC login
            .withColumn('daysSinceLastRollCallLogin', datediff(lit(today),'adx_identity_lastsuccessfullogin'))
            #days since last EMS login
            .withColumn('daysSinceLastEMSLogin', datediff(lit(today),'vms_lastuserlogin'))
           )

输出

SMA 会在使用 datediff 的代码行上方返回 EWI SPRKPY1019,以便您可以用来识别需要修改的位置。

from pyspark.sql.functions import datediff
#EWI: SPRKPY1019 => pyspark.sql.functions.datediff has a workaround, see documentation for more info
contacts = (contacts
            #days since last event
            .withColumn('daysSinceLastEvent', datediff(lit(today),'lastEvent'))
            #days since deployment
            .withColumn('daysSinceLastDeployment', datediff(lit(today),'lastDeploymentEnd'))
            #days since online training
            .withColumn('daysSinceLastTraining', datediff(lit(today),'lastTraining'))
            #days since last RC login
            .withColumn('daysSinceLastRollCallLogin', datediff(lit(today),'adx_identity_lastsuccessfullogin'))
            #days since last EMS login
            .withColumn('daysSinceLastEMSLogin', datediff(lit(today),'vms_lastuserlogin'))
           )

SMA convert pyspark.sql.functions.datediff onto snowflake.snowpark.functions.daydiff that also calculates the diference in days between two dates.

推荐修复方法

datediff(part: string ,end: ColumnOrName, start: ColumnOrName)

Action: Import snowflake.snowpark.functions, which contains an implementation for datediff function that requires an extra parameter for date time part and allows more versatility on calculate differences between dates.

from snowflake.snowpark import Session
from snowflake.snowpark.functions import datediff
contacts = (contacts
            #days since last event
            .withColumn('daysSinceLastEvent', datediff('day', lit(today),'lastEvent'))
            #days since deployment
            .withColumn('daysSinceLastDeployment', datediff('day',lit(today),'lastDeploymentEnd'))
            #days since online training
            .withColumn('daysSinceLastTraining', datediff('day', lit(today),'lastTraining'))
            #days since last RC login
            .withColumn('daysSinceLastRollCallLogin', datediff('day', lit(today),'adx_identity_lastsuccessfullogin'))
            #days since last EMS login
            .withColumn('daysSinceLastEMSLogin', datediff('day', lit(today),'vms_lastuserlogin'))
           )

建议

SPRKPY1020

消息:pyspark.sql.functions.instr 有替代方案

类别:警告。

描述

This issue appears when the tool detects the usage of pyspark.sql.functions.instr (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.instr.html) which has a workaround.

场景

输入

以下是使用 pyspark instr 的基本示例:

from pyspark.sql import SparkSession
from pyspark.sql.functions import instr
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('abcd',)], ['test',])
df.select(instr(df.test, 'cd').alias('result')).collect()

输出:

SMA 会在使用 instr 的代码行上方返回 EWI SPRKPY1020,以便您可以用来识别需要修改的位置。

from snowflake.snowpark import Session

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
df = spark.createDataFrame([('abcd',)], ['test',])
#EWI: SPRKPY1020 => pyspark.sql.functions.instr has a workaround, see documentation for more info
df.select(instr(df.test, 'cd').alias('result')).collect()

推荐修复方法

Requires a manual change by using the function charindex and changing the order of the first two parameters.

import snowflake.snowpark as snowpark
from snowflake.snowpark import Session
from snowflake.snowpark.functions import charindex, lit

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
df = spark.createDataFrame([('abcd',)], ['test',])
df.select(charindex(lit('cd'), df.test).as_('result')).show()

其他建议

SPRKPY1021

警告

此问题代码已 弃用

消息:pyspark.sql.functions.last 有替代方案,请参阅文档了解更多信息

类别:警告

描述

This issue appears when the SMA detects a use of the pyspark.sql.functions.last (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.last.html) function, which has a workaround.

场景

输入

Below is an example of a use of the pyspark.sql.functions.last function that generates this EWI. In this example, the last function is used to get the last value for each name.

df = spark.createDataFrame([("Alice", 1), ("Bob", 2), ("Charlie", 3), ("Alice", 4), ("Bob", 5)], ["name", "value"])
df_grouped = df.groupBy("name").agg(last("value").alias("last_value"))

输出

The SMA adds the EWI SPRKPY1021 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([("Alice", 1), ("Bob", 2), ("Charlie", 3), ("Alice", 4), ("Bob", 5)], ["name", "value"])
#EWI: SPRKPY1021 => pyspark.sql.functions.last has a workaround, see documentation for more info
df_grouped = df.groupBy("name").agg(last("value").alias("last_value"))

推荐修复方法

As a workaround, you can use the Snowflake LAST_VALUE function. To invoke this function from Snowpark, use the snowflake.snowpark.functions.call_builtin function and pass the string last_value as the first argument and the corresponding column as the second argument. If you were using the name of the column in the last function, you should convert it into a column when calling the call_builtin function.

df = spark.createDataFrame([("Alice", 1), ("Bob", 2), ("Charlie", 3), ("Alice", 4), ("Bob", 5)], ["name", "value"])
df_grouped = df.groupBy("name").agg(call_builtin("last_value", col("value")).alias("last_value"))

其他建议


description: >- The mode parameter in the methods of CSV, JSON and PARQUET is transformed to overwrite


SPRKPY1022

消息:pyspark.sql.functions.log10 有替代方案,请参阅文档了解更多信息

类别:警告

描述

This issue appears when the SMA detects a use of the pyspark.sql.functions.log10 (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.log10.html) function, which has a workaround.

场景

输入

Below is an example of a use of the pyspark.sql.functions.log10 function that generates this EWI. In this example, the log10 function is used to calculate the base-10 logarithm of the value column.

df = spark.createDataFrame([(1,), (10,), (100,), (1000,), (10000,)], ["value"])
df_with_log10 = df.withColumn("log10_value", log10(df["value"]))

输出

The SMA adds the EWI SPRKPY1022 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([(1,), (10,), (100,), (1000,), (10000,)], ["value"])
#EWI: SPRKPY1022 => pyspark.sql.functions.log10 has a workaround, see documentation for more info
df_with_log10 = df.withColumn("log10_value", log10(df["value"]))

推荐修复方法

As a workaround, you can use the snowflake.snowpark.functions.log function by passing the literal value 10 as the base.

df = spark.createDataFrame([(1,), (10,), (100,), (1000,), (10000,)], ["value"])
df_with_log10 = df.withColumn("log10_value", log(10, df["value"]))

其他建议

SPRKPY1023

消息:pyspark.sql.functions.log1p 有替代方案,请参阅文档了解更多信息

类别:警告

描述

This issue appears when the SMA detects a use of the pyspark.sql.functions.log1p (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.log1p.html) function, which has a workaround.

场景

输入

Below is an example of a use of the pyspark.sql.functions.log1p function that generates this EWI. In this example, the log1p function is used to calculate the natural logarithm of the value column.

df = spark.createDataFrame([(0,), (1,), (10,), (100,)], ["value"])
df_with_log1p = df.withColumn("log1p_value", log1p(df["value"]))

输出

The SMA adds the EWI SPRKPY1023 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([(0,), (1,), (10,), (100,)], ["value"])
#EWI: SPRKPY1023 => pyspark.sql.functions.log1p has a workaround, see documentation for more info
df_with_log1p = df.withColumn("log1p_value", log1p(df["value"]))

推荐修复方法

As a workaround, you can use the call_function function by passing the string ln as the first argument and by adding 1 to the second argument.

df = spark.createDataFrame([(0,), (1,), (10,), (100,)], ["value"])
df_with_log1p = df.withColumn("log1p_value", call_function("ln", lit(1) + df["value"]))

其他建议

SPRKPY1024

消息:pyspark.sql.functions.log2 有替代方案,请参阅文档了解更多信息

类别:警告

描述

This issue appears when the SMA detects a use of the pyspark.sql.functions.log2 (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.log2.html) function, which has a workaround.

场景

输入

Below is an example of a use of the pyspark.sql.functions.log2 function that generates this EWI. In this example, the log2 function is used to calculate the base-2 logarithm of the value column.

df = spark.createDataFrame([(1,), (2,), (4,), (8,), (16,)], ["value"])
df_with_log2 = df.withColumn("log2_value", log2(df["value"]))

输出

The SMA adds the EWI SPRKPY1024 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([(1,), (2,), (4,), (8,), (16,)], ["value"])
#EWI: SPRKPY1024 => pyspark.sql.functions.log2 has a workaround, see documentation for more info
df_with_log2 = df.withColumn("log2_value", log2(df["value"]))

推荐修复方法

As a workaround, you can use the snowflake.snowpark.functions.log function by passing the literal value 2 as the base.

df = session.createDataFrame([(1,), (2,), (4,), (8,), (16,)], ["value"])
df_with_log2 = df.withColumn("log2_value", log(2, df["value"]))

其他建议

SPRKPY1025

警告

此问题代码已 弃用

消息:pyspark.sql.functions.ntile 有替代方案,请参阅文档了解更多信息

类别:警告

描述

This issue appears when the SMA detects a use of the pyspark.sql.functions.ntile (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.ntile.html) function, which has a workaround.

场景

输入

Below is an example of a use of the pyspark.sql.functions.ntile function that generates this EWI. In this example, the ntile function is used to divide the rows into 3 buckets.

df = spark.createDataFrame([("Alice", 50), ("Bob", 30), ("Charlie", 60), ("David", 90), ("Eve", 70), ("Frank", 40)], ["name", "score"])
windowSpec = Window.orderBy("score")
df_with_ntile = df.withColumn("bucket", ntile(3).over(windowSpec))

输出

The SMA adds the EWI SPRKPY1025 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([("Alice", 50), ("Bob", 30), ("Charlie", 60), ("David", 90), ("Eve", 70), ("Frank", 40)], ["name", "score"])
windowSpec = Window.orderBy("score")
#EWI: SPRKPY1025 => pyspark.sql.functions.ntile has a workaround, see documentation for more info
df_with_ntile = df.withColumn("bucket", ntile(3).over(windowSpec))

推荐修复方法

Snowpark has an equivalent ntile function, however, the argument pass to it should be a column. As a workaround, you can convert the literal argument into a column using the snowflake.snowpark.functions.lit function.

df = spark.createDataFrame([("Alice", 50), ("Bob", 30), ("Charlie", 60), ("David", 90), ("Eve", 70), ("Frank", 40)], ["name", "score"])
windowSpec = Window.orderBy("score")
df_with_ntile = df.withColumn("bucket", ntile(lit(3)).over(windowSpec))

其他建议

SPRKPY1026

警告

This issue code has been deprecated since Spark Conversion Core 4.3.2

消息:pyspark.sql.readwriter.DataFrameReader.csv 有替代方案,请参阅文档了解更多信息

类别:警告

描述

This issue appears when the SMA detects a use of the pyspark.sql.readwriter.DataFrameReader.csv (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.csv.html) function, which has a workaround.

场景

输入

Below is an example of a use of the pyspark.sql.readwriter.DataFrameReader.csv function that generates this EWI. In this example, the csv function is used to read multiple .csv files with a given schema and uses some extra options such as encoding, header and sep to fine-tune the behavior of reading the files.

file_paths = [
  "path/to/your/file1.csv",
  "path/to/your/file2.csv",
  "path/to/your/file3.csv",
]

df = session.read.csv(
  file_paths,
  schema=my_schema,
  encoding="UTF-8",
  header=True,
  sep=","
)

输出

The SMA adds the EWI SPRKPY1026 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

file_paths = [
  "path/to/your/file1.csv",
  "path/to/your/file2.csv",
  "path/to/your/file3.csv",
]

#EWI: SPRKPY1026 => pyspark.sql.readwriter.DataFrameReader.csv has a workaround, see documentation for more info
df = session.read.csv(
  file_paths,
  schema=my_schema,
  encoding="UTF-8",
  header=True,
  sep=","
)

推荐修复方法

In this section, we explain how to configure the path parameter, the schema parameter and some options to make them work in Snowpark.

1. path 参数

Snowpark requires the path parameter to be a stage location so, as a workaround, you can create a temporary stage and add each .csv file to that stage using the prefix file://.

2. schema 参数

Snowpark does not allow defining the schema as a parameter of the csv function. As a workaround, you can use the snowflake.snowpark.DataFrameReader.schema function.

3. options 参数

Snowpark does not allow defining the extra options as parameters of the csv function. As a workaround, for many of them you can use the snowflake.snowpark.DataFrameReader.option function to specify those parameters as options of the DataFrameReader.

备注

Snowpark 不支持 以下选项:

  • columnNameOfCorruptRecord

  • emptyValue

  • enforceSchema

  • header

  • ignoreLeadingWhiteSpace

  • ignoreTrailingWhiteSpace

  • inferSchema

  • locale

  • maxCharsPerColumn

  • maxColumns

  • mode

  • multiLine

  • nanValue

  • negativeInf

  • nullValue

  • positiveInf

  • quoteAll

  • samplingRatio

  • timestampNTZFormat

  • unescapedQuoteHandling

以下是应用上述建议后,可在 Snowpark 中正常运行的完整代码示例:

stage = f'{session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {stage}')

session.file.put(f"file:///path/to/your/file1.csv", f"@{stage}")
session.file.put(f"file:///path/to/your/file2.csv", f"@{stage}")
session.file.put(f"file:///path/to/your/file3.csv", f"@{stage}")

df = session.read.schema(my_schema).option("encoding", "UTF-8").option("sep", ",").csv(stage)

其他建议

SPRKPY1027

警告

This issue code has been deprecated since Spark Conversion Core 4.5.2

消息:pyspark.sql.readwriter.DataFrameReader.json 有替代方案,请参阅文档了解更多信息

类别:警告

描述

This issue appears when the SMA detects a use of the pyspark.sql.readwriter.DataFrameReader.json (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.json.html) function, which has a workaround.

场景

输入

Below is an example of a use of the pyspark.sql.readwriter.DataFrameReader.json function that generates this EWI. In this example, the json function is used to read multiple .json files with a given schema and uses some extra options such as primitiveAsString and dateFormat to fine-tune the behavior of reading the files.

file_paths = [
  "path/to/your/file1.json",
  "path/to/your/file2.json",
  "path/to/your/file3.json",
]

df = session.read.json(
  file_paths,
  schema=my_schema,
  primitiveAsString=True,
  dateFormat="2023-06-20"
)

输出

The SMA adds the EWI SPRKPY1027 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

file_paths = [
  "path/to/your/file1.json",
  "path/to/your/file2.json",
  "path/to/your/file3.json",
]

#EWI: SPRKPY1027 => pyspark.sql.readwriter.DataFrameReader.json has a workaround, see documentation for more info
df = session.read.json(
  file_paths,
  schema=my_schema,
  primitiveAsString=True,
  dateFormat="2023-06-20"
)

推荐修复方法

In this section, we explain how to configure the path parameter, the schema parameter and some options to make them work in Snowpark.

1. path 参数

Snowpark requires the path parameter to be a stage location so, as a workaround, you can create a temporary stage and add each .json file to that stage using the prefix file://.

2. schema 参数

Snowpark does not allow defining the schema as a parameter of the json function. As a workaround, you can use the snowflake.snowpark.DataFrameReader.schema function.

3. options 参数

Snowpark does not allow defining the extra options as parameters of the json function. As a workaround, for many of them you can use the snowflake.snowpark.DataFrameReader.option function to specify those parameters as options of the DataFrameReader.

备注

Snowpark 不支持以下选项:

  • allowBackslashEscapingAnyCharacter

  • allowComments

  • allowNonNumericNumbers

  • allowNumericLeadingZero

  • allowSingleQuotes

  • allowUnquotedControlChars

  • allowUnquotedFieldNames

  • columnNameOfCorruptRecord

  • dropFiledIfAllNull

  • encoding

  • ignoreNullFields

  • lineSep

  • locale

  • mode

  • multiline

  • prefersDecimal

  • primitiveAsString

  • samplingRatio

  • timestampNTZFormat

  • timeZone

以下是应用上述建议后,可在 Snowpark 中正常运行的完整代码示例:

stage = f'{session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {stage}')

session.file.put(f"file:///path/to/your/file1.json", f"@{stage}")
session.file.put(f"file:///path/to/your/file2.json", f"@{stage}")
session.file.put(f"file:///path/to/your/file3.json", f"@{stage}")

df = session.read.schema(my_schema).option("dateFormat", "2023-06-20").json(stage)

其他建议

SPRKPY1028

消息:pyspark.sql.readwriter.DataFrameReader.orc 有替代方案,请参阅文档了解更多信息

类别:警告

描述

This issue appears when the SMA detects a use of the pyspark.sql.readwriter.DataFrameReader.orc (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.orc.html) function, which has a workaround.

场景

输入

Below is an example of a use of the pyspark.sql.readwriter.DataFrameReader.orc function that generates this EWI. In this example, the orc function is used to read multiple .orc files and uses some extra options such as mergeSchema and recursiveFileLookup to fine-tune the behavior of reading the files.

file_paths = [
  "path/to/your/file1.orc",
  "path/to/your/file2.orc",
  "path/to/your/file3.orc",
]

df = session.read.orc(
  file_paths,
  mergeSchema="True",
  recursiveFileLookup="True"
)

输出

The SMA adds the EWI SPRKPY1028 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

file_paths = [
  "path/to/your/file1.orc",
  "path/to/your/file2.orc",
  "path/to/your/file3.orc",
]

#EWI: SPRKPY1028 => pyspark.sql.readwriter.DataFrameReader.orc has a workaround, see documentation for more info
df = session.read.orc(
  file_paths,
  mergeSchema="True",
  recursiveFileLookup="True"
)

推荐修复方法

In this section, we explain how to configure the path parameter and the extra options to make them work in Snowpark.

1. path 参数

Snowpark requires the path parameter to be a stage location so, as a workaround, you can create a temporary stage and add each .orc file to that stage using the prefix file://.

2. options 参数

Snowpark does not allow defining the extra options as parameters of the orc function. As a workaround, for many of them you can use the snowflake.snowpark.DataFrameReader.option function to specify those parameters as options of the DataFrameReader.

备注

Snowpark 不支持以下选项:

  • compression

  • mergeSchema

以下是应用上述建议后,可在 Snowpark 中正常运行的完整代码示例:

stage = f'{session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {stage}')

session.file.put(f"file:///path/to/your/file1.orc", f"@{stage}")
session.file.put(f"file:///path/to/your/file2.orc", f"@{stage}")
session.file.put(f"file:///path/to/your/file3.orc", f"@{stage}")

df = session.read.option(recursiveFileLookup, "True").orc(stage)

其他建议

SPRKPY1029

消息:当该工具检测到使用了 pyspark.sql.readwriter.DataFrameReader.parquet 时,就会出现此问题。支持此功能,但由于 Snowpark 和 Spark API 之间存在一些差异,可能需要手动进行一些更改。

类别:警告

描述

This issue appears when the SMA detects a use of the pyspark.sql.readwriter.DataFrameReader.parquet (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.parquet.html) function. This function is supported by Snowpark, however, there are some differences that would require some manual changes.

场景

输入

Below is an example of a use of the pyspark.sql.readwriter.DataFrameReader.parquet function that generates this EWI.

file_paths = [
  "path/to/your/file1.parquet",
  "path/to/your/file2.parquet",
  "path/to/your/file3.parquet",
]

df = session.read.parquet(
  *file_paths,
  mergeSchema="true",
  pathGlobFilter="*file*",
  recursiveFileLookup="true",
  modifiedBefore="2024-12-31T00:00:00",
  modifiedAfter="2023-12-31T00:00:00"
)

输出

The SMA adds the EWI SPRKPY1029 to the output code to let you know that this function is supported by Snowpark, but it requires some manual adjustments. Please note that the options supported by Snowpark are transformed into option function calls and those that are not supported are removed. This is explained in more detail in the next sections.

file_paths = [
  "path/to/your/file1.parquet",
  "path/to/your/file2.parquet",
  "path/to/your/file3.parquet"
]

#EWI: SPRKPY1076 => Some of the included parameters are not supported in the parquet function, the supported ones will be added into a option method.
#EWI: SPRKPY1029 => This issue appears when the tool detects the usage of pyspark.sql.readwriter.DataFrameReader.parquet. This function is supported, but some of the differences between Snowpark and the Spark API might require making some manual changes.
df = session.read.option("PATTERN", "*file*").parquet(
  *file_paths
)

推荐修复方法

In this section, we explain how to configure the paths and options parameters to make them work in Snowpark.

1. paths 参数

In Spark, this parameter can be a local or cloud location. Snowpark only accepts cloud locations using a snowflake stage. So, you can create a temporal stage and add each file into it using the prefix file://.

2. options 参数

Snowpark does not allow defining the different options as parameters of the parquet function. As a workaround, you can use the option or options functions to specify those parameters as extra options of the DataFrameReader.

请注意,Snowpark options 与 PySpark options 并非完全相同,因此可能需要手动进行一些更改。下面更详细说明了如何在 Snowpark 中配置最常见的 PySpark options。

2.1 mergeSchema 选项

Parquet supports schema evolution, allowing users to start with a simple schema and gradually add more columns as needed. This can result in multiple parquet files with different but compatible schemas. In Snowflake, thanks to the infer_schema capabilities you don't need to do that and therefore the mergeSchema option can just be removed.

2.2 pathGlobFilter 选项

If you want to load only a subset of files from the stage, you can use the pattern option to specify a regular expression that matches the files you want to load. The SMA already automates this as you can see in the output of this scenario.

2.3 recursiveFileLookupstr 选项

This option is not supported by Snowpark. The best recommendation is to use a regular expression like with the pathGlobFilter option to achieve something similar.

2.4 modifiedBefore/modifiedAfter 选项

You can achieve the same result in Snowflake by using the metadata columns.

备注

Snowpark 不支持以下选项:

  • compression

  • datetimeRebaseMode

  • int96RebaseMode

  • mergeSchema

以下是应如何转换输入代码以让它在 Snowpark 中可以正常工作的完整示例:

from snowflake.snowpark.column import METADATA_FILE_LAST_MODIFIED, METADATA_FILENAME

temp_stage = f'{session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}')

session.file.put(f"file:///path/to/your/file1.parquet", f"@{temp_stage}")
session.file.put(f"file:///path/to/your/file2.parquet", f"@{temp_stage}")
session.file.put(f"file:///path/to/your/file3.parquet", f"@{temp_stage}")

df = session.read \
  .option("PATTERN", ".*file.*") \
  .with_metadata(METADATA_FILENAME, METADATA_FILE_LAST_MODIFIED) \
  .parquet(temp_stage) \
  .where(METADATA_FILE_LAST_MODIFIED < '2024-12-31T00:00:00') \
  .where(METADATA_FILE_LAST_MODIFIED > '2023-12-31T00:00:00')

其他建议

SPRKPY1030

警告

此问题代码已 弃用

消息:pyspark.sql.session.SparkSession.Builder.appName 有替代方案,请参阅文档了解更多信息

类别:警告

描述

This issue appears when the SMA detects a use of the pyspark.sql.session.SparkSession.Builder.appName (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.builder.appName.html) function, which has a workaround.

场景

输入

Below is an example of a use of the pyspark.sql.session.SparkSession.Builder.appName function that generates this EWI. In this example, the appName function is used to set MyApp as the name of the application.

session = SparkSession.builder.appName("MyApp").getOrCreate()

输出

The SMA adds the EWI SPRKPY1030 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

#EWI: SPRKPY1030 => pyspark.sql.session.SparkSession.Builder.appName has a workaround, see documentation for more info
session = Session.builder.appName("MyApp").getOrCreate()

推荐修复方法

As a workaround, you can import the snowpark_extensions (https://pypi.org/project/snowpark-extensions/) package which provides an extension for the appName function.

import snowpark_extensions
session = SessionBuilder.appName("MyApp").getOrCreate()

其他建议

SPRKPY1031

警告

This issue code has been deprecated since Spark Conversion Core 2.7.0

消息:pyspark.sql.Column.Column.Contains 有替代方案,请参阅文档了解更多信息

类别:警告

描述

This issue appears when the SMA detects a use of the pyspark.sql.column.Column.contains (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Column.contains.html) function, which has a workaround.

场景

输入

Below is an example of a use of the pyspark.sql.column.Column.contains function that generates this EWI. In this example, the contains function is used to filter the rows where the 'City' column contains the substring 'New'.

df = spark.createDataFrame([("Alice", "New York"), ("Bob", "Los Angeles"), ("Charlie", "Chicago")], ["Name", "City"])
df_filtered = df.filter(col("City").contains("New"))

输出

The SMA adds the EWI SPRKPY1031 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([("Alice", "New York"), ("Bob", "Los Angeles"), ("Charlie", "Chicago")], ["Name", "City"])
#EWI: SPRKPY1031 => pyspark.sql.column.Column.contains has a workaround, see documentation for more info
df_filtered = df.filter(col("City").contains("New"))

推荐修复方法

As a workaround, you can use the snowflake.snowpark.functions.contains function by passing the column as the first argument and the element to search as the second argument. If the element to search is a literal value then it should be converted into a column expression using the lit function.

from snowflake.snowpark import functions as f
df = spark.createDataFrame([("Alice", "New York"), ("Bob", "Los Angeles"), ("Charlie", "Chicago")], ["Name", "City"])
df_filtered = df.filter(f.contains(col("City"), f.lit("New")))

其他建议

SPRKPY1032

Message: *spark element* is not defined

类别:转换错误

描述

当 SMA 无法确定给定元素的相应映射状态时,就会出现此问题。这意味着,SMA 还不知道 Snowpark 是否支持该元素。请注意,这是 SMA 对任何未定义元素使用的通用错误代码。

场景

输入

Below is an example of a function for which the SMA could not determine an appropriate mapping status. In this case, you should assume that not_defined_function() is a valid PySpark function and the code runs.

sc.parallelize(["a", "b", "c", "d", "e"], 3).not_defined_function().collect()

输出

The SMA adds the EWI SPRKPY1032 to the output code to let you know that this element is not defined.

#EWI: SPRKPY1032 => pyspark.rdd.RDD.not_defined_function is not defined
sc.parallelize(["a", "b", "c", "d", "e"], 3).not_defined_function().collect()

推荐修复方法

要尝试找出问题,可以执行以下验证:

  • 检查源代码的语法是否正确以及拼写是否正确。

  • Check if you are using a PySpark version supported by the SMA. To know which PySpark version is supported by the SMA at the moment of running the SMA, you can review the first page of the DetailedReport.docx file.

If this is a valid PySpark element, please report that you encountered a conversion error on that particular element using the Report an Issue option of the SMA and include any additional information that you think may be helpful.

Please note that if an element is not defined, it does not mean that it is not supported by Snowpark. You should check the Snowpark Documentation to verify if an equivalent element exist.

其他建议

SPRKPY1033

警告

此问题代码已 弃用

消息:pyspark.sql.functions.asc 有替代方案,请参阅文档了解更多信息

类别:警告

描述

This issue appears when the SMA detects a use of the pyspark.sql.functions.asc (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.asc.html) function, which has a workaround.

场景

The pyspark.sql.functions.asc function takes either a column object or the name of the column as a string as its parameter. Both scenarios are not supported by Snowpark so this EWI is generated.

场景 1

输入

Below is an example of a use of the pyspark.sql.functions.asc function that takes a column object as parameter.

df.orderBy(asc(col))

输出

The SMA adds the EWI SPRKPY1033 to the output code to let you know that the asc function with a column object parameter is not directly supported by Snowpark, but it has a workaround.

#EWI: SPRKPY1033 => pyspark.sql.functions.asc has a workaround, see documentation for more info
df.orderBy(asc(col))

推荐修复方法

As a workaround, you can call the snowflake.snowpark.Column.asc function from the column parameter.

df.orderBy(col.asc())
场景 2

输入

Below is an example of a use of the pyspark.sql.functions.asc function that takes the name of the column as parameter.

df.orderBy(asc("colName"))

输出

The SMA adds the EWI SPRKPY1033 to the output code to let you know that the asc function with a column name parameter is not directly supported by Snowpark, but it has a workaround.

#EWI: SPRKPY1033 => pyspark.sql.functions.asc has a workaround, see documentation for more info
df.orderBy(asc("colName"))

推荐修复方法

As a workaround, you can convert the string parameter into a column object using the snowflake.snowpark.functions.col function and then call the snowflake.snowpark.Column.asc function.

df.orderBy(col("colName").asc())

其他建议

SPRKPY1034

警告

此问题代码已 弃用

消息:pyspark.sql.functions.desc 有替代方案,请参阅文档了解更多信息

类别:警告

描述

This issue appears when the SMA detects a use of the pyspark.sql.functions.desc (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.desc.html) function, which has a workaround.

场景

The pyspark.sql.functions.desc function takes either a column object or the name of the column as a string as its parameter. Both scenarios are not supported by Snowpark so this EWI is generated.

场景 1

输入

Below is an example of a use of the pyspark.sql.functions.desc function that takes a column object as parameter.

df.orderBy(desc(col))

输出

The SMA adds the EWI SPRKPY1034 to the output code to let you know that the desc function with a column object parameter is not directly supported by Snowpark, but it has a workaround.

#EWI: SPRKPY1034 => pyspark.sql.functions.desc has a workaround, see documentation for more info
df.orderBy(desc(col))

推荐修复方法

As a workaround, you can call the snowflake.snowpark.Column.desc function from the column parameter.

df.orderBy(col.desc())
场景 2

输入

Below is an example of a use of the pyspark.sql.functions.desc function that takes the name of the column as parameter.

df.orderBy(desc("colName"))

输出

The SMA adds the EWI SPRKPY1034 to the output code to let you know that the desc function with a column name parameter is not directly supported by Snowpark, but it has a workaround.

#EWI: SPRKPY1034 => pyspark.sql.functions.desc has a workaround, see documentation for more info
df.orderBy(desc("colName"))

推荐修复方法

As a workaround, you can convert the string parameter into a column object using the snowflake.snowpark.functions.col function and then call the snowflake.snowpark.Column.desc function.

df.orderBy(col("colName").desc())

其他建议

SPRKPY1035

警告

此问题代码已 弃用

消息:pyspark.sql.functions.reverse 有替代方案,请参阅文档了解更多信息

类别:警告

描述

This issue appears when the SMA detects a use of the pyspark.sql.functions.reverse (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.reverse.html) function, which has a workaround.

场景

输入

Below is an example of a use of the pyspark.sql.functions.reverse function that generates this EWI. In this example, the reverse function is used to reverse each string of the word column.

df = spark.createDataFrame([("hello",), ("world",)], ["word"])
df_reversed = df.withColumn("reversed_word", reverse(df["word"]))
df_reversed = df.withColumn("reversed_word", reverse("word"))

输出

The SMA adds the EWI SPRKPY1035 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([("hello",), ("world",)], ["word"])
#EWI: SPRKPY1035 => pyspark.sql.functions.reverse has a workaround, see documentation for more info
df_reversed = df.withColumn("reversed_word", reverse(df["word"]))
#EWI: SPRKPY1035 => pyspark.sql.functions.reverse has a workaround, see documentation for more info
df_reversed = df.withColumn("reversed_word", reverse("word"))

推荐修复方法

As a workaround, you can import the snowpark_extensions (https://pypi.org/project/snowpark-extensions/) package which provides an extension for the reverse function.

import snowpark_extensions

df = spark.createDataFrame([("hello",), ("world",)], ["word"])
df_reversed = df.withColumn("reversed_word", reverse(df["word"]))
df_reversed = df.withColumn("reversed_word", reverse("word"))

其他建议

SPRKPY1036

警告

此问题代码已 弃用

消息:pyspark.sql.column.Column.getField 有替代方案,请参阅文档了解更多信息

类别:警告

描述

This issue appears when the SMA detects a use of the pyspark.sql.column.Column.getField (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Column.getField.html) function, which has a workaround.

场景

输入

Below is an example of a use of the pyspark.sql.column.Column.getField function that generates this EWI. In this example, the getField function is used to extract the name from the info column.

df = spark.createDataFrame([(1, {"name": "John", "age": 30}), (2, {"name": "Jane", "age": 25})], ["id", "info"])
df_with_name = df.withColumn("name", col("info").getField("name"))

输出

The SMA adds the EWI SPRKPY1036 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([(1, {"name": "John", "age": 30}), (2, {"name": "Jane", "age": 25})], ["id", "info"])
#EWI: SPRKPY1036 => pyspark.sql.column.Column.getField has a workaround, see documentation for more info
df_with_name = df.withColumn("name", col("info").getField("name"))

推荐修复方法

As a workaround, you can use the Snowpark column indexer operator with the name of the field as the index.

df = spark.createDataFrame([(1, {"name": "John", "age": 30}), (2, {"name": "Jane", "age": 25})], ["id", "info"])
df_with_name = df.withColumn("name", col("info")["name"])

其他建议

SPRKPY1037

警告

此问题代码已 弃用

消息:pyspark.sql.functions.sort_array 有替代方案,请参阅文档了解更多信息

类别:警告

描述

This issue appears when the SMA detects a use of the pyspark.sql.functions.sort_array (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.sort_array.html) function, which has a workaround.

场景

输入

Below is an example of a use of the pyspark.sql.functions.sort_array function that generates this EWI. In this example, the sort_array function is used to sort the numbers array in ascending and descending order.

df = spark.createDataFrame([(1, [3, 1, 2]), (2, [10, 5, 8]), (3, [6, 4, 7])], ["id", "numbers"])
df_sorted_asc = df.withColumn("sorted_numbers_asc", sort_array("numbers", asc=True))
df_sorted_desc = df.withColumn("sorted_numbers_desc", sort_array("numbers", asc=False))

输出

The SMA adds the EWI SPRKPY1037 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([(1, [3, 1, 2]), (2, [10, 5, 8]), (3, [6, 4, 7])], ["id", "numbers"])
#EWI: SPRKPY1037 => pyspark.sql.functions.sort_array has a workaround, see documentation for more info
df_sorted_asc = df.withColumn("sorted_numbers_asc", sort_array("numbers", asc=True))
#EWI: SPRKPY1037 => pyspark.sql.functions.sort_array has a workaround, see documentation for more info
df_sorted_desc = df.withColumn("sorted_numbers_desc", sort_array("numbers", asc=False))

推荐修复方法

As a workaround, you can import the snowpark_extensions (https://pypi.org/project/snowpark-extensions/) package which provides an extension for the sort_array function.

import snowpark_extensions

df = spark.createDataFrame([(1, [3, 1, 2]), (2, [10, 5, 8]), (3, [6, 4, 7])], ["id", "numbers"])
df_sorted_asc = df.withColumn("sorted_numbers_asc", sort_array("numbers", asc=True))
df_sorted_desc = df.withColumn("sorted_numbers_desc", sort_array("numbers", asc=False))

其他建议

SPRKPY1038

Message: *spark element* is not yet recognized

类别:转换错误

描述

当您的源代码中有 SMA 无法识别的 PySpark 元素时,就会出现此问题。出现这种情况可能由多种原因导致,例如:

  • PySpark 中不存在的元素。

  • 在 SMA 尚不支持的 PySpark 版本中添加的元素。

  • 处理元素时 SMA 的内部错误。

这是 SMA 对任何无法识别的元素使用的通用错误代码。

场景

输入

以下是一个函数使用示例,由于该函数在 PySpark 中不存在,因此 SMA 无法识别该函数。

from pyspark.sql import functions as F
F.unrecognized_function()

输出

The SMA adds the EWI SPRKPY1038 to the output code to let you know that this element could not be recognized.

from snowflake.snowpark import functions as F
#EWI: SPRKPY1038 => pyspark.sql.functions.non_existent_function is not yet recognized
F.unrecognized_function()

推荐修复方法

要尝试找出问题,可以执行以下验证:

  • 检查 PySpark 中是否存在该元素。

  • 检查元素拼写是否正确。

  • Check if you are using a PySpark version supported by the SMA. To know which PySpark version is supported by the SMA at the moment of running the SMA, you can review the first page of the DetailedReport.docx file.

If it is a valid PySpark element, please report that you encountered a conversion error on that particular element using the Report an Issue option of the SMA and include any additional information that you think may be helpful.

Please note that if an element could not be recognized by the SMA, it does not mean that it is not supported by Snowpark. You should check the Snowpark Documentation to verify if an equivalent element exist.

其他建议

SPRKPY1039

警告

此问题代码已 弃用

消息:pyspark.sql.column.Column.getItem 有替代方案,请参阅文档了解更多信息

类别:警告

描述

This issue appears when the SMA detects a use of the pyspark.sql.column.Column.getItem (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Column.getItem.html) function, which has a workaround.

场景

输入

Below is an example of a use of the pyspark.sql.column.Column.getItem function that generates this EWI. In this example, the getItem function is used to get an item by position and by key.

df = spark.createDataFrame([(1, ["apple", "banana", "orange"]), (2, ["carrot", "avocado", "banana"])], ["id", "fruits"])
df.withColumn("first_fruit", col("fruits").getItem(0))

df = spark.createDataFrame([(1, {"apple": 10, "banana": 20}), (2, {"carrot": 15, "grape": 25}), (3, {"pear": 30, "apple": 35})], ["id", "fruit_quantities"])
df.withColumn("apple_quantity", col("fruit_quantities").getItem("apple"))

输出

The SMA adds the EWI SPRKPY1039 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([(1, ["apple", "banana", "orange"]), (2, ["carrot", "avocado", "banana"])], ["id", "fruits"])
#EWI: SPRKPY1039 => pyspark.sql.column.Column.getItem has a workaround, see documentation for more info
df.withColumn("first_fruit", col("fruits").getItem(0))

df = spark.createDataFrame([(1, {"apple": 10, "banana": 20}), (2, {"carrot": 15, "grape": 25}), (3, {"pear": 30, "apple": 35})], ["id", "fruit_quantities"])
#EWI: SPRKPY1039 => pyspark.sql.column.Column.getItem has a workaround, see documentation for more info
df.withColumn("apple_quantity", col("fruit_quantities").getItem("apple"))

推荐修复方法

作为一种替代方案,可以使用 Snowpark 列索引器运算符,并将字段的名称或位置用作索引。

df = spark.createDataFrame([(1, ["apple", "banana", "orange"]), (2, ["carrot", "avocado", "banana"])], ["id", "fruits"])
df.withColumn("first_fruit", col("fruits")[0])

df = spark.createDataFrame([(1, {"apple": 10, "banana": 20}), (2, {"carrot": 15, "grape": 25}), (3, {"pear": 30, "apple": 35})], ["id", "fruit_quantities"])
df.withColumn("apple_quantity", col("fruit_quantities")["apple"])

其他建议

SPRKPY1040

警告

此问题代码已 弃用

消息:pyspark.sql.functions.explode 有替代方案,请参阅文档了解更多信息

类别:警告

描述

This issue appears when the SMA detects a use of the pyspark.sql.functions.explode (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.explode.html) function, which has a workaround.

场景

输入

Below is an example of a use of the pyspark.sql.functions.explode function that generates this EWI. In this example, the explode function is used to generate one row per array item for the numbers column.

df = spark.createDataFrame([("Alice", [1, 2, 3]), ("Bob", [4, 5]), ("Charlie", [6, 7, 8, 9])], ["name", "numbers"])
exploded_df = df.select("name", explode(df.numbers).alias("number"))

输出

The SMA adds the EWI SPRKPY1040 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([("Alice", [1, 2, 3]), ("Bob", [4, 5]), ("Charlie", [6, 7, 8, 9])], ["name", "numbers"])
#EWI: SPRKPY1040 => pyspark.sql.functions.explode has a workaround, see documentation for more info
exploded_df = df.select("name", explode(df.numbers).alias("number"))

推荐修复方法

As a workaround, you can import the snowpark_extensions (https://pypi.org/project/snowpark-extensions/) package which provides an extension for the explode function.

import snowpark_extensions

df = spark.createDataFrame([("Alice", [1, 2, 3]), ("Bob", [4, 5]), ("Charlie", [6, 7, 8, 9])], ["name", "numbers"])
exploded_df = df.select("name", explode(df.numbers).alias("number"))

其他建议

SPRKPY1041

警告

This issue code has been deprecated since Spark Conversion Core Version 2.9.0

消息:pyspark.sql.functions.explode_outer 有替代方案

类别:警告

描述

This issue appears when the tool detects the usage of pyspark.sql.functions.explode_outer (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.explode_outer.html#pyspark.sql.functions.explode_outer) which has a workaround.

场景

输入

该示例显示了在 select 调用中使用 explode_outer 方法的情况。

df = spark.createDataFrame(
    [(1, ["foo", "bar"], {"x": 1.0}),
     (2, [], {}),
     (3, None, None)],
    ("id", "an_array", "a_map")
)

df.select("id", "an_array", explode_outer("a_map")).show()

输出

The tool adds the EWI SPRKPY1041 indicating that a workaround can be implemented.

df = spark.createDataFrame(
    [(1, ["foo", "bar"], {"x": 1.0}),
     (2, [], {}),
     (3, None, None)],
    ("id", "an_array", "a_map")
)

#EWI: SPRKPY1041 => pyspark.sql.functions.explode_outer has a workaround, see documentation for more info
df.select("id", "an_array", explode_outer("a_map")).show()

推荐修复方法

As a workaround, you can import the snowpark_extensions package, which contains a helper for the explode_outer function.

import snowpark_extensions

df = spark.createDataFrame(
    [(1, ["foo", "bar"], {"x": 1.0}),
     (2, [], {}),
     (3, None, None)],
    ("id", "an_array", "a_map")
)

df.select("id", "an_array", explode_outer("a_map")).show()

其他建议

SPRKPY1042

消息:pyspark.sql.functions.posexplode 有替代方案

类别:警告

描述

This issue appears when the tool detects the usage of pyspark.sql.functions.posexplode (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.posexplode.html?highlight=posexplode) which has a workaround.

场景

There are a couple of scenarios that this method can handle depending on the type of column it is passed as a parameter, it can be a list of values or a map/directory (keys/values).

场景 1

输入

Below is an example of the usage of posexplode passing as a parameter of a list of values.

df = spark.createDataFrame(
    [Row(a=1,
         intlist=[1, 2, 3])])

df.select(posexplode(df.intlist)).collect()

输出

The tool adds the EWI SPRKPY1042 indicating that a workaround can be implemented.

df = spark.createDataFrame(
    [Row(a=1,
         intlist=[100, 200, 300])])
#EWI: SPRKPY1042 => pyspark.sql.functions.posexplode has a workaround, see documentation for more info

df.select(posexplode(df.intlist)).show()

推荐修复方法

For having the same behavior, use the method functions.flatten, drop extra columns, and rename index and value column names.

df = spark.createDataFrame(
  [Row(a=1,
       intlist=[1, 2, 3])])

df.select(
    flatten(df.intlist))\
    .drop("DATA", "SEQ", "KEY", "PATH", "THIS")\
    .rename({"INDEX": "pos", "VALUE": "col"}).show()
场景 2

输入

Below is another example of the usage of posexplode passing as a parameter a map/dictionary (keys/values)

df = spark.createDataFrame([
    [1, [1, 2, 3], {"Ashi Garami": "Single Leg X"}, "Kimura"],
    [2, [11, 22], {"Sankaku": "Triangle"}, "Coffee"]
],
schema=["idx", "lists", "maps", "strs"])

df.select(posexplode(df.maps)).show()

输出

The tool adds the EWI SPRKPY1042 indicating that a workaround can be implemented.

df = spark.createDataFrame([
    [1, [1, 2, 3], {"Ashi Garami": "Single Leg X"}, "Kimura"],
    [2, [11, 22], {"Sankaku": "Triangle"}, "Coffee"]
],
schema=["idx", "lists", "maps", "strs"])
#EWI: SPRKPY1042 => pyspark.sql.functions.posexplode has a workaround, see documentation for more info

df.select(posexplode(df.maps)).show()

推荐修复方法

As a workaround, you can use functions.row_number to get the position and functions.explode with the name of the field to get the value the key/value for dictionaries.

df = spark.createDataFrame([
    [10, [1, 2, 3], {"Ashi Garami": "Single Leg X"}, "Kimura"],
    [11, [11, 22], {"Sankaku": "Triangle"}, "Coffee"]
],
    schema=["idx", "lists", "maps", "strs"])

window = Window.orderBy(col("idx").asc())

df.select(
    row_number().over(window).alias("pos"),
    explode(df.maps).alias("key", "value")).show()

注意: 使用 row_number 并不完全等效,因为它从 1 开始(而 Spark 方法从零开始)

其他建议

SPRKPY1043

消息:pyspark.sql.functions.posexplode_outer 有替代方案

类别:警告

描述

This issue appears when the tool detects the usage of pyspark.sql.functions.posexplode_outer (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.posexplode_outer.html) which has a workaround.

场景

There are a couple of scenarios that this method can handle depending on the type of column it is passed as a parameter, it can be a list of values or a map/directory (keys/values).

场景 1

输入

Below is an example that shows the usage of posexplode_outer passing a list of values.

df = spark.createDataFrame(
    [
        (1, ["foo", "bar"]),
        (2, []),
        (3, None)],
    ("id", "an_array"))

df.select("id", "an_array", posexplode_outer("an_array")).show()

输出

The tool adds the EWI SPRKPY1043 indicating that a workaround can be implemented.

df = spark.createDataFrame(
    [
        (1, ["foo", "bar"]),
        (2, []),
        (3, None)],
    ("id", "an_array"))
#EWI: SPRKPY1043 => pyspark.sql.functions.posexplode_outer has a workaround, see documentation for more info

df.select("id", "an_array", posexplode_outer("an_array")).show()

推荐修复方法

For having the same behavior, use the method functions.flatten sending the outer parameter in True, drop extra columns, and rename index and value column names.

df = spark.createDataFrame(
    [
        (1, ["foo", "bar"]),
        (2, []),
        (3, None)],
    ("id", "an_array"))

df.select(
    flatten(df.an_array, outer=True))\
    .drop("DATA", "SEQ", "KEY", "PATH", "THIS")\
    .rename({"INDEX": "pos", "VALUE": "col"}).show()
场景 2

输入

以下是使用 posexplode_outer 的另一个示例,其接收 map/dictionary (keys/values) 作为参数

df = spark.createDataFrame(
    [
        (1, {"x": 1.0}),
        (2, {}),
        (3, None)],
    ("id", "a_map"))

df.select(posexplode_outer(df.a_map)).show()

输出

The tool adds the EWI SPRKPY1043 indicating that a workaround can be implemented.

df = spark.createDataFrame(
    [
        (1, {"x": "Ashi Garami"}),
        (2, {}),
        (3, None)],
    ("id", "a_map"))
#EWI: SPRKPY1043 => pyspark.sql.functions.posexplode_outer has a workaround, see documentation for more info

df.select(posexplode_outer(df.a_map)).show()

推荐修复方法

As a workaround, you can use functions.row_number to get the position and functions.explode_outer with the name of the field to get the value of the key/value for dictionaries.

df = spark.createDataFrame(
    [
        (1, {"x": "Ashi Garami"}),
        (2,  {}),
        (3, None)],
    ("id", "a_map"))

window = Window.orderBy(col("id").asc())

df.select(
    row_number().over(window).alias("pos"),
          explode_outer(df.a_map)).show()

注意: 使用 row_number 并不完全等效,因为它从 1 开始(而 Spark 方法从零开始)

其他建议

SPRKPY1044

警告

This issue code has been deprecated since Spark Conversion Core Version 2.4.0

消息:pyspark.sql.functions.split 有替代方案

类别:警告。

描述

This issue appears when the tool detects the usage of pyspark.sql.functions.split (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.split.html) which has a workaround.

场景

具体实现方式取决于传递给该方法的参数数量,主要分为几种情况。

场景 1

输入

Below is an example when the function split has just the str and pattern parameters

F.split('col', '\\|')

输出

The tool shows the EWI SPRKPY1044 indicating there is a workaround.

#EWI: SPRKPY1044 => pyspark.sql.functions.split has a workaround, see the documentation for more info
F.split('col', '\\|')

推荐修复方法

As a workaround, you can call the function snowflake.snowpark.functions.lit with the pattern parameter and send it into the split.

F.split('col', lit('\\|'))
## the result of lit will be sent to the split function

场景 2

输入

Below is another example when the function split has the str, pattern, and limit parameters.

F.split('col', '\\|', 2)

输出

The tool shows the EWI SPRKPY1044 indicating there is a workaround.

#EWI: SPRKPY1044 => pyspark.sql.functions.split has a workaround, see the documentation for more info
F.split('col', '\\|', 2)

推荐修复方法

不支持此特定方案。

其他建议

SPRKPY1045

消息:pyspark.sql.functions.map_values 有替代方案

类别:警告。

描述

此函数用于从包含 map/dictionary (keys/values) 的列中提取值列表。

The issue appears when the tool detects the usage of pyspark.sql.functions.map_values (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.map_values.html) which has a workaround.

场景

输入

Below is an example of the usage of the method map_values.

df = spark.createDataFrame(
    [(1, {'Apple': 'Fruit', 'Potato': 'Vegetable'})],
    ("id", "a_map"))

df.select(map_values("a_map")).show()

输出

The tool adds the EWI SPRKPY1045 indicating that a workaround can be implemented.

df = spark.createDataFrame(
    [(1, {'Apple': 'Fruit', 'Potato': 'Vegetable'})],
    ("id", "a_map"))
#EWI: SPRKPY1045 => pyspark.sql.functions.map_values has a workaround, see documentation for more info

df.select(map_values("a_map")).show()

推荐修复方法

As a workaround, you can create an udf to get the values for a column. The below example shows how to create the udf, then assign it to F.map_values, and then make use of it.

from snowflake.snowpark import functions as F
from snowflake.snowpark.types import ArrayType, MapType

map_values_udf=None

def map_values(map):
    global map_values_udf
    if not map_values_udf:
        def _map_values(map: dict)->list:
            return list(map.values())
        map_values_udf = F.udf(_map_values,return_type=ArrayType(),input_types=[MapType()],name="map_values",is_permanent=False,replace=True)
    return map_values_udf(map)

F.map_values = map_values

df.select(map_values(colDict))

其他建议

SPRKPY1046

警告

This issue code has been deprecated since Spark Conversion Core Version 2.1.22

消息:pyspark.sql.functions.monotonically_increasing_id 有替代方案

类别:警告。

描述

This issue appears when the tool detects the usage of pyspark.sql.functions.monotonically_increasing_id (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.monotonically_increasing_id.html) which has a workaround.

场景

输入

Below is an example of the usage of the method monotonically_increasing_id.

from pyspark.sql import functions as F

spark.range(0, 10, 1, 2).select(F.monotonically_increasing_id()).show()

输出

The tool adds the EWI SPRKPY1046 indicating that a workaround can be implemented.

from pyspark.sql import functions as F
#EWI: SPRKPY1046 => pyspark.sql.functions.monotonically_increasing_id has a workaround, see documentation for more info
spark.range(0, 10, 1, 2).select(F.monotonically_increasing_id()).show()

推荐修复方法

更新工具版本。

其他建议

SPRKPY1047

警告

This issue code has been deprecated since Spark Conversion Core Version 4.6.0

描述

This issue appears when the tool detects the usage of pyspark.context.SparkContext.setLogLevel (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.setLogLevel.html?highlight=pyspark%20context%20sparkcontext%20setloglevel#pyspark.SparkContext.setLogLevel) which has a workaround.

场景

输入

Below is an example of the usage of the method setLogLevel.

sparkSession.sparkContext.setLogLevel("WARN")

输出

The tool adds the EWI SPRKPY1047 indicating that a workaround can be implemented.

#EWI: SPRKPY1047 => pyspark.context.SparkContext.setLogLevel has a workaround, see documentation for more info
sparkSession.sparkContext.setLogLevel("WARN")

推荐修复方法

Replace the setLogLevel function usage with logging.basicConfig that provides a set of convenience functions for simple logging usage. In order to use it, we need to import two modules, "logging" and "sys", and the level constant should be replaced using the "Level equivalent table":

import logging
import sys
logging.basicConfig(stream=sys.stdout, level=logging.WARNING)
  • 级别等效选项表

级别源参数

级别目标参数

"ALL"

<mark style="color:red;">**This has no equivalent**</mark>

"DEBUG"

logging.DEBUG

"ERROR"

logging.ERROR

"FATAL"

logging.CRITICAL

"INFO"

logging.INFO

"OFF"

logging.NOTSET

"TRACE"

<mark style="color:red;">**This has no equivalent**</mark>

"WARN"

logging.WARNING

其他建议

SPRKPY1048

警告

This issue code has been deprecated since Spark Conversion Core Version 2.4.0

消息:pyspark.sql.session.SparkSession.conf 有替代方案

类别:警告。

描述

This issue appears when the tool detects the usage of pyspark.sql.session.SparkSession.conf (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.conf.html) which has a workaround.

场景

输入

Below is an example of how to set a configuration into the property conf .

spark.conf.set("spark.sql.crossJoin.enabled", "true")

输出

The tool adds the EWI SPRKPY1048 indicating that a workaround can be implemented.

#EWI: SPRKPY1048 => pyspark.sql.session.SparkSession.conf has a workaround, see documentation for more info
spark.conf.set("spark.sql.crossJoin.enabled", "true")

推荐修复方法

SparkSession.conf 用于传递仅 Pyspark 使用的特定设置,这些设置不适用于 Snowpark。可以在代码上删除或添加注释

#spark.conf.set("spark.sql.crossJoin.enabled", "true")

其他建议

SPRKPY1049

警告

This issue code has been deprecated since Spark Conversion Core Version 2.1.9

消息:pyspark.sql.session.SparkSession.sparkContext 有替代方案

类别:警告。

描述

This issue appears when the tool detects the usage of pyspark.sql.session.SparkSession.sparkContext (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.sparkContext.html) which has a workaround.

场景

输入

Below is an example that creates a spark session and then uses the SparkContext property to print the appName.

print("APP Name :"+spark.sparkContext.appName())

输出

The tool adds the EWI SPRKPY1049 indicating that a workaround can be implemented.

#EWI: SPRKPY1049 => pyspark.sql.session.SparkSession.sparkContext has a workaround, see documentation for more info
print("APP Name :"+spark.sparkContext.appName())

推荐修复方法

SnowPark 不支持 SparkContext,但可以直接从 Session 实例访问 SparkContext 中的方法和属性。

## Pyspark
print("APP Name :"+spark.sparkContext.appName())
can be used in SnowPark removing the sparkContext as:
#Manual adjustment in SnowPark
print("APP Name :"+spark.appName());

其他建议

SPRKPY1050

消息:pyspark.conf.SparkConf.set 有替代方案。

类别:警告。

描述

This issue appears when the tool detects the usage of pyspark.conf.SparkConf.set (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkConf.set.html) which has a workaround.

场景

输入

Below is an example that sets a variable using conf.set.

conf = SparkConf().setAppName('my_app')

conf.set("spark.storage.memoryFraction", "0.5")

输出

The tool adds the EWI SPRKPY1050 indicating that a workaround can be implemented.

conf = SparkConf().setAppName('my_app')

#EWI: SPRKPY1050 => pyspark.conf.SparkConf.set has a workaround, see documentation for more info
conf.set("spark.storage.memoryFraction", "0.5")

推荐修复方法

SparkConf.set 用于设置仅由 Pyspark 使用的配置设置,不适用于 Snowpark。可以在代码上删除或添加注释

#conf.set("spark.storage.memoryFraction", "0.5")

其他建议

SPRKPY1051

警告

This issue code has been deprecated since Spark Conversion Core Version 2.4.0

消息:pyspark.sql.session.SparkSession.Builder.master 有替代方案

类别:警告。

描述

This issue appears when the tool detects pyspark.sql.session.SparkSession.Builder.master (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.builder.master.html) usage which has a workaround.

场景

输入

Below is an example of the usage of the method builder.master to set the Spark Master URL to connect to local using 1 core.

spark = SparkSession.builder.master("local[1]")

输出

The tool adds the EWI SPRKPY1051 indicating that a workaround can be implemented.

#EWI: SPRKPY1051 => pyspark.sql.session.SparkSession.Builder.master has a workaround, see documentation for more info
spark = Session.builder.master("local[1]")

推荐修复方法

pyspark.sql.session.SparkSession.Builder.master is used to set up a Spark Cluster. Snowpark doesn't use Spark Clusters so you can remove or comment the code.

## spark = Session.builder.master("local[1]")

其他建议

SPRKPY1052

警告

This issue code has been deprecated since Spark Conversion Core Version 2.8.0

消息:pyspark.sql.session.SparkSession.Builder.enableHiveSupport 有替代方案

类别:警告。

描述

This issue appears when the tool detects the usage of pyspark.sql.session.SparkSession.Builder.enableHiveSupport (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.builder.enableHiveSupport.html) which has a workaround.

场景

输入

Below is an example that configures the SparkSession and enables the hive support using the method enableHiveSupport.

spark = Session.builder.appName("Merge_target_table")\
        .config("spark.port.maxRetries","100") \
        .enableHiveSupport().getOrCreate()

输出

The tool adds the EWI SPRKPY1052 indicating that a workaround can be implemented.

#EWI: SPRKPY1052 => pyspark.sql.session.SparkSession.Builder.enableHiveSupport has a workaround, see documentation for more info
spark = Session.builder.appName("Merge_target_table")\
        .config("spark.port.maxRetries","100") \
        .enableHiveSupport().getOrCreate()

推荐修复方法

Remove the use of enableHiveSupport function because it is not needed in Snowpark.

spark = Session.builder.appName("Merge_target_table")\
        .config("spark.port.maxRetries","100") \
        .getOrCreate()

其他建议

SPRKPY1053

消息:提取 dbc 文件时出错。

类别:警告。

描述

当无法提取 dbc 文件时,就会出现此问题。此警告可能是由以下一个或多个原因引起的:文件过大、无法访问、只读等。

其他建议

  • 作为一种替代方案,如果文件太大无法处理,则可以检查文件的大小。此外,分析该工具是否可以访问文件,以免出现任何访问问题。

  • 如需更多支持,可以发送电子邮件至 snowconvert-info@snowflake.com 联系我们。如果您与 Snowflake 签订了支持合同,请联系您的销售工程师,他们可以指导您的支持需求。

SPRKPY1080

消息:SparkContext 的值被替换为“session”变量。

类别:警告

描述

Spark 上下文存储在一个名为“session”的变量中,该变量创建 Snowpark 会话。

场景

输入

以下代码段描述了 SparkContext

## Input Code
from pyspark import SparkContext
from pyspark.sql import SparkSession

def example1():

    sc = SparkContext("local[*]", "TestApp")

    sc.setLogLevel("ALL")
    sc.setLogLevel("DEBUG")

输出

在此输出代码中,SMA 已将 PySpark.SparkContext 替换为 SparkSession,请注意,SMA 还在“connection.json”文件中添加了一个模板来替换连接,然后在 connection_parameter 变量上加载此配置。

## Output Code
import logging
import sys
import json
from snowflake.snowpark import Session
from snowflake.snowpark import Session

def example1():
    jsonFile = open("connection.json")
    connection_parameter = json.load(jsonFile)
    jsonFile.close()
    #EWI: SPRKPY1080 => The value of SparkContext is replaced with 'session' variable.
    sc = Session.builder.configs(connection_parameter).getOrCreate()
    sc.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
    logging.basicConfig(stream = sys.stdout, level = logging.NOTSET)
    logging.basicConfig(stream = sys.stdout, level = logging.DEBUG)

推荐修复方法

必须使用所需的连接信息更新配置文件“connection.json”:

{
  "user": "my_user",
  "password": "my_password",
  "account": "my_account",
  "role": "my_role",
  "warehouse": "my_warehouse",
  "database": "my_database",
  "schema": "my_schema"
}

其他建议

SPRKPY1054

消息:pyspark.sql.readwriter.DataFrameReader.format 不受支持。

类别:警告。

描述

This issue appears when the pyspark.sql.readwriter.DataFrameReader.format (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.format.html) has an argument that is not supported by Snowpark.

场景

There are some scenarios depending on the type of format you are trying to load. It can be a supported , or non-supported format.

场景 1

输入

该工具会分析尝试加载的格式类型,支持的格式有:

  • Csv

  • JSON

  • Parquet

  • Orc

The below example shows how the tool transforms the format method when passing a Csv value.

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

df1 = spark.read.format('csv').load('/path/to/file')

输出

The tool transforms the format method into a Csv method call.

from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()

df1 = spark.read.csv('/path/to/file')

推荐修复方法

在此示例中,该工具不显示 EWI,这意味着无需修复。

场景 2

输入

The below example shows how the tool transforms the format method when passing a Jdbc value.

from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()

df2 = spark.read.format('jdbc') \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("dbtable", "employee") \
    .option("user", "root") \
    .option("password", "root") \
    .load()

输出

The tool shows the EWI SPRKPY1054 indicating that the value "jdbc" is not supported.

from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()

#EWI: SPRKPY1054 => pyspark.sql.readwriter.DataFrameReader.format with argument value "jdbc" is not supported.
#EWI: SPRKPY1002 => pyspark.sql.readwriter.DataFrameReader.load is not supported

df2 = spark.read.format('jdbc') \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("dbtable", "employee") \
    .option("user", "root") \
    .option("password", "root") \
    .load()

推荐修复方法

For the not supported scenarios, there is no specific fix since it depends on the files that are trying to be read.

场景 3

输入

The below example shows how the tool transforms the format method when passing a CSV, but using a variable instead.

from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()

myFormat = 'csv'
df3 = spark.read.format(myFormat).load('/path/to/file')

输出

Since the tool can not determine the value of the variable in runtime, shows the EWI SPRKPY1054 indicating that the value "" is not supported.

from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()

myFormat = 'csv'
#EWI: SPRKPY1054 => pyspark.sql.readwriter.DataFrameReader.format with argument value "" is not supported.
#EWI: SPRKPY1002 => pyspark.sql.readwriter.DataFrameReader.load is not supported
df3 = spark.read.format(myFormat).load('/path/to/file')

推荐修复方法

As a workaround, you can check the value of the variable and add it as a string to the format call.

其他建议

SPRKPY1055

消息:pyspark.sql.readwriter.DataFrameReader.option 键值不受支持。

类别:警告。

描述

This issue appears when the pyspark.sql.readwriter.DataFrameReader.option key value is not supported by SnowFlake.

该工具会分析 option 调用参数,并根据方法(CSV、JSON 或 PARQUET)判断键值在 Snowpark 中是否存在等效选项。如果所有参数均有等效选项,则该工具不会添加 EWI,并将键值替换为其等效选项;反之,工具将添加 EWI。

等效选项列表:

  • CSV 的等效选项:

Spark 选项键

Snowpark 等效选项

sep

FIELD_DELIMITER

header

PARSE_HEADER

lineSep

RECORD_DELIMITER

pathGlobFilter

PATTERN

quote

FIELD_OPTIONALLY_ENCLOSED_BY

nullValue

NULL_IF

dateFormat

DATE_FORMAT

timestampFormat

TIMESTAMP_FORMAT

inferSchema

INFER_SCHEMA

delimiter

FIELD_DELIMITER

  • JSON 的等效选项:

Spark 选项键

Snowpark 等效选项

dateFormat

DATE_FORMAT

timestampFormat

TIMESTAMP_FORMAT

pathGlobFilter

PATTERN

  • PARQUET 的等效选项:

Spark 选项键

Snowpark 等效选项

pathGlobFilter

PATTERN

其他未在上述表格中列出的关键选项,Snowpark 既不支持,也不存在等效选项。如果是这样,该工具会添加参数信息的 EWI 并将其从链中移除。

场景

以下场景适用于 CSV、JSON 和 PARQUET。

There are a couple of scenarios depending on the value of the key used in the option method.

场景 1

输入

Below is an example of a option call using a equivalent key.

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

## CSV example:
spark.read.option("header", True).csv(csv_file_path)

## Json example:
spark.read.option("dateFormat", "dd-MM-yyyy").json(json_file_path)

## Parquet example:
spark.read.option("pathGlobFilter", "*.parquet").parquet(parquet_file_path)

输出

该工具使用正确的等效选项转换键。

from snowflake.snowpark import Session

spark = Session.builder.getOrCreate()

## CSV example:
spark.read.option("PARSE_HEADER", True).csv(csv_file_path)

## Json example:
spark.read.option("DATE_FORMAT", "dd-MM-yyyy").json(json_file_path)

## Parquet example:
spark.read.option("PATTERN", "*.parquet").parquet(parquet_file_path)

推荐修复方法

由于该工具会转换键的值,因此没有必要的修复。

场景 2

输入

Below is an example of a option call using a non-equivalent key.

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

## CSV example:
spark.read.option("anotherKeyValue", "myVal").csv(csv_file_path)

## Json example:
spark.read.option("anotherKeyValue", "myVal").json(json_file_path)

## Parquet example:
spark.read.option("anotherKeyValue", "myVal").parquet(parquet_file_path)

输出

The tool adds the EWI SPRKPY1055 indicating the key is not supported and removes the option call.

from snowflake.snowpark import Session

spark = Session.builder.getOrCreate()

## CSV example:
#EWI: SPRKPY1055 => pyspark.sql.readwriter.DataFrameReader.option with key value "anotherKeyValue" is not supported.
spark.read.csv(csv_file_path)

## Json example:
#EWI: SPRKPY1055 => pyspark.sql.readwriter.DataFrameReader.option with key value "anotherKeyValue" is not supported.
spark.read.json(json_file_path)

## Parquet example:
#EWI: SPRKPY1055 => pyspark.sql.readwriter.DataFrameReader.option with key value "anotherKeyValue" is not supported.
spark.read.parquet(parquet_file_path)

推荐修复方法

建议在转换后检查行为。

其他建议

SPRKPY1056

警告

此问题代码已 弃用

消息:pyspark.sql.readwriter.DataFrameReader.option 实参 _**<argument_name>**_ 不是字面量,无法进行求值

类别:警告

描述

This issue appears when the argument's key or value of the pyspark.sql.readwriter.DataFrameReader.option (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.option.html) function is not a literal value (for example a variable). The SMA does a static analysis of your source code, and therefore it is not possible to evaluate the content of the argument.

场景

输入

Below is an example of a use of the pyspark.sql.readwriter.DataFrameReader.option function that generates this EWI.

my_value = ...
my_option = ...

df1 = spark.read.option("dateFormat", my_value).format("csv").load('filename.csv')
df2 = spark.read.option(my_option, "false").format("csv").load('filename.csv')

输出

The SMA adds the EWI SPRKPY1056 to the output code to let you know that the argument of this function is not a literal value, and therefore it could not be evaluated by the SMA.

my_value = ...
my_option = ...

#EWI: SPRKPY1056 => pyspark.sql.readwriter.DataFrameReader.option argument "dateFormat" is not a literal and can't be evaluated
df1 = spark.read.option("dateFormat", my_value).format("csv").load('filename.csv')
#EWI: SPRKPY1056 => pyspark.sql.readwriter.DataFrameReader.option argument key is not a literal and can't be evaluated
df2 = spark.read.option(my_option, "false").format("csv").load('filename.csv')

推荐修复方法

Even though the SMA was unable to evaluate the argument, it does not mean that it is not supported by Snowpark. Please make sure that the value of the argument is valid and equivalent in Snowpark by checking the documentation.

其他建议

SPRKPY1057

警告

This Issue Code has been deprecated since Spark Conversion Core Version 4.8.0

Message: PySpark Dataframe Option (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.option.html#pyspark.sql.DataFrameReader.option) argument contains a value that is not a literal, therefore cannot be evaluated

类别:警告。

描述

此问题代码已弃用。如果您使用的是旧版本,请升级到最新版本。

其他建议

SPRKPY1058

消息:带 < key > 平台特定键的 < method > 不受支持。

类别:ConversionError

描述

The get and set methods from pyspark.sql.conf.RuntimeConfig (https://spark.apache.org/docs/3.5.3/api/python/reference/pyspark.sql/api/pyspark.sql.conf.RuntimeConfig.html#pyspark.sql.conf.RuntimeConfig) are not supported with a Platform specific key.

场景

Not all usages of get or set methods are going to have an EWI in the output code. This EWI appears when the tool detects the usage of these methods with a Platform specific key which is not supported.

场景 1

输入

Below is an example of the get or set methods with supported keys in Snowpark.

session.conf.set("use_constant_subquery_alias", False)
spark.conf.set("sql_simplifier_enabled", True)

session.conf.get("use_constant_subquery_alias")
session.conf.get("use_constant_subquery_alias")

输出

由于 Snowpark 支持键,因此该工具不会在输出代码中添加 EWI。

session.conf.set("use_constant_subquery_alias", True)
session.conf.set("sql_simplifier_enabled", False)

session.conf.get("use_constant_subquery_alias")
session.conf.get("sql_simplifier_enabled")

推荐修复方法

对于此场景,没有建议的修复方法。

场景 2

输入

以下是使用不支持的键的示例。

data =
    [
      ("John", 30, "New York"),
      ("Jane", 25, "San Francisco")
    ]

session.conf.set("spark.sql.shuffle.partitions", "50")
spark.conf.set("spark.yarn.am.memory", "1g")

session.conf.get("spark.sql.shuffle.partitions")
session = spark.conf.get("spark.yarn.am.memory")

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

输出

The tool adds this EWI SPRKPY1058 on the output code to let you know that these methods are not supported with a Platform specific key.

data =
    [
      ("John", 30, "New York"),
      ("Jane", 25, "San Francisco")
    ]

#EWI: SPRKPY1058 => pyspark.sql.conf.RuntimeConfig.set method with this "spark.sql.shuffle.partitions" Platform specific key is not supported.
spark.conf.set("spark.sql.shuffle.partitions", "50")
#EWI: SPRKPY1058 => pyspark.sql.conf.RuntimeConfig.set method with this "spark.yarn.am.memory" Platform specific key is not supported.
spark.conf.set("spark.yarn.am.memory", "1g")

#EWI: SPRKPY1058 => pyspark.sql.conf.RuntimeConfig.get method with this "spark.sql.shuffle.partitions" Platform specific key is not supported.
spark.conf.get("spark.sql.shuffle.partitions")
#EWI: SPRKPY1058 => pyspark.sql.conf.RuntimeConfig.get method with this "spark.yarn.am.memory" Platform specific key is not supported.
spark.conf.get("spark.yarn.am.memory")

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

推荐修复方法

建议的修复方法是移除这些方法。

data =
    [
      ("John", 30, "New York"),
      ("Jane", 25, "San Francisco")
    ]

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

其他建议

SPRKPY1059

警告

This issue code has been deprecated since Spark Conversion Core Version 2.45.1

Message: pyspark.storagelevel.StorageLevel (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.StorageLevel.html) has a workaround, see documentation.

类别:警告

描述

Currently, the use of StorageLevel is not required in Snowpark since Snowflake controls the storage. For more information, you can refer to the EWI SPRKPY1072

其他建议

SPRKPY1060

消息:身份验证机制是 connection.json(提供了模板)。

类别:警告。

描述

This issue appears when the tool detects the usage of pyspark.conf.SparkConf (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkConf.html).

场景

输入

由于 Snowpark 中的身份验证机制不同,该工具删除了相关使用,改为创建 连接配置文件 (connection.json)

from pyspark import SparkConf

my_conf = SparkConf(loadDefaults=True)

输出

The tool adds the EWI SPRKPY1060 indicating that the authentication mechanism is different.

#EWI: SPRKPY1002 => pyspark.conf.SparkConf is not supported
#EWI: SPRKPY1060 => The authentication mechanism is connection.json (template provided).
#my_conf = Session.builder.configs(connection_parameter).getOrCreate()

my_conf = None

推荐修复方法

To create a connection it is necessary that you fill in the information in the connection.json file.

{
  "user": "<USER>",
  "password": "<PASSWORD>",
  "account": "<ACCOUNT>",
  "role": "<ROLE>",
  "warehouse": "<WAREHOUSE>",
  "database": "<DATABASE>",
  "schema": "<SCHEMA>"
}

其他建议

SPRKPY1061

消息:Snowpark 不支持 unix_timestamp 函数

类别:警告

描述

In Snowpark, the first parameter is mandatory; the issue appears when the tool detects the usage of pyspark.sql.functions.unix_timestamp (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.unix_timestamp.html) with no parameters.

场景

输入

Below an example that calls the unix_timestamp method without parameters.

data = [["2015-04-08", "10"],["2015-04-10", "15"]]

df = spark.createDataFrame(data, ['dt', 'val'])
df.select(unix_timestamp()).show()

输出

The Snowpark signature for this function unix_timestamp(e: ColumnOrName, fmt: Optional["Column"] = None), as you can notice the first parameter it's required.

The tool adds this EWI SPRKPY1061 to let you know that function unix_timestamp with no parameters it's not supported in Snowpark.

data = [["2015-04-08", "10"],["2015-04-10", "15"]]

df = spark.createDataFrame(data, ['dt', 'val'])
#EWI: SPRKPY1061 => Snowpark does not support unix_timestamp functions with no parameters. See documentation for more info.
df.select(unix_timestamp()).show()

推荐修复方法

作为一种替代方案,可以至少添加 timestamp 字符串的名称或列。

data = [["2015-04-08", "10"],["2015-04-10", "15"]]

df = spark.createDataFrame(data, ["dt", "val"])
df.select(unix_timestamp("dt")).show()

其他建议

SPRKPY1062

消息:Snowpark 不支持不带“values”参数的 GroupedData.pivot。

类别:警告

描述

This issue appears when the SMA detects the usage of the pyspark.sql.group.GroupedData.pivot (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.GroupedData.pivot.html) function without the "values" parameter (the list of values to pivot on).

目前,Snowpark Python pivot 函数要求明确指定要透视的不同值列表。

场景

场景 1

输入

The SMA detects an expression that matches the pattern dataFrame.groupBy("columnX").pivot("columnY") and the pivot does not have the values parameter.

df.groupBy("date").pivot("category").sum("amount")

输出

SMA 添加了一条 EWI 消息,提示不支持没有“values”参数的 pivot 函数。

此外,它将在 pivot 函数中添加第二个参数,即列表推导式,用于计算将转换为列的值列表。请记住,此操作对于大型数据集效率不高,建议明确指明值。

#EWI: SPRKPY1062 => pyspark.sql.group.GroupedData.pivot without parameter 'values' is not supported. See documentation for more info.
df.groupBy("date").pivot("category", [v[0] for v in df.select("category").distinct().limit(10000).collect()]]).sum("amount")

推荐修复方法

对于这种情况,SMA 在 pivot 函数中添加第二个参数,即列表推导式,用于计算将转换为列的值列表,但可以根据不同的值列表进行转换,如下所示:

df = spark.createDataFrame([
      Row(category="Client_ID", date=2012, amount=10000),
      Row(category="Client_name",   date=2012, amount=20000)
  ])

df.groupBy("date").pivot("category", ["dotNET", "Java"]).sum("amount")
场景 2

输入

The SMA couldn't detect an expression that matches the pattern dataFrame.groupBy("columnX").pivot("columnY") and the pivot does not have the values parameter.

df1.union(df2).groupBy("date").pivot("category").sum("amount")

输出

SMA 添加了一条 EWI 消息,提示不支持没有“values”参数的 pivot 函数。

#EWI: SPRKPY1062 => pyspark.sql.group.GroupedData.pivot without parameter 'values' is not supported. See documentation for more info.
df1.union(df2).groupBy("date").pivot("category").sum("amount")

推荐修复方法

添加要透视的唯一值列表,具体操作如下:

df = spark.createDataFrame([
      Row(course="dotNET", year=2012, earnings=10000),
      Row(course="Java",   year=2012, earnings=20000)
  ])

df.groupBy("year").pivot("course", ["dotNET", "Java"]).sum("earnings").show()

其他建议

  • 对于大型数据集,计算要透视的唯一值列表并不是一项有效的操作,可能会成为阻塞性调用。请考虑明确指定要用于透视的唯一值列表。

  • 如果您不想明确指定要透视的唯一值列表(不建议这样做),可添加以下代码作为 pivot 函数的第二个实参,以便在运行时推断这些值*

[v[0] for v in <df>.select(<column>).distinct().limit(<count>).collect()]]

****Replace*** :code:`<df>` with the corresponding DataFrame, with the column to pivot and with the number of rows to select.

SPRKPY1063

消息:pyspark.sql.pandas.functions.pandas_udf 有替代方案。

类别:警告

描述

This issue appears when the tool detects the usage of pyspark.sql.pandas.functions.pandas_udf (https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.pandas_udf.html) which has a workaround.

场景

输入

使用 pandas_udf 函数创建可处理大量数据的用户定义的函数。

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def modify_df(pdf):
    return pd.DataFrame({'result': pdf['col1'] + pdf['col2'] + 1})
df = spark.createDataFrame([(1, 2), (3, 4), (1, 1)], ["col1", "col2"])
new_df = df.groupby().apply(modify_df)

输出

SMA 添加了一条 EWI 消息,提示 pandas_udf 有替代方案。

#EWI: SPRKPY1062 => pyspark.sql.pandas.functions.pandas_udf has a workaround, see documentation for more info
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)

def modify_df(pdf):
    return pd.DataFrame({'result': pdf['col1'] + pdf['col2'] + 1})

df = spark.createDataFrame([(1, 2), (3, 4), (1, 1)], ["col1", "col2"])

new_df = df.groupby().apply(modify_df)

推荐修复方法

Specify explicitly the parameters types as a new parameter input_types, and remove functionType parameter if applies. Created function must be called inside a select statement.

@pandas_udf(
    return_type = schema,
    input_types = [PandasDataFrameType([IntegerType(), IntegerType()])]
)

def modify_df(pdf):
    return pd.DataFrame({'result': pdf['col1'] + pdf['col2'] + 1})

df = spark.createDataFrame([(1, 2), (3, 4), (1, 1)], ["col1", "col2"])

new_df = df.groupby().apply(modify_df) # You must modify function call to be a select and not an apply

其他建议

SPRKPY1064

Message: The *Spark element* does not apply since snowflake uses snowpipe mechanism instead.

类别:警告

描述

当该工具检测到使用了 pyspark.streaming 库中的任何元素时,就会出现此问题:

  • pyspark.streaming.DStream (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.streaming.DStream.html)

  • pyspark.streaming.StreamingContext (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.streaming.StreamingContext.html)

  • pyspark.streaming.listener.StreamingListener。

场景

输入

以下是触发此 EWI 的元素之一的示例。

from pyspark.streaming.listener import StreamingListener

var = StreamingListener.Java
var.mro()

df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])
df.show()

输出

The SMA adds the EWI SPRKPY1064 on the output code to let you know that this function does not apply.

#EWI: SPRKPY1064 => The element does not apply since snowflake uses snowpipe mechanism instead.

var = StreamingListener.Java
var.mro()

df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])
df.show()

推荐修复方法

The SMA removes the import statement and adds the issue to the Issues.csv inventory, remove any usages of the Spark element.

df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])
df.show()

其他建议

SPRKPY1065

消息:pyspark.context.SparkContext.broadcast 不适用,因为 Snowflake 使用数据聚类机制来计算数据。

类别:警告

描述

This issue appears when the tool detects the usage of element pyspark.context.SparkContext.broadcast (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.broadcast.html), which is not necessary due to the use of data-clustering of Snowflake.

输入代码

在此示例中,创建了一个广播变量,该变量能够实现数据在所有节点间的高效共享。

sc = SparkContext(conf=conf_spark)

mapping = {1: 10001, 2: 10002}

bc = sc.broadcast(mapping)

输出代码

SMA 添加了一条 EWI 消息,提示不需要进行广播。

sc = conf_spark

mapping = {1: 10001, 2: 10002}
#EWI: SPRKPY1065 => The element does not apply since snowflake use data-clustering mechanism to compute the data.

bc = sc.broadcast(mapping)

推荐修复方法

移除所有使用的 pyspark.context.SparkContext.broadcast。

sc = conf_spark

mapping = {1: 10001, 2: 10002}

其他建议

SPRKPY1066

消息:Spark 元素不适用,因为 Snowflake 使用的微分区机制是自动创建的。

类别:警告

描述

当该工具检测到使用了与分区相关的元素时,就会出现此问题:

  • pyspark.sql.catalog.Catalog.recoverPartitions (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Catalog.recoverPartitions.html)

  • pyspark.sql.dataframe.DataFrame.foreachPartition (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.foreachPartition.html)

  • pyspark.sql.dataframe.DataFrame.sortWithinPartitions (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.sortWithinPartitions.html)

  • pyspark.sql.functions.spark_partition_id (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.spark_partition_id.html)

Those elements do not apply due the use of micro-partitions of Snowflake.

输入代码

In this example sortWithinPartitions (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.sortWithinPartitions.html) it's used to create a partition in a DataFrame sorted by the specified column.

df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
df.sortWithinPartitions("age", ascending=False)

输出代码

SMA 添加了一条 EWI 消息,提示 Spark 元素并非必需。

df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
#EWI: SPRKPY1066 => The element does not apply since snowflake use micro-partitioning mechanism are created automatically.
df.sortWithinPartitions("age", ascending=False)

推荐修复方法

移除对该元素的使用。

df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])

其他建议

SPRKPY1067

消息:pyspark.sql.functions.split 包含在 Snowpark 中不受支持的参数。

类别:警告

描述

This issue appears when the tool detects the usage of pyspark.sql.functions.split (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.split.html) with more than two parameters or a regex pattern as a parameter; both cases are not supported.

场景

场景 1

输入代码

在此示例中,split 函数有两个以上的参数。

df.select(split(columnName, ",", 5))

输出代码

该工具在输出代码中添加了此 EWI,以提示当此函数具有两个以上的参数时,不支持此函数。

#EWI: SPRKPY1067 => Snowpark does not support split functions with more than two parameters or containing regex pattern. See documentation for more info.
df.select(split(columnName, ",", 5))

推荐修复方法

请确保 split 函数仅使用两个参数。

df.select(split(columnName, ","))
场景 2

输入代码

在此示例中,split 函数使用正则表达式模式作为参数。

df.select(split(columnName, "^([\d]+-[\d]+-[\d])"))

输出代码

该工具在输出代码中添加了此 EWI,以提示当它使用正则表达式模式作为参数时,不支持此函数。

#EWI: SPRKPY1067 => Snowpark does not support split functions with more than two parameters or containing regex pattern. See documentation for more info.
df.select(split(columnName, "^([\d]+-[\d]+-[\d])"))

推荐修复方法

The spark signature for this method functions.split(str: ColumnOrName, pattern: str, limit: int = - 1) not exactly match with the method in Snowpark functions.split(str: Union[Column, str], pattern: Union[Column, str]) so for now the scenario using regular expression does not have a recommended fix.

其他建议

SPRKPY1068

消息:toPandas 包含类型为不受支持的 ArrayType 的列,并且有替代方案。

类别:警告

描述

pyspark.sql.DataFrame.toPandas (https://spark.apache.org/docs/3.5.3/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.toPandas.html) doesn't work properly If there are columns of type ArrayType. The workaround for these cases is converting those columns into a Python Dictionary by using json.loads method.

场景

输入

ToPandas 以 Pandas DataFrame 的形式返回原始 DataFrame 数据。

sparkDF = spark.createDataFrame([
Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0))
])

pandasDF = sparkDF.toPandas()

输出

如果有类型为 ArrayType 的列,该工具会添加此 EWI,以提示 toPandas 不受支持,但有替代方案。

sparkDF = spark.createDataFrame([
Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0))
])
#EWI: SPRKPY1068 => toPandas doesn't work properly If there are columns of type ArrayType. The workaround for these cases is converting those columns into a Python Dictionary by using json.loads method. example: df[colName] = json.loads(df[colName]).
pandasDF = sparkDF.toPandas()

推荐修复方法

pandas_df = sparkDF.toPandas()

## check/convert all resulting fields from calling toPandas when they are of
## type ArrayType,
## they will be reasigned by converting them into a Python Dictionary
## using json.loads method​

for field in pandas_df.schema.fields:
    if isinstance(field.datatype, ArrayType):
        pandas_df[field.name] = pandas_df[field.name].apply(lambda x: json.loads(x) if x is not None else x)

其他建议

SPRKPY1069

消息:如果 partitionBy 参数是列表,则 Snowpark 会抛出错误。

类别:警告

描述

When there is a usage of pyspark.sql.readwriter.DataFrameWriter.parquet (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.parquet.html) method where it comes to the parameter partitionBy, the tool shows the EWI.

This is because in Snowpark the DataFrameWriter.parquet only supports a ColumnOrSqlExpr as a partitionBy parameter.

场景

场景 1

输入代码:

在此场景中,partitionBy 参数不是列表。

df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])

df.write.parquet(file_path, partitionBy="age")

输出代码:

The tool adds the EWI SPRKPY1069 to let you know that Snowpark throws an error if parameter is a list.

df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])

#EWI: SPRKPY1069 => If partitionBy parameter is a list, Snowpark will throw and error.
df.write.parquet(file_path, partition_by = "age", format_type_options = dict(compression = "None"))

推荐修复方法

There is not a recommended fix for this scenario because the tool always adds this EWI just in case the partitionBy parameter is a list. Remember that in Snowpark, only accepts cloud locations using a snowflake stage.

df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])

stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
Session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {stage}').show()
Session.file.put(f"file:///path/to/data/file.parquet", f"@{stage}")

df.write.parquet(stage, partition_by = "age", format_type_options = dict(compression = "None"))
场景 2

输入代码:

在此场景中,partitionBy 参数是列表。

df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])

df.write.parquet(file_path, partitionBy=["age", "name"])

输出代码:

The tool adds the EWI SPRKPY1069 to let you know that Snowpark throws an error if parameter is a list.

df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])

#EWI: SPRKPY1069 => If partitionBy parameter is a list, Snowpark will throw and error.
df.write.parquet(file_path, partition_by = ["age", "name"], format_type_options = dict(compression = "None"))

推荐修复方法

If the value of the parameter is a list, then replace it with a ColumnOrSqlExpr.

df.write.parquet(file_path, partition_by = sql_expr("age || name"), format_type_options = dict(compression = "None"))

其他建议

SPRKPY1070

Message: The mode argument is transformed to overwrite, check the variable value and set the corresponding bool value.

类别:警告

描述

当使用了以下各项时:

  • pyspark.sql.readwriter.DataFrameWriter.csv (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.csv.html)

  • pyspark.sql.readwriter.DataFrameWriter.json (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.json.html)

  • pyspark.sql.readwriter.DataFrameWriter.parquet (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.json.html)

The tool analyzes the parameter mode to determinate if the value is overwrite.

场景

场景 1

输入代码

对于此场景,该工具检测到 mode 参数可以设置相应的布尔值。

df.write.csv(file_path, mode="overwrite")

输出代码:

The SMA tool analyzes the mode parameter, determinate that the value is overwrite and set the corresponding bool value

df.write.csv(file_path, format_type_options = dict(compression = "None"), overwrite = True)

推荐修复方法

对于此场景,没有建议的修复方法,因为该工具执行了相应的转换。

场景 2:

输入代码

In this scenario the tool can not validate the value is overwrite.

df.write.csv(file_path, mode=myVal)

输出代码:

SMA 添加了一条 EWI 消息,提示 mode 参数已转换为“overwrite”,但它也是为了提示最好检查变量值并设置正确的布尔值。

#EWI: SPRKPY1070 => The 'mode' argument is transformed to 'overwrite', check the variable value and set the corresponding bool value.
df.write.csv(file_path, format_type_options = dict(compression = "None"), overwrite = myVal)

推荐修复方法

Check for the value of the parameter mode and add the correct value for the parameter overwrite.

df.write.csv(file_path, format_type_options = dict(compression = "None"), overwrite = True)

其他建议

SPRKPY1071

消息:在 Snowpark 中,pyspark.rdd.RDD.getNumPartitions 函数并非必需。因此,应该移除所有引用。

类别:警告

描述

This issue appears when the tool finds the use of the pyspark.rdd.RDD.getNumPartitions (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.getNumPartitions.html) function. Snowflake uses micro-partitioning mechanism, so the use of this function is not required.

场景

输入

getNumPartitions 返回 RDD 上的分区数量。

df = spark.createDataFrame([('2015-04-08',), ('5',), [Row(a=1, b="b")]], ['dt', 'num', 'row'])

print(df.getNumPartitions())

输出

该工具添加了此 EWI,以提示 getNumPartitions 并非必需。

df = spark.createDataFrame([('2015-04-08',), ('5',), [Row(a=1, b="b")]], ['dt', 'num', 'row'])
#EWI: SPRKPY1071 => The getNumPartitions are not required in Snowpark. So, you should remove all references.

print(df.getNumPartitions())

推荐修复方法

移除对此函数的所有使用。

df = spark.createDataFrame([('2015-04-08',), ('5',), [Row(a=1, b="b")]], ['dt', 'num', 'row'])

其他建议

SPRKPY1072

消息:在 Snowpark 中,并非必须使用 StorageLevel。

类别:警告。

描述

This issue appears when the tool finds the use of the StorageLevel (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.StorageLevel.html) class, which works like "flags" to set the storage level. Since Snowflake controls the storage, the use of this function is not required.

其他建议

SPRKPY1073

消息:不支持不带参数或返回类型参数的 pyspark.sql.functions.udf

类别:警告。

描述

This issue appears when the tool detects the usage of pyspark.sql.functions.udf (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.udf.html) as function or decorator and is not supported in two specifics cases, when it has no parameters or return type parameter.

场景

场景 1

输入

在 Pyspark 中,可以创建不带输入或返回类型参数的用户定义的函数:

from pyspark.sql import SparkSession, DataFrameStatFunctions
from pyspark.sql.functions import col, udf

spark = SparkSession.builder.getOrCreate()
data = [['Q1', 'Test 1'],
        ['Q2', 'Test 2'],
        ['Q3', 'Test 1'],
        ['Q4', 'Test 1']]

columns = ['Quadrant', 'Value']
df = spark.createDataFrame(data, columns)

my_udf = udf(lambda s: len(s))
df.withColumn('Len Value' ,my_udf(col('Value')) ).show()

输出

Snowpark 需要 Udf 函数的输入和返回类型。因为未提供这些参数,而且 SMA 不能缺少此参数。

from snowflake.snowpark import Session, DataFrameStatFunctions
from snowflake.snowpark.functions import col, udf

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 'Test 1'],
        ['Q2', 'Test 2'],
        ['Q3', 'Test 1'],
        ['Q4', 'Test 1']]

columns = ['Quadrant', 'Value']
df = spark.createDataFrame(data, columns)
#EWI: SPRKPY1073 => pyspark.sql.functions.udf function without the return type parameter is not supported. See documentation for more info.
my_udf = udf(lambda s: len(s))

df.withColumn('Len Value' ,my_udf(col('Value')) ).show()

推荐修复方法

To fix this scenario is required to add the import for the returns types of the input and output, and then the parameters of return*type and input_types[] on the udf function _my_udf*.

from snowflake.snowpark import Session, DataFrameStatFunctions
from snowflake.snowpark.functions import col, udf
from snowflake.snowpark.types import IntegerType, StringType

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 'Test 1'],
        ['Q2', 'Test 2'],
        ['Q3', 'Test 1'],
        ['Q4', 'Test 1']]

columns = ['Quadrant', 'Value']
df = spark.createDataFrame(data, columns)

my_udf = udf(lambda s: len(s), return_type=IntegerType(), input_types=[StringType()])

df.with_column("result", my_udf(df.Value)).show()
场景 2

在 PySpark 中,可以使用不带参数的 @udf 装饰器

输入

from pyspark.sql.functions import col, udf

spark = SparkSession.builder.getOrCreate()
data = [['Q1', 'Test 1'],
        ['Q2', 'Test 2'],
        ['Q3', 'Test 1'],
        ['Q4', 'Test 1']]

columns = ['Quadrant', 'Value']
df = spark.createDataFrame(data, columns)

@udf()
def my_udf(str):
    return len(str)


df.withColumn('Len Value' ,my_udf(col('Value')) ).show()

输出

In Snowpark all the parameters of a udf decorator are required.

from snowflake.snowpark.functions import col, udf

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 'Test 1'],
        ['Q2', 'Test 2'],
        ['Q3', 'Test 1'],
        ['Q4', 'Test 1']]

columns = ['Quadrant', 'Value']
df = spark.createDataFrame(data, columns)

#EWI: SPRKPY1073 => pyspark.sql.functions.udf decorator without parameters is not supported. See documentation for more info.

@udf()
def my_udf(str):
    return len(str)

df.withColumn('Len Value' ,my_udf(col('Value')) ).show()

推荐修复方法

To fix this scenario is required to add the import for the returns types of the input and output, and then the parameters of return_type and input_types[] on the udf @udf decorator.

from snowflake.snowpark.functions import col, udf
from snowflake.snowpark.types import IntegerType, StringType

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 'Test 1'],
        ['Q2', 'Test 2'],
        ['Q3', 'Test 1'],
        ['Q4', 'Test 1']]

columns = ['Quadrant', 'Value']
df = spark.createDataFrame(data, columns)

@udf(return_type=IntegerType(), input_types=[StringType()])
def my_udf(str):
    return len(str)

df.withColumn('Len Value' ,my_udf(col('Value')) ).show()

其他建议

SPRKPY1074

消息:文件含有混合缩进(空格和制表符)。

类别:解析错误。

描述

当该工具检测到文件含有混合缩进时,就会出现此问题。这意味着,文件混合使用空格和制表符来缩进代码行。

场景

输入

在 Pyspark 中,可以混合使用空格和制表符来表示标识级别。

def foo():
    x = 5 # spaces
    y = 6 # tab

输出

SMA 无法处理混合使用的缩进标记。当在 Python 代码文件中检测到这种情况时,SMA 会在第一行添加 EWI SPRKPY1074。

## EWI: SPRKPY1074 => File has mixed indentation (spaces and tabs).
## This file was not converted, so it is expected to still have references to the Spark API
def foo():
    x = 5 # spaces
    y = 6 # tabs

推荐修复方法

解决方案是将所有缩进符号统一为相同格式。

def foo():
  x = 5 # tab
  y = 6 # tab

其他建议

  • Useful indent tools PEP-8 (https://peps.python.org/pep-0008/) and Reindent (https://pypi.org/project/reindent/).

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1075

类别

警告。

描述

parse_json 不应用架构验证,如果需要根据架构进行筛选/验证,则可能需要引入一些逻辑。

示例

输入

df.select(from_json(df.value, Schema))
df.select(from_json(schema=Schema, col=df.value))
df.select(from_json(df.value, Schema, option))

输出

#EWI: SPRKPY1075 => The parse_json does not apply schema validation, if you need to filter/validate based on schema you might need to introduce some logic.
df.select(parse_json(df.value))
#EWI: SPRKPY1075 => The parse_json does not apply schema validation, if you need to filter/validate based on schema you might need to introduce some logic.
df.select(parse_json(df.value))
#EWI: SPRKPY1075 => The parse_json does not apply schema validation, if you need to filter/validate based on schema you might need to introduce some logic.
df.select(parse_json(df.value))

对于 from_json 函数,传递架构实际上并不是用于推断,而是用于验证。看看下面的示例:

data = [
    ('{"name": "John", "age": 30, "city": "New York"}',),
    ('{"name": "Jane", "age": "25", "city": "San Francisco"}',)
]

df = spark.createDataFrame(data, ["json_str"])

示例 1:强制使用数据类型和更改列名:

## Parse JSON column with schema
parsed_df = df.withColumn("parsed_json", from_json(col("json_str"), schema))

parsed_df.show(truncate=False)

## +------------------------------------------------------+---------------------------+
## |json_str                                              |parsed_json                |
## +------------------------------------------------------+---------------------------+
## |{"name": "John", "age": 30, "city": "New York"}       |{John, 30, New York}       |
## |{"name": "Jane", "age": "25", "city": "San Francisco"}|{Jane, null, San Francisco}|
## +------------------------------------------------------+---------------------------+
## notice that values outside of the schema were dropped and columns not matched are returned as null

示例 2:选择特定列:

## Define a schema with only the columns we want to use
partial_schema = StructType([
    StructField("name", StringType(), True),
    StructField("city", StringType(), True)
])

## Parse JSON column with partial schema
partial_df = df.withColumn("parsed_json", from_json(col("json_str"), partial_schema))

partial_df.show(truncate=False)

## +------------------------------------------------------+---------------------+
## |json_str                                              |parsed_json          |
## +------------------------------------------------------+---------------------+
## |{"name": "John", "age": 30, "city": "New York"}       |{John, New York}     |
## |{"name": "Jane", "age": "25", "city": "San Francisco"}|{Jane, San Francisco}|
## +------------------------------------------------------+---------------------+
## there is also an automatic filtering

建议

  • For more support, you can email us at sma-support@snowflake.com. If you have a contract for support with Snowflake, reach out to your sales engineer and they can direct your support needs.

  • Useful tools PEP-8 (https://peps.python.org/pep-0008/) and Reindent (https://pypi.org/project/reindent/).

SPRKPY1076

Message: Parameters in pyspark.sql.readwriter.DataFrameReader (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.html) methods are not supported. This applies to CSV, JSON and PARQUET methods.

类别:警告。

描述

For the CSV, JSON and PARQUET methods on the pyspark.sql.readwriter.DataFrameReader (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.html) object, the tool will analyze the parameters and add a transformation according to each case:

  • 所有参数均与其在 Snowpark 中的等效选项名称匹配:在这种情况下,该工具会将参数转换为 .option() 调用。在这种情况下,参数不会添加此 EWI。

  • 部分参数与其在 Snowpark 中的等效选项不匹配:在这种情况下,工具将添加包含该参数信息的 EWI,并将其从方法调用中移除。

等效选项列表:

  • CSV 的等效选项:

Spark 键

Snowpark 等效选项

sep

FIELD_DELIMITER

header

PARSE_HEADER

lineSep

RECORD_DELIMITER

pathGlobFilter

PATTERN

quote

FIELD_OPTIONALLY_ENCLOSED_BY

nullValue

NULL_IF

dateFormat

DATE_FORMAT

timestampFormat

TIMESTAMP_FORMAT

inferSchema

INFER_SCHEMA

delimiter

FIELD_DELIMITER

  • JSON 的等效选项:

Spark 键

Snowpark 等效选项

dateFormat

DATE_FORMAT

timestampFormat

TIMESTAMP_FORMAT

pathGlobFilter

PATTERN

  • PARQUET 的等效选项:

Spark 键

Snowpark 等效选项

pathGlobFilter

PATTERN

场景

场景 1

输入

对于 CVS,以下是一些示例:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('myapp').getOrCreate()

spark.read.csv("path3", None,None,None,None,None,None,True).show()

输出

在转换后的代码中,参数作为单个选项添加到 cvs 函数中

from snowflake.snowpark import Session

spark = Session.builder.app_name('myapp', True).getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})

#EWI: SPRKPY1076 => Some of the included parameters are not supported in the csv function, the supported ones will be added into a option method.
spark.read.option("FIELD_DELIMITER", None).option("PARSE_HEADER", True).option("FIELD_OPTIONALLY_ENCLOSED_BY", None).csv("path3").show()

场景 2

输入

对于 JSON,以下是一些示例:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('myapp').getOrCreate()
spark.read.json("/myPath/jsonFile/", dateFormat='YYYY/MM/DD').show()

输出

在转换后的代码中,参数作为单个选项添加到 json 函数中

from snowflake.snowpark import Session
spark = Session.builder.app_name('myapp', True).getOrCreate()
#EWI: SPRKPY1076 => Some of the included parameters are not supported in the json function, the supported ones will be added into a option method.

spark.read.option("DATE_FORMAT", 'YYYY/MM/DD').json("/myPath/jsonFile/").show()
场景 3

输入

对于 PARQUET,以下是一些示例:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('myapp').getOrCreate()

spark.read.parquet("/path/to/my/file.parquet", pathGlobFilter="*.parquet").show()

输出

在转换后的代码中,参数作为单个选项添加到 parquet 函数中

from snowflake.snowpark import Session

spark = Session.builder.app_name('myapp', True).getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})

#EWI: SPRKPY1076 => Some of the included parameters are not supported in the parquet function, the supported ones will be added into a option method.
#EWI: SPRKPY1029 => The parquet function require adjustments, in Snowpark the parquet files needs to be located in an stage. See the documentation for more info.

spark.read.option("PATTERN", "*.parquet").parquet("/path/to/my/file.parquet")

其他建议

  • 当存在非等效参数时,建议检查转换后的行为。

  • 此外,建议参考相关文档以找到更合适的实现方案:

    • Options documentation for CSV: - PySpark CSV Options (https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option). - Snowpark CSV Options.

    • Options documentation for JSON: - PySpark JSON Options (https://spark.apache.org/docs/latest/sql-data-sources-json.html). - Snowpark JSON Options.

    • Options documentation for PARQUET: - Pyspark PARQUET options (https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#data-source-option). - SnowPark PARQUET options..

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1077

消息:无法处理嵌入式 SQL 代码。

类别:警告。

描述

当该工具检测到无法转换为 Snowpark 的嵌入式 SQL 代码时,就会出现此问题。

有关更多信息,请查看 SQL 嵌入式代码部分。

场景

输入

在此示例中,SQL 代码嵌入到一个名为 query 的变量中,而该变量用作 Pyspark.sql 方法的参数。

query = f"SELECT * from myTable"
spark.sql(query)

输出

SMA 检测到 PySpark.sql 参数是变量而不是 SQL 代码,因此在 PySpark.sql 所在行中添加了 EWI SPRKPY1077 消息。

query = f"SELECT * myTable"
#EWI: SPRKPY1077 => SQL embedded code cannot be processed.
spark.sql(query)

其他建议

  • 在 SQL 转换过程中,此代码必须直接作为方法参数传递,且仅能使用字符串值,禁止使用插值。请检查传递 PySpark.SQL 函数的 SQL 语句,以验证其在 Snowflake 平台上的功能。

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1078

消息:pyspark.context.SparkContext.setLogLevel 函数的实参不是字面量值,因此无法求值

类别:警告

描述

This issue appears when the SMA detects the use of the pyspark.context.SparkContext.setLogLevel (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.setLogLevel.html) function with an argument that is not a literal value, for example, when the argument is a variable.

SMA 会对您的源代码进行静态分析,因此无法对实参的内容求值并确定在 Snowpark 中是否有等效选项。

场景

输入

在此示例中,在变量 my_log_level 中定义 logLevel,然后通过 setLogLevel 方法将 my_log_level 用作参数。

my_log_level = "WARN"
sparkSession.sparkContext.setLogLevel(my_log_level)

输出

SMA 无法对日志级别参数的实参求值,因此会在转换后的日志记录代码行上方添加 EWI SPRKPY1078:

my_log_level = "WARN"
#EWI: SPRKPY1078 => my_log_level is not a literal value and therefore could not be evaluated. Make sure the value of my_log_level is a valid level in Snowpark. Valid log levels are: logging.CRITICAL, logging.DEBUG, logging.ERROR, logging.INFO, logging.NOTSET, logging.WARNING
logging.basicConfig(stream = sys.stdout, level = my_log_level)

推荐修复方法

Even though the SMA was unable to evaluate the argument, it will transform the pyspark.context.SparkContext.setLogLevel function into the Snowpark equivalent. Please make sure the value of the level argument in the generated output code is a valid and equivalent log level in Snowpark according to the table below:

PySpark 日志级别

Snowpark 日志级别等效选项

ALL

logging.NOTSET

DEBUG

logging.DEBUG

ERROR

logging.ERROR

FATAL

logging.CRITICAL

INFO

logging.INFO

OFF

logging.WARNING

TRACE

logging.NOTSET

WARN

logging.WARNING

因此,建议的修复方法将如下所示:

my_log_level = logging.WARNING
logging.basicConfig(stream = sys.stdout, level = my_log_level)

其他建议

SPRKPY1079

消息:pyspark.context.SparkContext.setLogLevel 函数的参数不是有效的 PySpark 日志级别。

类别:警告

描述

This issue appears when the SMA detects the use of the pyspark.context.SparkContext.setLogLevel (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.setLogLevel.html) function with an argument that is not a valid log level in PySpark, and therefore an equivalent could not be determined in Snowpark.

场景

输入

这里的日志级别使用的“INVALID_LOG_LEVEL”不是有效的 Pyspark 日志级别。

sparkSession.sparkContext.setLogLevel("INVALID_LOG_LEVEL")

输出

尽管 SMA 进行了转换,但 SMA 无法识别日志级别“INVALID_LOG_LEVEL”,添加了 EWI SPRKPY1079 以提示可能存在问题。

#EWI: SPRKPY1079 => INVALID_LOG_LEVEL is not a valid PySpark log level, therefore an equivalent could not be determined in Snowpark. Valid PySpark log levels are: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
logging.basicConfig(stream = sys.stdout, level = logging.INVALID_LOG_LEVEL)

推荐修复方法

Make sure that the log level used in the pyspark.context.SparkContext.setLogLevel function is a valid log level in PySpark (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.setLogLevel.html) or in Snowpark and try again.

logging.basicConfig(stream = sys.stdout, level = logging.DEBUG)

其他建议

SPRKPY1081

This issue code has been deprecated since Spark Conversion Core 4.12.0

消息:pyspark.sql.readwriter.DataFrameWriter.partitionBy 有替代方案。

类别:警告

描述

The Pyspark.sql.readwriter.DataFrameWriter.partitionBy (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.partitionBy.html) function is not supported. The workaround is to use Snowpark's copy_into_location instead. See the documentation for more info.

场景

输入

This code will create a separate directories for each unique value in the FIRST_NAME column. The data is the same, but it's going to be stored in different directories based on the column.

df = session.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]], schema = ["FIRST_NAME", "LAST_NAME"])
df.write.partitionBy("FIRST_NAME").csv("/home/data")

This code will create a separate directories for each unique value in the FIRST_NAME column. The data is the same, but it's going to be stored in different directories based on the column.

输出代码

df = session.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]], schema = ["FIRST_NAME", "LAST_NAME"])
#EWI: SPRKPY1081 => The partitionBy function is not supported, but you can instead use copy_into_location as workaround. See the documentation for more info.
df.write.partitionBy("FIRST_NAME").csv("/home/data", format_type_options = dict(compression = "None"))

推荐修复方法

In Snowpark, copy_into_location has a partition_by parameter that you can use instead of the partitionBy function, but it's going to require some manual adjustments, as shown in the following example:

Spark 代码:

df = session.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]], schema = ["FIRST_NAME", "LAST_NAME"])
df.write.partitionBy("FIRST_NAME").csv("/home/data")

经过手动调整的 Snowpark 代码:

df = session.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]], schema = ["FIRST_NAME", "LAST_NAME"])
df.write.copy_into_location(location=temp_stage, partition_by=col("FIRST_NAME"), file_format_type="csv", format_type_options={"COMPRESSION": "NONE"}, header=True)

copy_into_location 具有以下参数

  • location: The Snowpark location only accepts cloud locations using an snowflake stage.

  • _partition_by_:它可以是列名或 SQL 表达式,因此需要使用 col 或 sql_expr 转换为列或 SQL。

其他建议

SPRKPY1082

消息:pyspark.sql.readwriter.DataFrameReader.load 函数不受支持。一种替代方案是改用 Snowpark DataFrameReader 格式特定的方法(avro csv、json、orc、parquet)。path 参数应该是暂存区位置。

类别:警告

描述

The pyspark.sql.readwriter.DataFrameReader.load (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.load.html) function is not supported. The workaround is to use Snowpark DataFrameReader methods instead.

场景

The spark signature for this method DataFrameReader.load(path, format, schema, **options) does not exist in Snowpark. Therefore, any usage of the load function is going to have an EWI in the output code.

场景 1

输入

Below is an example that tries to load data from a CSV source.

path_csv_file = "/path/to/file.csv"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])

my_session.read.load(path_csv_file, "csv").show()
my_session.read.load(path_csv_file, "csv", schema=schemaParam).show()
my_session.read.load(path_csv_file, "csv", schema=schemaParam, lineSep="\r\n", dateFormat="YYYY/MM/DD").show()

输出

The SMA adds the EWI SPRKPY1082 to let you know that this function is not supported by Snowpark, but it has a workaround.

path_csv_file = "/path/to/file.csv"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.

my_session.read.load(path_csv_file, "csv").show()
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.
my_session.read.load(path_csv_file, "csv", schema=schemaParam).show()
#EWI: The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.
my_session.read.load(path_csv_file, "csv", schema=schemaParam, lineSep="\r\n", dateFormat="YYYY/MM/DD").show()

推荐修复方法

As a workaround, you can use Snowpark DataFrameReader methods instead.

  • Fixing path and format parameters:

    • Replace the load method with csv method.

    • The first parameter path must be in a stage to make an equivalence with Snowpark.

Below is an example that creates a temporal stage and puts the file into it, then calls the CSV method.

path_csv_file = "/path/to/file.csv"

## Stage creation

temp_stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
my_session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}').show()
my_session.file.put(f"file:///path/to/file.csv", f"@{temp_stage}")
stage_file_path = f"{temp_stage}file.csv"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])

my_session.read.csv(stage_file_path).show()
  • Fixing schema parameter:

    • The schema can be set by using the schema function as follows:

schemaParam = StructType([
        StructField("name", StringType(), True),
        StructField("city", StringType(), True)
    ])

df = my_session.read.schema(schemaParam).csv(temp_stage)
  • Fixing options parameter:

The options between spark and snowpark are not the same, in this case lineSep and dateFormat are replaced with RECORD_DELIMITER and DATE_FORMAT, the Additional recommendations section has a table with all the Equivalences.

Below is an example that creates a dictionary with RECORD_DELIMITER and DATE_FORMAT, and calls the options method with that dictionary.

optionsParam = {"RECORD_DELIMITER": "\r\n", "DATE_FORMAT": "YYYY/MM/DD"}
df = my_session.read.options(optionsParam).csv(stage)

场景 2

输入

Below is an example that tries to load data from a JSON source.

path_json_file = "/path/to/file.json"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])

my_session.read.load(path_json_file, "json").show()
my_session.read.load(path_json_file, "json", schema=schemaParam).show()
my_session.read.load(path_json_file, "json", schema=schemaParam, dateFormat="YYYY/MM/DD", timestampFormat="YYYY-MM-DD HH24:MI:SS.FF3").show()

输出

The SMA adds the EWI SPRKPY1082 to let you know that this function is not supported by Snowpark, but it has a workaround.

path_json_file = "/path/to/file.json"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.

my_session.read.load(path_json_file, "json").show()
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.
my_session.read.load(path_json_file, "json", schema=schemaParam).show()
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.
my_session.read.load(path_json_file, "json", schema=schemaParam, dateFormat="YYYY/MM/DD", timestampFormat="YYYY-MM-DD HH24:MI:SS.FF3").show()

推荐修复方法

As a workaround, you can use Snowpark DataFrameReader methods instead.

  • Fixing path and format parameters:

    • Replace the load method with json method

    • The first parameter path must be in a stage to make an equivalence with Snowpark.

Below is an example that creates a temporal stage and puts the file into it, then calls the JSON method.

path_json_file = "/path/to/file.json"

## Stage creation

temp_stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
my_session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}').show()
my_session.file.put(f"file:///path/to/file.json", f"@{temp_stage}")
stage_file_path = f"{temp_stage}file.json"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])

my_session.read.json(stage_file_path).show()
  • Fixing schema parameter:

    • The schema can be set by using the schema function as follows:

schemaParam = StructType([
        StructField("name", StringType(), True),
        StructField("city", StringType(), True)
    ])

df = my_session.read.schema(schemaParam).json(temp_stage)
  • Fixing options parameter:

The options between Spark and snowpark are not the same, in this case dateFormat and timestampFormat are replaced with DATE_FORMAT and TIMESTAMP_FORMAT, the Additional recommendations section has a table with all the Equivalences.

Below is an example that creates a dictionary with DATE_FORMAT and TIMESTAMP_FORMAT, and calls the options method with that dictionary.

optionsParam = {"DATE_FORMAT": "YYYY/MM/DD", "TIMESTAMP_FORMAT": "YYYY-MM-DD HH24:MI:SS.FF3"}
df = Session.read.options(optionsParam).json(stage)

场景 3

输入

Below is an example that tries to load data from a PARQUET source.

path_parquet_file = "/path/to/file.parquet"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])

my_session.read.load(path_parquet_file, "parquet").show()
my_session.read.load(path_parquet_file, "parquet", schema=schemaParam).show()
my_session.read.load(path_parquet_file, "parquet", schema=schemaParam, pathGlobFilter="*.parquet").show()

输出

The SMA adds the EWI SPRKPY1082 to let you know that this function is not supported by Snowpark, but it has a workaround.

path_parquet_file = "/path/to/file.parquet"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.

my_session.read.load(path_parquet_file, "parquet").show()
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.
my_session.read.load(path_parquet_file, "parquet", schema=schemaParam).show()
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.
my_session.read.load(path_parquet_file, "parquet", schema=schemaParam, pathGlobFilter="*.parquet").show()

推荐修复方法

As a workaround, you can use Snowpark DataFrameReader methods instead.

  • Fixing path and format parameters:

    • Replace the load method with parquet method

    • The first parameter path must be in a stage to make an equivalence with Snowpark.

Below is an example that creates a temporal stage and puts the file into it, then calls the PARQUET method.

path_parquet_file = "/path/to/file.parquet"

## Stage creation

temp_stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
my_session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}').show()
my_session.file.put(f"file:///path/to/file.parquet", f"@{temp_stage}")
stage_file_path = f"{temp_stage}file.parquet"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])

my_session.read.parquet(stage_file_path).show()
  • Fixing schema parameter:

    • The schema can be set by using the schema function as follows:

schemaParam = StructType([
        StructField("name", StringType(), True),
        StructField("city", StringType(), True)
    ])

df = my_session.read.schema(schemaParam).parquet(temp_stage)
  • Fixing options parameter:

The options between Spark and snowpark are not the same, in this case pathGlobFilter is replaced with PATTERN, the Additional recommendations section has a table with all the Equivalences.

Below is an example that creates a dictionary with PATTERN, and calls the options method with that dictionary.

optionsParam = {"PATTERN": "*.parquet"}
df = Session.read.options(optionsParam).parquet(stage)

其他建议

  • 请注意,Spark 和 Snowpark 之间的选项不一样,但它们可以映射:

Spark 选项

可能的值

Snowpark 等效选项

描述

header

True 或 False

SKIP_HEADER = 1 / SKIP_HEADER = 0

使用文件的第一行作为列名。

delimiter

任何单字符/多字符字段分隔符

FIELD_DELIMITER

指定单个字符/多个字符作为每个列/字段的分隔符。

sep

任何单字符字段分隔符

FIELD_DELIMITER

为每个列/字段指定一个字符作为分隔符。

encoding

UTF-8、UTF-16 等等...

ENCODING

按给定的编码类型对 CSV 文件进行解码。默认编码为 UTF-8

lineSep

任何单字符行分隔符

RECORD_DELIMITER

定义应用于文件解析的行分隔符。

pathGlobFilter

文件模式

PATTERN

定义一种模式,以仅读取文件名与模式匹配的文件。

recursiveFileLookup

True 或 False

不适用

以递归方式扫描目录以读取文件。此选项的默认值为 False。

quote

要引用的单个字符

FIELD_OPTIONALLY_ENCLOSED_BY

引用字段/列,其包含定界符/分隔符可以是值的一部分的字段。此字符 To 与 quoteAll 选项一起使用时引用所有字段。此选项的默认值为双引号 (")。

nullValue

用于替换 null 的字符串

NULL_IF

在读取和写入 DataFrame 时用字符串替换空值。

dateFormat

有效的日期格式

DATE_FORMAT

定义一个表示日期格式的字符串。默认格式为 yyyy-MM-dd。

timestampFormat

有效的时间戳格式

TIMESTAMP_FORMAT

定义一个表示时间戳格式的字符串。默认格式为 yyyy-MM-dd 'T'HH:mm:ss。

escape

任何单个字符

ESCAPE

将单个字符设置为转义字符以覆盖默认的转义字符 (\)。

inferSchema

True 或 False

INFER_SCHEMA

自动检测文件架构

mergeSchema

True 或 False

不适用

在 Snowflake 中不需要,因为每当 infer_schema 确定 parquet 文件结构时就会发生这种情况

  • For modifiedBefore / modifiedAfter option you can achieve the same result in Snowflake by using the metadata columns and then adding a filter like: df.filter(METADATA_FILE_LAST_MODIFIED > ‘some_date’).

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1083

消息:pyspark.sql.readwriter.DataFrameWriter.save 函数不受支持。一种替代方案是改用 Snowpark DataFrameWriter copy_into_location 方法。

类别:警告

描述

The pyspark.sql.readwriter.DataFrameWriter.save (https://spark.apache.org/docs/3.5.3/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.save.html) function is not supported. The workaround is to use Snowpark DataFrameWriter methods instead.

场景

The spark signature for this method DataFrameWriter.save(path, format, mode, partitionBy, **options) does not exists in Snowpark. Therefore, any usage of the load function it's going to have an EWI in the output code.

场景 1

输入代码

Below is an example that tries to save data with CSV format.

path_csv_file = "/path/to/file.csv"

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = my_session.createDataFrame(data, schema=["Name", "Age", "City"])

df.write.save(path_csv_file, format="csv")
df.write.save(path_csv_file, format="csv", mode="overwrite")
df.write.save(path_csv_file, format="csv", mode="overwrite", lineSep="\r\n", dateFormat="YYYY/MM/DD")
df.write.save(path_csv_file, format="csv", mode="overwrite", partitionBy="City", lineSep="\r\n", dateFormat="YYYY/MM/DD")

输出代码

The tool adds this EWI SPRKPY1083 on the output code to let you know that this function is not supported by Snowpark, but it has a workaround.

path_csv_file = "/path/to/file.csv"

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = my_session.createDataFrame(data, schema=["Name", "Age", "City"])

#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_csv_file, format="csv")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_csv_file, format="csv", mode="overwrite")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_csv_file, format="csv", mode="overwrite", lineSep="\r\n", dateFormat="YYYY/MM/DD")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_csv_file, format="csv", mode="overwrite", partitionBy="City", lineSep="\r\n", dateFormat="YYYY/MM/DD")

推荐修复方法

As a workaround you can use Snowpark DataFrameWriter methods instead.

  • Fixing path and format parameters:

    • Replace the load method with csv or copy_into_location method.

    • If you are using copy_into_location method, you need to specify the format with the file_format_type parameter.

    • The first parameter path must be in a stage to make an equivalence with Snowpark.

以下示例演示了如何创建临时暂存区并将文件存入其中,然后调用上述方法之一。

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Stage creation

temp_stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
my_session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}').show()
my_session.file.put(f"file:///path/to/file.csv", f"@{temp_stage}")
stage_file_path = f"{temp_stage}file.csv"

## Using csv method
df.write.csv(stage_file_path)

## Using copy_into_location method
df.write.copy_into_location(stage_file_path, file_format_type="csv")

Below is an example that adds into the daisy chain the mode method with overwrite as a parameter.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Using csv method
df.write.mode("overwrite").csv(temp_stage)

## Using copy_into_location method
df.write.mode("overwrite").copy_into_location(temp_stage, file_format_type="csv")
  • Fixing partitionBy parameter:

    • Use the partition_by parameter from the CSV method, as follows:

Below is an example that used the partition_by parameter from the CSV method.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Using csv method
df.write.csv(temp_stage, partition_by="City")

## Using copy_into_location method
df.write.copy_into_location(temp_stage, file_format_type="csv", partition_by="City")
  • Fixing options parameter:

The options between spark and snowpark are not the same, in this case lineSep and dateFormat are replaced with RECORD_DELIMITER and DATE_FORMAT, the Additional recommendations section has table with all the Equivalences.

Below is an example that creates a dictionary with RECORD_DELIMITER and DATE_FORMAT, and calls the options method with that dictionary.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
optionsParam = {"RECORD_DELIMITER": "\r\n", "DATE_FORMAT": "YYYY/MM/DD"}

## Using csv method
df.write.csv(stage, format_type_options=optionsParam)

## Using copy_into_location method
df.write.csv(stage, file_format_type="csv", format_type_options=optionsParam)

场景 2

输入代码

Below is an example that tries to save data with JSON format.

path_json_file = "/path/to/file.json"

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

df.write.save(path_json_file, format="json")
df.write.save(path_json_file, format="json", mode="overwrite")
df.write.save(path_json_file, format="json", mode="overwrite", dateFormat="YYYY/MM/DD", timestampFormat="YYYY-MM-DD HH24:MI:SS.FF3")
df.write.save(path_json_file, format="json", mode="overwrite", partitionBy="City", dateFormat="YYYY/MM/DD", timestampFormat="YYYY-MM-DD HH24:MI:SS.FF3")

输出代码

The tool adds this EWI SPRKPY1083 on the output code to let you know that this function is not supported by Snowpark, but it has a workaround.

path_json_file = "/path/to/file.json"

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_json_file, format="json")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_json_file, format="json", mode="overwrite")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_json_file, format="json", mode="overwrite", dateFormat="YYYY/MM/DD", timestampFormat="YYYY-MM-DD HH24:MI:SS.FF3")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_json_file, format="json", mode="overwrite", partitionBy="City", dateFormat="YYYY/MM/DD", timestampFormat="YYYY-MM-DD HH24:MI:SS.FF3")

推荐修复方法

As a workaround you can use Snowpark DataFrameReader methods instead.

  • Fixing path and format parameters:

    • Replace the load method with json or copy_into_location method

    • If you are using copy_into_location method, you need to specify the format with the file_format_type parameter.

    • The first parameter path must be in a stage to make an equivalence with Snowpark.

以下示例演示了如何创建临时暂存区并将文件存入其中,然后调用上述方法之一。

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Stage creation

temp_stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
my_session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}').show()
my_session.file.put(f"file:///path/to/file.json", f"@{temp_stage}")
stage_file_path = f"{temp_stage}file.json"

## Using json method
df.write.json(stage_file_path)

## Using copy_into_location method
df.write.copy_into_location(stage_file_path, file_format_type="json")

Below is an example that adds into the daisy chain the mode method with overwrite as a parameter.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Using json method
df.write.mode("overwrite").json(temp_stage)

## Using copy_into_location method
df.write.mode("overwrite").copy_into_location(temp_stage, file_format_type="json")
  • Fixing partitionBy parameter:

    • Use the partition_by parameter from the CSV method, as follows:

Below is an example that used the partition_by parameter from the CSV method.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Using json method
df.write.json(temp_stage, partition_by="City")

## Using copy_into_location method
df.write.copy_into_location(temp_stage, file_format_type="json", partition_by="City")
  • Fixing options parameter:

The options between spark and snowpark are not the same, in this case dateFormat and timestampFormat are replaced with DATE_FORMAT and TIMESTAMP_FORMAT, the Additional recommendations section has table with all the Equivalences.

Below is an example that creates a dictionary with DATE_FORMAT and TIMESTAMP_FORMAT, and calls the options method with that dictionary.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
optionsParam = {"DATE_FORMAT": "YYYY/MM/DD", "TIMESTAMP_FORMAT": "YYYY-MM-DD HH24:MI:SS.FF3"}

## Using json method
df.write.json(stage, format_type_options=optionsParam)

## Using copy_into_location method
df.write.copy_into_location(stage, file_format_type="json", format_type_options=optionsParam)

场景 3

输入代码

Below is an example that tries to save data with PARQUET format.

path_parquet_file = "/path/to/file.parquet"

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

df.write.save(path_parquet_file, format="parquet")
df.write.save(path_parquet_file, format="parquet", mode="overwrite")
df.write.save(path_parquet_file, format="parquet", mode="overwrite", pathGlobFilter="*.parquet")
df.write.save(path_parquet_file, format="parquet", mode="overwrite", partitionBy="City", pathGlobFilter="*.parquet")

输出代码

The tool adds this EWI SPRKPY1083 on the output code to let you know that this function is not supported by Snowpark, but it has a workaround.

path_parquet_file = "/path/to/file.parquet"

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_parquet_file, format="parquet")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_parquet_file, format="parquet", mode="overwrite")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_parquet_file, format="parquet", mode="overwrite", pathGlobFilter="*.parquet")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_parquet_file, format="parquet", mode="overwrite", partitionBy="City", pathGlobFilter="*.parquet")

推荐修复方法

As a workaround you can use Snowpark DataFrameReader methods instead.

  • Fixing path and format parameters:

    • Replace the load method with parquet or copy_into_location method.

    • If you are using copy_into_location method, you need to specify the format with the file_format_type parameter.

    • The first parameter path must be in a stage to make an equivalence with Snowpark.

以下示例演示了如何创建临时暂存区并将文件存入其中,然后调用上述方法之一。

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Stage creation

temp_stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
my_session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}').show()
my_session.file.put(f"file:///path/to/file.parquet", f"@{temp_stage}")
stage_file_path = f"{temp_stage}file.parquet"

## Using parquet method
df.write.parquet(stage_file_path)

## Using copy_into_location method
df.write.copy_into_location(stage, file_format_type="parquet")

Below is an example that adds into the daisy chain the mode method with overwrite as a parameter.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Using parquet method
df.write.mode("overwrite").parquet(temp_stage)

## Using copy_into_location method
df.write.mode("overwrite").copy_into_location(stage, file_format_type="parquet")
  • Fixing partitionBy parameter:

    • Use the partition_by parameter from the CSV method, as follows:

Below is an example that used the partition_by parameter from the parquet method.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Using parquet method
df.write.parquet(temp_stage, partition_by="City")

## Using copy_into_location method
df.write.copy_into_location(stage, file_format_type="parquet", partition_by="City")
  • Fixing options parameter:

The options between spark and snowpark are not the same, in this case pathGlobFilter is replaced with PATTERN, the Additional recommendations section has table with all the Equivalences.

Below is an example that creates a dictionary with PATTERN, and calls the options method with that dictionary.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
optionsParam = {"PATTERN": "*.parquet"}

## Using parquet method
df.write.parquet(stage, format_type_options=optionsParam)

## Using copy_into_location method
df.write.copy_into_location(stage, file_format_type="parquet", format_type_options=optionsParam)

其他建议

  • 考虑到 Spark 和 Snowpark 之间的选项不一样,但它们可以映射:

Spark 选项

可能的值

Snowpark 等效选项

描述

header

True 或 False

SKIP_HEADER = 1 / SKIP_HEADER = 0

使用文件的第一行作为列名。

delimiter

任何单字符/多字符字段分隔符

FIELD_DELIMITER

指定单个字符/多个字符作为每个列/字段的分隔符。

sep

任何单字符字段分隔符

FIELD_DELIMITER

为每个列/字段指定一个字符作为分隔符。

encoding

UTF-8、UTF-16 等等...

ENCODING

按给定的编码类型对 CSV 文件进行解码。默认编码为 UTF-8

lineSep

任何单字符行分隔符

RECORD_DELIMITER

定义应用于文件解析的行分隔符。

pathGlobFilter

文件模式

PATTERN

定义一种模式,以仅读取文件名与模式匹配的文件。

recursiveFileLookup

True 或 False

不适用

以递归方式扫描目录以读取文件。此选项的默认值为 False。

quote

要引用的单个字符

FIELD_OPTIONALLY_ENCLOSED_BY

引用字段/列,其包含定界符/分隔符可以是值的一部分的字段。此字符 To 与 quoteAll 选项一起使用时引用所有字段。此选项的默认值为双引号 (")。

nullValue

用于替换 null 的字符串

NULL_IF

在读取和写入 DataFrame 时用字符串替换空值。

dateFormat

有效的日期格式

DATE_FORMAT

定义一个表示日期格式的字符串。默认格式为 yyyy-MM-dd。

timestampFormat

有效的时间戳格式

TIMESTAMP_FORMAT

定义一个表示时间戳格式的字符串。默认格式为 yyyy-MM-dd 'T'HH:mm:ss。

escape

任何单个字符

ESCAPE

将单个字符设置为转义字符以覆盖默认的转义字符 (\)。

inferSchema

True 或 False

INFER_SCHEMA

自动检测文件架构

mergeSchema

True 或 False

不适用

在 Snowflake 中不需要,因为每当 infer_schema 确定 parquet 文件结构时就会发生这种情况

  • For modifiedBefore / modifiedAfter option you can achieve the same result in Snowflake by using the metadata columns and then add a filter like: df.filter(METADATA_FILE_LAST_MODIFIED > ‘some_date’).

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1084

This issue code has been deprecated since Spark Conversion Core 4.12.0

消息:pyspark.sql.readwriter.DataFrameWriter.option 不受支持。

类别:警告

描述

The pyspark.sql.readwriter.DataFrameWriter.option (https://spark.apache.org/docs/3.5.3/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.option.html) function is not supported.

场景

输入代码

Below is an example using the option method, this method is used to add additional configurations when writing the data of a DataFrame.

path_csv_file = "/path/to/file.csv"
data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

df.write.option("header", True).csv(csv_file_path)
df.write.option("sep", ";").option("lineSep","-").csv(csv_file_path)

输出代码

The tool adds this EWI SPRKPY1084 on the output code to let you know that this function is not supported by Snowpark.

path_csv_file = "/path/to/file.csv"
data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

#EWI: SPRKPY1084 => The pyspark.sql.readwriter.DataFrameWriter.option function is not supported.

df.write.option("header", True).csv(csv_file_path)
#EWI: SPRKPY1084 => The pyspark.sql.readwriter.DataFrameWriter.option function is not supported.
df.write.option("sep", ";").option("lineSep","-").csv(csv_file_path)

推荐修复方法

pyspark.sql.readwriter.DataFrameWriter.option 方法没有建议的修复方法。

其他建议

SPRKPY1085

消息:pyspark.ml.feature.VectorAssembler 不受支持。

类别:警告

描述

The pyspark.ml.feature.VectorAssembler (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.VectorAssembler.html) is not supported.

场景

输入代码

VectorAssembler 用于将几列组合成一个向量。

data = [
        (1, 10.0, 20.0),
        (2, 25.0, 30.0),
        (3, 50.0, 60.0)
    ]

df = SparkSession.createDataFrame(data, schema=["Id", "col1", "col2"])
vector = VectorAssembler(inputCols=["col1", "col2"], output="cols")

输出代码

The tool adds this EWI SPRKPY1085 on the output code to let you know that this class is not supported by Snowpark.

data = [
        (1, 10.0, 20.0),
        (2, 25.0, 30.0),
        (3, 50.0, 60.0)
    ]

df = spark.createDataFrame(data, schema=["Id", "col1", "col2"])
#EWI: SPRKPY1085 => The pyspark.ml.feature.VectorAssembler function is not supported.

vector = VectorAssembler(inputCols=["col1", "col2"], output="cols")

推荐修复方法

pyspark.ml.feature.VectorAssembler 没有建议的修复方法。

其他建议

SPRKPY1086

消息:pyspark.ml.linalg.VectorUDT 不受支持。

类别:警告

描述

The pyspark.ml.linalg.VectorUDT (https://spark.apache.org/docs/latest/api/python/_modules/pyspark/ml/linalg.html) is not supported.

场景

输入代码

VectorUDT 是一种用于表示 DataFrame 中向量列的数据类型。

data = [
        (1, Vectors.dense([10.0, 20.0])),
        (2, Vectors.dense([25.0, 30.0])),
        (3, Vectors.dense([50.0, 60.0]))
    ]

schema = StructType([
        StructField("Id", IntegerType(), True),
        StructField("VectorCol", VectorUDT(), True),
    ])

df = SparkSession.createDataFrame(data, schema=schema)

输出代码

The tool adds this EWI SPRKPY1086 on the output code to let you know that this function is not supported by Snowpark.

data = [
        (1, Vectors.dense([10.0, 20.0])),
        (2, Vectors.dense([25.0, 30.0])),
        (3, Vectors.dense([50.0, 60.0]))
    ]

#EWI: SPRKPY1086 => The pyspark.ml.linalg.VectorUDT function is not supported.
schema = StructType([
        StructField("Id", IntegerType(), True),
        StructField("VectorCol", VectorUDT(), True),
    ])

df = spark.createDataFrame(data, schema=schema)

推荐修复方法

pyspark.ml.linalg.VectorUDT 没有建议的修复方法。

其他建议

SPRKPY1087

消息:pyspark.sql.dataframe.DataFrame.writeTo 函数不受支持,但它有替代方案。

类别:警告。

描述

The pyspark.sql.dataframe.DataFrame.writeTo (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.writeTo.html) function is not supported. The workaround is to use Snowpark DataFrameWriter SaveAsTable method instead.

场景

输入

Below is an example of a use of the pyspark.sql.dataframe.DataFrame.writeTo function, the dataframe df is written into a table name Personal_info.

df = spark.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]],
                                 schema=["FIRST_NAME", "LAST_NAME"])

df.writeTo("Personal_info")

输出

The SMA adds the EWI SPRKPY1087 to the output code to let you know that this function is not supported, but has a workaround.

df = spark.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]],
                                 schema=["FIRST_NAME", "LAST_NAME"])

#EWI: SPRKPY1087 => pyspark.sql.dataframe.DataFrame.writeTo is not supported, but it has a workaround.
df.writeTo("Personal_info")

推荐修复方法

替代方案是改用 Snowpark DataFrameWriter SaveAsTable 方法。

df = spark.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]],
                                 schema=["FIRST_NAME", "LAST_NAME"])

df.write.saveAsTable("Personal_info")

其他建议

SPRKPY1088

消息:Snowpark 中的 pyspark.sql.readwriter.DataFrameWriter.option 值可能不同,因此可能需要进行验证。

类别:警告

描述

The pyspark.sql.readwriter.DataFrameWriter.option (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.option.html) values in Snowpark may be different, so validation might be needed to ensure that the behavior is correct.

场景

有一些场景,具体取决于是否支持该选项,或者写入文件时使用的格式。

场景 1

输入

Below is an example of the usage of the method option, adding a sep option, which is currently supported.

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])

df.write.option("sep", ",").csv("some_path")

输出

The tool adds the EWI SPRKPY1088 indicating that it is required validation.

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])
#EWI: SPRKPY1088 => The pyspark.sql.readwriter.DataFrameWriter.option values in Snowpark may be different, so required validation might be needed.
df.write.option("sep", ",").csv("some_path")

推荐修复方法

Snowpark API 支持此参数,因此唯一的操作是在迁移后检查行为。请参阅 等效选项表 以查看支持的参数。

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])
#EWI: SPRKPY1088 => The pyspark.sql.readwriter.DataFrameWriter.option values in Snowpark may be different, so required validation might be needed.
df.write.option("sep", ",").csv("some_path")
场景 2

输入

Here the scenario shows the usage of option, but adds a header option, which is not supported.

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])

df.write.option("header", True).csv("some_path")

输出

The tool adds the EWI SPRKPY1088 indicating that it is required validation is needed.

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])
#EWI: SPRKPY1088 => The pyspark.sql.readwriter.DataFrameWriter.option values in Snowpark may be different, so required validation might be needed.
df.write.option("header", True).csv("some_path")

推荐修复方法

For this scenario it is recommended to evaluate the Snowpark format type options to see if it is possible to change it according to your needs. Also, check the behavior after the change.

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])
#EWI: SPRKPY1088 => The pyspark.sql.readwriter.DataFrameWriter.option values in Snowpark may be different, so required validation might be needed.
df.write.csv("some_path")
场景 3

输入

This scenario adds a sep option, which is supported and uses the JSON method.

  • Note: this scenario also applies for PARQUET.

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])

df.write.option("sep", ",").json("some_path")

输出

The tool adds the EWI SPRKPY1088 indicating that it is required validation is needed.

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])
#EWI: SPRKPY1088 => The pyspark.sql.readwriter.DataFrameWriter.option values in Snowpark may be different, so required validation might be needed.
df.write.option("sep", ",").json("some_path")

推荐修复方法

The file format JSON does not support the parameter sep, so it is recommended to evaluate the snowpark format type options to see if it is possible to change it according to your needs. Also, check the behavior after the change.

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])
#EWI: SPRKPY1088 => The pyspark.sql.readwriter.DataFrameWriter.option values in Snowpark may be different, so required validation might be needed.
df.write.json("some_path")

其他建议

  • Since there are some not supported parameters, it is recommended to check the table of equivalences and check the behavior after the transformation.

  • 等效选项表:

PySpark 选项

SnowFlake 选项

支持的文件格式

描述

SEP

FIELD_DELIMITER

CSV

一个或多个单字节或多字节字符,用于分隔输入文件中的字段。

LINESEP

RECORD_DELIMITER

CSV

用于分隔输入文件中记录的一个或多个字符。

QUOTE

FIELD_OPTIONALLY_ENCLOSED_BY

CSV

用于包围字符串的字符。

NULLVALUE

NULL_IF

CSV

用于与 SQL NULL 相互转换的字符串。

DATEFORMAT

DATE_FORMAT

CSV

定义要加载的数据文件中日期值格式的字符串。

TIMESTAMPFORMAT

TIMESTAMP_FORMAT

CSV

定义要加载的数据文件中时间戳值格式的字符串。

如果使用的参数不在列表中,则 API 会抛出错误。

SPRKPY1089

消息:Snowpark 中的 pyspark.sql.readwriter.DataFrameWriter.options 值可能不同,因此可能需要进行验证。

类别:警告

描述

The pyspark.sql.readwriter.DataFrameWriter.options (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.options.html) values in Snowpark may be different, so validation might be needed to ensure that the behavior is correct.

场景

有一些场景,具体取决于是否支持这些选项,或者写入文件时使用的格式。

场景 1

输入

Below is an example of the usage of the method options, adding the options sep and nullValue, which are currently supported.

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])

df.write.options(nullValue="myVal", sep=",").csv("some_path")

输出

The tool adds the EWI SPRKPY1089 indicating that it is required validation.

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])
#EWI: SPRKPY1089 => The pyspark.sql.readwriter.DataFrameWriter.options values in Snowpark may be different, so required validation might be needed.
df.write.options(nullValue="myVal", sep=",").csv("some_path")

推荐修复方法

Snowpark API 支持这些参数,因此唯一的操作是在迁移后检查行为。请参阅 等效选项表 以查看支持的参数。

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])
#EWI: SPRKPY1089 => The pyspark.sql.readwriter.DataFrameWriter.options values in Snowpark may be different, so required validation might be needed.
df.write.options(nullValue="myVal", sep=",").csv("some_path")
场景 2

输入

Here the scenario shows the usage of options, but adds a header option, which is not supported.

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])

df.write.options(header=True, sep=",").csv("some_path")

输出

The tool adds the EWI SPRKPY1089 indicating that it is required validation is needed.

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])
#EWI: SPRKPY1089 => The pyspark.sql.readwriter.DataFrameWriter.options values in Snowpark may be different, so required validation might be needed.
df.write.options(header=True, sep=",").csv("some_path")

推荐修复方法

For this scenario it is recommended to evaluate the Snowpark format type options to see if it is possible to change it according to your needs. Also, check the behavior after the change.

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])
#EWI: SPRKPY1089 => The pyspark.sql.readwriter.DataFrameWriter.options values in Snowpark may be different, so required validation might be needed.
df.write.csv("some_path")
场景 3

输入

This scenario adds a sep option, which is supported and uses the JSON method.

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])

df.write.options(nullValue="myVal", sep=",").json("some_path")

输出

The tool adds the EWI SPRKPY1089 indicating that it is required validation is needed.

  • Note: this scenario also applies for PARQUET.

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])
#EWI: SPRKPY1089 => The pyspark.sql.readwriter.DataFrameWriter.options values in Snowpark may be different, so required validation might be needed.
df.write.options(nullValue="myVal", sep=",").json("some_path")

推荐修复方法

The file format JSON does not support the parameter sep, so it is recommended to evaluate the snowpark format type options to see if it is possible to change it according to your needs. Also, check the behavior after the change.

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])
#EWI: SPRKPY1089 => The pyspark.sql.readwriter.DataFrameWriter.options values in Snowpark may be different, so required validation might be needed.
df.write.json("some_path")

其他建议

  • Since there are some not supported parameters, it is recommended to check the table of equivalences and check the behavior after the transformation.

  • 等效选项表:

Snowpark 可以支持某些参数的 等效选项 列表:

PySpark 选项

SnowFlake 选项

支持的文件格式

描述

SEP

FIELD_DELIMITER

CSV

一个或多个单字节或多字节字符,用于分隔输入文件中的字段。

LINESEP

RECORD_DELIMITER

CSV

用于分隔输入文件中记录的一个或多个字符。

QUOTE

FIELD_OPTIONALLY_ENCLOSED_BY

CSV

用于包围字符串的字符。

NULLVALUE

NULL_IF

CSV

用于与 SQL NULL 相互转换的字符串。

DATEFORMAT

DATE_FORMAT

CSV

定义要加载的数据文件中日期值格式的字符串。

TIMESTAMPFORMAT

TIMESTAMP_FORMAT

CSV

定义要加载的数据文件中时间戳值格式的字符串。

如果使用的参数不在列表中,则 API 会抛出错误。

SPRKPY1101

类别

解析错误。

描述

当该工具识别出解析错误时,它会尝试从错误中恢复并在下一行继续该过程。在这种情况下,它会在该行中显示错误和注释。

下面的示例说明如何处理空格和制表符之间的不匹配错误。

输入代码

def foo():
    x = 5 # Spaces
     y = 6 # Tab

def foo2():
    x=6
    y=7

输出代码

def foo():
    x = 5 # Spaces
## EWI: SPRKPY1101 => Unrecognized or invalid CODE STATEMENT @(3, 2). Last valid token was '5' @(2, 9), failed token 'y' @(3, 2)
## y = 6 # Tab

def foo2():
    x=6
    y=7

建议

  • 尝试修复带注释的行。

  • For more support, email us at sma-support@snowflake.com. If you have a support contract with Snowflake, reach out to your sales engineer, who can direct your support needs.