snowflake.core.task.dagv1.DAG¶
- class snowflake.core.task.dagv1.DAG(name: str, *, schedule: Cron | timedelta | None = None, warehouse: str | None = None, user_task_managed_initial_warehouse_size: str | None = None, error_integration: str | None = None, comment: str | None = None, task_auto_retry_attempts: int | None = None, allow_overlapping_execution: bool | None = None, user_task_timeout_ms: int | None = None, suspend_task_after_num_failures: int | None = None, config: Dict[str, Any] | None = None, session_parameters: Dict[str, Any] | None = None, stage_location: str | None = None, imports: List[str | Tuple[str, str]] | None = None, packages: List[str | ModuleType] | None = None, use_func_return_value: bool = False)¶
Bases:
object
A graph of tasks composed of a single root task and additional tasks, organized by their dependencies.
Snowflake doesn’t have a first-class task graph entity, so this is a client-side object representation which manages Task relationship. A root
Task
and its successors logically form a task graph or DAG (Directed Acyclic Graph). Refer to Task graphs.When a task graph is deployed, all child tasks are created in Snowflake. A dummy Task is created as the root. A task’s predecessor is the dummy task if it’s added to the task graph with no other predecessors.
Example
>>> dag = DAG("TEST_DAG", ... schedule=timedelta(minutes=10), ... use_func_return_value=True, ... warehouse="TESTWH_DAG", ... packages=["snowflake-snowpark-python"], ... stage_location="@TESTDB_DAG.TESTSCHEMA_DAG.TEST_STAGE_DAG" ... ) >>> def task1(session: Session) -> None: ... session.sql("select 'task1'").collect() >>> def task2(session: Session) -> None: ... session.sql("select 'task2'").collect() >>> def cond(session: Session) -> str: ... return 'TASK1' >>> with dag: ... task1 = DAGTask("TASK1", definition=task1, warehouse="TESTWH_DAG") ... task2 = DAGTask("TASK2", definition=task2, warehouse="TESTWH_DAG") ... condition = DAGTaskBranch("COND", definition=cond, warehouse="TESTWH_DAG") ... condition >> [task1, task2] >>> dag_op = DAGOperation(schema) >>> dag_op.deploy(dag, mode="orReplace") >>> dag_op.run(dag) Note: When defining a task branch handler, simply return the task name you want to jump to. The task name is case-sensitive, and it has to match the name property in DAGTask. For exmaple, in above sample code, return 'TASK1' instead of 'TEST_DAG$TASK1', 'task1' or 'Task1' will not be considered as a exact match.
Refer to
snowflake.core.task.Task
for the details of each property.Attributes
- tasks¶
Returns a list of tasks this task graph has.
Methods
- add_task(task: DAGTask) None ¶
Add a child task to this task graph.
- Parameters:
task (DAGTask) – The child task to be added to this task graph.
Examples
Add a task to previously created DAG
>>> child_task = DagTask( ... "child_task", ... "select 'child_task'", ... warehouse="test_warehouse" ... ) >>> dag.add_task(child_task) )