snowflake.core.task.dagv1.DAG

class snowflake.core.task.dagv1.DAG(name: str, *, schedule: Cron | timedelta | None = None, warehouse: str | None = None, user_task_managed_initial_warehouse_size: str | None = None, error_integration: str | None = None, comment: str | None = None, task_auto_retry_attempts: int | None = None, allow_overlapping_execution: bool | None = None, user_task_timeout_ms: int | None = None, suspend_task_after_num_failures: int | None = None, config: Dict[str, Any] | None = None, session_parameters: Dict[str, Any] | None = None, stage_location: str | None = None, imports: List[str | Tuple[str, str]] | None = None, packages: List[str | ModuleType] | None = None, use_func_return_value: bool = False)

Bases: object

A graph of tasks composed of a single root task and additional tasks, organized by their dependencies.

Snowflake doesn’t have a first-class task graph entity, so this is a client-side object representation which manages Task relationship. A root Task and its successors logically form a task graph or DAG (Directed Acyclic Graph). Refer to Task graphs.

When a task graph is deployed, all child tasks are created in Snowflake. A dummy Task is created as the root. A task’s predecessor is the dummy task if it’s added to the task graph with no other predecessors.

Example

>>> dag = DAG("TEST_DAG",
...     schedule=timedelta(minutes=10),
...     use_func_return_value=True,
...     warehouse="TESTWH_DAG",
...     packages=["snowflake-snowpark-python"],
...     stage_location="@TESTDB_DAG.TESTSCHEMA_DAG.TEST_STAGE_DAG"
... )
>>> def task1(session: Session) -> None:
...     session.sql("select 'task1'").collect()
>>> def task2(session: Session) -> None:
...     session.sql("select 'task2'").collect()
>>> def cond(session: Session) -> str:
...     return 'TASK1'
>>> with dag:
...     task1 = DAGTask("TASK1", definition=task1, warehouse="TESTWH_DAG")
...     task2 = DAGTask("TASK2", definition=task2, warehouse="TESTWH_DAG")
...     condition = DAGTaskBranch("COND", definition=cond, warehouse="TESTWH_DAG")
...     condition >> [task1, task2]
>>> dag_op = DAGOperation(schema)
>>> dag_op.deploy(dag, mode="orReplace")
>>> dag_op.run(dag)
Note:
    When defining a task branch handler, simply return the task name you want to jump to. The task name is
    case-sensitive, and it has to match the name property in DAGTask. For exmaple, in above sample code, return
    'TASK1' instead of 'TEST_DAG$TASK1', 'task1' or 'Task1' will not be considered as a exact match.
Copy

Refer to snowflake.core.task.Task for the details of each property.

Attributes

tasks

Returns a list of tasks this task graph has.

Methods

add_task(task: DAGTask) None

Add a child task to this task graph.

Parameters:

task (DAGTask) – The child task to be added to this task graph.

Examples

Add a task to previously created DAG

>>> child_task = DagTask(
...     "child_task",
...     "select 'child_task'",
...     warehouse="test_warehouse"
... )
>>> dag.add_task(child_task)
)
Copy
get_finalizer_task() DAGTask | None

Get the finalizer task for the dag.

Examples

Get the finalizer task from previously created DAG:

>>> finalizer_task = dag.get_finalizer_task()
Copy
get_task(task_name: str) DAGTask | None

Get a child task from this task graph based on task name.

Parameters:

task_name (str) – The name of the task to be retrieved from this task graph.

Examples

Get a task from previously created DAG

>>> task = dag.get_task("child_task")
Copy
Language: English