.. _dootle.control.fsm.task: ======================= dootle.control.fsm.task ======================= .. py:module:: dootle.control.fsm.task .. autoapi-nested-parse:: An FSM b acked Task and job Classes ------- .. autoapisummary:: dootle.control.fsm.task._Predicates_m dootle.control.fsm.task._Callbacks_m dootle.control.fsm.task.FSMTask dootle.control.fsm.task.FSMJob Module Contents =============== .. dootle.control.fsm.task._Predicates_m: .. py:class:: _Predicates_m .. py:attribute:: spec :type: doot.workflow.TaskSpec .. py:attribute:: name :type: doot.workflow._interface.TaskName_p .. py:attribute:: priority :type: int .. py:method:: spec_missing(*, tracker: doot.control.tracker._interface.WorkflowTracker_p) -> bool cancels the task if the spec is not registered .. py:method:: should_disable(source: statemachine.State, *, tracker: doot.control.tracker._interface.WorkflowTracker_p) -> bool cancels the task if the spec is disabled .. py:method:: should_timeout() -> bool Cancel if you've waited too long .. py:method:: should_wait(*, tracker: doot.control.tracker._interface.WorkflowTracker_p) -> bool if any dependencies have not run, delay this task .. py:method:: should_skip(source: statemachine.State) -> bool run a task's depends_on group, coercing to a bool returns False if the runner should skip the rest of the task .. py:method:: should_halt(*, tracker: doot.control.tracker._interface.WorkflowTracker_p) -> bool .. py:method:: should_fail() -> bool .. py:method:: state_is_needed(*, tracker: doot.control.tracker._interface.WorkflowTracker_p) -> bool delays exit from teardown _internal_state until it is safe to do so .. dootle.control.fsm.task._Callbacks_m: .. py:class:: _Callbacks_m .. py:attribute:: _internal_state :type: dict .. py:attribute:: name :type: doot.workflow._interface.TaskName_p .. py:attribute:: spec :type: doot.workflow.TaskSpec .. py:attribute:: _state_history :type: list .. py:method:: on_exit_state(*, source: Any) -> None Keep track of the progression of the task .. py:method:: on_enter_INIT(*, tracker: doot.control.tracker._interface.WorkflowTracker_p, parent: jgdv.Maybe[doot.workflow._interface.TaskName_p] = None) -> None initialise _internal_state, possibly run injections? .. py:method:: on_enter_RUNNING(*, step: int, tracker: doot.control.tracker._interface.WorkflowTracker_p) -> None .. py:method:: on_exit_RUNNING(*, step: int, tracker: doot.control.tracker._interface.WorkflowTracker_p) -> None .. py:method:: on_exit_TEARDOWN(*, source: Any, tracker: doot.control.tracker._interface.WorkflowTracker_p) -> None .. py:method:: on_enter_SUCCESS(*, tracker: doot.control.tracker._interface.WorkflowTracker_p) -> None .. py:method:: on_enter_FAILED(*, tracker: doot.control.tracker._interface.WorkflowTracker_p) -> None .. py:method:: on_enter_HALTED(*, tracker: doot.control.tracker._interface.WorkflowTracker_p) -> None .. py:method:: on_enter_SKIPPED() -> None .. py:method:: _get_parent_data(tracker: doot.control.tracker._interface.WorkflowTracker_p, parent: jgdv.Maybe[doot.workflow._interface.TaskName_p]) -> jgdv.Maybe[dict] .. py:method:: _get_cli_data(tracker: doot.control.tracker._interface.WorkflowTracker_p) -> jgdv.Maybe[dict] .. py:method:: _get_spec_data() -> dict .. py:method:: _get_inject_data(tracker: doot.control.tracker._interface.WorkflowTracker_p) -> jgdv.Maybe[dict] .. _dootle.control.fsm.task.FSMTask: .. py:class:: FSMTask(spec: doot.workflow.TaskSpec) The implementation of a task, as the domain model for a TaskMachine .. py:attribute:: _default_flags :type: ClassVar[set] .. py:attribute:: step :type: int .. py:attribute:: spec :type: doot.workflow.TaskSpec .. py:attribute:: status :type: doot.workflow._interface.TaskStatus_e .. py:attribute:: priority :type: int .. py:attribute:: records :type: list[Any] .. py:attribute:: _internal_state :type: dict .. py:attribute:: _state_history :type: list[doot.workflow._interface.TaskStatus_e] .. py:property:: name :type: doot.workflow.TaskName .. py:property:: internal_state :type: dict .. py:method:: _execute_action_group(*, group: str, lock_state: bool = False) -> tuple[int, doot.workflow._interface.ActionResponse_e] Execute a group of actions, possibly queue any task specs they produced, and return a count of the actions run + the result .. py:method:: _execute_action(count: int, action: doot.workflow.ActionSpec, *, group: jgdv.Maybe[str] = None, lock_state: bool = False) -> doot.workflow._interface.ActionResponse_e | bool | list[doot.workflow.TaskSpec] Run the given action of a specific task. returns either a list of specs to (potentially) queue, or an ActRE describing the action result. .. py:method:: get_action_group(group_name: str) -> list[doot.workflow.ActionSpec] .. py:method:: param_specs() -> list .. py:method:: log(msg: str, level: int = logmod.DEBUG, prefix: jgdv.Maybe[str] = None) -> None .. _dootle.control.fsm.task.FSMJob: .. py:class:: FSMJob(spec: doot.workflow.TaskSpec) Bases: :py:obj:`FSMTask` Extends an FSMTask for running a job .. py:method:: on_enter_RUNNING(step: int, tracker: doot.control.tracker._interface.WorkflowTracker_p) -> None Modifies how the object runs, requires the main action group return a list of tasks/specs .. py:method:: _execute_expansion_group(*, group: str) -> tuple[int, list[doot.workflow.TaskSpec]] Execute a group of actions, possibly queue any task specs they produced, and return a count of the actions run + the result