Source code for taskflow.patterns.graph_flow

# -*- coding: utf-8 -*-

#    Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import collections

import six

from taskflow import deciders as de
from taskflow import exceptions as exc
from taskflow import flow
from taskflow.types import graph as gr


def _unsatisfied_requires(node, graph, *additional_provided):
    requires = set(node.requires)
    if not requires:
        return requires
    for provided in additional_provided:
        # This is using the difference() method vs the -
        # operator since the latter doesn't work with frozen
        # or regular sets (when used in combination with ordered
        # sets).
        #
        # If this is not done the following happens...
        #
        # TypeError: unsupported operand type(s)
        # for -: 'set' and 'OrderedSet'
        requires = requires.difference(provided)
        if not requires:
            return requires
    for pred in graph.bfs_predecessors_iter(node):
        requires = requires.difference(pred.provides)
        if not requires:
            return requires
    return requires


[docs]class Flow(flow.Flow): """Graph flow pattern. Contained *flows/tasks* will be executed according to their dependencies which will be resolved by using the *flows/tasks* provides and requires mappings or by following manually created dependency links. From dependencies a `directed graph`_ is built. If it has edge ``A -> B``, this means ``B`` depends on ``A`` (and that the execution of ``B`` must wait until ``A`` has finished executing, on reverting this means that the reverting of ``A`` must wait until ``B`` has finished reverting). Note: `cyclic`_ dependencies are not allowed. .. _directed graph: https://en.wikipedia.org/wiki/Directed_graph .. _cyclic: https://en.wikipedia.org/wiki/Cycle_graph """ def __init__(self, name, retry=None): super(Flow, self).__init__(name, retry) self._graph = gr.DiGraph(name=name) self._graph.freeze() #: Extracts the unsatisified symbol requirements of a single node. _unsatisfied_requires = staticmethod(_unsatisfied_requires) def _link(self, u, v, graph=None, reason=None, manual=False, decider=None, decider_depth=None): mutable_graph = True if graph is None: graph = self._graph mutable_graph = False # NOTE(harlowja): Add an edge to a temporary copy and only if that # copy is valid then do we swap with the underlying graph. attrs = graph.get_edge_data(u, v) if not attrs: attrs = {} if decider is not None: attrs[flow.LINK_DECIDER] = decider try: # Remove existing decider depth, if one existed. del attrs[flow.LINK_DECIDER_DEPTH] except KeyError: pass if decider_depth is not None: if decider is None: raise ValueError("Decider depth requires a decider to be" " provided along with it") else: decider_depth = de.Depth.translate(decider_depth) attrs[flow.LINK_DECIDER_DEPTH] = decider_depth if manual: attrs[flow.LINK_MANUAL] = True if reason is not None: if flow.LINK_REASONS not in attrs: attrs[flow.LINK_REASONS] = set() attrs[flow.LINK_REASONS].add(reason) if not mutable_graph: graph = gr.DiGraph(graph) graph.add_edge(u, v, **attrs) return graph def _swap(self, graph): """Validates the replacement graph and then swaps the underlying graph. After swapping occurs the underlying graph will be frozen so that the immutability invariant is maintained (we may be able to relax this constraint in the future since our exposed public api does not allow direct access to the underlying graph). """ if not graph.is_directed_acyclic(): raise exc.DependencyFailure("No path through the node(s) in the" " graph produces an ordering that" " will allow for logical" " edge traversal") self._graph = graph.freeze()
[docs] def add(self, *nodes, **kwargs): """Adds a given task/tasks/flow/flows to this flow. Note that if the addition of these nodes (and any edges) creates a `cyclic`_ graph then a :class:`~taskflow.exceptions.DependencyFailure` will be raised and the applied changes will be discarded. :param nodes: node(s) to add to the flow :param kwargs: keyword arguments, the two keyword arguments currently processed are: * ``resolve_requires`` a boolean that when true (the default) implies that when node(s) are added their symbol requirements will be matched to existing node(s) and links will be automatically made to those providers. If multiple possible providers exist then a :class:`~taskflow.exceptions.AmbiguousDependency` exception will be raised and the provided additions will be discarded. * ``resolve_existing``, a boolean that when true (the default) implies that on addition of a new node that existing node(s) will have their requirements scanned for symbols that this newly added node can provide. If a match is found a link is automatically created from the newly added node to the requiree. .. _cyclic: https://en.wikipedia.org/wiki/Cycle_graph """ # Let's try to avoid doing any work if we can; since the below code # after this filter can create more temporary graphs that aren't needed # if the nodes already exist... nodes = [i for i in nodes if not self._graph.has_node(i)] if not nodes: return self # This syntax will *hopefully* be better in future versions of python. # # See: http://legacy.python.org/dev/peps/pep-3102/ (python 3.0+) resolve_requires = bool(kwargs.get('resolve_requires', True)) resolve_existing = bool(kwargs.get('resolve_existing', True)) # Figure out what the existing nodes *still* require and what they # provide so we can do this lookup later when inferring. required = collections.defaultdict(list) provided = collections.defaultdict(list) retry_provides = set() if self._retry is not None: for value in self._retry.requires: required[value].append(self._retry) for value in self._retry.provides: retry_provides.add(value) provided[value].append(self._retry) for node in self._graph.nodes_iter(): for value in self._unsatisfied_requires(node, self._graph, retry_provides): required[value].append(node) for value in node.provides: provided[value].append(node) # NOTE(harlowja): Add node(s) and edge(s) to a temporary copy of the # underlying graph and only if that is successful added to do we then # swap with the underlying graph. tmp_graph = gr.DiGraph(self._graph) for node in nodes: tmp_graph.add_node(node) # Try to find a valid provider. if resolve_requires: for value in self._unsatisfied_requires(node, tmp_graph, retry_provides): if value in provided: providers = provided[value] if len(providers) > 1: provider_names = [n.name for n in providers] raise exc.AmbiguousDependency( "Resolution error detected when" " adding '%(node)s', multiple" " providers %(providers)s found for" " required symbol '%(value)s'" % dict(node=node.name, providers=sorted(provider_names), value=value)) else: self._link(providers[0], node, graph=tmp_graph, reason=value) else: required[value].append(node) for value in node.provides: provided[value].append(node) # See if what we provide fulfills any existing requiree. if resolve_existing: for value in node.provides: if value in required: for requiree in list(required[value]): if requiree is not node: self._link(node, requiree, graph=tmp_graph, reason=value) required[value].remove(requiree) self._swap(tmp_graph) return self
def _get_subgraph(self): """Get the active subgraph of _graph. Descendants may override this to make only part of self._graph visible. """ return self._graph def __len__(self): return self._get_subgraph().number_of_nodes() def __iter__(self): for n, _n_data in self.iter_nodes(): yield n def iter_links(self): return self._get_subgraph().edges_iter(data=True) def iter_nodes(self): g = self._get_subgraph() for n in g.topological_sort(): yield n, g.node[n] @property def requires(self): requires = set() retry_provides = set() if self._retry is not None: requires.update(self._retry.requires) retry_provides.update(self._retry.provides) g = self._get_subgraph() for node in g.nodes_iter(): requires.update(self._unsatisfied_requires(node, g, retry_provides)) return frozenset(requires)
def _reset_cached_subgraph(func): """Resets cached subgraph after execution, in case it was affected.""" @six.wraps(func) def wrapper(self, *args, **kwargs): result = func(self, *args, **kwargs) self._subgraph = None return result return wrapper
[docs]class TargetedFlow(Flow): """Graph flow with a target. Adds possibility to execute a flow up to certain graph node (task or subflow). """ def __init__(self, *args, **kwargs): super(TargetedFlow, self).__init__(*args, **kwargs) self._subgraph = None self._target = None
[docs] def set_target(self, target_node): """Set target for the flow. Any node(s) (tasks or subflows) not needed for the target node will not be executed. """ if not self._graph.has_node(target_node): raise ValueError("Node '%s' not found" % target_node) self._target = target_node self._subgraph = None
[docs] def reset_target(self): """Reset target for the flow. All node(s) of the flow will be executed. """ self._target = None self._subgraph = None
add = _reset_cached_subgraph(Flow.add) link = _reset_cached_subgraph(Flow.link) def _get_subgraph(self): if self._subgraph is not None: return self._subgraph if self._target is None: return self._graph nodes = [self._target] nodes.extend(self._graph.bfs_predecessors_iter(self._target)) self._subgraph = self._graph.subgraph(nodes) self._subgraph.freeze() return self._subgraph