Task Graph Examples

This topic provides practical examples of how to run a sequence of tasks, also known as a task graph.

Example: Start multiple tasks and report status

In the following example, the root task starts tasks to update three different tables. After those three tables are updated, another task starts to update an aggregate sales table. Last, a finalizer task sends a status report using an external function.

Flowchart shows a root task that starts three child tasks, which start another child task. A finalizer task starts when the others are complete.
-- Create a notebook in the public schema
-- Use database <database name>;
-- Use schema <schema name>;

-- task_a: Root task. Starts the task graph and sets basic configurations.
-- Configurations:
-- * Each task in the task graph retries twice before the task is considered
--   to have failed.
-- * If any individual task fails three times, the task graph is suspended.
create or replace task task_a
schedule = '1 minute'
task_auto_retry_attempts = 2
suspend_task_after_num_failures = 3
user_task_timeout_ms = 60
config='{"environment": "production", "path": "/prod_directory/"}'
as
  begin
    call system$set_return_value('task_a successful');
  end;
;


-- task_customer_table: Updates the customer table.
--   Runs after the root task completes.
create or replace task task_customer_table
user_task_timeout_ms = 60
after task_a
as
  begin
    let value := (select customer_id from ref_cust_table
    where cust_name = "Jane Doe";);
    insert into customer_table values('customer_id',:value);
  end;
;

-- task_product_table: Updates the product table.
--   Runs after the root task completes.
create or replace task task_product_table
user_task_timeout_ms = 60
after task_a
as
  begin
    let value := (select product_id from ref_item_table
    where product_name = "widget";);
    insert into product_table values('product_id',:value);
  end;
;

-- task_date_time_table: Updates the date/time table.
--   Runs after the root task completes.
create or replace task task_date_time_table
user_task_timeout_ms = 60
after task_a
as
  begin
    let value := (select system$task_runtime_info('current_task_graph_original_scheduled_timestamp'));
    insert into date_time_table values('order_date',:value);
  end;
;

-- task_sales_table: Aggregates changes from other tables.
--   Runs only after updates are complete to all three other tables.
create or replace task task_sales_table
user_task_timeout_ms = 60
after task_customer_table, task_product_table, task_date_time_table
as
  begin
    let value := (select sales_order_id from orders);
    join customer_table on orders.customer_id=customer_table.customer_id;
    insert into sales_table values('sales_order_id',:value);
  end;
;

-- task_a_finalizer: Sends a notification on the task’s status.
--   Runs after all other tasks either complete successfully,
--   fail, or time out.
--   Uses an external function to notify the admin through email.
create or replace task notify_finalizer
user_task_timeout_ms = 60
finalize = task_a
as
  declare
      my_root_task_id string;
      my_start_time timestamp_ltz;
      summary_json string;
      summary_html string;
  begin
      --- Get root task ID
      my_root_task_id := (call system$task_runtime_info('current_root_task_uuid'));

      --- Get root task scheduled time
      my_start_time := (call system$task_runtime_info('current_task_graph_original_scheduled_timestamp'));

      --- Combine all task run infos into one JSON string
      summary_json := (select get_task_graph_run_summary(:my_root_task_id, :my_start_time));

      --- Convert JSON into HTML table
      summary_html := (select html_from_json_task_runs(:summary_json));


      --- Send HTML to email
      call system$send_email(
          'email_notification',
          'admin@snowflake.com',
          'notification task run summary',
          :summary_html,
          'text/html');

      --- Set return value for finalizer
      call system$set_return_value('✅ Graph run summary sent.');
end;
;

create or replace function get_task_graph_run_summary(my_root_task_id string, my_start_time timestamp_ltz)
returns string
as
$$
    (select
        array_agg(object_construct(
            'task_name', name,
            'run_status', state,
            'return_value', return_value,
            'started', query_start_time,
            'duration', duration,
            'error_message', error_message
            )
        ) as graph_run_summary
    from
        (select
            name,
            case
                when state = 'SUCCEED' then '🟢 Succeeded'
                when state = 'FAILED' then '🔴 Failed'
                when state = 'SKIPPED' then '🔵 Skipped'
                when state = 'CANCELLED' then '🔘 Cancelled'
            end as state,
            return_value,
            to_varchar(query_start_time, 'YYYY-MM-DD HH24:MI:SS') as query_start_time,
            concat(timestampdiff('seconds', query_start_time, completed_time),
              ' s') as duration,
            error_message
        from
            table(mweidb.information_schema.task_history(
                root_task_id => my_root_task_id ::string,
                scheduled_time_range_start => my_start_time,
                scheduled_time_range_end => current_timestamp()
                ))
        order by
            scheduled_time)
    )::string
$$
;


create or replace function HTML_FROM_JSON_TASK_RUNS(JSON_DATA string)
returns string
language python
runtime_version = '3.8'
handler = 'GENERATE_HTML_TABLE'
as
$$
import json

def GENERATE_HTML_TABLE(JSON_DATA):

    column_widths = ["320px", "120px", "400px", "160px", "80px", "480px"]

    DATA = json.loads(JSON_DATA)

    HTML = f"""
        <img src="https://example.com/logo.jpg"
        alt="Company logo" height="72">
        <p><strong>Task Graph Run Summary</strong>
        <br>Log in to Snowsight to see more details.</p>
        <table border="1" style="border-color:#DEE3EA"
        cellpadding="5" cellspacing="0">
          <thead>
            <tr>
    """
    headers = ["Task name", "Run status", "Return value", "Started", "Duration", "Error message"]
    for i, header in enumerate(headers):
        HTML += f'<th scope="col" style="text-align:left;
        width: {column_widths[i]}">{header.capitalize()}</th>'

    HTML +="""
            </tr>
    </thead>
    <tbody>
    """

    for ROW_DATA in DATA:
        HTML += "<tr>"
        for header in headers:
            key = header.replace(" ", "_").upper()
            CELL_DATA = ROW_DATA.get(key, "")
            HTML += f'<td style="text-align:left;
            width: {column_widths[headers.index(header)]}">{CELL_DATA}</td>'
        HTML += "</tr>"

    HTML +="""
        </tbody>
    </table>
    """

    return HTML
$$
;
Copy

Example: Use a finalizer task to correct for errors

This example demonstrates how a finalizer task can correct for errors.

For demonstration purposes, the tasks are designed to fail during their first run. The finalizer tasks corrects the issue and restarts the tasks, which succeed on following runs:

Diagram showing a task series. Task A is shown on the upper-left. An arrow points right from Task A to Task B, which points to Task C, which points to Task D. Below Task A, an arrow points to the finalizer task, Task F.
-- Configuration
-- By default, the notebook creates the objects in the public schema
-- Use database <database name>;
-- Use schema <schema name>;

-- 1. Set default configurations
--    Creates a root task ("task_a"), and sets the default configurations
--    used throughout the task graph.
--    Configurations include:
--    * Each task runs after one minute, with a 60 second timeout.
--    * If a task fails, retry it twice. if it fails twice,
--      the entire task graph is considered as failed.
--    * If the task graph fails consecutively 3 times, suspend the task.
--    * Other environment values are set.

schedule = '1 minute'
user_task_timeout_ms = 60
task_auto_retry_attempts = 2
suspend_task_after_num_failures = 3
config='{"environment": "production", "path": "/prod_directory/"}'
as
  begin
    call system$set_return_value('task a successful');
  end;
;

-- 2. Use a runtime reflection variable
--    Creates a child task ("task_b")
--    By design, this example fails the first time it runs, because
--    it writes to a table ("demo_table") that doesn’t exist.
create or replace task task_b
user_task_timeout_ms = 60
after task_a
as
  begin
    let value := (select system$task_runtime_info('current_task_name'));
    insert into demo_table values('task b name',:value);

  end;
;

-- 3. Get a task graph configuration value
--    Creates the child task ("task_c").
--    By design, this example fails the first time it runs, because
--    the predecessor task ("task_b") fails.
create or replace task task_c
user_task_timeout_ms = 60
after task_b
as
  begin
    call system$get_task_graph_config('path');
    let value := (select system$get_task_graph_config('path'));
    insert into demo_table values('task c path',:value);
  end;
;

-- 4. Get a value from a predecessor.
--    Creates the child task ("task_d").
--    By design, this example fails the first time it runs, because
--    the predecessor task ("task_c") fails.
create or replace task task_d
user_task_timeout_ms = 60
after task_c
as
  begin
    let value := (select system$get_predecessor_return_value('task_a'));
    insert into demo_table values('task d: predecessor return value', :value);
  end;
;

-- 5. Create finalizer task ("task_f"), which creates the missing demo table.
--    After the finalizer completes, the task should automatically retry
--    (see task_a: tasks_auto_retry_attempts).
--    On retry, task_b, task_c, and task_d should complete successfully.
create or replace task task_f
user_task_timeout_ms = 60
finalize = task_a
as
  begin
    create table if not exists demo_table(name varchar, value varchar);
  end;
;

-- 6. Resume finalizer. Upon creation, tasks start in a suspended state.
--    Use this command to resume the finalizer and all of its child tasks.
alter task task_f resume;
select system$task_dependents_enable('task_a');

-- 7. Query task history
select
    name, state, attempt_number, scheduled_from
from
    table(information_schema.task_history(task_name=> 'task_b'))
limit 5;

-- 8. Suspend task graph to stop incurring costs
--    Note: To stop the task graph, you only need to suspend the root task
--    ("task_a"). Child tasks don’t run unless the root task is run.
--    if any child tasks are running, they have a limited duration
--    and will end soon.
alter task task_a suspend;
drop table demo_table;

-- 9. Check tasks during execution (optional)
--    Run this command to query the demo table during execution
--    to check which tasks have run.
select * from demo_table;

-- 10. Demo reset (optional)
--     Run this command to remove the demo table.
--     This causes task_b to fail during its first run.
--     after the task graph retries, task_b will succeed.
drop table demo_table;
Copy
Language: English