Writing stored procedures in Scala¶
You can write a stored procedure in Scala. You can use the Snowpark library within your stored procedure to perform queries, updates, and other work on tables in Snowflake.
This topic explains how to write a stored procedure’s logic. Once you have the logic, you can create and call the procedure using SQL. For more information, see Creating a stored procedure and Calling a stored procedure.
Introduction¶
You can build and run your data pipeline within Snowflake, using a Snowflake warehouse as the compute framework. For the code for your data pipeline, you use the Snowpark API for Scala to write stored procedures. To schedule the execution of these stored procedures, you use tasks.
You can capture log and trace data as your handler code executes. For more information, refer to Logging, tracing, and metrics.
Note
To both create and call an anonymous procedure, use CALL (with anonymous procedure). Creating and calling an anonymous procedure does not require a role with CREATE PROCEDURE schema privileges.
Prerequisites¶
You must use version 1.1.0 or a more recent version of the Snowpark library.
If you are writing a stored procedure whose handler code will be copied to a stage, you must compile your classes to run in Java version 11.x.
Setting up your development environment for Snowpark¶
Set up your development environment to use the Snowpark library. See Setting Up Your Development Environment for Snowpark Scala.
Structuring and building handler code¶
You can keep handler source code in-line with the SQL that creates the procedure or keep handler compiled result in a separate location and reference it from the SQL. For more information, see Keeping handler code in-line or on a stage.
For more on building handler source code for use with a procedure, see Packaging Handler Code.
Creating and calling a procedure¶
Once you’ve written a procedure’s handler, you can create and call it using SQL.
Creating the stored procedure
For information about creating a stored procedure with SQL, see Creating a stored procedure.
For information about creating an anonymous procedure with SQL, see CALL (with anonymous procedure).
Calling your stored procedure
For information on calling a stored procedure from SQL, see Calling a stored procedure.
Limitations¶
Snowpark Stored Procedures have the following limitations:
Concurrency is not supported. For example, from within your code, you cannot submit queries from multiple threads. Code that concurrently issues multiple queries will produce an error.
If you are executing your stored procedure from a task, you must specify a warehouse when creating the task. (You cannot use serverless compute resources to run the task.)
Keep in mind the following limitations for using some Snowpark APIs in your stored procedure.
When you use APIs that execute PUT and GET commands (including
Session.sql("PUT ...")
andSession.sql("GET ...")
), you may write only to the /tmp directory in the memory-backed file system provided for the query calling the procedure.Do not use APIs that create new sessions (for example,
Session.builder().configs(...).create()
).Using
session.jdbcConnection
(and the connection returned from it) is not supported because it may result in unsafe behavior.
Creating named temp objects is not supported in an owner’s rights stored procedure. An owner’s rights stored procedure is a stored procedure that runs with the privileges of the stored procedure owner. For more information, refer to caller’s rights or owner’s rights.
Writing the handler code for the stored procedure¶
For your procedure’s logic, you write handler code that executes when the procedure is called. This section describes the design of a handler.
You can include this code in-line with the SQL statement that creates the procedure, or copy the code to a stage and reference it there when you create the procedure. For more information, see Keeping handler code in-line or on a stage.
Planning to write your stored procedure¶
Limit the amount of memory consumed.
Snowflake places limits on a method in terms of the amount of memory needed. For more information on how to avoid consuming too much, see Designing Handlers that Stay Within Snowflake-Imposed Constraints.
Write thread-safe code.
Make sure that your handler method or function is thread safe.
Understand the security restrictions.
Your handler code runs within a restricted engine, so be sure to follow the rules described in Security Practices for UDFs and Procedures.
Decide on using owner’s rights or caller’s rights.
When planning to write your stored procedure, consider whether you want the stored procedure to run with caller’s rights or owner’s rights.
Keep in mind the timeout behavior for stored procedures.
Stored procedure execution will time out unless the timer is reset by the code’s activity. In particular, the timeout timer is reset by the code’s interactions with data, including file operations, queries, and iterating through a result set.
Writing the class or object¶
The method or function that you define should be part of a class or object.
When writing the class or object, note the following:
The class (or object) and method must not be protected or private.
If the method is not static and you want to define a constructor, define a zero-argument constructor for the class. Snowflake invokes this zero-argument constructor at initialization time to create an instance of your class.
You can define different methods for different stored procedures in the same class or object.
Writing the method or function¶
When writing the method or function for the stored procedure, note the following:
Specify the Snowpark
Session
object as the first argument of your method or function.When you call your stored procedure, Snowflake automatically creates a
Session
object and passes it to your stored procedure. (You cannot create theSession
object yourself.)For the rest of the arguments and for the return value, use the Scala types that correspond to Snowflake data types.
Your method or function must return a value. For stored procedures in Scala, a return value is required.
Stored procedure execution will time out unless the timer is reset by the code’s activity. In particular, the timeout timer is reset by the code’s interactions with data, including file operations, queries, and iterating through a result set.
When you run an asynchronous child job from within a procedure’s handler, “fire and forget” is not supported.
In other words, if the handler issues a child query that is still running when the parent procedure job completes, the child job is canceled automatically.
Making dependencies available to your code¶
If your handler code depends on code defined outside the handler itself (such as classes in a JAR file) or on resource files, you can make those dependencies available to your code by uploading them to a stage. When creating the procedure, you can reference these dependencies using the IMPORTS clause.
For more information, see Making dependencies available to your code.
Accessing data in a Snowflake procedure¶
To access data in Snowflake, use the Snowpark library APIs.
When handling a call to your Scala stored procedure, Snowflake creates a Snowpark Session
object and passes the object to
the method or function for your stored procedure.
As is the case with stored procedures in other languages, the context for the session (e.g. the privileges, current database and schema, etc.) is determined by whether the stored procedure runs with caller’s rights or owner’s rights. For details, see Accessing and setting the session state.
You can use this Session
object to call APIs in the
Snowpark library.
For example, you can create a DataFrame for a table or execute a
SQL statement.
See the Snowpark Developer Guide for Scala for more information.
Note
For information about limitations, including limitations on accessing data, see Limitations.
Data access example¶
The following is an example of a Scala method that copies a specified number of rows from one table to another table. The method takes the following arguments:
A Snowpark
Session
objectThe name of the table to copy the rows from
The name of the table to save the rows to
The number of rows to copy
The method in this example returns a string.
object MyObject
{
def myProcedure(session: com.snowflake.snowpark.Session, fromTable: String, toTable: String, count: Int): String =
{
session.table(fromTable).limit(count).write.saveAsTable(toTable)
return "Success"
}
}
The following example defines a function, rather than a method:
object MyObject
{
val myProcedure = (session: com.snowflake.snowpark.Session, fromTable: String, toTable: String, count: Int): String =>
{
session.table(fromTable).limit(count).write.saveAsTable(toTable)
"Success"
}
}
Reading a file with a Scala procedure¶
You can read the contents of a file with handler code. The file must be on a Snowflake stage that’s available to your handler. For example, you might want to read a file to process unstructured data in the handler.
To read the contents of staged files, your handler can call methods in either the SnowflakeFile
class or the InputStream
class. You might do this if you need to access the file dynamically during compute. For more information, see
Reading a dynamically-specified file with SnowflakeFile or Reading a dynamically-specified file with InputStream in this topic.
SnowflakeFile
provides features not available with InputStream
, as described in the following table.
Class |
Input |
Notes |
---|---|---|
|
URL formats:
The file must be located in a named internal stage or an external stage. |
Easily access additional file attributes, such as file size. |
|
URL formats:
The file must be located in a named internal stage or an external stage. |
Note
For an owner’s rights stored procedure, the procedure’s owner must have access to any files that are not scoped URLs. For caller’s rights
procedures, the caller must have access to any files that are not scoped URLs. In either case, you can read the staged file by having the
handler code call the SnowflakeFile.newInstance
method with a boolean
value for a new requireScopedUrl
parameter.
The following example uses SnowflakeFile.newInstance
while specifying that a scoped URL is not required.
var filename = "@my_stage/filename.txt"
var sfFile = SnowflakeFile.newInstance(filename, false)
Reading a dynamically-specified file with SnowflakeFile
¶
Code in the following example has a handler function execute
that takes a String
and returns a String
with the file’s contents. At run time, Snowflake initializes the handler’s fileName
variable from the incoming file path in the
procedure’s input
variable. The handler code uses a SnowflakeFile
instance to read the file.
CREATE OR REPLACE PROCEDURE file_reader_scala_proc_snowflakefile(input VARCHAR)
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
HANDLER = 'FileReader.execute'
PACKAGES=('com.snowflake:snowpark:latest')
AS $$
import java.io.InputStream
import java.nio.charset.StandardCharsets
import com.snowflake.snowpark_java.types.SnowflakeFile
import com.snowflake.snowpark_java.Session
object FileReader {
def execute(session: Session, fileName: String): String = {
var input: InputStream = SnowflakeFile.newInstance(fileName).getInputStream()
return new String(input.readAllBytes(), StandardCharsets.UTF_8)
}
}
$$;
Code in the following CALL example creates a scoped file URL that points to the file. This is an encoded URL that permits temporary access to a staged file without granting privileges to the stage itself.
CALL file_reader_scala_proc_snowflakefile(BUILD_SCOPED_FILE_URL('@sales_data_stage', '/car_sales.json'));
Reading a dynamically-specified file with InputStream
¶
Code in the following example has a handler function execute
that takes an InputStream
and returns a String
with the file’s contents. At run time, Snowflake initializes the handler’s stream
variable from the incoming file path in the
procedure’s input
variable. The handler code uses the InputStream
to read the file.
CREATE OR REPLACE PROCEDURE file_reader_scala_proc_input(input VARCHAR)
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
HANDLER = 'FileReader.execute'
PACKAGES=('com.snowflake:snowpark:latest')
AS $$
import java.io.InputStream
import java.nio.charset.StandardCharsets
import com.snowflake.snowpark_java.Session
object FileReader {
def execute(session: Session, stream: InputStream): String = {
val contents = new String(stream.readAllBytes(), StandardCharsets.UTF_8)
return contents
}
}
$$;
Code in the following CALL example creates a scoped file URL that points to the file. This is an encoded URL that permits temporary access to a staged file without granting privileges to the stage itself.
CALL file_reader_scala_proc_input(BUILD_SCOPED_FILE_URL('@sales_data_stage', '/car_sales.json'));
Returning tabular data¶
You can write a procedure that returns data in tabular form. To write a procedure that returns tabular data, do the following:
Specify
TABLE(...)
as the procedure’s return type in your CREATE PROCEDURE statement.As TABLE parameters, you can specify the returned data’s column names and types if you know them. If you don’t know the returned columns when defining the procedure – such as when they’re specified at run time – you can leave out the TABLE parameters. When you do, the procedure’s return value columns will be converted from the columns in the dataframe returned by its handler. Column data types will be converted to SQL according to the mapping specified in SQL-Scala Data Type Mappings.
Write the handler so that it returns the tabular result in a Snowpark dataframe.
For more information about dataframes, see Working with DataFrames in Snowpark Scala.
Note
A procedure will generate an error at runtime if either of the following is true:
It declares TABLE as its return type but its handler does not return a dataframe.
Its handler returns a dataframe but the procedure doesn’t declare TABLE as its return type.
Example¶
The examples in this section illustrate returning tabular values from a procedure that filters for rows where a column matches a string.
Defining the data¶
Code in the following example creates a table of employees.
CREATE OR REPLACE TABLE employees(id NUMBER, name VARCHAR, role VARCHAR);
INSERT INTO employees (id, name, role) VALUES (1, 'Alice', 'op'), (2, 'Bob', 'dev'), (3, 'Cindy', 'dev');
Declaring a procedure to filter rows¶
Code in the following two examples create a stored procedure that takes the table name and role as arguments, returning the rows in the table whose role column value matches the role specified as an argument.
Specifying return column names and types¶
This example specifies column names and types in the RETURNS TABLE()
statement.
CREATE OR REPLACE PROCEDURE filter_by_role(table_name VARCHAR, role VARCHAR)
RETURNS TABLE(id NUMBER, name VARCHAR, role VARCHAR)
LANGUAGE SCALA
RUNTIME_VERSION = '2.12'
PACKAGES = ('com.snowflake:snowpark:latest')
HANDLER = 'Filter.filterByRole'
AS
$$
import com.snowflake.snowpark.functions._
import com.snowflake.snowpark._
object Filter {
def filterByRole(session: Session, tableName: String, role: String): DataFrame = {
val table = session.table(tableName)
val filteredRows = table.filter(col("role") === role)
return filteredRows
}
}
$$;
Note
Currently, in the RETURNS TABLE(...)
clause, you can’t specify GEOGRAPHY as a column type. This
applies whether you are creating a stored or anonymous procedure.
CREATE OR REPLACE PROCEDURE test_return_geography_table_1()
RETURNS TABLE(g GEOGRAPHY)
...
WITH test_return_geography_table_1() AS PROCEDURE
RETURNS TABLE(g GEOGRAPHY)
...
CALL test_return_geography_table_1();
If you attempt to specify GEOGRAPHY as a column type, calling the stored procedure results in the error:
Stored procedure execution error: data type of returned table does not match expected returned table type
To work around this issue, you can omit the column arguments and types in RETURNS TABLE()
.
CREATE OR REPLACE PROCEDURE test_return_geography_table_1()
RETURNS TABLE()
...
WITH test_return_geography_table_1() AS PROCEDURE
RETURNS TABLE()
...
CALL test_return_geography_table_1();
Omitting return column names and types¶
Code in the following example declares a procedure that allows return value column names and types to be extrapolated from columns in the
handler’s return value. It omits the column names and types from the RETURNS TABLE()
statement.
CREATE OR REPLACE PROCEDURE filter_by_role(table_name VARCHAR, role VARCHAR)
RETURNS TABLE()
LANGUAGE SCALA
RUNTIME_VERSION = '2.12'
PACKAGES = ('com.snowflake:snowpark:latest')
HANDLER = 'Filter.filterByRole'
AS
$$
import com.snowflake.snowpark.functions._
import com.snowflake.snowpark._
object Filter {
def filterByRole(session: Session, tableName: String, role: String): DataFrame = {
val table = session.table(tableName)
val filteredRows = table.filter(col("role") === role)
return filteredRows
}
}
$$;
Calling the procedure¶
The following example calls the stored procedure:
CALL filter_by_role('employees', 'dev');
The procedure call produces the following output:
+----+-------+------+
| ID | NAME | ROLE |
+----+-------+------+
| 2 | Bob | dev |
| 3 | Cindy | dev |
+----+-------+------+
Preparing a stored procedure with a staged handler¶
If you plan to create a stored procedure whose handler will be compiled and copied to a stage (rather than kept in-line as source), you must compile and package your classes in a JAR file, and you must upload the JAR file to a stage.
Compile and package your handler code
To make it easier to set up your stored procedure, build a JAR file that contains all of the dependencies needed for your stored procedure. Later, you’ll need to upload the JAR file to a stage and point to the JAR file from your CREATE PROCEDURE statement. This process is simpler if you have fewer JAR files to upload and point to.
Use sbt to build a JAR file with dependencies.
If you are using SBT to build and package your code, you can use the sbt-assembly plugin (https://github.com/sbt/sbt-assembly/blob/develop/README.md) to create a JAR file containing all of the dependencies. For more information, see Packaging Scala Handler Code with sbt.
Use Maven to build a JAR file with dependencies.
If you are using Maven to build and package your code, you can use the Maven Assembly Plugin (https://maven.apache.org/plugins/maven-assembly-plugin/index.html) to create a JAR file that contains all of the dependencies. For more information, see Packaging Java or Scala Handler Code with Maven.
Use other tools to build a JAR file with dependencies.
If you are not using SBT or Maven, see the documentation for your build tool for instructions on building a JAR file with all of the dependencies.
For example, if you are using an IntelliJ IDEA project (not an SBT project in IntelliJ), see the instructions on setting up an artifact configuration (https://www.jetbrains.com/help/idea/compiling-applications.html#configure_artifact).
Upload files to a stage
To make your procedure’s logic (and other dependencies, if any) available to the procedure you’ll need to upload the files required to a stage. For more information, see Making dependencies available to your code.
Examples¶
Using Snowpark APIs for asynchronous processing¶
The following examples illustrate how you can use Snowpark APIs to begin asynchronous child jobs, as well as how those jobs behave under different conditions.
In the following example, the asyncWait
procedure executes an asynchronous child job that waits 10 seconds.
CREATE OR REPLACE PROCEDURE asyncWait()
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
PACKAGES = ('com.snowflake:snowpark:latest')
HANDLER = 'TestScalaSP.asyncBasic'
AS
$$
import com.snowflake.snowpark._
object TestScalaSP {
def asyncBasic(session: com.snowflake.snowpark.Session): String = {
val df = session.sql("select system$wait(10)")
val asyncJob = df.async.collect()
while(!asyncJob.isDone()) {
Thread.sleep(1000)
}
"Done"
}
}
$$;
call asyncScalaTest();
In the following example, the cancelJob
procedure uses SQL to start a job that would take 10 seconds to finish. It then cancels
the child job before it finishes.
CREATE OR REPLACE PROCEDURE cancelJob()
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
PACKAGES = ('com.snowflake:snowpark:latest')
HANDLER = 'TestScalaSP.asyncBasic'
AS
$$
import com.snowflake.snowpark._
object TestScalaSP {
def asyncBasic(session: com.snowflake.snowpark.Session): String = {
val df = session.sql("select system$wait(10)")
val asyncJob = df.async.collect()
asyncJob.cancel()
"Done"
}
}
$$;
In the following example, the checkStatus
procedure executes an asynchronous child job that waits 10 seconds. The procedure then
checks on the status of the job before it finishes, so the check returns False
.
CREATE OR REPLACE PROCEDURE checkStatus()
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
PACKAGES = ('com.snowflake:snowpark:latest')
HANDLER = 'TestScalaSP.asyncBasic'
AS
$$
import java.sql.ResultSet
import net.snowflake.client.jdbc.{SnowflakeConnectionV1, SnowflakeResultSet, SnowflakeStatement}
object TestScalaSP {
def asyncBasic(session: com.snowflake.snowpark.Session): String = {
val connection = session.jdbcConnection
val stmt = connection.createStatement()
val rs = stmt.asInstanceOf[SnowflakeStatement].executeAsyncQuery("CALL SYSTEM$WAIT(10)")
val status = rs.asInstanceOf[SnowflakeResultSet].getStatus.toString
s"""status: ${status}"""
}
}
$$;