After completing the Tutorial Common Setup, you are ready to create a job service. In this tutorial, you create a
simple job service that connects to Snowflake, executes a SQL SELECT query, and saves the result to a table.
Stage the service specification file, which gives Snowflake the container configuration information. In addition to the name of
the image to use to start a container, the specification file specifies three arguments: a SELECT query, a virtual warehouse to execute the query, and the name of the table to save the result to.
Execute the job service. Using the EXECUTE JOB SERVICE command, you can execute the job service by providing the specification file and the
compute pool where Snowflake can run the container. And finally, verify the service results.
Build an image for the linux/amd64 platform that Snowpark Container Services supports, and then upload the image to the image
repository in your account (see Common Setup).
You will need information about the repository (the repository URL and the registry hostname) before you can build and upload the image. For more information, see
Registry and Repositories.
To build a Docker image, execute the following docker build command using the Docker CLI.
Note the command specifies current working directory (.)
as the PATH for files to use for building the image.
Upload the image to the repository in your Snowflake account. In order for Docker to upload an image on your behalf to your repository,
you must first authenticate Docker with the registry.
We recommend using Snowflake CLI
to authenticate your local Docker instance with the image
registry for your Snowflake account. Make sure that you configured Snowflake CLI to connect to Snowflake. For more information,
see Configuring Snowflake CLI and connecting to Snowflake.
To authenticate, execute the following Snowflake CLI command:
FROM and SPEC provide the stage name and the name of the job service specification file. When the job service is executed, it runs the
SQL statement and saves the result to a table as specified in my_job_spec.yaml.
+------------------------------------------------------------------------------------+| status |-------------------------------------------------------------------------------------+|Job TUTORIAL_2_JOB_SERVICE completed successfully with status: DONE.|+------------------------------------------------------------------------------------+
The job service runs a simple query and saves result to the results table.
You can verify the job service successfully completed by querying the results table:
If you want to debug execution of your job service, execute SHOW SERVICE CONTAINERS IN SERVICE to determine if the job service is still running, if it failed to start, or why it failed if it did. Also,
assuming your code outputs useful logs to standard output or standard error, you can access the logs using SYSTEM$GET_SERVICE_LOGS.
检查提供的文件: Review various code files that implement the job service.
在本地构建和测试镜像. The section provides an explanation of how you can locally test the
Docker image before uploading it to a repository in your Snowflake account.
#!/opt/conda/bin/python3import argparse
import logging
import os
import sys
from snowflake.snowpark import Session
from snowflake.snowpark.exceptions import *
# Environment variables below will be automatically populated by Snowflake.
SNOWFLAKE_ACCOUNT = os.getenv("SNOWFLAKE_ACCOUNT")
SNOWFLAKE_HOST = os.getenv("SNOWFLAKE_HOST")
SNOWFLAKE_DATABASE = os.getenv("SNOWFLAKE_DATABASE")
SNOWFLAKE_SCHEMA = os.getenv("SNOWFLAKE_SCHEMA")
# Custom environment variables
SNOWFLAKE_USER = os.getenv("SNOWFLAKE_USER")
SNOWFLAKE_PASSWORD = os.getenv("SNOWFLAKE_PASSWORD")
SNOWFLAKE_ROLE = os.getenv("SNOWFLAKE_ROLE")
SNOWFLAKE_WAREHOUSE = os.getenv("SNOWFLAKE_WAREHOUSE")
defget_arg_parser():
"""
Input argument list.
"""
parser = argparse.ArgumentParser()
parser.add_argument("--query", required=True, help="query text to execute")
parser.add_argument(
"--result_table",
required=True,
help="name of the table to store result of query specified by flag --query")
return parser
defget_logger():
"""
Get a logger for local logging.
"""
logger = logging.getLogger("job-tutorial")
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(name)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
defget_login_token():
"""
Read the login token supplied automatically by Snowflake. These tokens
are short lived and should always be read right before creating any new connection.
"""withopen("/snowflake/session/token", "r") as f:
return f.read()
defget_connection_params():
"""
Construct Snowflake connection params from environment variables.
"""if os.path.exists("/snowflake/session/token"):
return {
"account": SNOWFLAKE_ACCOUNT,
"host": SNOWFLAKE_HOST,
"authenticator": "oauth",
"token": get_login_token(),
"warehouse": SNOWFLAKE_WAREHOUSE,
"database": SNOWFLAKE_DATABASE,
"schema": SNOWFLAKE_SCHEMA
}
else:
return {
"account": SNOWFLAKE_ACCOUNT,
"host": SNOWFLAKE_HOST,
"user": SNOWFLAKE_USER,
"password": SNOWFLAKE_PASSWORD,
"role": SNOWFLAKE_ROLE,
"warehouse": SNOWFLAKE_WAREHOUSE,
"database": SNOWFLAKE_DATABASE,
"schema": SNOWFLAKE_SCHEMA
}
defrun_job():
"""
Main body of this job.
"""
logger = get_logger()
logger.info("Job started")
# Parse input arguments
args = get_arg_parser().parse_args()
query = args.query
result_table = args.result_table
# Start a Snowflake session, run the query and write results to specified tablewith Session.builder.configs(get_connection_params()).create() as session:
# Print out current session context information.
database = session.get_current_database()
schema = session.get_current_schema()
warehouse = session.get_current_warehouse()
role = session.get_current_role()
logger.info(
f"Connection succeeded. Current session context: database={database}, schema={schema}, warehouse={warehouse}, role={role}"
)
# Execute query and persist results in a table.
logger.info(
f"Executing query [{query}] and writing result to table [{result_table}]"
)
res = session.sql(query)
# If the table already exists, the query result must match the table scheme.# If the table does not exist, this will create a new table.
res.write.mode("append").save_as_table(result_table)
logger.info("Job finished")
if __name__ == "__main__":
run_job()
在代码中:
Python code executes at main, which then executes the run_job() function:
if __name__ == "__main__":
run_job()
The run_job() function reads the environment variables and uses them to set default values for various parameters.
The container uses these parameters to connect to Snowflake. Note that:
You can override the parameter values, used in the service, using the containers.env and containers.args fields in the service specification. For more information, see Service specification reference.
When the image runs in Snowflake, Snowflake populates some of these parameters (see source code) automatically. However,
when testing the image locally, you need to explicitly provide these parameters (as shown in the next section,
在本地构建和测试镜像).
spec:containers:-name:mainimage:/tutorial_db/data_schema/tutorial_repository/my_job_image:latestenv:SNOWFLAKE_WAREHOUSE:tutorial_warehouseargs:-"--query=select current_time() as time,'hello'"-"--result_table=results"
In addition to the container.name and container.image required fields (see Service specification reference),
the specification includes the optional container.args field to list the arguments:
--query provides the query to execute when the service runs.
--result_table identifies the table to save the query results.
When you run the container as a service, Snowflake provides these parameters to the container as the environment variables. For
more information, see Configure Snowflake Client.
The job service executes the query (select current_time() as time,'hello') and writes result to the table
(tutorial_db.data_schema.results). If the table does not exist, it is created. If the table exists, the job service adds a row.