Snowpark Migration Accelerator: 转换的工作原理

Snowpark Migration Accelerator (SMA) 不仅能生成全面的代码评估,还能将源代码中的特定元素转换为目标代码库的兼容格式。此转换过程遵循与初始评估相同的步骤,仅增加了一个步骤。

SMA 中的转换

在评估和转换模式下,Snowpark Migration Accelerator (SMA) 都会执行以下操作:

  • 搜索指定目录下的所有文件

  • 检测哪些文件包含代码

  • 按编程语言分析代码文件

  • 创建代码的结构化表示(抽象语法树 (AST))

  • 创建包含程序信息的符号表

  • 识别发现的所有错误并进行分类

  • 创建详细的结果报告

即使先前已在评估模式下运行过 SMA,在转换模式下运行它时,也会重复执行上述所有过程。但是,转换模式还包括一个额外的最后步骤。

  • 对基于抽象语法树 (AST) 生成的代码进行格式化,以提升可读性

抽象语法树 (AST) 是代表源代码工作原理的模型。当源语言和目标语言中都存在相同的功能时,SMA 可以在目标语言中生成等效代码。此代码生成仅在实际转换过程中发生。

SMA 中的转换类型

Snowpark Migration Accelerator (SMA) 目前支持以下代码转换:

  • 将 Python 或 Scala 代码从 Spark API 调用转换为等效的 Snowpark API 调用

备注

The SMA does not perform any SQL conversion. For SQL files or SQL-only assessments, the tool provides assessment only, without any automated conversion.

让我们来看看一个同时使用 Scala 和 Python 编程语言编写的示例。

将 Spark API 引用转换为 Snowpark API 引用的示例

将 Spark Scala 转换为 Snowpark 的示例

使用 Scala 作为源语言时,Snowpark Migration Accelerator (SMA) 会自动将 Scala 代码中的 Spark API 引用转换为其等效的 Snowpark API 引用。下面的示例演示了基本 Spark 应用程序的转换过程。该示例应用程序执行了几种常见的数据操作:

  • 读取数据

  • 筛选记录

  • 联接数据集

  • 计算平均值

  • 显示结果

用 Scala 编写的 Apache Spark 代码

import org.apache.spark.sql._ 
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.SparkSession 

object SimpleApp {
  // This function calculates the average salary for jobs in a specific department
  def avgJobSalary(session: SparkSession, dept: String) {
    // Load employee data from CSV file
    val employees = session.read.csv("path/data/employees.csv")
    // Load job data from CSV file
    val jobs = session.read.csv("path/data/jobs.csv")

val jobsAvgSalary = employees
    .filter(column("Department") === dept)    // Filter employees by department
    .join(jobs)                              // Join with jobs table
    .groupBy("JobName")                      // Group results by job name
    .avg("Salary")                          // Calculate average salary for each job

// Calculate and display a list of all salaries in the department
jobsAvgSalary.select(collect_list("Salary")).show()

```scala
// Calculate and display the average salary
jobsAvgSalary.show()
}
Copy

转换为 Snowflake 后的代码:

import com.snowflake.snowpark._ 
import com.snowflake.snowpark.functions._ 
import com.snowflake.snowpark.Session 

object SimpleApp {
  // This function calculates the average salary for jobs in a specific department
  def avgJobSalary(session: Session, dept: String) {
    // Load employee data from CSV file
    val employees = session.read.csv("path/data/employees.csv")
    // Load job data from CSV file
    val jobs = session.read.csv("path/data/jobs.csv")

val jobsAvgSalary = employees
    .filter(column("Department") === dept)    // Filter employees by department
    .join(jobs)                              // Join with jobs table
    .groupBy("JobName")                      // Group results by job name
    .avg("Salary")                           // Calculate average salary per job

```scala
// Calculate and display all salaries in the department 
jobsAvgSalary.select(array_agg("Salary")).show()

// Display the average salary
jobsAvgSalary.show()
}
}
Copy

在此示例中,代码结构基本保持不变。但是,该代码已更新为使用 Snowpark API 引用而不是 Spark API 引用。

PySpark 转 Snowpark 的示例

选择 Python 作为源语言时,SMA 会自动将 Python 代码中的 PySpark API 调用转换为等效的 Snowpark API 调用。下面的示例脚本演示了各种 PySpark 函数:

from datetime import date, datetime
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Row

Create a Spark session by building and initializing a new SparkSession object, or retrieve an existing one if already available.

df = spark_session.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])

# cube()
df.cube("name", df.age).count().orderBy("name", "age").show()

# take()
df_new1.take(2)

# describe()
df.describe(['age']).show()

# explain()
df.explain() 
df.explain("simple") # Physical plan
df.explain(True) 

# intersect()
df1 = spark_session.createDataFrame([("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"])
df2 = spark_session.createDataFrame([("a", 1), ("a", 1), ("b", 3)], ["C1", "C2"])

# where()
df_new1.where(F.col('Id2')>30).show()
Copy

转换为 Snowflake 后的代码:

from datetime import date, datetime
from snowflake.snowpark import Session
from snowflake.snowpark import functions as F
from snowflake.snowpark import Row

Create a Spark session using the Session builder:

spark_session = Session.builder.create()

df = spark_session.create_dataframe([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])

# cube()
df.cube("name", df.age).count().sort("name", "age").show()

# take()
df_new1.take(2)

# describe()
df.describe(['age']).show()

# explain()
df.explain()
df.explain("simple") # Physical plan
df.explain(True)

# intersect()
df1 = spark_session.create_dataframe([("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"])
df2 = spark_session.create_dataframe([("a", 1), ("a", 1), ("b", 3)], ["C1", "C2"])

# where()
df_new1.where(F.col('Id2')>30).show()
Copy

在此示例中,代码结构基本保持不变。但是,该代码已更新为使用 Snowpark API 调用而不是 Spark API 调用。

在使用 Snowpark Migration Accelerator (SMA) 进行转换的过程中,将可获得以下结果:

语言: 中文