Writing stored procedures in Scala

You can write a stored procedure in Scala. You can use the Snowpark library within your stored procedure to perform queries, updates, and other work on tables in Snowflake.

This topic explains how to write a stored procedure’s logic. Once you have the logic, you can create and call the procedure using SQL. For more information, see Creating a stored procedure and Calling a stored procedure.

Introduction

You can build and run your data pipeline within Snowflake, using a Snowflake warehouse as the compute framework. For the code for your data pipeline, you use the Snowpark API for Scala to write stored procedures. To schedule the execution of these stored procedures, you use tasks.

You can capture log and trace data as your handler code executes. For more information, refer to Logging, tracing, and metrics.

Note

To both create and call an anonymous procedure, use CALL (with anonymous procedure). Creating and calling an anonymous procedure does not require a role with CREATE PROCEDURE schema privileges.

Prerequisites

You must use version 1.1.0 or a more recent version of the Snowpark library.

If you are writing a stored procedure whose handler code will be copied to a stage, you must compile your classes to run in Java version 11.x.

Setting up your development environment for Snowpark

Set up your development environment to use the Snowpark library. See Setting Up Your Development Environment for Snowpark Scala.

Structuring and building handler code

You can keep handler source code in-line with the SQL that creates the procedure or keep handler compiled result in a separate location and reference it from the SQL. For more information, see Keeping handler code in-line or on a stage.

For more on building handler source code for use with a procedure, see Packaging Handler Code.

Creating and calling a procedure

Once you’ve written a procedure’s handler, you can create and call it using SQL.

Limitations

Snowpark Stored Procedures have the following limitations:

  • Concurrency is not supported. For example, from within your code, you cannot submit queries from multiple threads. Code that concurrently issues multiple queries will produce an error.

  • If you are executing your stored procedure from a task, you must specify a warehouse when creating the task. (You cannot use serverless compute resources to run the task.)

  • Keep in mind the following limitations for using some Snowpark APIs in your stored procedure.

    • When you use APIs that execute PUT and GET commands (including Session.sql("PUT ...") and Session.sql("GET ...")), you may write only to the /tmp directory in the memory-backed file system provided for the query calling the procedure.

    • Do not use APIs that create new sessions (for example, Session.builder().configs(...).create()).

    • Using session.jdbcConnection (and the connection returned from it) is not supported because it may result in unsafe behavior.

  • Creating named temp objects is not supported in an owner’s rights stored procedure. An owner’s rights stored procedure is a stored procedure that runs with the privileges of the stored procedure owner. For more information, refer to caller’s rights or owner’s rights.

Writing the handler code for the stored procedure

For your procedure’s logic, you write handler code that executes when the procedure is called. This section describes the design of a handler.

You can include this code in-line with the SQL statement that creates the procedure, or copy the code to a stage and reference it there when you create the procedure. For more information, see Keeping handler code in-line or on a stage.

Planning to write your stored procedure

  • Limit the amount of memory consumed.

    Snowflake places limits on a method in terms of the amount of memory needed. For more information on how to avoid consuming too much, see Designing Handlers that Stay Within Snowflake-Imposed Constraints.

  • Write thread-safe code.

    Make sure that your handler method or function is thread safe.

  • Understand the security restrictions.

    Your handler code runs within a restricted engine, so be sure to follow the rules described in Security Practices for UDFs and Procedures.

  • Decide on using owner’s rights or caller’s rights.

    When planning to write your stored procedure, consider whether you want the stored procedure to run with caller’s rights or owner’s rights.

  • Keep in mind the timeout behavior for stored procedures.

    Stored procedure execution will time out unless the timer is reset by the code’s activity. In particular, the timeout timer is reset by the code’s interactions with data, including file operations, queries, and iterating through a result set.

Writing the class or object

The method or function that you define should be part of a class or object.

When writing the class or object, note the following:

  • The class (or object) and method must not be protected or private.

  • If the method is not static and you want to define a constructor, define a zero-argument constructor for the class. Snowflake invokes this zero-argument constructor at initialization time to create an instance of your class.

  • You can define different methods for different stored procedures in the same class or object.

Writing the method or function

When writing the method or function for the stored procedure, note the following:

  • Specify the Snowpark Session object as the first argument of your method or function.

    When you call your stored procedure, Snowflake automatically creates a Session object and passes it to your stored procedure. (You cannot create the Session object yourself.)

  • For the rest of the arguments and for the return value, use the Scala types that correspond to Snowflake data types.

  • Your method or function must return a value. For stored procedures in Scala, a return value is required.

  • Stored procedure execution will time out unless the timer is reset by the code’s activity. In particular, the timeout timer is reset by the code’s interactions with data, including file operations, queries, and iterating through a result set.

  • When you run an asynchronous child job from within a procedure’s handler, “fire and forget” is not supported.

    In other words, if the handler issues a child query that is still running when the parent procedure job completes, the child job is canceled automatically.

Making dependencies available to your code

If your handler code depends on code defined outside the handler itself (such as classes in a JAR file) or on resource files, you can make those dependencies available to your code by uploading them to a stage. When creating the procedure, you can reference these dependencies using the IMPORTS clause.

For more information, see Making dependencies available to your code.

Accessing data in a Snowflake procedure

To access data in Snowflake, use the Snowpark library APIs.

When handling a call to your Scala stored procedure, Snowflake creates a Snowpark Session object and passes the object to the method or function for your stored procedure.

As is the case with stored procedures in other languages, the context for the session (e.g. the privileges, current database and schema, etc.) is determined by whether the stored procedure runs with caller’s rights or owner’s rights. For details, see Accessing and setting the session state.

You can use this Session object to call APIs in the Snowpark library. For example, you can create a DataFrame for a table or execute a SQL statement.

See the Snowpark Developer Guide for Scala for more information.

Note

For information about limitations, including limitations on accessing data, see Limitations.

Data access example

The following is an example of a Scala method that copies a specified number of rows from one table to another table. The method takes the following arguments:

  • A Snowpark Session object

  • The name of the table to copy the rows from

  • The name of the table to save the rows to

  • The number of rows to copy

The method in this example returns a string.

object MyObject
{
  def myProcedure(session: com.snowflake.snowpark.Session, fromTable: String, toTable: String, count: Int): String =
  {
    session.table(fromTable).limit(count).write.saveAsTable(toTable)
    return "Success"
  }
}
Copy

The following example defines a function, rather than a method:

object MyObject
{
  val myProcedure = (session: com.snowflake.snowpark.Session, fromTable: String, toTable: String, count: Int): String =>
  {
    session.table(fromTable).limit(count).write.saveAsTable(toTable)
    "Success"
  }
}
Copy

Reading a file with a Scala procedure

You can read the contents of a file with handler code. The file must be on a Snowflake stage that’s available to your handler. For example, you might want to read a file to process unstructured data in the handler.

To read the contents of staged files, your handler can call methods in either the SnowflakeFile class or the InputStream class. You might do this if you need to access the file dynamically during compute. For more information, see Reading a dynamically-specified file with SnowflakeFile or Reading a dynamically-specified file with InputStream in this topic.

SnowflakeFile provides features not available with InputStream, as described in the following table.

Class

Input

Notes

SnowflakeFile

URL formats:

  • Scoped URL to reduce the risk of file injection attacks when the function’s caller is not also its owner.

  • File URL or string path for files that the UDF owner has access to.

The file must be located in a named internal stage or an external stage.

Easily access additional file attributes, such as file size.

InputStream

URL formats:

  • Scoped URL to reduce the risk of file injection attacks when the function’s caller is not also its owner.

The file must be located in a named internal stage or an external stage.

Note

For an owner’s rights stored procedure, the procedure’s owner must have access to any files that are not scoped URLs. For caller’s rights procedures, the caller must have access to any files that are not scoped URLs. In either case, you can read the staged file by having the handler code call the SnowflakeFile.newInstance method with a boolean value for a new requireScopedUrl parameter.

The following example uses SnowflakeFile.newInstance while specifying that a scoped URL is not required.

var filename = "@my_stage/filename.txt"
var sfFile = SnowflakeFile.newInstance(filename, false)
Copy

Reading a dynamically-specified file with SnowflakeFile

Code in the following example has a handler function execute that takes a String and returns a String with the file’s contents. At run time, Snowflake initializes the handler’s fileName variable from the incoming file path in the procedure’s input variable. The handler code uses a SnowflakeFile instance to read the file.

CREATE OR REPLACE PROCEDURE file_reader_scala_proc_snowflakefile(input VARCHAR)
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
HANDLER = 'FileReader.execute'
PACKAGES=('com.snowflake:snowpark:latest')
AS $$
import java.io.InputStream
import java.nio.charset.StandardCharsets
import com.snowflake.snowpark_java.types.SnowflakeFile
import com.snowflake.snowpark_java.Session

object FileReader {
  def execute(session: Session, fileName: String): String = {
    var input: InputStream = SnowflakeFile.newInstance(fileName).getInputStream()
    return new String(input.readAllBytes(), StandardCharsets.UTF_8)
  }
}
$$;
Copy

Code in the following CALL example creates a scoped file URL that points to the file. This is an encoded URL that permits temporary access to a staged file without granting privileges to the stage itself.

CALL file_reader_scala_proc_snowflakefile(BUILD_SCOPED_FILE_URL('@sales_data_stage', '/car_sales.json'));
Copy

Reading a dynamically-specified file with InputStream

Code in the following example has a handler function execute that takes an InputStream and returns a String with the file’s contents. At run time, Snowflake initializes the handler’s stream variable from the incoming file path in the procedure’s input variable. The handler code uses the InputStream to read the file.

CREATE OR REPLACE PROCEDURE file_reader_scala_proc_input(input VARCHAR)
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
HANDLER = 'FileReader.execute'
PACKAGES=('com.snowflake:snowpark:latest')
AS $$
import java.io.InputStream
import java.nio.charset.StandardCharsets
import com.snowflake.snowpark_java.Session

object FileReader {
  def execute(session: Session, stream: InputStream): String = {
    val contents = new String(stream.readAllBytes(), StandardCharsets.UTF_8)
    return contents
  }
}
$$;
Copy

Code in the following CALL example creates a scoped file URL that points to the file. This is an encoded URL that permits temporary access to a staged file without granting privileges to the stage itself.

CALL file_reader_scala_proc_input(BUILD_SCOPED_FILE_URL('@sales_data_stage', '/car_sales.json'));
Copy

Returning tabular data

You can write a procedure that returns data in tabular form. To write a procedure that returns tabular data, do the following:

  • Specify TABLE(...) as the procedure’s return type in your CREATE PROCEDURE statement.

    As TABLE parameters, you can specify the returned data’s column names and types if you know them. If you don’t know the returned columns when defining the procedure – such as when they’re specified at run time – you can leave out the TABLE parameters. When you do, the procedure’s return value columns will be converted from the columns in the dataframe returned by its handler. Column data types will be converted to SQL according to the mapping specified in SQL-Scala Data Type Mappings.

  • Write the handler so that it returns the tabular result in a Snowpark dataframe.

    For more information about dataframes, see Working with DataFrames in Snowpark Scala.

Note

A procedure will generate an error at runtime if either of the following is true:

  • It declares TABLE as its return type but its handler does not return a dataframe.

  • Its handler returns a dataframe but the procedure doesn’t declare TABLE as its return type.

Example

The examples in this section illustrate returning tabular values from a procedure that filters for rows where a column matches a string.

Defining the data

Code in the following example creates a table of employees.

CREATE OR REPLACE TABLE employees(id NUMBER, name VARCHAR, role VARCHAR);
INSERT INTO employees (id, name, role) VALUES (1, 'Alice', 'op'), (2, 'Bob', 'dev'), (3, 'Cindy', 'dev');
Copy

Declaring a procedure to filter rows

Code in the following two examples create a stored procedure that takes the table name and role as arguments, returning the rows in the table whose role column value matches the role specified as an argument.

Specifying return column names and types

This example specifies column names and types in the RETURNS TABLE() statement.

CREATE OR REPLACE PROCEDURE filter_by_role(table_name VARCHAR, role VARCHAR)
RETURNS TABLE(id NUMBER, name VARCHAR, role VARCHAR)
LANGUAGE SCALA
RUNTIME_VERSION = '2.12'
PACKAGES = ('com.snowflake:snowpark:latest')
HANDLER = 'Filter.filterByRole'
AS
$$
import com.snowflake.snowpark.functions._
import com.snowflake.snowpark._

object Filter {
   def filterByRole(session: Session, tableName: String, role: String): DataFrame = {
     val table = session.table(tableName)
     val filteredRows = table.filter(col("role") === role)
     return filteredRows
   }
}
$$;
Copy

Note

Currently, in the RETURNS TABLE(...) clause, you can’t specify GEOGRAPHY as a column type. This applies whether you are creating a stored or anonymous procedure.

CREATE OR REPLACE PROCEDURE test_return_geography_table_1()
  RETURNS TABLE(g GEOGRAPHY)
  ...
Copy
WITH test_return_geography_table_1() AS PROCEDURE
  RETURNS TABLE(g GEOGRAPHY)
  ...
CALL test_return_geography_table_1();
Copy

If you attempt to specify GEOGRAPHY as a column type, calling the stored procedure results in the error:

Stored procedure execution error: data type of returned table does not match expected returned table type
Copy

To work around this issue, you can omit the column arguments and types in RETURNS TABLE().

CREATE OR REPLACE PROCEDURE test_return_geography_table_1()
  RETURNS TABLE()
  ...
Copy
WITH test_return_geography_table_1() AS PROCEDURE
  RETURNS TABLE()
  ...
CALL test_return_geography_table_1();
Copy
Omitting return column names and types

Code in the following example declares a procedure that allows return value column names and types to be extrapolated from columns in the handler’s return value. It omits the column names and types from the RETURNS TABLE() statement.

CREATE OR REPLACE PROCEDURE filter_by_role(table_name VARCHAR, role VARCHAR)
   RETURNS TABLE()
   LANGUAGE SCALA
   RUNTIME_VERSION = '2.12'
   PACKAGES = ('com.snowflake:snowpark:latest')
   HANDLER = 'Filter.filterByRole'
   AS
   $$
   import com.snowflake.snowpark.functions._
   import com.snowflake.snowpark._

   object Filter {
      def filterByRole(session: Session, tableName: String, role: String): DataFrame = {
         val table = session.table(tableName)
         val filteredRows = table.filter(col("role") === role)
         return filteredRows
      }
   }
$$;
Copy

Calling the procedure

The following example calls the stored procedure:

CALL filter_by_role('employees', 'dev');
Copy

The procedure call produces the following output:

+----+-------+------+
| ID | NAME  | ROLE |
+----+-------+------+
| 2  | Bob   | dev  |
| 3  | Cindy | dev  |
+----+-------+------+

Preparing a stored procedure with a staged handler

If you plan to create a stored procedure whose handler will be compiled and copied to a stage (rather than kept in-line as source), you must compile and package your classes in a JAR file, and you must upload the JAR file to a stage.

  1. Compile and package your handler code

    To make it easier to set up your stored procedure, build a JAR file that contains all of the dependencies needed for your stored procedure. Later, you’ll need to upload the JAR file to a stage and point to the JAR file from your CREATE PROCEDURE statement. This process is simpler if you have fewer JAR files to upload and point to.

    • Use sbt to build a JAR file with dependencies.

      If you are using SBT to build and package your code, you can use the sbt-assembly plugin (https://github.com/sbt/sbt-assembly/blob/develop/README.md) to create a JAR file containing all of the dependencies. For more information, see Packaging Scala Handler Code with sbt.

    • Use Maven to build a JAR file with dependencies.

      If you are using Maven to build and package your code, you can use the Maven Assembly Plugin (https://maven.apache.org/plugins/maven-assembly-plugin/index.html) to create a JAR file that contains all of the dependencies. For more information, see Packaging Java or Scala Handler Code with Maven.

    • Use other tools to build a JAR file with dependencies.

      If you are not using SBT or Maven, see the documentation for your build tool for instructions on building a JAR file with all of the dependencies.

      For example, if you are using an IntelliJ IDEA project (not an SBT project in IntelliJ), see the instructions on setting up an artifact configuration (https://www.jetbrains.com/help/idea/compiling-applications.html#configure_artifact).

  2. Upload files to a stage

    To make your procedure’s logic (and other dependencies, if any) available to the procedure you’ll need to upload the files required to a stage. For more information, see Making dependencies available to your code.

Examples

Using Snowpark APIs for asynchronous processing

The following examples illustrate how you can use Snowpark APIs to begin asynchronous child jobs, as well as how those jobs behave under different conditions.

In the following example, the asyncWait procedure executes an asynchronous child job that waits 10 seconds.

CREATE OR REPLACE PROCEDURE asyncWait()
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
PACKAGES = ('com.snowflake:snowpark:latest')
HANDLER = 'TestScalaSP.asyncBasic'
AS
$$
import com.snowflake.snowpark._
object TestScalaSP {
  def asyncBasic(session: com.snowflake.snowpark.Session): String = {
    val df = session.sql("select system$wait(10)")
    val asyncJob = df.async.collect()
    while(!asyncJob.isDone()) {
      Thread.sleep(1000)
    }
    "Done"
  }
}
$$;

call asyncScalaTest();
Copy

In the following example, the cancelJob procedure uses SQL to start a job that would take 10 seconds to finish. It then cancels the child job before it finishes.

CREATE OR REPLACE PROCEDURE cancelJob()
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
PACKAGES = ('com.snowflake:snowpark:latest')
HANDLER = 'TestScalaSP.asyncBasic'
AS
$$
import com.snowflake.snowpark._
object TestScalaSP {
  def asyncBasic(session: com.snowflake.snowpark.Session): String = {
    val df = session.sql("select system$wait(10)")
    val asyncJob = df.async.collect()
    asyncJob.cancel()
    "Done"
  }
}
$$;
Copy

In the following example, the checkStatus procedure executes an asynchronous child job that waits 10 seconds. The procedure then checks on the status of the job before it finishes, so the check returns False.

CREATE OR REPLACE PROCEDURE checkStatus()
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
PACKAGES = ('com.snowflake:snowpark:latest')
HANDLER = 'TestScalaSP.asyncBasic'
AS
$$
import java.sql.ResultSet
import net.snowflake.client.jdbc.{SnowflakeConnectionV1, SnowflakeResultSet, SnowflakeStatement}
object TestScalaSP {
  def asyncBasic(session: com.snowflake.snowpark.Session): String = {
    val connection = session.jdbcConnection
    val stmt = connection.createStatement()
    val rs = stmt.asInstanceOf[SnowflakeStatement].executeAsyncQuery("CALL SYSTEM$WAIT(10)")
    val status = rs.asInstanceOf[SnowflakeResultSet].getStatus.toString
    s"""status:    ${status}"""
  }
}
$$;
Copy
Language: English