Examples

While developing TaskFlow the team has worked hard to make sure the various concepts are explained by relevant examples. Here are a few selected examples to get started (ordered by perceived complexity):

To explore more of these examples please check out the examples directory in the TaskFlow source tree.

Note

If the examples provided are not satisfactory (or up to your standards) contributions are welcome and very much appreciated to help improve them. The higher the quality and the clearer the examples are the better and more useful they are for everyone.

Hello world

Note

Full source located at hello_world.

 1
 2import logging
 3import os
 4import sys
 5
 6logging.basicConfig(level=logging.ERROR)
 7
 8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 9                                       os.pardir,
10                                       os.pardir))
11sys.path.insert(0, top_dir)
12
13from taskflow import engines
14from taskflow.patterns import linear_flow as lf
15from taskflow.patterns import unordered_flow as uf
16from taskflow import task
17
18
19# INTRO: This is the defacto hello world equivalent for taskflow; it shows how
20# an overly simplistic workflow can be created that runs using different
21# engines using different styles of execution (all can be used to run in
22# parallel if a workflow is provided that is parallelizable).
23
24class PrinterTask(task.Task):
25    def __init__(self, name, show_name=True, inject=None):
26        super(PrinterTask, self).__init__(name, inject=inject)
27        self._show_name = show_name
28
29    def execute(self, output):
30        if self._show_name:
31            print("%s: %s" % (self.name, output))
32        else:
33            print(output)
34
35
36# This will be the work that we want done, which for this example is just to
37# print 'hello world' (like a song) using different tasks and different
38# execution models.
39song = lf.Flow("beats")
40
41# Unordered flows when ran can be ran in parallel; and a chorus is everyone
42# singing at once of course!
43hi_chorus = uf.Flow('hello')
44world_chorus = uf.Flow('world')
45for (name, hello, world) in [('bob', 'hello', 'world'),
46                             ('joe', 'hellooo', 'worllllld'),
47                             ('sue', "helloooooo!", 'wooorllld!')]:
48    hi_chorus.add(PrinterTask("%s@hello" % name,
49                              # This will show up to the execute() method of
50                              # the task as the argument named 'output' (which
51                              # will allow us to print the character we want).
52                              inject={'output': hello}))
53    world_chorus.add(PrinterTask("%s@world" % name,
54                                 inject={'output': world}))
55
56# The composition starts with the conductor and then runs in sequence with
57# the chorus running in parallel, but no matter what the 'hello' chorus must
58# always run before the 'world' chorus (otherwise the world will fall apart).
59song.add(PrinterTask("conductor@begin",
60                     show_name=False, inject={'output': "*ding*"}),
61         hi_chorus,
62         world_chorus,
63         PrinterTask("conductor@end",
64                     show_name=False, inject={'output': "*dong*"}))
65
66# Run in parallel using eventlet green threads...
67try:
68    import eventlet as _eventlet  # noqa
69except ImportError:
70    # No eventlet currently active, skip running with it...
71    pass
72else:
73    print("-- Running in parallel using eventlet --")
74    e = engines.load(song, executor='greenthreaded', engine='parallel',
75                     max_workers=1)
76    e.run()
77
78
79# Run in parallel using real threads...
80print("-- Running in parallel using threads --")
81e = engines.load(song, executor='threaded', engine='parallel',
82                 max_workers=1)
83e.run()
84
85
86# Run in parallel using external processes...
87print("-- Running in parallel using processes --")
88e = engines.load(song, executor='processes', engine='parallel',
89                 max_workers=1)
90e.run()
91
92
93# Run serially (aka, if the workflow could have been ran in parallel, it will
94# not be when ran in this mode)...
95print("-- Running serially --")
96e = engines.load(song, engine='serial')
97e.run()
98print("-- Statistics gathered --")
99print(e.statistics)

Passing values from and to tasks

Note

Full source located at simple_linear_pass.

 1
 2import logging
 3import os
 4import sys
 5
 6logging.basicConfig(level=logging.ERROR)
 7
 8self_dir = os.path.abspath(os.path.dirname(__file__))
 9top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
10                                       os.pardir,
11                                       os.pardir))
12sys.path.insert(0, top_dir)
13sys.path.insert(0, self_dir)
14
15from taskflow import engines
16from taskflow.patterns import linear_flow
17from taskflow import task
18
19# INTRO: This example shows how a task (in a linear/serial workflow) can
20# produce an output that can be then consumed/used by a downstream task.
21
22
23class TaskA(task.Task):
24    default_provides = 'a'
25
26    def execute(self):
27        print("Executing '%s'" % (self.name))
28        return 'a'
29
30
31class TaskB(task.Task):
32    def execute(self, a):
33        print("Executing '%s'" % (self.name))
34        print("Got input '%s'" % (a))
35
36
37print("Constructing...")
38wf = linear_flow.Flow("pass-from-to")
39wf.add(TaskA('a'), TaskB('b'))
40
41print("Loading...")
42e = engines.load(wf)
43
44print("Compiling...")
45e.compile()
46
47print("Preparing...")
48e.prepare()
49
50print("Running...")
51e.run()
52
53print("Done...")

Using listeners

Note

Full source located at echo_listener.

 1
 2import logging
 3import os
 4import sys
 5
 6logging.basicConfig(level=logging.DEBUG)
 7
 8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 9                                       os.pardir,
10                                       os.pardir))
11sys.path.insert(0, top_dir)
12
13from taskflow import engines
14from taskflow.listeners import logging as logging_listener
15from taskflow.patterns import linear_flow as lf
16from taskflow import task
17
18# INTRO: This example walks through a miniature workflow which will do a
19# simple echo operation; during this execution a listener is associated with
20# the engine to receive all notifications about what the flow has performed,
21# this example dumps that output to the stdout for viewing (at debug level
22# to show all the information which is possible).
23
24
25class Echo(task.Task):
26    def execute(self):
27        print(self.name)
28
29
30# Generate the work to be done (but don't do it yet).
31wf = lf.Flow('abc')
32wf.add(Echo('a'))
33wf.add(Echo('b'))
34wf.add(Echo('c'))
35
36# This will associate the listener with the engine (the listener
37# will automatically register for notifications with the engine and deregister
38# when the context is exited).
39e = engines.load(wf)
40with logging_listener.DynamicLoggingListener(e):
41    e.run()

Using listeners (to watch a phone call)

Note

Full source located at simple_linear_listening.

 1
 2import logging
 3import os
 4import sys
 5
 6logging.basicConfig(level=logging.ERROR)
 7
 8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 9                                       os.pardir,
10                                       os.pardir))
11sys.path.insert(0, top_dir)
12
13import taskflow.engines
14from taskflow.patterns import linear_flow as lf
15from taskflow import task
16from taskflow.types import notifier
17
18ANY = notifier.Notifier.ANY
19
20# INTRO: In this example we create two tasks (this time as functions instead
21# of task subclasses as in the simple_linear.py example), each of which ~calls~
22# a given ~phone~ number (provided as a function input) in a linear fashion
23# (one after the other).
24#
25# For a workflow which is serial this shows an extremely simple way
26# of structuring your tasks (the code that does the work) into a linear
27# sequence (the flow) and then passing the work off to an engine, with some
28# initial data to be ran in a reliable manner.
29#
30# This example shows a basic usage of the taskflow structures without involving
31# the complexity of persistence. Using the structures that taskflow provides
32# via tasks and flows makes it possible for you to easily at a later time
33# hook in a persistence layer (and then gain the functionality that offers)
34# when you decide the complexity of adding that layer in is 'worth it' for your
35# applications usage pattern (which some applications may not need).
36#
37# It **also** adds on to the simple_linear.py example by adding a set of
38# callback functions which the engine will call when a flow state transition
39# or task state transition occurs. These types of functions are useful for
40# updating task or flow progress, or for debugging, sending notifications to
41# external systems, or for other yet unknown future usage that you may create!
42
43
44def call_jim(context):
45    print("Calling jim.")
46    print("Context = %s" % (sorted(context.items(), key=lambda x: x[0])))
47
48
49def call_joe(context):
50    print("Calling joe.")
51    print("Context = %s" % (sorted(context.items(), key=lambda x: x[0])))
52
53
54def flow_watch(state, details):
55    print('Flow => %s' % state)
56
57
58def task_watch(state, details):
59    print('Task %s => %s' % (details.get('task_name'), state))
60
61
62# Wrap your functions into a task type that knows how to treat your functions
63# as tasks. There was previous work done to just allow a function to be
64# directly passed, but in python 3.0 there is no easy way to capture an
65# instance method, so this wrapping approach was decided upon instead which
66# can attach to instance methods (if that's desired).
67flow = lf.Flow("Call-them")
68flow.add(task.FunctorTask(execute=call_jim))
69flow.add(task.FunctorTask(execute=call_joe))
70
71# Now load (but do not run) the flow using the provided initial data.
72engine = taskflow.engines.load(flow, store={
73    'context': {
74        "joe_number": 444,
75        "jim_number": 555,
76    }
77})
78
79# This is where we attach our callback functions to the 2 different
80# notification objects that an engine exposes. The usage of a ANY (kleene star)
81# here means that we want to be notified on all state changes, if you want to
82# restrict to a specific state change, just register that instead.
83engine.notifier.register(ANY, flow_watch)
84engine.atom_notifier.register(ANY, task_watch)
85
86# And now run!
87engine.run()

Dumping a in-memory backend

Note

Full source located at dump_memory_backend.

 1
 2import logging
 3import os
 4import sys
 5
 6logging.basicConfig(level=logging.ERROR)
 7
 8self_dir = os.path.abspath(os.path.dirname(__file__))
 9top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
10                                       os.pardir,
11                                       os.pardir))
12sys.path.insert(0, top_dir)
13sys.path.insert(0, self_dir)
14
15from taskflow import engines
16from taskflow.patterns import linear_flow as lf
17from taskflow import task
18
19# INTRO: in this example we create a dummy flow with a dummy task, and run
20# it using a in-memory backend and pre/post run we dump out the contents
21# of the in-memory backends tree structure (which can be quite useful to
22# look at for debugging or other analysis).
23
24
25class PrintTask(task.Task):
26    def execute(self):
27        print("Running '%s'" % self.name)
28
29# Make a little flow and run it...
30f = lf.Flow('root')
31for alpha in ['a', 'b', 'c']:
32    f.add(PrintTask(alpha))
33
34e = engines.load(f)
35e.compile()
36e.prepare()
37
38# After prepare the storage layer + backend can now be accessed safely...
39backend = e.storage.backend
40
41print("----------")
42print("Before run")
43print("----------")
44print(backend.memory.pformat())
45print("----------")
46
47e.run()
48
49print("---------")
50print("After run")
51print("---------")
52for path in backend.memory.ls_r(backend.memory.root_path, absolute=True):
53    value = backend.memory[path]
54    if value:
55        print("%s -> %s" % (path, value))
56    else:
57        print("%s" % (path))

Making phone calls

Note

Full source located at simple_linear.

 1
 2import logging
 3import os
 4import sys
 5
 6logging.basicConfig(level=logging.ERROR)
 7
 8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 9                                       os.pardir,
10                                       os.pardir))
11sys.path.insert(0, top_dir)
12
13import taskflow.engines
14from taskflow.patterns import linear_flow as lf
15from taskflow import task
16
17# INTRO: In this example we create two tasks, each of which ~calls~ a given
18# ~phone~ number (provided as a function input) in a linear fashion (one after
19# the other). For a workflow which is serial this shows a extremely simple way
20# of structuring your tasks (the code that does the work) into a linear
21# sequence (the flow) and then passing the work off to an engine, with some
22# initial data to be ran in a reliable manner.
23#
24# NOTE(harlowja): This example shows a basic usage of the taskflow structures
25# without involving the complexity of persistence. Using the structures that
26# taskflow provides via tasks and flows makes it possible for you to easily at
27# a later time hook in a persistence layer (and then gain the functionality
28# that offers) when you decide the complexity of adding that layer in
29# is 'worth it' for your application's usage pattern (which certain
30# applications may not need).
31
32
33class CallJim(task.Task):
34    def execute(self, jim_number, *args, **kwargs):
35        print("Calling jim %s." % jim_number)
36
37
38class CallJoe(task.Task):
39    def execute(self, joe_number, *args, **kwargs):
40        print("Calling joe %s." % joe_number)
41
42
43# Create your flow and associated tasks (the work to be done).
44flow = lf.Flow('simple-linear').add(
45    CallJim(),
46    CallJoe()
47)
48
49# Now run that flow using the provided initial data (store below).
50taskflow.engines.run(flow, store=dict(joe_number=444,
51                                      jim_number=555))

Making phone calls (automatically reverting)

Note

Full source located at reverting_linear.

 1
 2import logging
 3import os
 4import sys
 5
 6logging.basicConfig(level=logging.ERROR)
 7
 8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 9                                       os.pardir,
10                                       os.pardir))
11sys.path.insert(0, top_dir)
12
13import taskflow.engines
14from taskflow.patterns import linear_flow as lf
15from taskflow import task
16
17# INTRO: In this example we create three tasks, each of which ~calls~ a given
18# number (provided as a function input), one of those tasks *fails* calling a
19# given number (the suzzie calling); this causes the workflow to enter the
20# reverting process, which activates the revert methods of the previous two
21# phone ~calls~.
22#
23# This simulated calling makes it appear like all three calls occur or all
24# three don't occur (transaction-like capabilities). No persistence layer is
25# used here so reverting and executing will *not* be tolerant of process
26# failure.
27
28
29class CallJim(task.Task):
30    def execute(self, jim_number, *args, **kwargs):
31        print("Calling jim %s." % jim_number)
32
33    def revert(self, jim_number, *args, **kwargs):
34        print("Calling %s and apologizing." % jim_number)
35
36
37class CallJoe(task.Task):
38    def execute(self, joe_number, *args, **kwargs):
39        print("Calling joe %s." % joe_number)
40
41    def revert(self, joe_number, *args, **kwargs):
42        print("Calling %s and apologizing." % joe_number)
43
44
45class CallSuzzie(task.Task):
46    def execute(self, suzzie_number, *args, **kwargs):
47        raise IOError("Suzzie not home right now.")
48
49
50# Create your flow and associated tasks (the work to be done).
51flow = lf.Flow('simple-linear').add(
52    CallJim(),
53    CallJoe(),
54    CallSuzzie()
55)
56
57try:
58    # Now run that flow using the provided initial data (store below).
59    taskflow.engines.run(flow, store=dict(joe_number=444,
60                                          jim_number=555,
61                                          suzzie_number=666))
62except Exception as e:
63    # NOTE(harlowja): This exception will be the exception that came out of the
64    # 'CallSuzzie' task instead of a different exception, this is useful since
65    # typically surrounding code wants to handle the original exception and not
66    # a wrapped or altered one.
67    #
68    # *WARNING* If this flow was multi-threaded and multiple active tasks threw
69    # exceptions then the above exception would be wrapped into a combined
70    # exception (the object has methods to iterate over the contained
71    # exceptions). See: exceptions.py and the class 'WrappedFailure' to look at
72    # how to deal with multiple tasks failing while running.
73    #
74    # You will also note that this is not a problem in this case since no
75    # parallelism is involved; this is ensured by the usage of a linear flow
76    # and the default engine type which is 'serial' vs being 'parallel'.
77    print("Flow failed: %s" % e)

Building a car

Note

Full source located at build_a_car.

  1
  2import logging
  3import os
  4import sys
  5
  6
  7logging.basicConfig(level=logging.ERROR)
  8
  9top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 10                                       os.pardir,
 11                                       os.pardir))
 12sys.path.insert(0, top_dir)
 13
 14
 15import taskflow.engines
 16from taskflow.patterns import graph_flow as gf
 17from taskflow.patterns import linear_flow as lf
 18from taskflow import task
 19from taskflow.types import notifier
 20
 21ANY = notifier.Notifier.ANY
 22
 23import example_utils as eu  # noqa
 24
 25
 26# INTRO: This example shows how a graph flow and linear flow can be used
 27# together to execute dependent & non-dependent tasks by going through the
 28# steps required to build a simplistic car (an assembly line if you will). It
 29# also shows how raw functions can be wrapped into a task object instead of
 30# being forced to use the more *heavy* task base class. This is useful in
 31# scenarios where pre-existing code has functions that you easily want to
 32# plug-in to taskflow, without requiring a large amount of code changes.
 33
 34
 35def build_frame():
 36    return 'steel'
 37
 38
 39def build_engine():
 40    return 'honda'
 41
 42
 43def build_doors():
 44    return '2'
 45
 46
 47def build_wheels():
 48    return '4'
 49
 50
 51# These just return true to indiciate success, they would in the real work
 52# do more than just that.
 53
 54def install_engine(frame, engine):
 55    return True
 56
 57
 58def install_doors(frame, windows_installed, doors):
 59    return True
 60
 61
 62def install_windows(frame, doors):
 63    return True
 64
 65
 66def install_wheels(frame, engine, engine_installed, wheels):
 67    return True
 68
 69
 70def trash(**kwargs):
 71    eu.print_wrapped("Throwing away pieces of car!")
 72
 73
 74def startup(**kwargs):
 75    # If you want to see the rollback function being activated try uncommenting
 76    # the following line.
 77    #
 78    # raise ValueError("Car not verified")
 79    return True
 80
 81
 82def verify(spec, **kwargs):
 83    # If the car is not what we ordered throw away the car (trigger reversion).
 84    for key, value in kwargs.items():
 85        if spec[key] != value:
 86            raise Exception("Car doesn't match spec!")
 87    return True
 88
 89
 90# These two functions connect into the state transition notification emission
 91# points that the engine outputs, they can be used to log state transitions
 92# that are occurring, or they can be used to suspend the engine (or perform
 93# other useful activities).
 94def flow_watch(state, details):
 95    print('Flow => %s' % state)
 96
 97
 98def task_watch(state, details):
 99    print('Task %s => %s' % (details.get('task_name'), state))
100
101
102flow = lf.Flow("make-auto").add(
103    task.FunctorTask(startup, revert=trash, provides='ran'),
104    # A graph flow allows automatic dependency based ordering, the ordering
105    # is determined by analyzing the symbols required and provided and ordering
106    # execution based on a functioning order (if one exists).
107    gf.Flow("install-parts").add(
108        task.FunctorTask(build_frame, provides='frame'),
109        task.FunctorTask(build_engine, provides='engine'),
110        task.FunctorTask(build_doors, provides='doors'),
111        task.FunctorTask(build_wheels, provides='wheels'),
112        # These *_installed outputs allow for other tasks to depend on certain
113        # actions being performed (aka the components were installed), another
114        # way to do this is to link() the tasks manually instead of creating
115        # an 'artificial' data dependency that accomplishes the same goal the
116        # manual linking would result in.
117        task.FunctorTask(install_engine, provides='engine_installed'),
118        task.FunctorTask(install_doors, provides='doors_installed'),
119        task.FunctorTask(install_windows, provides='windows_installed'),
120        task.FunctorTask(install_wheels, provides='wheels_installed')),
121    task.FunctorTask(verify, requires=['frame',
122                                       'engine',
123                                       'doors',
124                                       'wheels',
125                                       'engine_installed',
126                                       'doors_installed',
127                                       'windows_installed',
128                                       'wheels_installed']))
129
130# This dictionary will be provided to the tasks as a specification for what
131# the tasks should produce, in this example this specification will influence
132# what those tasks do and what output they create. Different tasks depend on
133# different information from this specification, all of which will be provided
134# automatically by the engine to those tasks.
135spec = {
136    "frame": 'steel',
137    "engine": 'honda',
138    "doors": '2',
139    "wheels": '4',
140    # These are used to compare the result product, a car without the pieces
141    # installed is not a car after all.
142    "engine_installed": True,
143    "doors_installed": True,
144    "windows_installed": True,
145    "wheels_installed": True,
146}
147
148
149engine = taskflow.engines.load(flow, store={'spec': spec.copy()})
150
151# This registers all (ANY) state transitions to trigger a call to the
152# flow_watch function for flow state transitions, and registers the
153# same all (ANY) state transitions for task state transitions.
154engine.notifier.register(ANY, flow_watch)
155engine.atom_notifier.register(ANY, task_watch)
156
157eu.print_wrapped("Building a car")
158engine.run()
159
160# Alter the specification and ensure that the reverting logic gets triggered
161# since the resultant car that will be built by the build_wheels function will
162# build a car with 4 doors only (not 5), this will cause the verification
163# task to mark the car that is produced as not matching the desired spec.
164spec['doors'] = 5
165
166engine = taskflow.engines.load(flow, store={'spec': spec.copy()})
167engine.notifier.register(ANY, flow_watch)
168engine.atom_notifier.register(ANY, task_watch)
169
170eu.print_wrapped("Building a wrong car that doesn't match specification")
171try:
172    engine.run()
173except Exception as e:
174    eu.print_wrapped("Flow failed: %s" % e)

Iterating over the alphabet (using processes)

Note

Full source located at alphabet_soup.

 1
 2import fractions
 3import functools
 4import logging
 5import os
 6import string
 7import sys
 8import time
 9
10logging.basicConfig(level=logging.ERROR)
11
12self_dir = os.path.abspath(os.path.dirname(__file__))
13top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
14                                       os.pardir,
15                                       os.pardir))
16sys.path.insert(0, top_dir)
17sys.path.insert(0, self_dir)
18
19from taskflow import engines
20from taskflow import exceptions
21from taskflow.patterns import linear_flow
22from taskflow import task
23
24
25# In this example we show how a simple linear set of tasks can be executed
26# using local processes (and not threads or remote workers) with minimal (if
27# any) modification to those tasks to make them safe to run in this mode.
28#
29# This is useful since it allows further scaling up your workflows when thread
30# execution starts to become a bottleneck (which it can start to be due to the
31# GIL in python). It also offers a intermediary scalable runner that can be
32# used when the scale and/or setup of remote workers is not desirable.
33
34
35def progress_printer(task, event_type, details):
36    # This callback, attached to each task will be called in the local
37    # process (not the child processes)...
38    progress = details.pop('progress')
39    progress = int(progress * 100.0)
40    print("Task '%s' reached %d%% completion" % (task.name, progress))
41
42
43class AlphabetTask(task.Task):
44    # Second delay between each progress part.
45    _DELAY = 0.1
46
47    # This task will run in X main stages (each with a different progress
48    # report that will be delivered back to the running process...). The
49    # initial 0% and 100% are triggered automatically by the engine when
50    # a task is started and finished (so that's why those are not emitted
51    # here).
52    _PROGRESS_PARTS = [fractions.Fraction("%s/5" % x) for x in range(1, 5)]
53
54    def execute(self):
55        for p in self._PROGRESS_PARTS:
56            self.update_progress(p)
57            time.sleep(self._DELAY)
58
59
60print("Constructing...")
61soup = linear_flow.Flow("alphabet-soup")
62for letter in string.ascii_lowercase:
63    abc = AlphabetTask(letter)
64    abc.notifier.register(task.EVENT_UPDATE_PROGRESS,
65                          functools.partial(progress_printer, abc))
66    soup.add(abc)
67try:
68    print("Loading...")
69    e = engines.load(soup, engine='parallel', executor='processes')
70    print("Compiling...")
71    e.compile()
72    print("Preparing...")
73    e.prepare()
74    print("Running...")
75    e.run()
76    print("Done: %s" % e.statistics)
77except exceptions.NotImplementedError as e:
78    print(e)

Watching execution timing

Note

Full source located at timing_listener.

 1
 2import logging
 3import os
 4import random
 5import sys
 6import time
 7
 8logging.basicConfig(level=logging.ERROR)
 9
10top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
11                                       os.pardir,
12                                       os.pardir))
13sys.path.insert(0, top_dir)
14
15from taskflow import engines
16from taskflow.listeners import timing
17from taskflow.patterns import linear_flow as lf
18from taskflow import task
19
20# INTRO: in this example we will attach a listener to an engine
21# and have variable run time tasks run and show how the listener will print
22# out how long those tasks took (when they started and when they finished).
23#
24# This shows how timing metrics can be gathered (or attached onto an engine)
25# after a workflow has been constructed, making it easy to gather metrics
26# dynamically for situations where this kind of information is applicable (or
27# even adding this information on at a later point in the future when your
28# application starts to slow down).
29
30
31class VariableTask(task.Task):
32    def __init__(self, name):
33        super(VariableTask, self).__init__(name)
34        self._sleepy_time = random.random()
35
36    def execute(self):
37        time.sleep(self._sleepy_time)
38
39
40f = lf.Flow('root')
41f.add(VariableTask('a'), VariableTask('b'), VariableTask('c'))
42e = engines.load(f)
43with timing.PrintingDurationListener(e):
44    e.run()

Distance calculator

Note

Full source located at distance_calculator

 1
 2import collections
 3import math
 4import os
 5import sys
 6
 7top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 8                                       os.pardir,
 9                                       os.pardir))
10sys.path.insert(0, top_dir)
11
12from taskflow import engines
13from taskflow.patterns import linear_flow
14from taskflow import task
15
16# INTRO: This shows how to use a tasks/atoms ability to take requirements from
17# its execute functions default parameters and shows how to provide those
18# via different methods when needed, to influence those parameters to in
19# this case calculate the distance between two points in 2D space.
20
21# A 2D point.
22Point = collections.namedtuple("Point", "x,y")
23
24
25def is_near(val, expected, tolerance=0.001):
26    # Floats don't really provide equality...
27    if val > (expected + tolerance):
28        return False
29    if val < (expected - tolerance):
30        return False
31    return True
32
33
34class DistanceTask(task.Task):
35    # See: http://en.wikipedia.org/wiki/Distance#Distance_in_Euclidean_space
36
37    default_provides = 'distance'
38
39    def execute(self, a=Point(0, 0), b=Point(0, 0)):
40        return math.sqrt(math.pow(b.x - a.x, 2) + math.pow(b.y - a.y, 2))
41
42
43if __name__ == '__main__':
44    # For these we rely on the execute() methods points by default being
45    # at the origin (and we override it with store values when we want) at
46    # execution time (which then influences what is calculated).
47    any_distance = linear_flow.Flow("origin").add(DistanceTask())
48    results = engines.run(any_distance)
49    print(results)
50    print("%s is near-enough to %s: %s" % (results['distance'],
51                                           0.0,
52                                           is_near(results['distance'], 0.0)))
53
54    results = engines.run(any_distance, store={'a': Point(1, 1)})
55    print(results)
56    print("%s is near-enough to %s: %s" % (results['distance'],
57                                           1.4142,
58                                           is_near(results['distance'],
59                                                   1.4142)))
60
61    results = engines.run(any_distance, store={'a': Point(10, 10)})
62    print(results)
63    print("%s is near-enough to %s: %s" % (results['distance'],
64                                           14.14199,
65                                           is_near(results['distance'],
66                                                   14.14199)))
67
68    results = engines.run(any_distance,
69                          store={'a': Point(5, 5), 'b': Point(10, 10)})
70    print(results)
71    print("%s is near-enough to %s: %s" % (results['distance'],
72                                           7.07106,
73                                           is_near(results['distance'],
74                                                   7.07106)))
75
76    # For this we use the ability to override at task creation time the
77    # optional arguments so that we don't need to continue to send them
78    # in via the 'store' argument like in the above (and we fix the new
79    # starting point 'a' at (10, 10) instead of (0, 0)...
80
81    ten_distance = linear_flow.Flow("ten")
82    ten_distance.add(DistanceTask(inject={'a': Point(10, 10)}))
83    results = engines.run(ten_distance, store={'b': Point(10, 10)})
84    print(results)
85    print("%s is near-enough to %s: %s" % (results['distance'],
86                                           0.0,
87                                           is_near(results['distance'], 0.0)))
88
89    results = engines.run(ten_distance)
90    print(results)
91    print("%s is near-enough to %s: %s" % (results['distance'],
92                                           14.14199,
93                                           is_near(results['distance'],
94                                                   14.14199)))

Table multiplier (in parallel)

Note

Full source located at parallel_table_multiply

  1
  2import csv
  3import logging
  4import os
  5import random
  6import sys
  7
  8logging.basicConfig(level=logging.ERROR)
  9
 10top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 11                                       os.pardir,
 12                                       os.pardir))
 13sys.path.insert(0, top_dir)
 14
 15import futurist
 16
 17from taskflow import engines
 18from taskflow.patterns import unordered_flow as uf
 19from taskflow import task
 20
 21# INTRO: This example walks through a miniature workflow which does a parallel
 22# table modification where each row in the table gets adjusted by a thread, or
 23# green thread (if eventlet is available) in parallel and then the result
 24# is reformed into a new table and some verifications are performed on it
 25# to ensure everything went as expected.
 26
 27
 28MULTIPLER = 10
 29
 30
 31class RowMultiplier(task.Task):
 32    """Performs a modification of an input row, creating a output row."""
 33
 34    def __init__(self, name, index, row, multiplier):
 35        super(RowMultiplier, self).__init__(name=name)
 36        self.index = index
 37        self.multiplier = multiplier
 38        self.row = row
 39
 40    def execute(self):
 41        return [r * self.multiplier for r in self.row]
 42
 43
 44def make_flow(table):
 45    # This creation will allow for parallel computation (since the flow here
 46    # is specifically unordered; and when things are unordered they have
 47    # no dependencies and when things have no dependencies they can just be
 48    # ran at the same time, limited in concurrency by the executor or max
 49    # workers of that executor...)
 50    f = uf.Flow("root")
 51    for i, row in enumerate(table):
 52        f.add(RowMultiplier("m-%s" % i, i, row, MULTIPLER))
 53    # NOTE(harlowja): at this point nothing has ran, the above is just
 54    # defining what should be done (but not actually doing it) and associating
 55    # an ordering dependencies that should be enforced (the flow pattern used
 56    # forces this), the engine in the later main() function will actually
 57    # perform this work...
 58    return f
 59
 60
 61def main():
 62    if len(sys.argv) == 2:
 63        tbl = []
 64        with open(sys.argv[1], 'rb') as fh:
 65            reader = csv.reader(fh)
 66            for row in reader:
 67                tbl.append([float(r) if r else 0.0 for r in row])
 68    else:
 69        # Make some random table out of thin air...
 70        tbl = []
 71        cols = random.randint(1, 100)
 72        rows = random.randint(1, 100)
 73        for _i in range(0, rows):
 74            row = []
 75            for _j in range(0, cols):
 76                row.append(random.random())
 77            tbl.append(row)
 78
 79    # Generate the work to be done.
 80    f = make_flow(tbl)
 81
 82    # Now run it (using the specified executor)...
 83    try:
 84        executor = futurist.GreenThreadPoolExecutor(max_workers=5)
 85    except RuntimeError:
 86        # No eventlet currently active, use real threads instead.
 87        executor = futurist.ThreadPoolExecutor(max_workers=5)
 88    try:
 89        e = engines.load(f, engine='parallel', executor=executor)
 90        for st in e.run_iter():
 91            print(st)
 92    finally:
 93        executor.shutdown()
 94
 95    # Find the old rows and put them into place...
 96    #
 97    # TODO(harlowja): probably easier just to sort instead of search...
 98    computed_tbl = []
 99    for i in range(0, len(tbl)):
100        for t in f:
101            if t.index == i:
102                computed_tbl.append(e.storage.get(t.name))
103
104    # Do some basic validation (which causes the return code of this process
105    # to be different if things were not as expected...)
106    if len(computed_tbl) != len(tbl):
107        return 1
108    else:
109        return 0
110
111
112if __name__ == "__main__":
113    sys.exit(main())

Linear equation solver (explicit dependencies)

Note

Full source located at calculate_linear.

  1
  2import logging
  3import os
  4import sys
  5
  6logging.basicConfig(level=logging.ERROR)
  7
  8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
  9                                       os.pardir,
 10                                       os.pardir))
 11sys.path.insert(0, top_dir)
 12
 13import taskflow.engines
 14from taskflow.patterns import linear_flow as lf
 15from taskflow import task
 16
 17
 18# INTRO: In this example a linear flow is used to group four tasks to calculate
 19# a value. A single added task is used twice, showing how this can be done
 20# and the twice added task takes in different bound values. In the first case
 21# it uses default parameters ('x' and 'y') and in the second case arguments
 22# are bound with ('z', 'd') keys from the engines internal storage mechanism.
 23#
 24# A multiplier task uses a binding that another task also provides, but this
 25# example explicitly shows that 'z' parameter is bound with 'a' key
 26# This shows that if a task depends on a key named the same as a key provided
 27# from another task the name can be remapped to take the desired key from a
 28# different origin.
 29
 30
 31# This task provides some values from as a result of execution, this can be
 32# useful when you want to provide values from a static set to other tasks that
 33# depend on those values existing before those tasks can run.
 34#
 35# NOTE(harlowja): this usage is *depreciated* in favor of a simpler mechanism
 36# that just provides those values on engine running by prepopulating the
 37# storage backend before your tasks are ran (which accomplishes a similar goal
 38# in a more uniform manner).
 39class Provider(task.Task):
 40
 41    def __init__(self, name, *args, **kwargs):
 42        super(Provider, self).__init__(name=name, **kwargs)
 43        self._provide = args
 44
 45    def execute(self):
 46        return self._provide
 47
 48
 49# This task adds two input variables and returns the result.
 50#
 51# Note that since this task does not have a revert() function (since addition
 52# is a stateless operation) there are no side-effects that this function needs
 53# to undo if some later operation fails.
 54class Adder(task.Task):
 55    def execute(self, x, y):
 56        return x + y
 57
 58
 59# This task multiplies an input variable by a multiplier and returns the
 60# result.
 61#
 62# Note that since this task does not have a revert() function (since
 63# multiplication is a stateless operation) and there are no side-effects that
 64# this function needs to undo if some later operation fails.
 65class Multiplier(task.Task):
 66    def __init__(self, name, multiplier, provides=None, rebind=None):
 67        super(Multiplier, self).__init__(name=name, provides=provides,
 68                                         rebind=rebind)
 69        self._multiplier = multiplier
 70
 71    def execute(self, z):
 72        return z * self._multiplier
 73
 74
 75# Note here that the ordering is established so that the correct sequences
 76# of operations occurs where the adding and multiplying is done according
 77# to the expected and typical mathematical model. A graph flow could also be
 78# used here to automatically infer & ensure the correct ordering.
 79flow = lf.Flow('root').add(
 80    # Provide the initial values for other tasks to depend on.
 81    #
 82    # x = 2, y = 3, d = 5
 83    Provider("provide-adder", 2, 3, 5, provides=('x', 'y', 'd')),
 84    # z = x+y = 5
 85    Adder("add-1", provides='z'),
 86    # a = z+d = 10
 87    Adder("add-2", provides='a', rebind=['z', 'd']),
 88    # Calculate 'r = a*3 = 30'
 89    #
 90    # Note here that the 'z' argument of the execute() function will not be
 91    # bound to the 'z' variable provided from the above 'provider' object but
 92    # instead the 'z' argument will be taken from the 'a' variable provided
 93    # by the second add-2 listed above.
 94    Multiplier("multi", 3, provides='r', rebind={'z': 'a'})
 95)
 96
 97# The result here will be all results (from all tasks) which is stored in an
 98# in-memory storage location that backs this engine since it is not configured
 99# with persistence storage.
100results = taskflow.engines.run(flow)
101print(results)

Linear equation solver (inferred dependencies)

Source: graph_flow.py

  1
  2import logging
  3import os
  4import sys
  5
  6logging.basicConfig(level=logging.ERROR)
  7
  8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
  9                                       os.pardir,
 10                                       os.pardir))
 11sys.path.insert(0, top_dir)
 12
 13import taskflow.engines
 14from taskflow.patterns import graph_flow as gf
 15from taskflow.patterns import linear_flow as lf
 16from taskflow import task
 17
 18
 19# In this example there are complex *inferred* dependencies between tasks that
 20# are used to perform a simple set of linear equations.
 21#
 22# As you will see below the tasks just define what they require as input
 23# and produce as output (named values). Then the user doesn't care about
 24# ordering the tasks (in this case the tasks calculate pieces of the overall
 25# equation).
 26#
 27# As you will notice a graph flow resolves dependencies automatically using the
 28# tasks symbol requirements and provided symbol values and no orderin
 29# dependency has to be manually created.
 30#
 31# Also notice that flows of any types can be nested into a graph flow; showing
 32# that subflow dependencies (and associated ordering) will be inferred too.
 33
 34
 35class Adder(task.Task):
 36
 37    def execute(self, x, y):
 38        return x + y
 39
 40
 41flow = gf.Flow('root').add(
 42    lf.Flow('nested_linear').add(
 43        # x2 = y3+y4 = 12
 44        Adder("add2", provides='x2', rebind=['y3', 'y4']),
 45        # x1 = y1+y2 = 4
 46        Adder("add1", provides='x1', rebind=['y1', 'y2'])
 47    ),
 48    # x5 = x1+x3 = 20
 49    Adder("add5", provides='x5', rebind=['x1', 'x3']),
 50    # x3 = x1+x2 = 16
 51    Adder("add3", provides='x3', rebind=['x1', 'x2']),
 52    # x4 = x2+y5 = 21
 53    Adder("add4", provides='x4', rebind=['x2', 'y5']),
 54    # x6 = x5+x4 = 41
 55    Adder("add6", provides='x6', rebind=['x5', 'x4']),
 56    # x7 = x6+x6 = 82
 57    Adder("add7", provides='x7', rebind=['x6', 'x6']))
 58
 59# Provide the initial variable inputs using a storage dictionary.
 60store = {
 61    "y1": 1,
 62    "y2": 3,
 63    "y3": 5,
 64    "y4": 7,
 65    "y5": 9,
 66}
 67
 68# This is the expected values that should be created.
 69unexpected = 0
 70expected = [
 71    ('x1', 4),
 72    ('x2', 12),
 73    ('x3', 16),
 74    ('x4', 21),
 75    ('x5', 20),
 76    ('x6', 41),
 77    ('x7', 82),
 78]
 79
 80result = taskflow.engines.run(
 81    flow, engine='serial', store=store)
 82
 83print("Single threaded engine result %s" % result)
 84for (name, value) in expected:
 85    actual = result.get(name)
 86    if actual != value:
 87        sys.stderr.write("%s != %s\n" % (actual, value))
 88        unexpected += 1
 89
 90result = taskflow.engines.run(
 91    flow, engine='parallel', store=store)
 92
 93print("Multi threaded engine result %s" % result)
 94for (name, value) in expected:
 95    actual = result.get(name)
 96    if actual != value:
 97        sys.stderr.write("%s != %s\n" % (actual, value))
 98        unexpected += 1
 99
100if unexpected:
101    sys.exit(1)

Linear equation solver (in parallel)

Note

Full source located at calculate_in_parallel

 1
 2import logging
 3import os
 4import sys
 5
 6logging.basicConfig(level=logging.ERROR)
 7
 8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 9                                       os.pardir,
10                                       os.pardir))
11sys.path.insert(0, top_dir)
12
13import taskflow.engines
14from taskflow.patterns import linear_flow as lf
15from taskflow.patterns import unordered_flow as uf
16from taskflow import task
17
18# INTRO: These examples show how a linear flow and an unordered flow can be
19# used together to execute calculations in parallel and then use the
20# result for the next task/s. The adder task is used for all calculations
21# and argument bindings are used to set correct parameters for each task.
22
23
24# This task provides some values from as a result of execution, this can be
25# useful when you want to provide values from a static set to other tasks that
26# depend on those values existing before those tasks can run.
27#
28# NOTE(harlowja): this usage is *depreciated* in favor of a simpler mechanism
29# that provides those values on engine running by prepopulating the storage
30# backend before your tasks are ran (which accomplishes a similar goal in a
31# more uniform manner).
32class Provider(task.Task):
33    def __init__(self, name, *args, **kwargs):
34        super(Provider, self).__init__(name=name, **kwargs)
35        self._provide = args
36
37    def execute(self):
38        return self._provide
39
40
41# This task adds two input variables and returns the result of that addition.
42#
43# Note that since this task does not have a revert() function (since addition
44# is a stateless operation) there are no side-effects that this function needs
45# to undo if some later operation fails.
46class Adder(task.Task):
47    def execute(self, x, y):
48        return x + y
49
50
51flow = lf.Flow('root').add(
52    # Provide the initial values for other tasks to depend on.
53    #
54    # x1 = 2, y1 = 3, x2 = 5, x3 = 8
55    Provider("provide-adder", 2, 3, 5, 8,
56             provides=('x1', 'y1', 'x2', 'y2')),
57    # Note here that we define the flow that contains the 2 adders to be an
58    # unordered flow since the order in which these execute does not matter,
59    # another way to solve this would be to use a graph_flow pattern, which
60    # also can run in parallel (since they have no ordering dependencies).
61    uf.Flow('adders').add(
62        # Calculate 'z1 = x1+y1 = 5'
63        #
64        # Rebind here means that the execute() function x argument will be
65        # satisfied from a previous output named 'x1', and the y argument
66        # of execute() will be populated from the previous output named 'y1'
67        #
68        # The output (result of adding) will be mapped into a variable named
69        # 'z1' which can then be refereed to and depended on by other tasks.
70        Adder(name="add", provides='z1', rebind=['x1', 'y1']),
71        # z2 = x2+y2 = 13
72        Adder(name="add-2", provides='z2', rebind=['x2', 'y2']),
73    ),
74    # r = z1+z2 = 18
75    Adder(name="sum-1", provides='r', rebind=['z1', 'z2']))
76
77
78# The result here will be all results (from all tasks) which is stored in an
79# in-memory storage location that backs this engine since it is not configured
80# with persistence storage.
81result = taskflow.engines.run(flow, engine='parallel')
82print(result)

Creating a volume (in parallel)

Note

Full source located at create_parallel_volume

 1
 2import contextlib
 3import logging
 4import os
 5import random
 6import sys
 7import time
 8
 9logging.basicConfig(level=logging.ERROR)
10
11top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
12                                       os.pardir,
13                                       os.pardir))
14sys.path.insert(0, top_dir)
15
16from oslo_utils import reflection
17
18from taskflow import engines
19from taskflow.listeners import printing
20from taskflow.patterns import unordered_flow as uf
21from taskflow import task
22
23# INTRO: These examples show how unordered_flow can be used to create a large
24# number of fake volumes in parallel (or serially, depending on a constant that
25# can be easily changed).
26
27
28@contextlib.contextmanager
29def show_time(name):
30    start = time.time()
31    yield
32    end = time.time()
33    print(" -- %s took %0.3f seconds" % (name, end - start))
34
35
36# This affects how many volumes to create and how much time to *simulate*
37# passing for that volume to be created.
38MAX_CREATE_TIME = 3
39VOLUME_COUNT = 5
40
41# This will be used to determine if all the volumes are created in parallel
42# or whether the volumes are created serially (in an undefined ordered since
43# a unordered flow is used). Note that there is a disconnection between the
44# ordering and the concept of parallelism (since unordered items can still be
45# ran in a serial ordering). A typical use-case for offering both is to allow
46# for debugging using a serial approach, while when running at a larger scale
47# one would likely want to use the parallel approach.
48#
49# If you switch this flag from serial to parallel you can see the overall
50# time difference that this causes.
51SERIAL = False
52if SERIAL:
53    engine = 'serial'
54else:
55    engine = 'parallel'
56
57
58class VolumeCreator(task.Task):
59    def __init__(self, volume_id):
60        # Note here that the volume name is composed of the name of the class
61        # along with the volume id that is being created, since a name of a
62        # task uniquely identifies that task in storage it is important that
63        # the name be relevant and identifiable if the task is recreated for
64        # subsequent resumption (if applicable).
65        #
66        # UUIDs are *not* used as they can not be tied back to a previous tasks
67        # state on resumption (since they are unique and will vary for each
68        # task that is created). A name based off the volume id that is to be
69        # created is more easily tied back to the original task so that the
70        # volume create can be resumed/revert, and is much easier to use for
71        # audit and tracking purposes.
72        base_name = reflection.get_callable_name(self)
73        super(VolumeCreator, self).__init__(name="%s-%s" % (base_name,
74                                                            volume_id))
75        self._volume_id = volume_id
76
77    def execute(self):
78        print("Making volume %s" % (self._volume_id))
79        time.sleep(random.random() * MAX_CREATE_TIME)
80        print("Finished making volume %s" % (self._volume_id))
81
82
83# Assume there is no ordering dependency between volumes.
84flow = uf.Flow("volume-maker")
85for i in range(0, VOLUME_COUNT):
86    flow.add(VolumeCreator(volume_id="vol-%s" % (i)))
87
88
89# Show how much time the overall engine loading and running takes.
90with show_time(name=flow.name.title()):
91    eng = engines.load(flow, engine=engine)
92    # This context manager automatically adds (and automatically removes) a
93    # helpful set of state transition notification printing helper utilities
94    # that show you exactly what transitions the engine is going through
95    # while running the various volume create tasks.
96    with printing.PrintingListener(eng):
97        eng.run()

Summation mapper(s) and reducer (in parallel)

Note

Full source located at simple_map_reduce

 1
 2import logging
 3import os
 4import sys
 5
 6logging.basicConfig(level=logging.ERROR)
 7
 8self_dir = os.path.abspath(os.path.dirname(__file__))
 9top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
10                                       os.pardir,
11                                       os.pardir))
12sys.path.insert(0, top_dir)
13sys.path.insert(0, self_dir)
14
15# INTRO: These examples show a simplistic map/reduce implementation where
16# a set of mapper(s) will sum a series of input numbers (in parallel) and
17# return their individual summed result. A reducer will then use those
18# produced values and perform a final summation and this result will then be
19# printed (and verified to ensure the calculation was as expected).
20
21from taskflow import engines
22from taskflow.patterns import linear_flow
23from taskflow.patterns import unordered_flow
24from taskflow import task
25
26
27class SumMapper(task.Task):
28    def execute(self, inputs):
29        # Sums some set of provided inputs.
30        return sum(inputs)
31
32
33class TotalReducer(task.Task):
34    def execute(self, *args, **kwargs):
35        # Reduces all mapped summed outputs into a single value.
36        total = 0
37        for (k, v) in kwargs.items():
38            # If any other kwargs was passed in, we don't want to use those
39            # in the calculation of the total...
40            if k.startswith('reduction_'):
41                total += v
42        return total
43
44
45def chunk_iter(chunk_size, upperbound):
46    """Yields back chunk size pieces from zero to upperbound - 1."""
47    chunk = []
48    for i in range(0, upperbound):
49        chunk.append(i)
50        if len(chunk) == chunk_size:
51            yield chunk
52            chunk = []
53
54
55# Upper bound of numbers to sum for example purposes...
56UPPER_BOUND = 10000
57
58# How many mappers we want to have.
59SPLIT = 10
60
61# How big of a chunk we want to give each mapper.
62CHUNK_SIZE = UPPER_BOUND // SPLIT
63
64# This will be the workflow we will compose and run.
65w = linear_flow.Flow("root")
66
67# The mappers will run in parallel.
68store = {}
69provided = []
70mappers = unordered_flow.Flow('map')
71for i, chunk in enumerate(chunk_iter(CHUNK_SIZE, UPPER_BOUND)):
72    mapper_name = 'mapper_%s' % i
73    # Give that mapper some information to compute.
74    store[mapper_name] = chunk
75    # The reducer uses all of the outputs of the mappers, so it needs
76    # to be recorded that it needs access to them (under a specific name).
77    provided.append("reduction_%s" % i)
78    mappers.add(SumMapper(name=mapper_name,
79                          rebind={'inputs': mapper_name},
80                          provides=provided[-1]))
81w.add(mappers)
82
83# The reducer will run last (after all the mappers).
84w.add(TotalReducer('reducer', requires=provided))
85
86# Now go!
87e = engines.load(w, engine='parallel', store=store, max_workers=4)
88print("Running a parallel engine with options: %s" % e.options)
89e.run()
90
91# Now get the result the reducer created.
92total = e.storage.get('reducer')
93print("Calculated result = %s" % total)
94
95# Calculate it manually to verify that it worked...
96calc_total = sum(range(0, UPPER_BOUND))
97if calc_total != total:
98    sys.exit(1)

Sharing a thread pool executor (in parallel)

Note

Full source located at share_engine_thread

 1
 2import logging
 3import os
 4import random
 5import sys
 6import time
 7
 8logging.basicConfig(level=logging.ERROR)
 9
10top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
11                                       os.pardir,
12                                       os.pardir))
13sys.path.insert(0, top_dir)
14
15import futurist
16
17from taskflow import engines
18from taskflow.patterns import unordered_flow as uf
19from taskflow import task
20from taskflow.utils import threading_utils as tu
21
22# INTRO: in this example we create 2 dummy flow(s) with a 2 dummy task(s), and
23# run it using a shared thread pool executor to show how a single executor can
24# be used with more than one engine (sharing the execution thread pool between
25# them); this allows for saving resources and reusing threads in situations
26# where this is benefical.
27
28
29class DelayedTask(task.Task):
30    def __init__(self, name):
31        super(DelayedTask, self).__init__(name=name)
32        self._wait_for = random.random()
33
34    def execute(self):
35        print("Running '%s' in thread '%s'" % (self.name, tu.get_ident()))
36        time.sleep(self._wait_for)
37
38
39f1 = uf.Flow("f1")
40f1.add(DelayedTask("f1-1"))
41f1.add(DelayedTask("f1-2"))
42
43f2 = uf.Flow("f2")
44f2.add(DelayedTask("f2-1"))
45f2.add(DelayedTask("f2-2"))
46
47# Run them all using the same futures (thread-pool based) executor...
48with futurist.ThreadPoolExecutor() as ex:
49    e1 = engines.load(f1, engine='parallel', executor=ex)
50    e2 = engines.load(f2, engine='parallel', executor=ex)
51    iters = [e1.run_iter(), e2.run_iter()]
52    # Iterate over a copy (so we can remove from the source list).
53    cloned_iters = list(iters)
54    while iters:
55        # Run a single 'step' of each iterator, forcing each engine to perform
56        # some work, then yield, and repeat until each iterator is consumed
57        # and there is no more engine work to be done.
58        for it in cloned_iters:
59            try:
60                next(it)
61            except StopIteration:
62                try:
63                    iters.remove(it)
64                except ValueError:
65                    pass

Storing & emitting a bill

Note

Full source located at fake_billing

  1
  2import json
  3import logging
  4import os
  5import sys
  6import time
  7
  8logging.basicConfig(level=logging.ERROR)
  9
 10top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 11                                       os.pardir,
 12                                       os.pardir))
 13sys.path.insert(0, top_dir)
 14
 15from oslo_utils import uuidutils
 16
 17from taskflow import engines
 18from taskflow.listeners import printing
 19from taskflow.patterns import graph_flow as gf
 20from taskflow.patterns import linear_flow as lf
 21from taskflow import task
 22from taskflow.utils import misc
 23
 24# INTRO: This example walks through a miniature workflow which simulates
 25# the reception of an API request, creation of a database entry, driver
 26# activation (which invokes a 'fake' webservice) and final completion.
 27#
 28# This example also shows how a function/object (in this class the url sending)
 29# that occurs during driver activation can update the progress of a task
 30# without being aware of the internals of how to do this by associating a
 31# callback that the url sending can update as the sending progresses from 0.0%
 32# complete to 100% complete.
 33
 34
 35class DB(object):
 36    def query(self, sql):
 37        print("Querying with: %s" % (sql))
 38
 39
 40class UrlCaller(object):
 41    def __init__(self):
 42        self._send_time = 0.5
 43        self._chunks = 25
 44
 45    def send(self, url, data, status_cb=None):
 46        sleep_time = float(self._send_time) / self._chunks
 47        for i in range(0, len(data)):
 48            time.sleep(sleep_time)
 49            # As we send the data, each chunk we 'fake' send will progress
 50            # the sending progress that much further to 100%.
 51            if status_cb:
 52                status_cb(float(i) / len(data))
 53
 54
 55# Since engines save the output of tasks to a optional persistent storage
 56# backend resources have to be dealt with in a slightly different manner since
 57# resources are transient and can *not* be persisted (or serialized). For tasks
 58# that require access to a set of resources it is a common pattern to provide
 59# a object (in this case this object) on construction of those tasks via the
 60# task constructor.
 61class ResourceFetcher(object):
 62    def __init__(self):
 63        self._db_handle = None
 64        self._url_handle = None
 65
 66    @property
 67    def db_handle(self):
 68        if self._db_handle is None:
 69            self._db_handle = DB()
 70        return self._db_handle
 71
 72    @property
 73    def url_handle(self):
 74        if self._url_handle is None:
 75            self._url_handle = UrlCaller()
 76        return self._url_handle
 77
 78
 79class ExtractInputRequest(task.Task):
 80    def __init__(self, resources):
 81        super(ExtractInputRequest, self).__init__(provides="parsed_request")
 82        self._resources = resources
 83
 84    def execute(self, request):
 85        return {
 86            'user': request.user,
 87            'user_id': misc.as_int(request.id),
 88            'request_id': uuidutils.generate_uuid(),
 89        }
 90
 91
 92class MakeDBEntry(task.Task):
 93    def __init__(self, resources):
 94        super(MakeDBEntry, self).__init__()
 95        self._resources = resources
 96
 97    def execute(self, parsed_request):
 98        db_handle = self._resources.db_handle
 99        db_handle.query("INSERT %s INTO mydb" % (parsed_request))
100
101    def revert(self, result, parsed_request):
102        db_handle = self._resources.db_handle
103        db_handle.query("DELETE %s FROM mydb IF EXISTS" % (parsed_request))
104
105
106class ActivateDriver(task.Task):
107    def __init__(self, resources):
108        super(ActivateDriver, self).__init__(provides='sent_to')
109        self._resources = resources
110        self._url = "http://blahblah.com"
111
112    def execute(self, parsed_request):
113        print("Sending billing data to %s" % (self._url))
114        url_sender = self._resources.url_handle
115        # Note that here we attach our update_progress function (which is a
116        # function that the engine also 'binds' to) to the progress function
117        # that the url sending helper class uses. This allows the task progress
118        # to be tied to the url sending progress, which is very useful for
119        # downstream systems to be aware of what a task is doing at any time.
120        url_sender.send(self._url, json.dumps(parsed_request),
121                        status_cb=self.update_progress)
122        return self._url
123
124    def update_progress(self, progress, **kwargs):
125        # Override the parent method to also print out the status.
126        super(ActivateDriver, self).update_progress(progress, **kwargs)
127        print("%s is %0.2f%% done" % (self.name, progress * 100))
128
129
130class DeclareSuccess(task.Task):
131    def execute(self, sent_to):
132        print("Done!")
133        print("All data processed and sent to %s" % (sent_to))
134
135
136class DummyUser(object):
137    def __init__(self, user, id_):
138        self.user = user
139        self.id = id_
140
141
142# Resources (db handles and similar) of course can *not* be persisted so we
143# need to make sure that we pass this resource fetcher to the tasks constructor
144# so that the tasks have access to any needed resources (the resources are
145# lazily loaded so that they are only created when they are used).
146resources = ResourceFetcher()
147flow = lf.Flow("initialize-me")
148
149# 1. First we extract the api request into a usable format.
150# 2. Then we go ahead and make a database entry for our request.
151flow.add(ExtractInputRequest(resources), MakeDBEntry(resources))
152
153# 3. Then we activate our payment method and finally declare success.
154sub_flow = gf.Flow("after-initialize")
155sub_flow.add(ActivateDriver(resources), DeclareSuccess())
156flow.add(sub_flow)
157
158# Initially populate the storage with the following request object,
159# prepopulating this allows the tasks that dependent on the 'request' variable
160# to start processing (in this case this is the ExtractInputRequest task).
161store = {
162    'request': DummyUser(user="bob", id_="1.35"),
163}
164eng = engines.load(flow, engine='serial', store=store)
165
166# This context manager automatically adds (and automatically removes) a
167# helpful set of state transition notification printing helper utilities
168# that show you exactly what transitions the engine is going through
169# while running the various billing related tasks.
170with printing.PrintingListener(eng):
171    eng.run()

Suspending a workflow & resuming

Note

Full source located at resume_from_backend

  1
  2import contextlib
  3import logging
  4import os
  5import sys
  6
  7logging.basicConfig(level=logging.ERROR)
  8
  9self_dir = os.path.abspath(os.path.dirname(__file__))
 10top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 11                                       os.pardir,
 12                                       os.pardir))
 13sys.path.insert(0, top_dir)
 14sys.path.insert(0, self_dir)
 15
 16from oslo_utils import uuidutils
 17
 18import taskflow.engines
 19from taskflow.patterns import linear_flow as lf
 20from taskflow.persistence import models
 21from taskflow import task
 22
 23import example_utils as eu  # noqa
 24
 25# INTRO: In this example linear_flow is used to group three tasks, one which
 26# will suspend the future work the engine may do. This suspend engine is then
 27# discarded and the workflow is reloaded from the persisted data and then the
 28# workflow is resumed from where it was suspended. This allows you to see how
 29# to start an engine, have a task stop the engine from doing future work (if
 30# a multi-threaded engine is being used, then the currently active work is not
 31# preempted) and then resume the work later.
 32#
 33# Usage:
 34#
 35#   With a filesystem directory as backend
 36#
 37#     python taskflow/examples/resume_from_backend.py
 38#
 39#   With ZooKeeper as backend
 40#
 41#     python taskflow/examples/resume_from_backend.py \
 42#       zookeeper://127.0.0.1:2181/taskflow/resume_from_backend/
 43
 44
 45# UTILITY FUNCTIONS #########################################
 46
 47
 48def print_task_states(flowdetail, msg):
 49    eu.print_wrapped(msg)
 50    print("Flow '%s' state: %s" % (flowdetail.name, flowdetail.state))
 51    # Sort by these so that our test validation doesn't get confused by the
 52    # order in which the items in the flow detail can be in.
 53    items = sorted((td.name, td.version, td.state, td.results)
 54                   for td in flowdetail)
 55    for item in items:
 56        print(" %s==%s: %s, result=%s" % item)
 57
 58
 59def find_flow_detail(backend, lb_id, fd_id):
 60    conn = backend.get_connection()
 61    lb = conn.get_logbook(lb_id)
 62    return lb.find(fd_id)
 63
 64
 65# CREATE FLOW ###############################################
 66
 67
 68class InterruptTask(task.Task):
 69    def execute(self):
 70        # DO NOT TRY THIS AT HOME
 71        engine.suspend()
 72
 73
 74class TestTask(task.Task):
 75    def execute(self):
 76        print('executing %s' % self)
 77        return 'ok'
 78
 79
 80def flow_factory():
 81    return lf.Flow('resume from backend example').add(
 82        TestTask(name='first'),
 83        InterruptTask(name='boom'),
 84        TestTask(name='second'))
 85
 86
 87# INITIALIZE PERSISTENCE ####################################
 88
 89with eu.get_backend() as backend:
 90
 91    # Create a place where the persistence information will be stored.
 92    book = models.LogBook("example")
 93    flow_detail = models.FlowDetail("resume from backend example",
 94                                    uuid=uuidutils.generate_uuid())
 95    book.add(flow_detail)
 96    with contextlib.closing(backend.get_connection()) as conn:
 97        conn.save_logbook(book)
 98
 99    # CREATE AND RUN THE FLOW: FIRST ATTEMPT ####################
100
101    flow = flow_factory()
102    engine = taskflow.engines.load(flow, flow_detail=flow_detail,
103                                   book=book, backend=backend)
104
105    print_task_states(flow_detail, "At the beginning, there is no state")
106    eu.print_wrapped("Running")
107    engine.run()
108    print_task_states(flow_detail, "After running")
109
110    # RE-CREATE, RESUME, RUN ####################################
111
112    eu.print_wrapped("Resuming and running again")
113
114    # NOTE(harlowja): reload the flow detail from backend, this will allow us
115    # to resume the flow from its suspended state, but first we need to search
116    # for the right flow details in the correct logbook where things are
117    # stored.
118    #
119    # We could avoid re-loading the engine and just do engine.run() again, but
120    # this example shows how another process may unsuspend a given flow and
121    # start it again for situations where this is useful to-do (say the process
122    # running the above flow crashes).
123    flow2 = flow_factory()
124    flow_detail_2 = find_flow_detail(backend, book.uuid, flow_detail.uuid)
125    engine2 = taskflow.engines.load(flow2,
126                                    flow_detail=flow_detail_2,
127                                    backend=backend, book=book)
128    engine2.run()
129    print_task_states(flow_detail_2, "At the end")

Creating a virtual machine (resumable)

Note

Full source located at resume_vm_boot

  1
  2import contextlib
  3import hashlib
  4import logging
  5import os
  6import random
  7import sys
  8import time
  9
 10logging.basicConfig(level=logging.ERROR)
 11
 12self_dir = os.path.abspath(os.path.dirname(__file__))
 13top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 14                                       os.pardir,
 15                                       os.pardir))
 16sys.path.insert(0, top_dir)
 17sys.path.insert(0, self_dir)
 18
 19import futurist
 20from oslo_utils import uuidutils
 21
 22from taskflow import engines
 23from taskflow import exceptions as exc
 24from taskflow.patterns import graph_flow as gf
 25from taskflow.patterns import linear_flow as lf
 26from taskflow.persistence import models
 27from taskflow import task
 28
 29import example_utils as eu  # noqa
 30
 31# INTRO: These examples show how a hierarchy of flows can be used to create a
 32# vm in a reliable & resumable manner using taskflow + a miniature version of
 33# what nova does while booting a vm.
 34
 35
 36@contextlib.contextmanager
 37def slow_down(how_long=0.5):
 38    try:
 39        yield how_long
 40    finally:
 41        if len(sys.argv) > 1:
 42            # Only both to do this if user input provided.
 43            print("** Ctrl-c me please!!! **")
 44            time.sleep(how_long)
 45
 46
 47class PrintText(task.Task):
 48    """Just inserts some text print outs in a workflow."""
 49    def __init__(self, print_what, no_slow=False):
 50        content_hash = hashlib.md5(print_what.encode('utf-8')).hexdigest()[0:8]
 51        super(PrintText, self).__init__(name="Print: %s" % (content_hash))
 52        self._text = print_what
 53        self._no_slow = no_slow
 54
 55    def execute(self):
 56        if self._no_slow:
 57            eu.print_wrapped(self._text)
 58        else:
 59            with slow_down():
 60                eu.print_wrapped(self._text)
 61
 62
 63class DefineVMSpec(task.Task):
 64    """Defines a vm specification to be."""
 65    def __init__(self, name):
 66        super(DefineVMSpec, self).__init__(provides='vm_spec', name=name)
 67
 68    def execute(self):
 69        return {
 70            'type': 'kvm',
 71            'disks': 2,
 72            'vcpu': 1,
 73            'ips': 1,
 74            'volumes': 3,
 75        }
 76
 77
 78class LocateImages(task.Task):
 79    """Locates where the vm images are."""
 80    def __init__(self, name):
 81        super(LocateImages, self).__init__(provides='image_locations',
 82                                           name=name)
 83
 84    def execute(self, vm_spec):
 85        image_locations = {}
 86        for i in range(0, vm_spec['disks']):
 87            url = "http://www.yahoo.com/images/%s" % (i)
 88            image_locations[url] = "/tmp/%s.img" % (i)
 89        return image_locations
 90
 91
 92class DownloadImages(task.Task):
 93    """Downloads all the vm images."""
 94    def __init__(self, name):
 95        super(DownloadImages, self).__init__(provides='download_paths',
 96                                             name=name)
 97
 98    def execute(self, image_locations):
 99        for src, loc in image_locations.items():
100            with slow_down(1):
101                print("Downloading from %s => %s" % (src, loc))
102        return sorted(image_locations.values())
103
104
105class CreateNetworkTpl(task.Task):
106    """Generates the network settings file to be placed in the images."""
107    SYSCONFIG_CONTENTS = """DEVICE=eth%s
108BOOTPROTO=static
109IPADDR=%s
110ONBOOT=yes"""
111
112    def __init__(self, name):
113        super(CreateNetworkTpl, self).__init__(provides='network_settings',
114                                               name=name)
115
116    def execute(self, ips):
117        settings = []
118        for i, ip in enumerate(ips):
119            settings.append(self.SYSCONFIG_CONTENTS % (i, ip))
120        return settings
121
122
123class AllocateIP(task.Task):
124    """Allocates the ips for the given vm."""
125    def __init__(self, name):
126        super(AllocateIP, self).__init__(provides='ips', name=name)
127
128    def execute(self, vm_spec):
129        ips = []
130        for _i in range(0, vm_spec.get('ips', 0)):
131            ips.append("192.168.0.%s" % (random.randint(1, 254)))
132        return ips
133
134
135class WriteNetworkSettings(task.Task):
136    """Writes all the network settings into the downloaded images."""
137    def execute(self, download_paths, network_settings):
138        for j, path in enumerate(download_paths):
139            with slow_down(1):
140                print("Mounting %s to /tmp/%s" % (path, j))
141            for i, setting in enumerate(network_settings):
142                filename = ("/tmp/etc/sysconfig/network-scripts/"
143                            "ifcfg-eth%s" % (i))
144                with slow_down(1):
145                    print("Writing to %s" % (filename))
146                    print(setting)
147
148
149class BootVM(task.Task):
150    """Fires off the vm boot operation."""
151    def execute(self, vm_spec):
152        print("Starting vm!")
153        with slow_down(1):
154            print("Created: %s" % (vm_spec))
155
156
157class AllocateVolumes(task.Task):
158    """Allocates the volumes for the vm."""
159    def execute(self, vm_spec):
160        volumes = []
161        for i in range(0, vm_spec['volumes']):
162            with slow_down(1):
163                volumes.append("/dev/vda%s" % (i + 1))
164                print("Allocated volume %s" % volumes[-1])
165        return volumes
166
167
168class FormatVolumes(task.Task):
169    """Formats the volumes for the vm."""
170    def execute(self, volumes):
171        for v in volumes:
172            print("Formatting volume %s" % v)
173            with slow_down(1):
174                pass
175            print("Formatted volume %s" % v)
176
177
178def create_flow():
179    # Setup the set of things to do (mini-nova).
180    flow = lf.Flow("root").add(
181        PrintText("Starting vm creation.", no_slow=True),
182        lf.Flow('vm-maker').add(
183            # First create a specification for the final vm to-be.
184            DefineVMSpec("define_spec"),
185            # This does all the image stuff.
186            gf.Flow("img-maker").add(
187                LocateImages("locate_images"),
188                DownloadImages("download_images"),
189            ),
190            # This does all the network stuff.
191            gf.Flow("net-maker").add(
192                AllocateIP("get_my_ips"),
193                CreateNetworkTpl("fetch_net_settings"),
194                WriteNetworkSettings("write_net_settings"),
195            ),
196            # This does all the volume stuff.
197            gf.Flow("volume-maker").add(
198                AllocateVolumes("allocate_my_volumes", provides='volumes'),
199                FormatVolumes("volume_formatter"),
200            ),
201            # Finally boot it all.
202            BootVM("boot-it"),
203        ),
204        # Ya it worked!
205        PrintText("Finished vm create.", no_slow=True),
206        PrintText("Instance is running!", no_slow=True))
207    return flow
208
209eu.print_wrapped("Initializing")
210
211# Setup the persistence & resumption layer.
212with eu.get_backend() as backend:
213
214    # Try to find a previously passed in tracking id...
215    try:
216        book_id, flow_id = sys.argv[2].split("+", 1)
217        if not uuidutils.is_uuid_like(book_id):
218            book_id = None
219        if not uuidutils.is_uuid_like(flow_id):
220            flow_id = None
221    except (IndexError, ValueError):
222        book_id = None
223        flow_id = None
224
225    # Set up how we want our engine to run, serial, parallel...
226    try:
227        executor = futurist.GreenThreadPoolExecutor(max_workers=5)
228    except RuntimeError:
229        # No eventlet installed, just let the default be used instead.
230        executor = None
231
232    # Create/fetch a logbook that will track the workflows work.
233    book = None
234    flow_detail = None
235    if all([book_id, flow_id]):
236        # Try to find in a prior logbook and flow detail...
237        with contextlib.closing(backend.get_connection()) as conn:
238            try:
239                book = conn.get_logbook(book_id)
240                flow_detail = book.find(flow_id)
241            except exc.NotFound:
242                pass
243    if book is None and flow_detail is None:
244        book = models.LogBook("vm-boot")
245        with contextlib.closing(backend.get_connection()) as conn:
246            conn.save_logbook(book)
247        engine = engines.load_from_factory(create_flow,
248                                           backend=backend, book=book,
249                                           engine='parallel',
250                                           executor=executor)
251        print("!! Your tracking id is: '%s+%s'" % (book.uuid,
252                                                   engine.storage.flow_uuid))
253        print("!! Please submit this on later runs for tracking purposes")
254    else:
255        # Attempt to load from a previously partially completed flow.
256        engine = engines.load_from_detail(flow_detail, backend=backend,
257                                          engine='parallel', executor=executor)
258
259    # Make me my vm please!
260    eu.print_wrapped('Running')
261    engine.run()
262
263# How to use.
264#
265# 1. $ python me.py "sqlite:////tmp/nova.db"
266# 2. ctrl-c before this finishes
267# 3. Find the tracking id (search for 'Your tracking id is')
268# 4. $ python me.py "sqlite:////tmp/cinder.db" "$tracking_id"
269# 5. Watch it pick up where it left off.
270# 6. Profit!

Creating a volume (resumable)

Note

Full source located at resume_volume_create

  1
  2import contextlib
  3import hashlib
  4import logging
  5import os
  6import random
  7import sys
  8import time
  9
 10logging.basicConfig(level=logging.ERROR)
 11
 12self_dir = os.path.abspath(os.path.dirname(__file__))
 13top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 14                                       os.pardir,
 15                                       os.pardir))
 16sys.path.insert(0, top_dir)
 17sys.path.insert(0, self_dir)
 18
 19from oslo_utils import uuidutils
 20
 21from taskflow import engines
 22from taskflow.patterns import graph_flow as gf
 23from taskflow.patterns import linear_flow as lf
 24from taskflow.persistence import models
 25from taskflow import task
 26
 27import example_utils  # noqa
 28
 29# INTRO: These examples show how a hierarchy of flows can be used to create a
 30# pseudo-volume in a reliable & resumable manner using taskflow + a miniature
 31# version of what cinder does while creating a volume (very miniature).
 32
 33
 34@contextlib.contextmanager
 35def slow_down(how_long=0.5):
 36    try:
 37        yield how_long
 38    finally:
 39        print("** Ctrl-c me please!!! **")
 40        time.sleep(how_long)
 41
 42
 43def find_flow_detail(backend, book_id, flow_id):
 44    # NOTE(harlowja): this is used to attempt to find a given logbook with
 45    # a given id and a given flow details inside that logbook, we need this
 46    # reference so that we can resume the correct flow (as a logbook tracks
 47    # flows and a flow detail tracks a individual flow).
 48    #
 49    # Without a reference to the logbook and the flow details in that logbook
 50    # we will not know exactly what we should resume and that would mean we
 51    # can't resume what we don't know.
 52    with contextlib.closing(backend.get_connection()) as conn:
 53        lb = conn.get_logbook(book_id)
 54        return lb.find(flow_id)
 55
 56
 57class PrintText(task.Task):
 58    def __init__(self, print_what, no_slow=False):
 59        content_hash = hashlib.md5(print_what.encode('utf-8')).hexdigest()[0:8]
 60        super(PrintText, self).__init__(name="Print: %s" % (content_hash))
 61        self._text = print_what
 62        self._no_slow = no_slow
 63
 64    def execute(self):
 65        if self._no_slow:
 66            print("-" * (len(self._text)))
 67            print(self._text)
 68            print("-" * (len(self._text)))
 69        else:
 70            with slow_down():
 71                print("-" * (len(self._text)))
 72                print(self._text)
 73                print("-" * (len(self._text)))
 74
 75
 76class CreateSpecForVolumes(task.Task):
 77    def execute(self):
 78        volumes = []
 79        for i in range(0, random.randint(1, 10)):
 80            volumes.append({
 81                'type': 'disk',
 82                'location': "/dev/vda%s" % (i + 1),
 83            })
 84        return volumes
 85
 86
 87class PrepareVolumes(task.Task):
 88    def execute(self, volume_specs):
 89        for v in volume_specs:
 90            with slow_down():
 91                print("Dusting off your hard drive %s" % (v))
 92            with slow_down():
 93                print("Taking a well deserved break.")
 94            print("Your drive %s has been certified." % (v))
 95
 96
 97# Setup the set of things to do (mini-cinder).
 98flow = lf.Flow("root").add(
 99    PrintText("Starting volume create", no_slow=True),
100    gf.Flow('maker').add(
101        CreateSpecForVolumes("volume_specs", provides='volume_specs'),
102        PrintText("I need a nap, it took me a while to build those specs."),
103        PrepareVolumes(),
104    ),
105    PrintText("Finished volume create", no_slow=True))
106
107# Setup the persistence & resumption layer.
108with example_utils.get_backend() as backend:
109    try:
110        book_id, flow_id = sys.argv[2].split("+", 1)
111    except (IndexError, ValueError):
112        book_id = None
113        flow_id = None
114
115    if not all([book_id, flow_id]):
116        # If no 'tracking id' (think a fedex or ups tracking id) is provided
117        # then we create one by creating a logbook (where flow details are
118        # stored) and creating a flow detail (where flow and task state is
119        # stored). The combination of these 2 objects unique ids (uuids) allows
120        # the users of taskflow to reassociate the workflows that were
121        # potentially running (and which may have partially completed) back
122        # with taskflow so that those workflows can be resumed (or reverted)
123        # after a process/thread/engine has failed in someway.
124        book = models.LogBook('resume-volume-create')
125        flow_detail = models.FlowDetail("root", uuid=uuidutils.generate_uuid())
126        book.add(flow_detail)
127        with contextlib.closing(backend.get_connection()) as conn:
128            conn.save_logbook(book)
129        print("!! Your tracking id is: '%s+%s'" % (book.uuid,
130                                                   flow_detail.uuid))
131        print("!! Please submit this on later runs for tracking purposes")
132    else:
133        flow_detail = find_flow_detail(backend, book_id, flow_id)
134
135    # Load and run.
136    engine = engines.load(flow,
137                          flow_detail=flow_detail,
138                          backend=backend, engine='serial')
139    engine.run()
140
141# How to use.
142#
143# 1. $ python me.py "sqlite:////tmp/cinder.db"
144# 2. ctrl-c before this finishes
145# 3. Find the tracking id (search for 'Your tracking id is')
146# 4. $ python me.py "sqlite:////tmp/cinder.db" "$tracking_id"
147# 5. Profit!

Running engines via iteration

Note

Full source located at run_by_iter

 1
 2import logging
 3import os
 4import sys
 5
 6logging.basicConfig(level=logging.ERROR)
 7
 8self_dir = os.path.abspath(os.path.dirname(__file__))
 9top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
10                                       os.pardir,
11                                       os.pardir))
12sys.path.insert(0, top_dir)
13sys.path.insert(0, self_dir)
14
15
16from taskflow import engines
17from taskflow.patterns import linear_flow as lf
18from taskflow import task
19
20
21# INTRO: This example shows how to run a set of engines at the same time, each
22# running in different engines using a single thread of control to iterate over
23# each engine (which causes that engine to advanced to its next state during
24# each iteration).
25
26
27class EchoTask(task.Task):
28    def execute(self, value):
29        print(value)
30        return chr(ord(value) + 1)
31
32
33def make_alphabet_flow(i):
34    f = lf.Flow("alphabet_%s" % (i))
35    start_value = 'A'
36    end_value = 'Z'
37    curr_value = start_value
38    while ord(curr_value) <= ord(end_value):
39        next_value = chr(ord(curr_value) + 1)
40        if curr_value != end_value:
41            f.add(EchoTask(name="echoer_%s" % curr_value,
42                           rebind={'value': curr_value},
43                           provides=next_value))
44        else:
45            f.add(EchoTask(name="echoer_%s" % curr_value,
46                           rebind={'value': curr_value}))
47        curr_value = next_value
48    return f
49
50
51# Adjust this number to change how many engines/flows run at once.
52flow_count = 1
53flows = []
54for i in range(0, flow_count):
55    f = make_alphabet_flow(i + 1)
56    flows.append(make_alphabet_flow(i + 1))
57engine_iters = []
58for f in flows:
59    e = engines.load(f)
60    e.compile()
61    e.storage.inject({'A': 'A'})
62    e.prepare()
63    engine_iters.append(e.run_iter())
64while engine_iters:
65    for it in list(engine_iters):
66        try:
67            print(next(it))
68        except StopIteration:
69            engine_iters.remove(it)

Controlling retries using a retry controller

Note

Full source located at retry_flow

 1
 2import logging
 3import os
 4import sys
 5
 6logging.basicConfig(level=logging.ERROR)
 7
 8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 9                                       os.pardir,
10                                       os.pardir))
11sys.path.insert(0, top_dir)
12
13import taskflow.engines
14from taskflow.patterns import linear_flow as lf
15from taskflow import retry
16from taskflow import task
17
18# INTRO: In this example we create a retry controller that receives a phone
19# directory and tries different phone numbers. The next task tries to call Jim
20# using the given number. If it is not a Jim's number, the task raises an
21# exception and retry controller takes the next number from the phone
22# directory and retries the call.
23#
24# This example shows a basic usage of retry controllers in a flow.
25# Retry controllers allows to revert and retry a failed subflow with new
26# parameters.
27
28
29class CallJim(task.Task):
30    def execute(self, jim_number):
31        print("Calling jim %s." % jim_number)
32        if jim_number != 555:
33            raise Exception("Wrong number!")
34        else:
35            print("Hello Jim!")
36
37    def revert(self, jim_number, **kwargs):
38        print("Wrong number, apologizing.")
39
40
41# Create your flow and associated tasks (the work to be done).
42flow = lf.Flow('retrying-linear',
43               retry=retry.ParameterizedForEach(
44                   rebind=['phone_directory'],
45                   provides='jim_number')).add(CallJim())
46
47# Now run that flow using the provided initial data (store below).
48taskflow.engines.run(flow, store={'phone_directory': [333, 444, 555, 666]})

Distributed execution (simple)

Note

Full source located at wbe_simple_linear

  1
  2import json
  3import logging
  4import os
  5import sys
  6import tempfile
  7
  8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
  9                                       os.pardir,
 10                                       os.pardir))
 11sys.path.insert(0, top_dir)
 12
 13from taskflow import engines
 14from taskflow.engines.worker_based import worker
 15from taskflow.patterns import linear_flow as lf
 16from taskflow.tests import utils
 17from taskflow.utils import threading_utils
 18
 19import example_utils  # noqa
 20
 21# INTRO: This example walks through a miniature workflow which shows how to
 22# start up a number of workers (these workers will process task execution and
 23# reversion requests using any provided input data) and then use an engine
 24# that creates a set of *capable* tasks and flows (the engine can not create
 25# tasks that the workers are not able to run, this will end in failure) that
 26# those workers will run and then executes that workflow seamlessly using the
 27# workers to perform the actual execution.
 28#
 29# NOTE(harlowja): this example simulates the expected larger number of workers
 30# by using a set of threads (which in this example simulate the remote workers
 31# that would typically be running on other external machines).
 32
 33# A filesystem can also be used as the queue transport (useful as simple
 34# transport type that does not involve setting up a larger mq system). If this
 35# is false then the memory transport is used instead, both work in standalone
 36# setups.
 37USE_FILESYSTEM = False
 38BASE_SHARED_CONF = {
 39    'exchange': 'taskflow',
 40}
 41
 42# Until https://github.com/celery/kombu/issues/398 is resolved it is not
 43# recommended to run many worker threads in this example due to the types
 44# of errors mentioned in that issue.
 45MEMORY_WORKERS = 2
 46FILE_WORKERS = 1
 47WORKER_CONF = {
 48    # These are the tasks the worker can execute, they *must* be importable,
 49    # typically this list is used to restrict what workers may execute to
 50    # a smaller set of *allowed* tasks that are known to be safe (one would
 51    # not want to allow all python code to be executed).
 52    'tasks': [
 53        'taskflow.tests.utils:TaskOneArgOneReturn',
 54        'taskflow.tests.utils:TaskMultiArgOneReturn'
 55    ],
 56}
 57
 58
 59def run(engine_options):
 60    flow = lf.Flow('simple-linear').add(
 61        utils.TaskOneArgOneReturn(provides='result1'),
 62        utils.TaskMultiArgOneReturn(provides='result2')
 63    )
 64    eng = engines.load(flow,
 65                       store=dict(x=111, y=222, z=333),
 66                       engine='worker-based', **engine_options)
 67    eng.run()
 68    return eng.storage.fetch_all()
 69
 70
 71if __name__ == "__main__":
 72    logging.basicConfig(level=logging.ERROR)
 73
 74    # Setup our transport configuration and merge it into the worker and
 75    # engine configuration so that both of those use it correctly.
 76    shared_conf = dict(BASE_SHARED_CONF)
 77
 78    tmp_path = None
 79    if USE_FILESYSTEM:
 80        worker_count = FILE_WORKERS
 81        tmp_path = tempfile.mkdtemp(prefix='wbe-example-')
 82        shared_conf.update({
 83            'transport': 'filesystem',
 84            'transport_options': {
 85                'data_folder_in': tmp_path,
 86                'data_folder_out': tmp_path,
 87                'polling_interval': 0.1,
 88            },
 89        })
 90    else:
 91        worker_count = MEMORY_WORKERS
 92        shared_conf.update({
 93            'transport': 'memory',
 94            'transport_options': {
 95                'polling_interval': 0.1,
 96            },
 97        })
 98    worker_conf = dict(WORKER_CONF)
 99    worker_conf.update(shared_conf)
100    engine_options = dict(shared_conf)
101    workers = []
102    worker_topics = []
103
104    try:
105        # Create a set of workers to simulate actual remote workers.
106        print('Running %s workers.' % (worker_count))
107        for i in range(0, worker_count):
108            worker_conf['topic'] = 'worker-%s' % (i + 1)
109            worker_topics.append(worker_conf['topic'])
110            w = worker.Worker(**worker_conf)
111            runner = threading_utils.daemon_thread(w.run)
112            runner.start()
113            w.wait()
114            workers.append((runner, w.stop))
115
116        # Now use those workers to do something.
117        print('Executing some work.')
118        engine_options['topics'] = worker_topics
119        result = run(engine_options)
120        print('Execution finished.')
121        # This is done so that the test examples can work correctly
122        # even when the keys change order (which will happen in various
123        # python versions).
124        print("Result = %s" % json.dumps(result, sort_keys=True))
125    finally:
126        # And cleanup.
127        print('Stopping workers.')
128        while workers:
129            r, stopper = workers.pop()
130            stopper()
131            r.join()
132        if tmp_path:
133            example_utils.rm_path(tmp_path)

Distributed notification (simple)

Note

Full source located at wbe_event_sender

  1
  2import logging
  3import os
  4import string
  5import sys
  6import time
  7
  8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
  9                                       os.pardir,
 10                                       os.pardir))
 11sys.path.insert(0, top_dir)
 12
 13from taskflow import engines
 14from taskflow.engines.worker_based import worker
 15from taskflow.patterns import linear_flow as lf
 16from taskflow import task
 17from taskflow.types import notifier
 18from taskflow.utils import threading_utils
 19
 20ANY = notifier.Notifier.ANY
 21
 22# INTRO: These examples show how to use a remote worker's event notification
 23# attribute to proxy back task event notifications to the controlling process.
 24#
 25# In this case a simple set of events is triggered by a worker running a
 26# task (simulated to be remote by using a kombu memory transport and threads).
 27# Those events that the 'remote worker' produces will then be proxied back to
 28# the task that the engine is running 'remotely', and then they will be emitted
 29# back to the original callbacks that exist in the originating engine
 30# process/thread. This creates a one-way *notification* channel that can
 31# transparently be used in-process, outside-of-process using remote workers and
 32# so-on that allows tasks to signal to its controlling process some sort of
 33# action that has occurred that the task may need to tell others about (for
 34# example to trigger some type of response when the task reaches 50% done...).
 35
 36
 37def event_receiver(event_type, details):
 38    """This is the callback that (in this example) doesn't do much..."""
 39    print("Recieved event '%s'" % event_type)
 40    print("Details = %s" % details)
 41
 42
 43class EventReporter(task.Task):
 44    """This is the task that will be running 'remotely' (not really remote)."""
 45
 46    EVENTS = tuple(string.ascii_uppercase)
 47    EVENT_DELAY = 0.1
 48
 49    def execute(self):
 50        for i, e in enumerate(self.EVENTS):
 51            details = {
 52                'leftover': self.EVENTS[i:],
 53            }
 54            self.notifier.notify(e, details)
 55            time.sleep(self.EVENT_DELAY)
 56
 57
 58BASE_SHARED_CONF = {
 59    'exchange': 'taskflow',
 60    'transport': 'memory',
 61    'transport_options': {
 62        'polling_interval': 0.1,
 63    },
 64}
 65
 66# Until https://github.com/celery/kombu/issues/398 is resolved it is not
 67# recommended to run many worker threads in this example due to the types
 68# of errors mentioned in that issue.
 69MEMORY_WORKERS = 1
 70WORKER_CONF = {
 71    'tasks': [
 72        # Used to locate which tasks we can run (we don't want to allow
 73        # arbitrary code/tasks to be ran by any worker since that would
 74        # open up a variety of vulnerabilities).
 75        '%s:EventReporter' % (__name__),
 76    ],
 77}
 78
 79
 80def run(engine_options):
 81    reporter = EventReporter()
 82    reporter.notifier.register(ANY, event_receiver)
 83    flow = lf.Flow('event-reporter').add(reporter)
 84    eng = engines.load(flow, engine='worker-based', **engine_options)
 85    eng.run()
 86
 87
 88if __name__ == "__main__":
 89    logging.basicConfig(level=logging.ERROR)
 90
 91    # Setup our transport configuration and merge it into the worker and
 92    # engine configuration so that both of those objects use it correctly.
 93    worker_conf = dict(WORKER_CONF)
 94    worker_conf.update(BASE_SHARED_CONF)
 95    engine_options = dict(BASE_SHARED_CONF)
 96    workers = []
 97
 98    # These topics will be used to request worker information on; those
 99    # workers will respond with their capabilities which the executing engine
100    # will use to match pending tasks to a matched worker, this will cause
101    # the task to be sent for execution, and the engine will wait until it
102    # is finished (a response is received) and then the engine will either
103    # continue with other tasks, do some retry/failure resolution logic or
104    # stop (and potentially re-raise the remote workers failure)...
105    worker_topics = []
106
107    try:
108        # Create a set of worker threads to simulate actual remote workers...
109        print('Running %s workers.' % (MEMORY_WORKERS))
110        for i in range(0, MEMORY_WORKERS):
111            # Give each one its own unique topic name so that they can
112            # correctly communicate with the engine (they will all share the
113            # same exchange).
114            worker_conf['topic'] = 'worker-%s' % (i + 1)
115            worker_topics.append(worker_conf['topic'])
116            w = worker.Worker(**worker_conf)
117            runner = threading_utils.daemon_thread(w.run)
118            runner.start()
119            w.wait()
120            workers.append((runner, w.stop))
121
122        # Now use those workers to do something.
123        print('Executing some work.')
124        engine_options['topics'] = worker_topics
125        result = run(engine_options)
126        print('Execution finished.')
127    finally:
128        # And cleanup.
129        print('Stopping workers.')
130        while workers:
131            r, stopper = workers.pop()
132            stopper()
133            r.join()

Distributed mandelbrot (complex)

Note

Full source located at wbe_mandelbrot

Output

Generated mandelbrot fractal

Code

  1
  2import logging
  3import math
  4import os
  5import sys
  6
  7top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
  8                                       os.pardir,
  9                                       os.pardir))
 10sys.path.insert(0, top_dir)
 11
 12from taskflow import engines
 13from taskflow.engines.worker_based import worker
 14from taskflow.patterns import unordered_flow as uf
 15from taskflow import task
 16from taskflow.utils import threading_utils
 17
 18# INTRO: This example walks through a workflow that will in parallel compute
 19# a mandelbrot result set (using X 'remote' workers) and then combine their
 20# results together to form a final mandelbrot fractal image. It shows a usage
 21# of taskflow to perform a well-known embarrassingly parallel problem that has
 22# the added benefit of also being an elegant visualization.
 23#
 24# NOTE(harlowja): this example simulates the expected larger number of workers
 25# by using a set of threads (which in this example simulate the remote workers
 26# that would typically be running on other external machines).
 27#
 28# NOTE(harlowja): to have it produce an image run (after installing pillow):
 29#
 30# $ python taskflow/examples/wbe_mandelbrot.py output.png
 31
 32BASE_SHARED_CONF = {
 33    'exchange': 'taskflow',
 34}
 35WORKERS = 2
 36WORKER_CONF = {
 37    # These are the tasks the worker can execute, they *must* be importable,
 38    # typically this list is used to restrict what workers may execute to
 39    # a smaller set of *allowed* tasks that are known to be safe (one would
 40    # not want to allow all python code to be executed).
 41    'tasks': [
 42        '%s:MandelCalculator' % (__name__),
 43    ],
 44}
 45ENGINE_CONF = {
 46    'engine': 'worker-based',
 47}
 48
 49# Mandelbrot & image settings...
 50IMAGE_SIZE = (512, 512)
 51CHUNK_COUNT = 8
 52MAX_ITERATIONS = 25
 53
 54
 55class MandelCalculator(task.Task):
 56    def execute(self, image_config, mandelbrot_config, chunk):
 57        """Returns the number of iterations before the computation "escapes".
 58
 59        Given the real and imaginary parts of a complex number, determine if it
 60        is a candidate for membership in the mandelbrot set given a fixed
 61        number of iterations.
 62        """
 63
 64        # Parts borrowed from (credit to mark harris and benoît mandelbrot).
 65        #
 66        # http://nbviewer.ipython.org/gist/harrism/f5707335f40af9463c43
 67        def mandelbrot(x, y, max_iters):
 68            c = complex(x, y)
 69            z = 0.0j
 70            for i in range(max_iters):
 71                z = z * z + c
 72                if (z.real * z.real + z.imag * z.imag) >= 4:
 73                    return i
 74            return max_iters
 75
 76        min_x, max_x, min_y, max_y, max_iters = mandelbrot_config
 77        height, width = image_config['size']
 78        pixel_size_x = (max_x - min_x) / width
 79        pixel_size_y = (max_y - min_y) / height
 80        block = []
 81        for y in range(chunk[0], chunk[1]):
 82            row = []
 83            imag = min_y + y * pixel_size_y
 84            for x in range(0, width):
 85                real = min_x + x * pixel_size_x
 86                row.append(mandelbrot(real, imag, max_iters))
 87            block.append(row)
 88        return block
 89
 90
 91def calculate(engine_conf):
 92    # Subdivide the work into X pieces, then request each worker to calculate
 93    # one of those chunks and then later we will write these chunks out to
 94    # an image bitmap file.
 95
 96    # And unordered flow is used here since the mandelbrot calculation is an
 97    # example of an embarrassingly parallel computation that we can scatter
 98    # across as many workers as possible.
 99    flow = uf.Flow("mandelbrot")
100
101    # These symbols will be automatically given to tasks as input to their
102    # execute method, in this case these are constants used in the mandelbrot
103    # calculation.
104    store = {
105        'mandelbrot_config': [-2.0, 1.0, -1.0, 1.0, MAX_ITERATIONS],
106        'image_config': {
107            'size': IMAGE_SIZE,
108        }
109    }
110
111    # We need the task names to be in the right order so that we can extract
112    # the final results in the right order (we don't care about the order when
113    # executing).
114    task_names = []
115
116    # Compose our workflow.
117    height, _width = IMAGE_SIZE
118    chunk_size = int(math.ceil(height / float(CHUNK_COUNT)))
119    for i in range(0, CHUNK_COUNT):
120        chunk_name = 'chunk_%s' % i
121        task_name = "calculation_%s" % i
122        # Break the calculation up into chunk size pieces.
123        rows = [i * chunk_size, i * chunk_size + chunk_size]
124        flow.add(
125            MandelCalculator(task_name,
126                             # This ensures the storage symbol with name
127                             # 'chunk_name' is sent into the tasks local
128                             # symbol 'chunk'. This is how we give each
129                             # calculator its own correct sequence of rows
130                             # to work on.
131                             rebind={'chunk': chunk_name}))
132        store[chunk_name] = rows
133        task_names.append(task_name)
134
135    # Now execute it.
136    eng = engines.load(flow, store=store, engine_conf=engine_conf)
137    eng.run()
138
139    # Gather all the results and order them for further processing.
140    gather = []
141    for name in task_names:
142        gather.extend(eng.storage.get(name))
143    points = []
144    for y, row in enumerate(gather):
145        for x, color in enumerate(row):
146            points.append(((x, y), color))
147    return points
148
149
150def write_image(results, output_filename=None):
151    print("Gathered %s results that represents a mandelbrot"
152          " image (using %s chunks that are computed jointly"
153          " by %s workers)." % (len(results), CHUNK_COUNT, WORKERS))
154    if not output_filename:
155        return
156
157    # Pillow (the PIL fork) saves us from writing our own image writer...
158    try:
159        from PIL import Image
160    except ImportError as e:
161        # To currently get this (may change in the future),
162        # $ pip install Pillow
163        raise RuntimeError("Pillow is required to write image files: %s" % e)
164
165    # Limit to 255, find the max and normalize to that...
166    color_max = 0
167    for _point, color in results:
168        color_max = max(color, color_max)
169
170    # Use gray scale since we don't really have other colors.
171    img = Image.new('L', IMAGE_SIZE, "black")
172    pixels = img.load()
173    for (x, y), color in results:
174        if color_max == 0:
175            color = 0
176        else:
177            color = int((float(color) / color_max) * 255.0)
178        pixels[x, y] = color
179    img.save(output_filename)
180
181
182def create_fractal():
183    logging.basicConfig(level=logging.ERROR)
184
185    # Setup our transport configuration and merge it into the worker and
186    # engine configuration so that both of those use it correctly.
187    shared_conf = dict(BASE_SHARED_CONF)
188    shared_conf.update({
189        'transport': 'memory',
190        'transport_options': {
191            'polling_interval': 0.1,
192        },
193    })
194
195    if len(sys.argv) >= 2:
196        output_filename = sys.argv[1]
197    else:
198        output_filename = None
199
200    worker_conf = dict(WORKER_CONF)
201    worker_conf.update(shared_conf)
202    engine_conf = dict(ENGINE_CONF)
203    engine_conf.update(shared_conf)
204    workers = []
205    worker_topics = []
206
207    print('Calculating your mandelbrot fractal of size %sx%s.' % IMAGE_SIZE)
208    try:
209        # Create a set of workers to simulate actual remote workers.
210        print('Running %s workers.' % (WORKERS))
211        for i in range(0, WORKERS):
212            worker_conf['topic'] = 'calculator_%s' % (i + 1)
213            worker_topics.append(worker_conf['topic'])
214            w = worker.Worker(**worker_conf)
215            runner = threading_utils.daemon_thread(w.run)
216            runner.start()
217            w.wait()
218            workers.append((runner, w.stop))
219
220        # Now use those workers to do something.
221        engine_conf['topics'] = worker_topics
222        results = calculate(engine_conf)
223        print('Execution finished.')
224    finally:
225        # And cleanup.
226        print('Stopping workers.')
227        while workers:
228            r, stopper = workers.pop()
229            stopper()
230            r.join()
231    print("Writing image...")
232    write_image(results, output_filename=output_filename)
233
234
235if __name__ == "__main__":
236    create_fractal()

Jobboard producer/consumer (simple)

Note

Full source located at jobboard_produce_consume_colors

  1
  2import collections
  3import contextlib
  4import logging
  5import os
  6import random
  7import sys
  8import threading
  9import time
 10
 11logging.basicConfig(level=logging.ERROR)
 12
 13top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 14                                       os.pardir,
 15                                       os.pardir))
 16sys.path.insert(0, top_dir)
 17
 18from zake import fake_client
 19
 20from taskflow import exceptions as excp
 21from taskflow.jobs import backends
 22from taskflow.utils import threading_utils
 23
 24# In this example we show how a jobboard can be used to post work for other
 25# entities to work on. This example creates a set of jobs using one producer
 26# thread (typically this would be split across many machines) and then having
 27# other worker threads with their own jobboards select work using a given
 28# filters [red/blue] and then perform that work (and consuming or abandoning
 29# the job after it has been completed or failed).
 30
 31# Things to note:
 32# - No persistence layer is used (or logbook), just the job details are used
 33#   to determine if a job should be selected by a worker or not.
 34# - This example runs in a single process (this is expected to be atypical
 35#   but this example shows that it can be done if needed, for testing...)
 36# - The iterjobs(), claim(), consume()/abandon() worker workflow.
 37# - The post() producer workflow.
 38
 39SHARED_CONF = {
 40    'path': "/taskflow/jobs",
 41    'board': 'zookeeper',
 42}
 43
 44# How many workers and producers of work will be created (as threads).
 45PRODUCERS = 3
 46WORKERS = 5
 47
 48# How many units of work each producer will create.
 49PRODUCER_UNITS = 10
 50
 51# How many units of work are expected to be produced (used so workers can
 52# know when to stop running and shutdown, typically this would not be a
 53# a value but we have to limit this example's execution time to be less than
 54# infinity).
 55EXPECTED_UNITS = PRODUCER_UNITS * PRODUCERS
 56
 57# Delay between producing/consuming more work.
 58WORKER_DELAY, PRODUCER_DELAY = (0.5, 0.5)
 59
 60# To ensure threads don't trample other threads output.
 61STDOUT_LOCK = threading.Lock()
 62
 63
 64def dispatch_work(job):
 65    # This is where the jobs contained work *would* be done
 66    time.sleep(1.0)
 67
 68
 69def safe_print(name, message, prefix=""):
 70    with STDOUT_LOCK:
 71        if prefix:
 72            print("%s %s: %s" % (prefix, name, message))
 73        else:
 74            print("%s: %s" % (name, message))
 75
 76
 77def worker(ident, client, consumed):
 78    # Create a personal board (using the same client so that it works in
 79    # the same process) and start looking for jobs on the board that we want
 80    # to perform.
 81    name = "W-%s" % (ident)
 82    safe_print(name, "started")
 83    claimed_jobs = 0
 84    consumed_jobs = 0
 85    abandoned_jobs = 0
 86    with backends.backend(name, SHARED_CONF.copy(), client=client) as board:
 87        while len(consumed) != EXPECTED_UNITS:
 88            favorite_color = random.choice(['blue', 'red'])
 89            for job in board.iterjobs(ensure_fresh=True, only_unclaimed=True):
 90                # See if we should even bother with it...
 91                if job.details.get('color') != favorite_color:
 92                    continue
 93                safe_print(name, "'%s' [attempting claim]" % (job))
 94                try:
 95                    board.claim(job, name)
 96                    claimed_jobs += 1
 97                    safe_print(name, "'%s' [claimed]" % (job))
 98                except (excp.NotFound, excp.UnclaimableJob):
 99                    safe_print(name, "'%s' [claim unsuccessful]" % (job))
100                else:
101                    try:
102                        dispatch_work(job)
103                        board.consume(job, name)
104                        safe_print(name, "'%s' [consumed]" % (job))
105                        consumed_jobs += 1
106                        consumed.append(job)
107                    except Exception:
108                        board.abandon(job, name)
109                        abandoned_jobs += 1
110                        safe_print(name, "'%s' [abandoned]" % (job))
111            time.sleep(WORKER_DELAY)
112    safe_print(name,
113               "finished (claimed %s jobs, consumed %s jobs,"
114               " abandoned %s jobs)" % (claimed_jobs, consumed_jobs,
115                                        abandoned_jobs), prefix=">>>")
116
117
118def producer(ident, client):
119    # Create a personal board (using the same client so that it works in
120    # the same process) and start posting jobs on the board that we want
121    # some entity to perform.
122    name = "P-%s" % (ident)
123    safe_print(name, "started")
124    with backends.backend(name, SHARED_CONF.copy(), client=client) as board:
125        for i in range(0, PRODUCER_UNITS):
126            job_name = "%s-%s" % (name, i)
127            details = {
128                'color': random.choice(['red', 'blue']),
129            }
130            job = board.post(job_name, book=None, details=details)
131            safe_print(name, "'%s' [posted]" % (job))
132            time.sleep(PRODUCER_DELAY)
133    safe_print(name, "finished", prefix=">>>")
134
135
136def main():
137    # TODO(harlowja): Hack to make eventlet work right, remove when the
138    # following is fixed: https://github.com/eventlet/eventlet/issues/230
139    from taskflow.utils import eventlet_utils as _eu  # noqa
140    try:
141        import eventlet as _eventlet  # noqa
142    except ImportError:
143        pass
144
145    with contextlib.closing(fake_client.FakeClient()) as c:
146        created = []
147        for i in range(0, PRODUCERS):
148            p = threading_utils.daemon_thread(producer, i + 1, c)
149            created.append(p)
150            p.start()
151        consumed = collections.deque()
152        for i in range(0, WORKERS):
153            w = threading_utils.daemon_thread(worker, i + 1, c, consumed)
154            created.append(w)
155            w.start()
156        while created:
157            t = created.pop()
158            t.join()
159        # At the end there should be nothing leftover, let's verify that.
160        board = backends.fetch('verifier', SHARED_CONF.copy(), client=c)
161        board.connect()
162        with contextlib.closing(board):
163            if board.job_count != 0 or len(consumed) != EXPECTED_UNITS:
164                return 1
165            return 0
166
167
168if __name__ == "__main__":
169    sys.exit(main())

Conductor simulating a CI pipeline

Note

Full source located at tox_conductor

  1
  2import contextlib
  3import itertools
  4import logging
  5import os
  6import shutil
  7import socket
  8import sys
  9import tempfile
 10import threading
 11import time
 12
 13logging.basicConfig(level=logging.ERROR)
 14
 15top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 16                                       os.pardir,
 17                                       os.pardir))
 18sys.path.insert(0, top_dir)
 19
 20from oslo_utils import timeutils
 21from oslo_utils import uuidutils
 22from zake import fake_client
 23
 24from taskflow.conductors import backends as conductors
 25from taskflow import engines
 26from taskflow.jobs import backends as boards
 27from taskflow.patterns import linear_flow
 28from taskflow.persistence import backends as persistence
 29from taskflow.persistence import models
 30from taskflow import task
 31from taskflow.utils import threading_utils
 32
 33# INTRO: This examples shows how a worker/producer can post desired work (jobs)
 34# to a jobboard and a conductor can consume that work (jobs) from that jobboard
 35# and execute those jobs in a reliable & async manner (for example, if the
 36# conductor were to crash then the job will be released back onto the jobboard
 37# and another conductor can attempt to finish it, from wherever that job last
 38# left off).
 39#
 40# In this example a in-memory jobboard (and in-memory storage) is created and
 41# used that simulates how this would be done at a larger scale (it is an
 42# example after all).
 43
 44# Restrict how long this example runs for...
 45RUN_TIME = 5
 46REVIEW_CREATION_DELAY = 0.5
 47SCAN_DELAY = 0.1
 48NAME = "%s_%s" % (socket.getfqdn(), os.getpid())
 49
 50# This won't really use zookeeper but will use a local version of it using
 51# the zake library that mimics an actual zookeeper cluster using threads and
 52# an in-memory data structure.
 53JOBBOARD_CONF = {
 54    'board': 'zookeeper://localhost?path=/taskflow/tox/jobs',
 55}
 56
 57
 58class RunReview(task.Task):
 59    # A dummy task that clones the review and runs tox...
 60
 61    def _clone_review(self, review, temp_dir):
 62        print("Cloning review '%s' into %s" % (review['id'], temp_dir))
 63
 64    def _run_tox(self, temp_dir):
 65        print("Running tox in %s" % temp_dir)
 66
 67    def execute(self, review, temp_dir):
 68        self._clone_review(review, temp_dir)
 69        self._run_tox(temp_dir)
 70
 71
 72class MakeTempDir(task.Task):
 73    # A task that creates and destroys a temporary dir (on failure).
 74    #
 75    # It provides the location of the temporary dir for other tasks to use
 76    # as they see fit.
 77
 78    default_provides = 'temp_dir'
 79
 80    def execute(self):
 81        return tempfile.mkdtemp()
 82
 83    def revert(self, *args, **kwargs):
 84        temp_dir = kwargs.get(task.REVERT_RESULT)
 85        if temp_dir:
 86            shutil.rmtree(temp_dir)
 87
 88
 89class CleanResources(task.Task):
 90    # A task that cleans up any workflow resources.
 91
 92    def execute(self, temp_dir):
 93        print("Removing %s" % temp_dir)
 94        shutil.rmtree(temp_dir)
 95
 96
 97def review_iter():
 98    """Makes reviews (never-ending iterator/generator)."""
 99    review_id_gen = itertools.count(0)
100    while True:
101        review_id = next(review_id_gen)
102        review = {
103            'id': review_id,
104        }
105        yield review
106
107
108# The reason this is at the module namespace level is important, since it must
109# be accessible from a conductor dispatching an engine, if it was a lambda
110# function for example, it would not be reimportable and the conductor would
111# be unable to reference it when creating the workflow to run.
112def create_review_workflow():
113    """Factory method used to create a review workflow to run."""
114    f = linear_flow.Flow("tester")
115    f.add(
116        MakeTempDir(name="maker"),
117        RunReview(name="runner"),
118        CleanResources(name="cleaner")
119    )
120    return f
121
122
123def generate_reviewer(client, saver, name=NAME):
124    """Creates a review producer thread with the given name prefix."""
125    real_name = "%s_reviewer" % name
126    no_more = threading.Event()
127    jb = boards.fetch(real_name, JOBBOARD_CONF,
128                      client=client, persistence=saver)
129
130    def make_save_book(saver, review_id):
131        # Record what we want to happen (sometime in the future).
132        book = models.LogBook("book_%s" % review_id)
133        detail = models.FlowDetail("flow_%s" % review_id,
134                                   uuidutils.generate_uuid())
135        book.add(detail)
136        # Associate the factory method we want to be called (in the future)
137        # with the book, so that the conductor will be able to call into
138        # that factory to retrieve the workflow objects that represent the
139        # work.
140        #
141        # These args and kwargs *can* be used to save any specific parameters
142        # into the factory when it is being called to create the workflow
143        # objects (typically used to tell a factory how to create a unique
144        # workflow that represents this review).
145        factory_args = ()
146        factory_kwargs = {}
147        engines.save_factory_details(detail, create_review_workflow,
148                                     factory_args, factory_kwargs)
149        with contextlib.closing(saver.get_connection()) as conn:
150            conn.save_logbook(book)
151            return book
152
153    def run():
154        """Periodically publishes 'fake' reviews to analyze."""
155        jb.connect()
156        review_generator = review_iter()
157        with contextlib.closing(jb):
158            while not no_more.is_set():
159                review = next(review_generator)
160                details = {
161                    'store': {
162                        'review': review,
163                    },
164                }
165                job_name = "%s_%s" % (real_name, review['id'])
166                print("Posting review '%s'" % review['id'])
167                jb.post(job_name,
168                        book=make_save_book(saver, review['id']),
169                        details=details)
170                time.sleep(REVIEW_CREATION_DELAY)
171
172    # Return the unstarted thread, and a callback that can be used
173    # shutdown that thread (to avoid running forever).
174    return (threading_utils.daemon_thread(target=run), no_more.set)
175
176
177def generate_conductor(client, saver, name=NAME):
178    """Creates a conductor thread with the given name prefix."""
179    real_name = "%s_conductor" % name
180    jb = boards.fetch(name, JOBBOARD_CONF,
181                      client=client, persistence=saver)
182    conductor = conductors.fetch("blocking", real_name, jb,
183                                 engine='parallel', wait_timeout=SCAN_DELAY)
184
185    def run():
186        jb.connect()
187        with contextlib.closing(jb):
188            conductor.run()
189
190    # Return the unstarted thread, and a callback that can be used
191    # shutdown that thread (to avoid running forever).
192    return (threading_utils.daemon_thread(target=run), conductor.stop)
193
194
195def main():
196    # Need to share the same backend, so that data can be shared...
197    persistence_conf = {
198        'connection': 'memory',
199    }
200    saver = persistence.fetch(persistence_conf)
201    with contextlib.closing(saver.get_connection()) as conn:
202        # This ensures that the needed backend setup/data directories/schema
203        # upgrades and so on... exist before they are attempted to be used...
204        conn.upgrade()
205    fc1 = fake_client.FakeClient()
206    # Done like this to share the same client storage location so the correct
207    # zookeeper features work across clients...
208    fc2 = fake_client.FakeClient(storage=fc1.storage)
209    entities = [
210        generate_reviewer(fc1, saver),
211        generate_conductor(fc2, saver),
212    ]
213    for t, stopper in entities:
214        t.start()
215    try:
216        watch = timeutils.StopWatch(duration=RUN_TIME)
217        watch.start()
218        while not watch.expired():
219            time.sleep(0.1)
220    finally:
221        for t, stopper in reversed(entities):
222            stopper()
223            t.join()
224
225
226if __name__ == '__main__':
227    main()

Conductor running 99 bottles of beer song requests

Note

Full source located at 99_bottles

  1
  2import contextlib
  3import functools
  4import logging
  5import os
  6import sys
  7import time
  8import traceback
  9
 10from kazoo import client
 11
 12top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 13                                       os.pardir,
 14                                       os.pardir))
 15sys.path.insert(0, top_dir)
 16
 17from taskflow.conductors import backends as conductor_backends
 18from taskflow import engines
 19from taskflow.jobs import backends as job_backends
 20from taskflow import logging as taskflow_logging
 21from taskflow.patterns import linear_flow as lf
 22from taskflow.persistence import backends as persistence_backends
 23from taskflow.persistence import models
 24from taskflow import task
 25
 26from oslo_utils import timeutils
 27from oslo_utils import uuidutils
 28
 29# Instructions!
 30#
 31# 1. Install zookeeper (or change host listed below)
 32# 2. Download this example, place in file '99_bottles.py'
 33# 3. Run `python 99_bottles.py p` to place a song request onto the jobboard
 34# 4. Run `python 99_bottles.py c` a few times (in different shells)
 35# 5. On demand kill previously listed processes created in (4) and watch
 36#    the work resume on another process (and repeat)
 37# 6. Keep enough workers alive to eventually finish the song (if desired).
 38
 39ME = os.getpid()
 40ZK_HOST = "localhost:2181"
 41JB_CONF = {
 42    'hosts': ZK_HOST,
 43    'board': 'zookeeper',
 44    'path': '/taskflow/99-bottles-demo',
 45}
 46PERSISTENCE_URI = r"sqlite:////tmp/bottles.db"
 47TAKE_DOWN_DELAY = 1.0
 48PASS_AROUND_DELAY = 3.0
 49HOW_MANY_BOTTLES = 99
 50
 51
 52class TakeABottleDown(task.Task):
 53    def execute(self, bottles_left):
 54        sys.stdout.write('Take one down, ')
 55        sys.stdout.flush()
 56        time.sleep(TAKE_DOWN_DELAY)
 57        return bottles_left - 1
 58
 59
 60class PassItAround(task.Task):
 61    def execute(self):
 62        sys.stdout.write('pass it around, ')
 63        sys.stdout.flush()
 64        time.sleep(PASS_AROUND_DELAY)
 65
 66
 67class Conclusion(task.Task):
 68    def execute(self, bottles_left):
 69        sys.stdout.write('%s bottles of beer on the wall...\n' % bottles_left)
 70        sys.stdout.flush()
 71
 72
 73def make_bottles(count):
 74    # This is the function that will be called to generate the workflow
 75    # and will also be called to regenerate it on resumption so that work
 76    # can continue from where it last left off...
 77
 78    s = lf.Flow("bottle-song")
 79
 80    take_bottle = TakeABottleDown("take-bottle-%s" % count,
 81                                  inject={'bottles_left': count},
 82                                  provides='bottles_left')
 83    pass_it = PassItAround("pass-%s-around" % count)
 84    next_bottles = Conclusion("next-bottles-%s" % (count - 1))
 85    s.add(take_bottle, pass_it, next_bottles)
 86
 87    for bottle in reversed(list(range(1, count))):
 88        take_bottle = TakeABottleDown("take-bottle-%s" % bottle,
 89                                      provides='bottles_left')
 90        pass_it = PassItAround("pass-%s-around" % bottle)
 91        next_bottles = Conclusion("next-bottles-%s" % (bottle - 1))
 92        s.add(take_bottle, pass_it, next_bottles)
 93
 94    return s
 95
 96
 97def run_conductor(only_run_once=False):
 98    # This continuously runs consumers until its stopped via ctrl-c or other
 99    # kill signal...
100    event_watches = {}
101
102    # This will be triggered by the conductor doing various activities
103    # with engines, and is quite nice to be able to see the various timing
104    # segments (which is useful for debugging, or watching, or figuring out
105    # where to optimize).
106    def on_conductor_event(cond, event, details):
107        print("Event '%s' has been received..." % event)
108        print("Details = %s" % details)
109        if event.endswith("_start"):
110            w = timeutils.StopWatch()
111            w.start()
112            base_event = event[0:-len("_start")]
113            event_watches[base_event] = w
114        if event.endswith("_end"):
115            base_event = event[0:-len("_end")]
116            try:
117                w = event_watches.pop(base_event)
118                w.stop()
119                print("It took %0.3f seconds for event '%s' to finish"
120                      % (w.elapsed(), base_event))
121            except KeyError:
122                pass
123        if event == 'running_end' and only_run_once:
124            cond.stop()
125
126    print("Starting conductor with pid: %s" % ME)
127    my_name = "conductor-%s" % ME
128    persist_backend = persistence_backends.fetch(PERSISTENCE_URI)
129    with contextlib.closing(persist_backend):
130        with contextlib.closing(persist_backend.get_connection()) as conn:
131            conn.upgrade()
132        job_backend = job_backends.fetch(my_name, JB_CONF,
133                                         persistence=persist_backend)
134        job_backend.connect()
135        with contextlib.closing(job_backend):
136            cond = conductor_backends.fetch('blocking', my_name, job_backend,
137                                            persistence=persist_backend)
138            on_conductor_event = functools.partial(on_conductor_event, cond)
139            cond.notifier.register(cond.notifier.ANY, on_conductor_event)
140            # Run forever, and kill -9 or ctrl-c me...
141            try:
142                cond.run()
143            finally:
144                cond.stop()
145                cond.wait()
146
147
148def run_poster():
149    # This just posts a single job and then ends...
150    print("Starting poster with pid: %s" % ME)
151    my_name = "poster-%s" % ME
152    persist_backend = persistence_backends.fetch(PERSISTENCE_URI)
153    with contextlib.closing(persist_backend):
154        with contextlib.closing(persist_backend.get_connection()) as conn:
155            conn.upgrade()
156        job_backend = job_backends.fetch(my_name, JB_CONF,
157                                         persistence=persist_backend)
158        job_backend.connect()
159        with contextlib.closing(job_backend):
160            # Create information in the persistence backend about the
161            # unit of work we want to complete and the factory that
162            # can be called to create the tasks that the work unit needs
163            # to be done.
164            lb = models.LogBook("post-from-%s" % my_name)
165            fd = models.FlowDetail("song-from-%s" % my_name,
166                                   uuidutils.generate_uuid())
167            lb.add(fd)
168            with contextlib.closing(persist_backend.get_connection()) as conn:
169                conn.save_logbook(lb)
170            engines.save_factory_details(fd, make_bottles,
171                                         [HOW_MANY_BOTTLES], {},
172                                         backend=persist_backend)
173            # Post, and be done with it!
174            jb = job_backend.post("song-from-%s" % my_name, book=lb)
175            print("Posted: %s" % jb)
176            print("Goodbye...")
177
178
179def main_local():
180    # Run locally typically this is activating during unit testing when all
181    # the examples are made sure to still function correctly...
182    global TAKE_DOWN_DELAY
183    global PASS_AROUND_DELAY
184    global JB_CONF
185    # Make everything go much faster (so that this finishes quickly).
186    PASS_AROUND_DELAY = 0.01
187    TAKE_DOWN_DELAY = 0.01
188    JB_CONF['path'] = JB_CONF['path'] + "-" + uuidutils.generate_uuid()
189    run_poster()
190    run_conductor(only_run_once=True)
191
192
193def check_for_zookeeper(timeout=1):
194    sys.stderr.write("Testing for the existence of a zookeeper server...\n")
195    sys.stderr.write("Please wait....\n")
196    with contextlib.closing(client.KazooClient()) as test_client:
197        try:
198            test_client.start(timeout=timeout)
199        except test_client.handler.timeout_exception:
200            sys.stderr.write("Zookeeper is needed for running this example!\n")
201            traceback.print_exc()
202            return False
203        else:
204            test_client.stop()
205            return True
206
207
208def main():
209    if not check_for_zookeeper():
210        return
211    if len(sys.argv) == 1:
212        main_local()
213    elif sys.argv[1] in ('p', 'c'):
214        if sys.argv[-1] == "v":
215            logging.basicConfig(level=taskflow_logging.TRACE)
216        else:
217            logging.basicConfig(level=logging.ERROR)
218        if sys.argv[1] == 'p':
219            run_poster()
220        else:
221            run_conductor()
222    else:
223        sys.stderr.write("%s p|c (v?)\n" % os.path.basename(sys.argv[0]))
224
225
226if __name__ == '__main__':
227    main()