Snowpark Migration Accelerator: 转换的工作原理¶
Snowpark Migration Accelerator (SMA) 不仅能生成全面的代码评估,还能将源代码中的特定元素转换为目标代码库的兼容格式。此转换过程遵循与初始评估相同的步骤,仅增加了一个步骤。
SMA 中的转换¶
在评估和转换模式下,Snowpark Migration Accelerator (SMA) 都会执行以下操作:
搜索指定目录下的所有文件
检测哪些文件包含代码
按编程语言分析代码文件
创建代码的结构化表示(抽象语法树 (AST))
创建包含程序信息的符号表
识别发现的所有错误并进行分类
创建详细的结果报告
即使先前已在评估模式下运行过 SMA,在转换模式下运行它时,也会重复执行上述所有过程。但是,转换模式还包括一个额外的最后步骤。
对基于抽象语法树 (AST) 生成的代码进行格式化,以提升可读性
抽象语法树 (AST) 是代表源代码工作原理的模型。当源语言和目标语言中都存在相同的功能时,SMA 可以在目标语言中生成等效代码。此代码生成仅在实际转换过程中发生。
SMA 中的转换类型¶
Snowpark Migration Accelerator (SMA) 目前支持以下代码转换:
将 Python 或 Scala 代码从 Spark API 调用转换为等效的 Snowpark API 调用
备注
The SMA does not perform any SQL conversion. For SQL files or SQL-only assessments, the tool provides assessment only, without any automated conversion.
让我们来看看一个同时使用 Scala 和 Python 编程语言编写的示例。
将 Spark API 引用转换为 Snowpark API 引用的示例¶
将 Spark Scala 转换为 Snowpark 的示例¶
使用 Scala 作为源语言时,Snowpark Migration Accelerator (SMA) 会自动将 Scala 代码中的 Spark API 引用转换为其等效的 Snowpark API 引用。下面的示例演示了基本 Spark 应用程序的转换过程。该示例应用程序执行了几种常见的数据操作:
读取数据
筛选记录
联接数据集
计算平均值
显示结果
用 Scala 编写的 Apache Spark 代码
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
object SimpleApp {
// This function calculates the average salary for jobs in a specific department
def avgJobSalary(session: SparkSession, dept: String) {
// Load employee data from CSV file
val employees = session.read.csv("path/data/employees.csv")
// Load job data from CSV file
val jobs = session.read.csv("path/data/jobs.csv")
val jobsAvgSalary = employees
.filter(column("Department") === dept) // Filter employees by department
.join(jobs) // Join with jobs table
.groupBy("JobName") // Group results by job name
.avg("Salary") // Calculate average salary for each job
// Calculate and display a list of all salaries in the department
jobsAvgSalary.select(collect_list("Salary")).show()
```scala
// Calculate and display the average salary
jobsAvgSalary.show()
}
转换为 Snowflake 后的代码:
import com.snowflake.snowpark._
import com.snowflake.snowpark.functions._
import com.snowflake.snowpark.Session
object SimpleApp {
// This function calculates the average salary for jobs in a specific department
def avgJobSalary(session: Session, dept: String) {
// Load employee data from CSV file
val employees = session.read.csv("path/data/employees.csv")
// Load job data from CSV file
val jobs = session.read.csv("path/data/jobs.csv")
val jobsAvgSalary = employees
.filter(column("Department") === dept) // Filter employees by department
.join(jobs) // Join with jobs table
.groupBy("JobName") // Group results by job name
.avg("Salary") // Calculate average salary per job
```scala
// Calculate and display all salaries in the department
jobsAvgSalary.select(array_agg("Salary")).show()
// Display the average salary
jobsAvgSalary.show()
}
}
在此示例中,代码结构基本保持不变。但是,该代码已更新为使用 Snowpark API 引用而不是 Spark API 引用。
PySpark 转 Snowpark 的示例¶
选择 Python 作为源语言时,SMA 会自动将 Python 代码中的 PySpark API 调用转换为等效的 Snowpark API 调用。下面的示例脚本演示了各种 PySpark 函数:
from datetime import date, datetime
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Row
Create a Spark session by building and initializing a new SparkSession object, or retrieve an existing one if already available.
df = spark_session.createDataFrame([
Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
# cube()
df.cube("name", df.age).count().orderBy("name", "age").show()
# take()
df_new1.take(2)
# describe()
df.describe(['age']).show()
# explain()
df.explain()
df.explain("simple") # Physical plan
df.explain(True)
# intersect()
df1 = spark_session.createDataFrame([("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"])
df2 = spark_session.createDataFrame([("a", 1), ("a", 1), ("b", 3)], ["C1", "C2"])
# where()
df_new1.where(F.col('Id2')>30).show()
转换为 Snowflake 后的代码:
from datetime import date, datetime
from snowflake.snowpark import Session
from snowflake.snowpark import functions as F
from snowflake.snowpark import Row
Create a Spark session using the Session builder:
spark_session = Session.builder.create()
df = spark_session.create_dataframe([
Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
# cube()
df.cube("name", df.age).count().sort("name", "age").show()
# take()
df_new1.take(2)
# describe()
df.describe(['age']).show()
# explain()
df.explain()
df.explain("simple") # Physical plan
df.explain(True)
# intersect()
df1 = spark_session.create_dataframe([("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"])
df2 = spark_session.create_dataframe([("a", 1), ("a", 1), ("b", 3)], ["C1", "C2"])
# where()
df_new1.where(F.col('Id2')>30).show()
在此示例中,代码结构基本保持不变。但是,该代码已更新为使用 Snowpark API 调用而不是 Spark API 调用。
在使用 Snowpark Migration Accelerator (SMA) 进行转换的过程中,将可获得以下结果: