Engines

Overview

Engines are what really runs your atoms.

An engine takes a flow structure (described by patterns) and uses it to decide which atom to run and when.

TaskFlow provides different implementations of engines. Some may be easier to use (ie, require no additional infrastructure setup) and understand; others might require more complicated setup but provide better scalability. The idea and ideal is that deployers or developers of a service that use TaskFlow can select an engine that suites their setup best without modifying the code of said service.

Note

Engines usually have different capabilities and configuration, but all of them must implement the same interface and preserve the semantics of patterns (e.g. parts of a linear_flow.Flow are run one after another, in order, even if the selected engine is capable of running tasks in parallel).

Why they exist

An engine being the core component which actually makes your flows progress is likely a new concept for many programmers so let’s describe how it operates in more depth and some of the reasoning behind why it exists. This will hopefully make it more clear on their value add to the TaskFlow library user.

First though let us discuss something most are familiar already with; the difference between declarative and imperative programming models. The imperative model involves establishing statements that accomplish a programs action (likely using conditionals and such other language features to do this). This kind of program embeds the how to accomplish a goal while also defining what the goal actually is (and the state of this is maintained in memory or on the stack while these statements execute). In contrast there is the declarative model which instead of combining the how to accomplish a goal along side the what is to be accomplished splits these two into only declaring what the intended goal is and not the how. In TaskFlow terminology the what is the structure of your flows and the tasks and other atoms you have inside those flows, but the how is not defined (the line becomes blurred since tasks themselves contain imperative code, but for now consider a task as more of a pure function that executes, reverts and may require inputs and provide outputs). This is where engines get involved; they do the execution of the what defined via atoms, tasks, flows and the relationships defined there-in and execute these in a well-defined manner (and the engine is responsible for any state manipulation instead).

This mix of imperative and declarative (with a stronger emphasis on the declarative model) allows for the following functionality to become possible:

  • Enhancing reliability: Decoupling of state alterations from what should be accomplished allows for a natural way of resuming by allowing the engine to track the current state and know at which point a workflow is in and how to get back into that state when resumption occurs.

  • Enhancing scalability: When an engine is responsible for executing your desired work it becomes possible to alter the how in the future by creating new types of execution backends (for example the worker model which does not execute locally). Without the decoupling of the what and the how it is not possible to provide such a feature (since by the very nature of that coupling this kind of functionality is inherently very hard to provide).

  • Enhancing consistency: Since the engine is responsible for executing atoms and the associated workflow, it can be one (if not the only) of the primary entities that is working to keep the execution model in a consistent state. Coupled with atoms which should be immutable and have have limited (if any) internal state the ability to reason about and obtain consistency can be vastly improved.

    • With future features around locking (using tooz to help) engines can also help ensure that resources being accessed by tasks are reliably obtained and mutated on. This will help ensure that other processes, threads, or other types of entities are also not executing tasks that manipulate those same resources (further increasing consistency).

Of course these kind of features can come with some drawbacks:

  • The downside of decoupling the how and the what is that the imperative model where functions control & manipulate state must start to be shifted away from (and this is likely a mindset change for programmers used to the imperative model). We have worked to make this less of a concern by creating and encouraging the usage of persistence, to help make it possible to have state and transfer that state via a argument input and output mechanism.

  • Depending on how much imperative code exists (and state inside that code) there may be significant rework of that code and converting or refactoring it to these new concepts. We have tried to help here by allowing you to have tasks that internally use regular python code (and internally can be written in an imperative style) as well as by providing examples that show how to use these concepts.

  • Another one of the downsides of decoupling the what from the how is that it may become harder to use traditional techniques to debug failures (especially if remote workers are involved). We try to help here by making it easy to track, monitor and introspect the actions & state changes that are occurring inside an engine (see notifications for how to use some of these capabilities).

Creating

All engines are mere classes that implement the same interface, and of course it is possible to import them and create instances just like with any classes in Python. But the easier (and recommended) way for creating an engine is using the engine helper functions. All of these functions are imported into the taskflow.engines module namespace, so the typical usage of these functions might look like:

from taskflow import engines

...
flow = make_flow()
eng = engines.load(flow, engine='serial', backend=my_persistence_conf)
eng.run()
...
taskflow.engines.helpers.load(flow, store=None, flow_detail=None, book=None, backend=None, namespace='taskflow.engines', engine='default', **kwargs)[source]

Load a flow into an engine.

This function creates and prepares an engine to run the provided flow. All that is left after this returns is to run the engine with the engines run() method.

Which engine to load is specified via the engine parameter. It can be a string that names the engine type to use, or a string that is a URI with a scheme that names the engine type to use and further options contained in the URI’s host, port, and query parameters…

Which storage backend to use is defined by the backend parameter. It can be backend itself, or a dictionary that is passed to fetch() to obtain a viable backend.

Parameters:
  • flow – flow to load

  • store – dict – data to put to storage to satisfy flow requirements

  • flow_detail – FlowDetail that holds the state of the flow (if one is not provided then one will be created for you in the provided backend)

  • book – LogBook to create flow detail in if flow_detail is None

  • backend – storage backend to use or configuration that defines it

  • namespace – driver namespace for stevedore (or empty for default)

  • engine – string engine type or URI string with scheme that contains the engine type and any URI specific components that will become part of the engine options.

  • kwargs – arbitrary keyword arguments passed as options (merged with any extracted engine), typically used for any engine specific options that do not fit as any of the existing arguments.

Returns:

engine

taskflow.engines.helpers.run(flow, store=None, flow_detail=None, book=None, backend=None, namespace='taskflow.engines', engine='default', **kwargs)[source]

Run the flow.

This function loads the flow into an engine (with the load() function) and runs the engine.

The arguments are interpreted as for load().

Returns:

dictionary of all named results (see fetch_all())

taskflow.engines.helpers.save_factory_details(flow_detail, flow_factory, factory_args, factory_kwargs, backend=None)[source]

Saves the given factories reimportable attributes into the flow detail.

This function saves the factory name, arguments, and keyword arguments into the given flow details object and if a backend is provided it will also ensure that the backend saves the flow details after being updated.

Parameters:
  • flow_detail – FlowDetail that holds state of the flow to load

  • flow_factory – function or string: function that creates the flow

  • factory_args – list or tuple of factory positional arguments

  • factory_kwargs – dict of factory keyword arguments

  • backend – storage backend to use or configuration

taskflow.engines.helpers.load_from_factory(flow_factory, factory_args=None, factory_kwargs=None, store=None, book=None, backend=None, namespace='taskflow.engines', engine='default', **kwargs)[source]

Loads a flow from a factory function into an engine.

Gets flow factory function (or name of it) and creates flow with it. Then, the flow is loaded into an engine with the load() function, and the factory function fully qualified name is saved to flow metadata so that it can be later resumed.

Parameters:
  • flow_factory – function or string: function that creates the flow

  • factory_args – list or tuple of factory positional arguments

  • factory_kwargs – dict of factory keyword arguments

Further arguments are interpreted as for load().

Returns:

engine

taskflow.engines.helpers.flow_from_detail(flow_detail)[source]

Reloads a flow previously saved.

Gets the flow factories name and any arguments and keyword arguments from the flow details metadata, and then calls that factory to recreate the flow.

Parameters:

flow_detail – FlowDetail that holds state of the flow to load

taskflow.engines.helpers.load_from_detail(flow_detail, store=None, backend=None, namespace='taskflow.engines', engine='default', **kwargs)[source]

Reloads an engine previously saved.

This reloads the flow using the flow_from_detail() function and then calls into the load() function to create an engine from that flow.

Parameters:

flow_detail – FlowDetail that holds state of the flow to load

Further arguments are interpreted as for load().

Returns:

engine

Usage

To select which engine to use and pass parameters to an engine you should use the engine parameter any engine helper function accepts and for any engine specific options use the kwargs parameter.

Types

Serial

Engine type: 'serial'

Runs all tasks on a single thread – the same thread run() is called from.

Note

This engine is used by default.

Tip

If eventlet is used then this engine will not block other threads from running as eventlet automatically creates a implicit co-routine system (using greenthreads and monkey patching). See eventlet and greenlet for more details.

Parallel

Engine type: 'parallel'

A parallel engine schedules tasks onto different threads/processes to allow for running non-dependent tasks simultaneously. See the documentation of ParallelActionEngine for supported arguments that can be used to construct a parallel engine that runs using your desired execution model.

Tip

Sharing an executor between engine instances provides better scalability by reducing thread/process creation and teardown as well as by reusing existing pools (which is a good practice in general).

Warning

Running tasks with a process pool executor is experimentally supported. This is mainly due to the futures backport and the multiprocessing module that exist in older versions of python not being as up to date (with important fixes such as 4892, 6721, 9205, 16284, 22393 and others…) as the most recent python version (which themselves have a variety of ongoing/recent bugs).

Workers

Engine type: 'worker-based' or 'workers'

Note

Since this engine is significantly more complicated (and different) then the others we thought it appropriate to devote a whole documentation section to it.

How they run

To provide a peek into the general process that an engine goes through when running lets break it apart a little and describe what one of the engine types does while executing (for this we will look into the ActionEngine engine type).

Creation

The first thing that occurs is that the user creates an engine for a given flow, providing a flow detail (where results will be saved into a provided persistence backend). This is typically accomplished via the methods described above in creating engines. The engine at this point now will have references to your flow and backends and other internal variables are setup.

Compiling

During this stage (see compile()) the flow will be converted into an internal graph representation using a compiler (the default implementation for patterns is the PatternCompiler). This class compiles/converts the flow objects and contained atoms into a networkx directed graph (and tree structure) that contains the equivalent atoms defined in the flow and any nested flows & atoms as well as the constraints that are created by the application of the different flow patterns. This graph (and tree) are what will be analyzed & traversed during the engines execution. At this point a few helper object are also created and saved to internal engine variables (these object help in execution of atoms, analyzing the graph and performing other internal engine activities). At the finishing of this stage a Runtime object is created which contains references to all needed runtime components and its compile() is called to compile a cache of frequently used execution helper objects.

Preparation

This stage (see prepare()) starts by setting up the storage needed for all atoms in the compiled graph, ensuring that corresponding AtomDetail (or subclass of) objects are created for each node in the graph.

Validation

This stage (see validate()) performs any final validation of the compiled (and now storage prepared) engine. It compares the requirements that are needed to start execution and what is currently provided or will be produced in the future. If there are any atom requirements that are not satisfied (no known current provider or future producer is found) then execution will not be allowed to continue.

Execution

The graph (and helper objects) previously created are now used for guiding further execution (see run()). The flow is put into the RUNNING state and a MachineBuilder state machine object and runner object are built (using the automaton library). That machine and associated runner then starts to take over and begins going through the stages listed below (for a more visual diagram/representation see the engine state diagram).

Note

The engine will respect the constraints imposed by the flow. For example, if an engine is executing a Flow then it is constrained by the dependency graph which is linear in this case, and hence using a parallel engine may not yield any benefits if one is looking for concurrency.

Resumption

One of the first stages is to analyze the state of the tasks in the graph, determining which ones have failed, which one were previously running and determining what the intention of that task should now be (typically an intention can be that it should REVERT, or that it should EXECUTE or that it should be IGNORED). This intention is determined by analyzing the current state of the task; which is determined by looking at the state in the task detail object for that task and analyzing edges of the graph for things like retry atom which can influence what a tasks intention should be (this is aided by the usage of the Selector helper object which was designed to provide helper methods for this analysis). Once these intentions are determined and associated with each task (the intention is also stored in the AtomDetail object) the scheduling stage starts.

Scheduling

This stage selects which atoms are eligible to run by using a Scheduler implementation (the default implementation looks at their intention, checking if predecessor atoms have ran and so-on, using a Selector helper object as needed) and submits those atoms to a previously provided compatible executor for asynchronous execution. This Scheduler will return a future object for each atom scheduled; all of which are collected into a list of not done futures. This will end the initial round of scheduling and at this point the engine enters the waiting stage.

Waiting

In this stage the engine waits for any of the future objects previously submitted to complete. Once one of the future objects completes (or fails) that atoms result will be examined and finalized using a Completer implementation. It typically will persist results to a provided persistence backend (saved into the corresponding AtomDetail and FlowDetail objects via the Storage helper) and reflect the new state of the atom. At this point what typically happens falls into two categories, one for if that atom failed and one for if it did not. If the atom failed it may be set to a new intention such as RETRY or REVERT (other atoms that were predecessors of this failing atom may also have there intention altered). Once this intention adjustment has happened a new round of scheduling occurs and this process repeats until the engine succeeds or fails (if the process running the engine dies the above stages will be restarted and resuming will occur).

Note

If the engine is suspended while the engine is going through the above stages this will stop any further scheduling stages from occurring and all currently executing work will be allowed to finish (see suspension).

Finishing

At this point the machine (and runner) that was built using the MachineBuilder class has now finished successfully, failed, or the execution was suspended. Depending on which one of these occurs will cause the flow to enter a new state (typically one of FAILURE, SUSPENDED, SUCCESS or REVERTED). Notifications will be sent out about this final state change (other state changes also send out notifications) and any failures that occurred will be reraised (the failure objects are wrapped exceptions). If no failures have occurred then the engine will have finished and if so desired the persistence can be used to cleanup any details that were saved for this execution.

Special cases

Suspension

Each engine implements a suspend() method that can be used to externally (or in the future internally) request that the engine stop scheduling new work. By default what this performs is a transition of the flow state from RUNNING into a SUSPENDING state (which will later transition into a SUSPENDED state). Since an engine may be remotely executing atoms (or locally executing them) and there is currently no preemption what occurs is that the engines MachineBuilder state machine will detect this transition into SUSPENDING has occurred and the state machine will avoid scheduling new work (it will though let active work continue). After the current work has finished the engine will transition from SUSPENDING into SUSPENDED and return from its run() method.

Note

When run() is returned from at that point there may (but does not have to be, depending on what was active when suspend() was called) be unfinished work in the flow that was not finished (but which can be resumed at a later point in time).

Scoping

During creation of flows it is also important to understand the lookup strategy (also typically known as scope resolution) that the engine you are using will internally use. For example when a task A provides result ‘a’ and a task B after A provides a different result ‘a’ and a task C after A and after B requires ‘a’ to run, which one will be selected?

Default strategy

When an engine is executing it internally interacts with the Storage class and that class interacts with the a ScopeWalker instance and the Storage class uses the following lookup order to find (or fail) a atoms requirement lookup/request:

  1. Transient injected atom specific arguments.

  2. Non-transient injected atom specific arguments.

  3. Transient injected arguments (flow specific).

  4. Non-transient injected arguments (flow specific).

  5. First scope visited provider that produces the named result; note that if multiple providers are found in the same scope the first (the scope walkers yielded ordering defines what first means) that produced that result and can be extracted without raising an error is selected as the provider of the requested requirement.

  6. Fails with NotFound if unresolved at this point (the cause attribute of this exception may have more details on why the lookup failed).

Note

To examine this information when debugging it is recommended to enable the BLATHER logging level (level 5). At this level the storage and scope code/layers will log what is being searched for and what is being found.

Interfaces

class taskflow.engines.base.Engine(flow, flow_detail, backend, options)[source]

Bases: object

Base for all engines implementations.

Variables:
  • Engine.notifier – A notification object that will dispatch events that occur related to the flow the engine contains.

  • atom_notifier – A notification object that will dispatch events that occur related to the atoms the engine contains.

property notifier

The flow notifier.

property atom_notifier

The atom notifier.

property options

The options that were passed to this engine on construction.

abstract property storage

The storage unit for this engine.

abstract property statistics

A dictionary of runtime statistics this engine has gathered.

This dictionary will be empty when the engine has never been ran. When it is running or has ran previously it should have (but may not) have useful and/or informational keys and values when running is underway and/or completed.

Warning

The keys in this dictionary should be some what stable (not changing), but there existence may change between major releases as new statistics are gathered or removed so before accessing keys ensure that they actually exist and handle when they do not.

abstract compile()[source]

Compiles the contained flow into a internal representation.

This internal representation is what the engine will actually use to run. If this compilation can not be accomplished then an exception is expected to be thrown with a message indicating why the compilation could not be achieved.

abstract reset()[source]

Reset back to the PENDING state.

If a flow had previously ended up (from a prior engine run()) in the FAILURE, SUCCESS or REVERTED states (or for some reason it ended up in an intermediary state) it can be desirable to make it possible to run it again. Calling this method enables that to occur (without causing a state transition failure, which would typically occur if run() is called directly without doing a reset).

abstract prepare()[source]

Performs any pre-run, but post-compilation actions.

NOTE(harlowja): During preparation it is currently assumed that the underlying storage will be initialized, the atoms will be reset and the engine will enter the PENDING state.

abstract validate()[source]

Performs any pre-run, post-prepare validation actions.

NOTE(harlowja): During validation all final dependencies will be verified and ensured. This will by default check that all atoms have satisfiable requirements (satisfied by some other provider).

abstract run()[source]

Runs the flow in the engine to completion (or die trying).

abstract suspend()[source]

Attempts to suspend the engine.

If the engine is currently running atoms then this will attempt to suspend future work from being started (currently active atoms can not currently be preempted) and move the engine into a suspend state which can then later be resumed from.

Implementations

class taskflow.engines.action_engine.engine.ActionEngine(flow, flow_detail, backend, options)[source]

Bases: Engine

Generic action-based engine.

This engine compiles the flow (and any subflows) into a compilation unit which contains the full runtime definition to be executed and then uses this compilation unit in combination with the executor, runtime, machine builder and storage classes to attempt to run your flow (and any subflows & contained atoms) to completion.

NOTE(harlowja): during this process it is permissible and valid to have a task or multiple tasks in the execution graph fail (at the same time even), which will cause the process of reversion or retrying to commence. See the valid states in the states module to learn more about what other states the tasks and flow being ran can go through.

Engine options:

Name/key

Description

Type

Default

defer_reverts

This option lets you safely nest flows with retries inside flows without retries and it still behaves as a user would expect (for example if the retry gets exhausted it reverts the outer flow unless the outer flow has a has a separate retry behavior).

bool

False

never_resolve

When true, instead of reverting and trying to resolve a atom failure the engine will skip reverting and abort instead of reverting and/or retrying.

bool

False

inject_transient

When true, values that are local to each atoms scope are injected into storage into a transient location (typically a local dictionary), when false those values are instead persisted into atom details (and saved in a non- transient manner).

bool

True

NO_RERAISING_STATES = frozenset({'SUCCESS', 'SUSPENDED'})

States that if the engine stops in will not cause any potential failures to be reraised. States not in this list will cause any failure/s that were captured (if any) to get reraised.

IGNORABLE_STATES = frozenset({'ANALYZING', 'GAME_OVER', 'RESUMING', 'SCHEDULING', 'UNDEFINED', 'WAITING'})

Informational states this engines internal machine yields back while running, not useful to have the engine record but useful to provide to end-users when doing execution iterations via run_iter().

MAX_MACHINE_STATES_RETAINED = 10

During run_iter() the last X state machine transitions will be recorded (typically only useful on failure).

suspend()[source]

Attempts to suspend the engine.

If the engine is currently running atoms then this will attempt to suspend future work from being started (currently active atoms can not currently be preempted) and move the engine into a suspend state which can then later be resumed from.

property statistics

A dictionary of runtime statistics this engine has gathered.

This dictionary will be empty when the engine has never been ran. When it is running or has ran previously it should have (but may not) have useful and/or informational keys and values when running is underway and/or completed.

Warning

The keys in this dictionary should be some what stable (not changing), but there existence may change between major releases as new statistics are gathered or removed so before accessing keys ensure that they actually exist and handle when they do not.

property compilation

The compilation result.

NOTE(harlowja): Only accessible after compilation has completed (None will be returned when this property is accessed before compilation has completed successfully).

storage

The storage unit for this engine.

NOTE(harlowja): the atom argument lookup strategy will change for this storage unit after compile() has completed (since only after compilation is the actual structure known). Before compile() has completed the atom argument lookup strategy lookup will be restricted to injected arguments only (this will not reflect the actual runtime lookup strategy, which typically will be, but is not always different).

run(timeout=None)[source]

Runs the engine (or die trying).

Parameters:

timeout – timeout to wait for any atoms to complete (this timeout will be used during the waiting period that occurs when unfinished atoms are being waited on).

run_iter(timeout=None)[source]

Runs the engine using iteration (or die trying).

Parameters:

timeout – timeout to wait for any atoms to complete (this timeout will be used during the waiting period that occurs after the waiting state is yielded when unfinished atoms are being waited on).

Instead of running to completion in a blocking manner, this will return a generator which will yield back the various states that the engine is going through (and can be used to run multiple engines at once using a generator per engine). The iterator returned also responds to the send() method from PEP 0342 and will attempt to suspend itself if a truthy value is sent in (the suspend may be delayed until all active atoms have finished).

NOTE(harlowja): using the run_iter method will not retain the engine lock while executing so the user should ensure that there is only one entity using a returned engine iterator (one per engine) at a given time.

validate()[source]

Performs any pre-run, post-prepare validation actions.

NOTE(harlowja): During validation all final dependencies will be verified and ensured. This will by default check that all atoms have satisfiable requirements (satisfied by some other provider).

prepare()[source]

Performs any pre-run, but post-compilation actions.

NOTE(harlowja): During preparation it is currently assumed that the underlying storage will be initialized, the atoms will be reset and the engine will enter the PENDING state.

reset()[source]

Reset back to the PENDING state.

If a flow had previously ended up (from a prior engine run()) in the FAILURE, SUCCESS or REVERTED states (or for some reason it ended up in an intermediary state) it can be desirable to make it possible to run it again. Calling this method enables that to occur (without causing a state transition failure, which would typically occur if run() is called directly without doing a reset).

compile()[source]

Compiles the contained flow into a internal representation.

This internal representation is what the engine will actually use to run. If this compilation can not be accomplished then an exception is expected to be thrown with a message indicating why the compilation could not be achieved.

class taskflow.engines.action_engine.engine.SerialActionEngine(flow, flow_detail, backend, options)[source]

Bases: ActionEngine

Engine that runs tasks in serial manner.

class taskflow.engines.action_engine.engine.ParallelActionEngine(flow, flow_detail, backend, options)[source]

Bases: ActionEngine

Engine that runs tasks in parallel manner.

Additional engine options:

  • executor: a object that implements a PEP 3148 compatible executor interface; it will be used for scheduling tasks. The following type are applicable (other unknown types passed will cause a type error to be raised).

Type provided

Executor used

concurrent.futures.thread.ThreadPoolExecutor

ParallelThreadTaskExecutor

concurrent.futures.process.ProcessPoolExecutor

ParallelProcessTaskExecutor

concurrent.futures._base.Executor

ParallelThreadTaskExecutor

  • executor: a string that will be used to select a PEP 3148 compatible executor; it will be used for scheduling tasks. The following string are applicable (other unknown strings passed will cause a value error to be raised).

String (case insensitive)

Executor used

process

ParallelProcessTaskExecutor

processes

ParallelProcessTaskExecutor

thread

ParallelThreadTaskExecutor

threaded

ParallelThreadTaskExecutor

threads

ParallelThreadTaskExecutor

greenthread

ParallelThreadTaskExecutor

(greened version)

greedthreaded

ParallelThreadTaskExecutor

(greened version)

greenthreads

ParallelThreadTaskExecutor

(greened version)

  • max_workers: a integer that will affect the number of parallel workers that are used to dispatch tasks into (this number is bounded by the maximum parallelization your workflow can support).

  • wait_timeout: a float (in seconds) that will affect the parallel process task executor (and therefore is only applicable when the executor provided above is of the process variant). This number affects how much time the process task executor waits for messages from child processes (typically indicating they have finished or failed). A lower number will have high granularity but currently involves more polling while a higher number will involve less polling but a slower time for an engine to notice a task has completed.

Components

Warning

External usage of internal engine functions, components and modules should be kept to a minimum as they may be altered, refactored or moved to other locations without notice (and without the typical deprecation cycle).

class taskflow.engines.action_engine.builder.MachineMemory[source]

Bases: object

State machine memory.

cancel_futures()[source]

Attempts to cancel any not done futures.

class taskflow.engines.action_engine.builder.MachineBuilder(runtime, waiter)[source]

Bases: object

State machine builder that powers the engine components.

NOTE(harlowja): the machine (states and events that will trigger transitions) that this builds is represented by the following table:

+--------------+------------------+------------+----------+---------+
|    Start     |      Event       |    End     | On Enter | On Exit |
+--------------+------------------+------------+----------+---------+
|  ANALYZING   |    completed     | GAME_OVER  |    .     |    .    |
|  ANALYZING   |  schedule_next   | SCHEDULING |    .     |    .    |
|  ANALYZING   |  wait_finished   |  WAITING   |    .     |    .    |
|  FAILURE[$]  |        .         |     .      |    .     |    .    |
|  GAME_OVER   |      failed      |  FAILURE   |    .     |    .    |
|  GAME_OVER   |     reverted     |  REVERTED  |    .     |    .    |
|  GAME_OVER   |     success      |  SUCCESS   |    .     |    .    |
|  GAME_OVER   |    suspended     | SUSPENDED  |    .     |    .    |
|   RESUMING   |  schedule_next   | SCHEDULING |    .     |    .    |
| REVERTED[$]  |        .         |     .      |    .     |    .    |
|  SCHEDULING  |  wait_finished   |  WAITING   |    .     |    .    |
|  SUCCESS[$]  |        .         |     .      |    .     |    .    |
| SUSPENDED[$] |        .         |     .      |    .     |    .    |
| UNDEFINED[^] |      start       |  RESUMING  |    .     |    .    |
|   WAITING    | examine_finished | ANALYZING  |    .     |    .    |
+--------------+------------------+------------+----------+---------+

Between any of these yielded states (minus GAME_OVER and UNDEFINED) if the engine has been suspended or the engine has failed (due to a non-resolveable task failure or scheduling failure) the machine will stop executing new tasks (currently running tasks will be allowed to complete) and this machines run loop will be broken.

NOTE(harlowja): If the runtimes scheduler component is able to schedule tasks in parallel, this enables parallel running and/or reversion.

build(statistics, timeout=None, gather_statistics=True)[source]

Builds a state-machine (that is used during running).

class taskflow.engines.action_engine.compiler.Terminator(flow)[source]

Bases: object

Flow terminator class.

property flow

The flow which this terminator signifies/marks the end of.

property name

Useful name this end terminator has (derived from flow name).

class taskflow.engines.action_engine.compiler.Compilation(execution_graph, hierarchy)[source]

Bases: object

The result of a compilers compile() is this immutable object.

TASK = 'task'

Task nodes will have a kind metadata key with this value.

RETRY = 'retry'

Retry nodes will have a kind metadata key with this value.

FLOW = 'flow'

Flow entry nodes will have a kind metadata key with this value.

FLOW_END = 'flow_end'

Flow exit nodes will have a kind metadata key with this value (only applicable for compilation execution graph, not currently used in tree hierarchy).

property execution_graph

The execution ordering of atoms (as a graph structure).

property hierarchy

The hierarchy of patterns (as a tree structure).

class taskflow.engines.action_engine.compiler.TaskCompiler[source]

Bases: object

Non-recursive compiler of tasks.

class taskflow.engines.action_engine.compiler.FlowCompiler(deep_compiler_func)[source]

Bases: object

Recursive compiler of flows.

compile(flow, parent=None)[source]

Decomposes a flow into a graph and scope tree hierarchy.

class taskflow.engines.action_engine.compiler.PatternCompiler(root, freeze=True)[source]

Bases: object

Compiles a flow pattern (or task) into a compilation unit.

Let’s dive into the basic idea for how this works:

The compiler here is provided a ‘root’ object via its __init__ method, this object could be a task, or a flow (one of the supported patterns), the end-goal is to produce a Compilation object as the result with the needed components. If this is not possible a CompilationFailure will be raised. In the case where a unknown type is being requested to compile a TypeError will be raised and when a duplicate object (one that has already been compiled) is encountered a ValueError is raised.

The complexity of this comes into play when the ‘root’ is a flow that contains itself other nested flows (and so-on); to compile this object and its contained objects into a graph that preserves the constraints the pattern mandates we have to go through a recursive algorithm that creates subgraphs for each nesting level, and then on the way back up through the recursion (now with a decomposed mapping from contained patterns or atoms to there corresponding subgraph) we have to then connect the subgraphs (and the atom(s) there-in) that were decomposed for a pattern correctly into a new graph and then ensure the pattern mandated constraints are retained. Finally we then return to the caller (and they will do the same thing up until the root node, which by that point one graph is created with all contained atoms in the pattern/nested patterns mandated ordering).

Also maintained in the Compilation object is a hierarchy of the nesting of items (which is also built up during the above mentioned recusion, via a much simpler algorithm); this is typically used later to determine the prior atoms of a given atom when looking up values that can be provided to that atom for execution (see the scopes.py file for how this works). Note that although you could think that the graph itself could be used for this, which in some ways it can (for limited usage) the hierarchy retains the nested structure (which is useful for scoping analysis/lookup) to be able to provide back a iterator that gives back the scopes visible at each level (the graph does not have this information once flattened).

Let’s take an example:

Given the pattern f(a(b, c), d) where f is a Flow with items a(b, c) where a is a Flow composed of tasks (b, c) and task d.

The algorithm that will be performed (mirroring the above described logic) will go through the following steps (the tree hierarchy building is left out as that is more obvious):

Compiling f
  - Decomposing flow f with no parent (must be the root)
  - Compiling a
      - Decomposing flow a with parent f
      - Compiling b
          - Decomposing task b with parent a
          - Decomposed b into:
            Name: b
            Nodes: 1
              - b
            Edges: 0
      - Compiling c
          - Decomposing task c with parent a
          - Decomposed c into:
            Name: c
            Nodes: 1
              - c
            Edges: 0
      - Relinking decomposed b -> decomposed c
      - Decomposed a into:
        Name: a
        Nodes: 2
          - b
          - c
        Edges: 1
          b -> c ({'invariant': True})
  - Compiling d
      - Decomposing task d with parent f
      - Decomposed d into:
        Name: d
        Nodes: 1
          - d
        Edges: 0
  - Relinking decomposed a -> decomposed d
  - Decomposed f into:
    Name: f
    Nodes: 3
      - c
      - b
      - d
    Edges: 2
      c -> d ({'invariant': True})
      b -> c ({'invariant': True})
compile()[source]

Compiles the contained item into a compiled equivalent.

class taskflow.engines.action_engine.completer.Strategy(runtime)[source]

Bases: object

Failure resolution strategy base class.

abstract apply()[source]

Applies some algorithm to resolve some detected failure.

class taskflow.engines.action_engine.completer.RevertAndRetry(runtime, retry)[source]

Bases: Strategy

Sets the associated subflow for revert to be later retried.

apply()[source]

Applies some algorithm to resolve some detected failure.

class taskflow.engines.action_engine.completer.RevertAll(runtime)[source]

Bases: Strategy

Sets all nodes/atoms to the REVERT intention.

apply()[source]

Applies some algorithm to resolve some detected failure.

class taskflow.engines.action_engine.completer.Revert(runtime, atom)[source]

Bases: Strategy

Sets atom and associated nodes to the REVERT intention.

apply()[source]

Applies some algorithm to resolve some detected failure.

class taskflow.engines.action_engine.completer.Completer(runtime)[source]

Bases: object

Completes atoms using actions to complete them.

resume()[source]

Resumes atoms in the contained graph.

This is done to allow any previously completed or failed atoms to be analyzed, there results processed and any potential atoms affected to be adjusted as needed.

This should return a set of atoms which should be the initial set of atoms that were previously not finished (due to a RUNNING or REVERTING attempt not previously finishing).

complete_failure(node, outcome, failure)[source]

Performs post-execution completion of a nodes failure.

Returns whether the result should be saved into an accumulator of failures or whether this should not be done.

complete(node, outcome, result)[source]

Performs post-execution completion of a node result.

class taskflow.engines.action_engine.deciders.Decider[source]

Bases: object

Base class for deciders.

Provides interface to be implemented by sub-classes.

Deciders check whether next atom in flow should be executed or not.

abstract tally(runtime)[source]

Tally edge deciders on whether this decider should allow running.

The returned value is a list of edge deciders that voted ‘nay’ (do not allow running).

abstract affect(runtime, nay_voters)[source]

Affects associated atoms due to at least one ‘nay’ edge decider.

This will alter the associated atom + some set of successor atoms by setting there state and intention to IGNORE so that they are ignored in future runtime activities.

check_and_affect(runtime)[source]

Handles tally() + affect() in right order.

NOTE(harlowja): If there are zero ‘nay’ edge deciders then it is assumed this decider should allow running.

Returns boolean of whether this decider allows for running (or not).

class taskflow.engines.action_engine.deciders.IgnoreDecider(atom, edge_deciders)[source]

Bases: Decider

Checks any provided edge-deciders and determines if ok to run.

tally(runtime)[source]

Tally edge deciders on whether this decider should allow running.

The returned value is a list of edge deciders that voted ‘nay’ (do not allow running).

affect(runtime, nay_voters)[source]

Affects associated atoms due to at least one ‘nay’ edge decider.

This will alter the associated atom + some set of successor atoms by setting there state and intention to IGNORE so that they are ignored in future runtime activities.

class taskflow.engines.action_engine.deciders.NoOpDecider[source]

Bases: Decider

No-op decider that says it is always ok to run & has no effect(s).

tally(runtime)[source]

Always good to go.

affect(runtime, nay_voters)[source]

Does nothing.

class taskflow.engines.action_engine.executor.SerialRetryExecutor[source]

Bases: object

Executes and reverts retries.

start()[source]

Prepare to execute retries.

stop()[source]

Finalize retry executor.

execute_retry(retry, arguments)[source]

Schedules retry execution.

revert_retry(retry, arguments)[source]

Schedules retry reversion.

class taskflow.engines.action_engine.executor.TaskExecutor[source]

Bases: object

Executes and reverts tasks.

This class takes task and its arguments and executes or reverts it. It encapsulates knowledge on how task should be executed or reverted: right now, on separate thread, on another machine, etc.

abstract execute_task(task, task_uuid, arguments, progress_callback=None)[source]

Schedules task execution.

abstract revert_task(task, task_uuid, arguments, result, failures, progress_callback=None)[source]

Schedules task reversion.

start()[source]

Prepare to execute tasks.

stop()[source]

Finalize task executor.

class taskflow.engines.action_engine.executor.SerialTaskExecutor[source]

Bases: TaskExecutor

Executes tasks one after another.

start()[source]

Prepare to execute tasks.

stop()[source]

Finalize task executor.

execute_task(task, task_uuid, arguments, progress_callback=None)[source]

Schedules task execution.

revert_task(task, task_uuid, arguments, result, failures, progress_callback=None)[source]

Schedules task reversion.

class taskflow.engines.action_engine.executor.ParallelTaskExecutor(executor=None, max_workers=None)[source]

Bases: TaskExecutor

Executes tasks in parallel.

Submits tasks to an executor which should provide an interface similar to concurrent.Futures.Executor.

constructor_options = [('max_workers', <function ParallelTaskExecutor.<lambda>>)]

Optional constructor keyword arguments this executor supports. These will typically be passed via engine options (by a engine user) and converted into the correct type before being sent into this classes __init__ method.

execute_task(task, task_uuid, arguments, progress_callback=None)[source]

Schedules task execution.

revert_task(task, task_uuid, arguments, result, failures, progress_callback=None)[source]

Schedules task reversion.

start()[source]

Prepare to execute tasks.

stop()[source]

Finalize task executor.

class taskflow.engines.action_engine.executor.ParallelThreadTaskExecutor(executor=None, max_workers=None)[source]

Bases: ParallelTaskExecutor

Executes tasks in parallel using a thread pool executor.

class taskflow.engines.action_engine.executor.ParallelGreenThreadTaskExecutor(executor=None, max_workers=None)[source]

Bases: ParallelThreadTaskExecutor

Executes tasks in parallel using a greenthread pool executor.

DEFAULT_WORKERS = 1000

Default number of workers when None is passed; being that greenthreads don’t map to native threads or processors very well this is more of a guess/somewhat arbitrary, but it does match what the eventlet greenpool default size is (so at least it’s consistent with what eventlet does).

exception taskflow.engines.action_engine.process_executor.UnknownSender[source]

Bases: Exception

Exception raised when message from unknown sender is recvd.

exception taskflow.engines.action_engine.process_executor.ChallengeIgnored[source]

Bases: Exception

Exception raised when challenge has not been responded to.

class taskflow.engines.action_engine.process_executor.Reader(auth_key, dispatch_func, msg_limit=-1)[source]

Bases: object

Reader machine that streams & parses messages that it then dispatches.

TODO(harlowja): Use python-suitcase in the future when the following are addressed/resolved and released:

Binary format format is the following (no newlines in actual format):

<magic-header> (4 bytes)
<mac-header-length> (4 bytes)
<mac> (1 or more variable bytes)
<identity-header-length> (4 bytes)
<identity> (1 or more variable bytes)
<msg-header-length> (4 bytes)
<msg> (1 or more variable bytes)
exception taskflow.engines.action_engine.process_executor.BadHmacValueError[source]

Bases: ValueError

Value error raised when an invalid hmac is discovered.

class taskflow.engines.action_engine.process_executor.Channel(port, identity, auth_key)[source]

Bases: object

Object that workers use to communicate back to their creator.

class taskflow.engines.action_engine.process_executor.EventSender(channel)[source]

Bases: object

Sends event information from a child worker process to its creator.

class taskflow.engines.action_engine.process_executor.DispatcherHandler(sock, addr, dispatcher)[source]

Bases: dispatcher

Dispatches from a single connection into a target.

CHUNK_SIZE = 8192

Read/write chunk size.

class taskflow.engines.action_engine.process_executor.Dispatcher(map, auth_key, identity)[source]

Bases: dispatcher

Accepts messages received from child worker processes.

MAX_BACKLOG = 5

See https://docs.python.org/2/library/socket.html#socket.socket.listen

class taskflow.engines.action_engine.process_executor.ParallelProcessTaskExecutor(executor=None, max_workers=None, wait_timeout=None)[source]

Bases: ParallelTaskExecutor

Executes tasks in parallel using a process pool executor.

NOTE(harlowja): this executor executes tasks in external processes, so that implies that tasks that are sent to that external process are pickleable since this is how the multiprocessing works (sending pickled objects back and forth) and that the bound handlers (for progress updating in particular) are proxied correctly from that external process to the one that is alive in the parent process to ensure that callbacks registered in the parent are executed on events in the child.

WAIT_TIMEOUT = 0.01

Default timeout used by asyncore io loop (and eventually select/poll).

constructor_options = [('max_workers', <function ParallelProcessTaskExecutor.<lambda>>), ('wait_timeout', <function ParallelProcessTaskExecutor.<lambda>>)]

Optional constructor keyword arguments this executor supports. These will typically be passed via engine options (by a engine user) and converted into the correct type before being sent into this classes __init__ method.

start()[source]

Prepare to execute tasks.

stop()[source]

Finalize task executor.

class taskflow.engines.action_engine.runtime.Runtime(compilation, storage, atom_notifier, task_executor, retry_executor, options=None)[source]

Bases: object

A aggregate of runtime objects, properties, … used during execution.

This object contains various utility methods and properties that represent the collection of runtime components and functionality needed for an action engine to run to completion.

compile()[source]

Compiles & caches frequently used execution helper objects.

Build out a cache of commonly used item that are associated with the contained atoms (by name), and are useful to have for quick lookup on (for example, the change state handler function for each atom, the scope walker object for each atom, the task or retry specific scheduler and so-on).

check_atom_transition(atom, current_state, target_state)[source]

Checks if the atom can transition to the provided target state.

fetch_edge_deciders(atom)[source]

Fetches the edge deciders for the given atom.

fetch_scheduler(atom)[source]

Fetches the cached specific scheduler for the given atom.

fetch_action(atom)[source]

Fetches the cached action handler for the given atom.

fetch_scopes_for(atom_name)[source]

Fetches a walker of the visible scopes for the given atom.

iterate_retries(state=None)[source]

Iterates retry atoms that match the provided state.

If no state is provided it will yield back all retry atoms.

iterate_nodes(allowed_kinds)[source]

Yields back all nodes of specified kinds in the execution graph.

is_success()[source]

Checks if all atoms in the execution graph are in ‘happy’ state.

find_retry(node)[source]

Returns the retry atom associated to the given node (or none).

reset_atoms(atoms, state='PENDING', intention='EXECUTE')[source]

Resets all the provided atoms to the given state and intention.

reset_all(state='PENDING', intention='EXECUTE')[source]

Resets all atoms to the given state and intention.

reset_subgraph(atom, state='PENDING', intention='EXECUTE')[source]

Resets a atoms subgraph to the given state and intention.

The subgraph is contained of all of the atoms successors.

retry_subflow(retry)[source]

Prepares a retrys + its subgraph for execution.

This sets the retrys intention to EXECUTE and resets all of its subgraph (its successors) to the PENDING state with an EXECUTE intention.

class taskflow.engines.action_engine.scheduler.RetryScheduler(runtime)[source]

Bases: object

Schedules retry atoms.

schedule(retry)[source]

Schedules the given retry atom for future completion.

Depending on the atoms stored intention this may schedule the retry atom for reversion or execution.

class taskflow.engines.action_engine.scheduler.TaskScheduler(runtime)[source]

Bases: object

Schedules task atoms.

schedule(task)[source]

Schedules the given task atom for future completion.

Depending on the atoms stored intention this may schedule the task atom for reversion or execution.

class taskflow.engines.action_engine.scheduler.Scheduler(runtime)[source]

Bases: object

Safely schedules atoms using a runtime fetch_scheduler routine.

schedule(atoms)[source]

Schedules the provided atoms for future completion.

This method should schedule a future for each atom provided and return a set of those futures to be waited on (or used for other similar purposes). It should also return any failure objects that represented scheduling failures that may have occurred during this scheduling process.

class taskflow.engines.action_engine.selector.Selector(runtime)[source]

Bases: object

Selector that uses a compilation and aids in execution processes.

Its primary purpose is to get the next atoms for execution or reversion by utilizing the compilations underlying structures (graphs, nodes and edge relations…) and using this information along with the atom state/states stored in storage to provide other useful functionality to the rest of the runtime system.

iter_next_atoms(atom=None)[source]

Iterate next atoms to run (originating from atom or all atoms).

class taskflow.engines.action_engine.scopes.ScopeWalker(compilation, atom, names_only=False)[source]

Bases: object

Walks through the scopes of a atom using a engines compilation.

NOTE(harlowja): for internal usage only.

This will walk the visible scopes that are accessible for the given atom, which can be used by some external entity in some meaningful way, for example to find dependent values…

__iter__()[source]

Iterates over the visible scopes.

How this works is the following:

We first grab all the predecessors of the given atom (lets call it Y) by using the Compilation execution graph (and doing a reverse breadth-first expansion to gather its predecessors), this is useful since we know they always will exist (and execute) before this atom but it does not tell us the corresponding scope level (flow, nested flow…) that each predecessor was created in, so we need to find this information.

For that information we consult the location of the atom Y in the Compilation hierarchy/tree. We lookup in a reverse order the parent X of Y and traverse backwards from the index in the parent where Y exists to all siblings (and children of those siblings) in X that we encounter in this backwards search (if a sibling is a flow itself, its atom(s) will be recursively expanded and included). This collection will then be assumed to be at the same scope. This is what is called a potential single scope, to make an actual scope we remove the items from the potential scope that are not predecessors of Y to form the actual scope which we then yield back.

Then for additional scopes we continue up the tree, by finding the parent of X (lets call it Z) and perform the same operation, going through the children in a reverse manner from the index in parent Z where X was located. This forms another potential scope which we provide back as an actual scope after reducing the potential set to only include predecessors previously gathered. We then repeat this process until we no longer have any parent nodes (aka we have reached the top of the tree) or we run out of predecessors.

class taskflow.engines.action_engine.traversal.Direction(value)[source]

Bases: Enum

Traversal direction enum.

FORWARD = 1

Go through successors.

BACKWARD = 2

Go through predecessors.

taskflow.engines.action_engine.traversal.breadth_first_iterate(execution_graph, starting_node, direction, through_flows=True, through_retries=True, through_tasks=True)[source]

Iterates connected nodes in execution graph (from starting node).

Does so in a breadth first manner.

Jumps over nodes with noop attribute (does not yield them back).

taskflow.engines.action_engine.traversal.depth_first_iterate(execution_graph, starting_node, direction, through_flows=True, through_retries=True, through_tasks=True)[source]

Iterates connected nodes in execution graph (from starting node).

Does so in a depth first manner.

Jumps over nodes with noop attribute (does not yield them back).

taskflow.engines.action_engine.traversal.depth_first_reverse_iterate(node, start_from_idx=-1)[source]

Iterates connected (in reverse) tree nodes (from starting node).

Jumps through nodes with noop attribute (does not yield them back).

Hierarchy

Inheritance diagram of taskflow.engines.action_engine.engine.ActionEngine, taskflow.engines.base.Engine, taskflow.engines.worker_based.engine.WorkerBasedActionEngine