Compose Module

Task, DAG, and Composer classes

This module defines the Task, DAG, and Composer classes. Tasks are intended to hold task-level data. DAGs are intended to hold relationships between Tasks. Composer schedules Tasks in accordance with the DAG.

class alyeska.compose.Composer(dag: alyeska.compose.DAG)

Bases: object

The Composer handles all the scheduling computations.

dag

A copy of the originally supplied DAG. This attribute is trimmed while planning the schedule.

Type:DAG
original_dag

The originally supplied DAG. Used to refresh dag after the schedule is planned.

Type:DAG
classmethod from_yaml(p: pathlib.Path) → alyeska.compose.Composer

Create a Composer from a compose.yaml file

Returns:A Composer representation of compose.yaml
Return type:Composer
get_schedules() → Dict[int, Set[alyeska.compose.Task]]

Schedule tasks by priority level.

Returns:set of Task
Return type:dict of int
For example, make_tea -> pour_tea -> drink_tea will give the dict
{1: {make_tea},
2: {pour_tea}, 3: {drink_tea}}
get_task_schedules() → Dict[alyeska.compose.Task, int]

Define schedule priority level for each task

Returns:int
Return type:dict of Task

Example

make_tea -> pour_tea -> drink_tea will give the dict: {make_tea: 1, pour_tea: 2, drink_tea: 3}

refresh_dag() → None

Create a deepcopy of the original_dag.

class alyeska.compose.DAG(*, tasks: set = {}, upstream_dependencies: dict = {}, downstream_dependencies: dict = {})

Bases: object

Define a DAG and relationships between tasks.

A DAG is a directed acyclic graph with tasks and dependencies as nodes and directed edges respectively. You have the option to define all the tasks and dependencies at once if you prefer that syntax.

Note

Not obviously, a DAG may contain more than one graph. Also not obviously, new Tasks defined by edges are automatically added to the set of tasks.

tasks

The set of all tasks. Composer will try to run every task in this attribute.

Type:set of Task
_edges (`dict` of `Task`

set of Task): Maps tasks to their downstream dependencies. Access directly at your own peril. e.g. A -> B -> C, not C -> B -> A

add_dependencies(d: Dict[alyeska.compose.Task, Set[alyeska.compose.Task]]) → None

Add multiple dependencies to DAG

Parameters:(dict of Task (d) – set of Task): An adjacency dict mapping downstream Tasks to possibly many upstream tasks.

Note

If any tasks do not yet exist in DAG, the task will automatically be added to the dag.

Examples

>>> from Composer import Task, DAG
>>> boil_water = Task('boil_water.py')
>>> prep_infuser = Task('prep_infuser.py')
>>> steep_tea = Task('steep_tea.py')
>>> dag = DAG()
>>> dag.add_dependencies({steep_tea: {boil_water, prep_infuser}})
add_dependency(task: alyeska.compose.Task, depends_on: alyeska.compose.Task) → None

Add dependency to DAG.

Parameters:
  • task (Task) – The downstream task.
  • depends_on (Task) – The upstream task.

Note

If either task does not yet exist in DAG, the task will automatically be added to the dag.

Examples

>>> from Composer import Task, DAG
>>> boil_water = Task('boil_water.py')
>>> steep_tea = Task('steep_tea.py')
>>> dag = DAG()
>>> dag.add_dependency(steep_tea, depends_on=boil_water)
add_task(task: alyeska.compose.Task) → None

Add a task to the set of tasks

Parameters:task (Task) – A Task object.
add_tasks(tasks: set) → None

Add multiple tasks to the set of tasks.

Parameters:tasks (set of Task) – Tasks to be added to the DAG.
classmethod from_yaml(p: pathlib.Path) → alyeska.compose.DAG

Create a DAG from a compose.yaml file

Returns:Directed Acyclic Graph representation of compose.yaml
Return type:DAG
get_downstream() → dict

Return adjacency dict of downstream Tasks.

Returns:set of Task
Return type:dict of Task
get_sinks() → set

Return the set of sink Tasks (Tasks with no downstream dependencies)

Returns:set of Task
get_sources() → set

Return the set of source Tasks (Tasks with no upstream dependencies)

Returns:set of Task
get_upstream() → dict

Return adjacency dict of upstream Tasks

Returns:set of Task
Return type:dict of Task
is_cyclic() → bool

Detect if the DAG is cyclic.

Returns:True if cycle detected. False otherwise.
remove_task(task: alyeska.compose.Task) → None

Remove task from the set of tasks and remove any related edges

Parameters:task (Task) – A task to be removed from the DAG.
remove_tasks(tasks: set) → None

Remove multiple tasks from the set of tasks and any related edges

Parameters:tasks (set of Task) – Tasks to be removed from the DAG.
static validate_dependency(d)
class alyeska.compose.Task(loc: pathlib.Path, env: str = '/home/docs/checkouts/readthedocs.org/user_builds/alyeska/envs/latest/bin/python', validate_loc: bool = False)

Bases: object

Define a Task and its relevant attributes.

Note

Tasks with the same loc and env are equal.

loc

location of the python script that runs the task.

Type:pathlib.Path
env

Which environment to run.

Type:str, optional
env
loc