使用 SQL 创建的存储过程的 Scala 示例¶
将 Snowpark APIs 用于异步处理¶
以下示例说明了如何使用 Snowpark APIs 启动异步子作业,以及这些作业在不同条件下的行为方式。
在以下示例中,asyncWait
过程会执行等待 10 秒的异步子作业。
CREATE OR REPLACE PROCEDURE asyncWait()
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
PACKAGES = ('com.snowflake:snowpark:latest')
HANDLER = 'TestScalaSP.asyncBasic'
AS
$$
import com.snowflake.snowpark._
object TestScalaSP {
def asyncBasic(session: com.snowflake.snowpark.Session): String = {
val df = session.sql("select system$wait(10)")
val asyncJob = df.async.collect()
while(!asyncJob.isDone()) {
Thread.sleep(1000)
}
"Done"
}
}
$$;
call asyncScalaTest();
在以下示例中,cancelJob
过程使用 SQL 开始需要 10 秒才能完成的作业。然后,它会在子作业完成之前将其取消。
CREATE OR REPLACE PROCEDURE cancelJob()
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
PACKAGES = ('com.snowflake:snowpark:latest')
HANDLER = 'TestScalaSP.asyncBasic'
AS
$$
import com.snowflake.snowpark._
object TestScalaSP {
def asyncBasic(session: com.snowflake.snowpark.Session): String = {
val df = session.sql("select system$wait(10)")
val asyncJob = df.async.collect()
asyncJob.cancel()
"Done"
}
}
$$;
在以下示例中,checkStatus
过程会执行等待 10 秒的异步子作业。然后,该过程会在作业完成之前检查作业的状态,因此检查将返回 False
。
CREATE OR REPLACE PROCEDURE checkStatus()
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
PACKAGES = ('com.snowflake:snowpark:latest')
HANDLER = 'TestScalaSP.asyncBasic'
AS
$$
import java.sql.ResultSet
import net.snowflake.client.jdbc.{SnowflakeConnectionV1, SnowflakeResultSet, SnowflakeStatement}
object TestScalaSP {
def asyncBasic(session: com.snowflake.snowpark.Session): String = {
val connection = session.jdbcConnection
val stmt = connection.createStatement()
val rs = stmt.asInstanceOf[SnowflakeStatement].executeAsyncQuery("CALL SYSTEM$WAIT(10)")
val status = rs.asInstanceOf[SnowflakeResultSet].getStatus.toString
s"""status: ${status}"""
}
}
$$;