# -*- encoding: utf-8 -*-
# Copyright (c) 2016 b<>com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_log import log
from taskflow import engines
from taskflow import exceptions as tf_exception
from taskflow.patterns import graph_flow as gf
from taskflow import task as flow_task
from watcher.applier.workflow_engine import base
from watcher.common import exception
from watcher import objects
LOG = log.getLogger(__name__)
[docs]class DefaultWorkFlowEngine(base.BaseWorkFlowEngine):
"""Taskflow as a workflow engine for Watcher
Full documentation on taskflow at
https://docs.openstack.org/taskflow/latest
"""
[docs] def decider(self, history):
# FIXME(jed) not possible with the current Watcher Planner
#
# decider – A callback function that will be expected to
# decide at runtime whether v should be allowed to execute
# (or whether the execution of v should be ignored,
# and therefore not executed). It is expected to take as single
# keyword argument history which will be the execution results of
# all u decidable links that have v as a target. It is expected
# to return a single boolean
# (True to allow v execution or False to not).
return True
[docs] @classmethod
def get_config_opts(cls):
return [
cfg.IntOpt(
'max_workers',
default=processutils.get_worker_count(),
min=1,
required=True,
help='Number of workers for taskflow engine '
'to execute actions.')
]
[docs] def execute(self, actions):
try:
# NOTE(jed) We want to have a strong separation of concern
# between the Watcher planner and the Watcher Applier in order
# to us the possibility to support several workflow engine.
# We want to provide the 'taskflow' engine by
# default although we still want to leave the possibility for
# the users to change it.
# The current implementation uses graph with linked actions.
# todo(jed) add olso conf for retry and name
flow = gf.Flow("watcher_flow")
actions_uuid = {}
for a in actions:
task = TaskFlowActionContainer(a, self)
flow.add(task)
actions_uuid[a.uuid] = task
for a in actions:
for parent_id in a.parents:
flow.link(actions_uuid[parent_id], actions_uuid[a.uuid],
decider=self.decider)
e = engines.load(
flow, engine='parallel',
max_workers=self.config.max_workers)
e.run()
return flow
except exception.ActionPlanCancelled as e:
raise
except tf_exception.WrappedFailure as e:
if e.check("watcher.common.exception.ActionPlanCancelled"):
raise exception.ActionPlanCancelled
else:
raise exception.WorkflowExecutionException(error=e)
except Exception as e:
raise exception.WorkflowExecutionException(error=e)
[docs]class TaskFlowActionContainer(base.BaseTaskFlowActionContainer):
def __init__(self, db_action, engine):
name = "action_type:{0} uuid:{1}".format(db_action.action_type,
db_action.uuid)
super(TaskFlowActionContainer, self).__init__(name, db_action, engine)
[docs] def do_pre_execute(self):
db_action = self.engine.notify(self._db_action,
objects.action.State.ONGOING)
LOG.debug("Pre-condition action: %s", self.name)
self.action.pre_condition()
return db_action
[docs] def do_execute(self, *args, **kwargs):
LOG.debug("Running action: %s", self.name)
# NOTE:Some actions(such as migrate) will return None when exception
# Only when True is returned, the action state is set to SUCCEEDED
result = self.action.execute()
if result is True:
return self.engine.notify(self._db_action,
objects.action.State.SUCCEEDED)
else:
self.engine.notify(self._db_action,
objects.action.State.FAILED)
raise exception.ActionExecutionFailure(
action_id=self._db_action.uuid)
[docs] def do_post_execute(self):
LOG.debug("Post-condition action: %s", self.name)
self.action.post_condition()
[docs] def do_revert(self, *args, **kwargs):
LOG.warning("Revert action: %s", self.name)
try:
# TODO(jed): do we need to update the states in case of failure?
self.action.revert()
except Exception as e:
LOG.exception(e)
LOG.critical("Oops! We need a disaster recover plan.")
[docs] def do_abort(self, *args, **kwargs):
LOG.warning("Aborting action: %s", self.name)
try:
result = self.action.abort()
if result:
# Aborted the action.
return self.engine.notify(self._db_action,
objects.action.State.CANCELLED)
else:
return self.engine.notify(self._db_action,
objects.action.State.SUCCEEDED)
except Exception as e:
LOG.exception(e)
return self.engine.notify(self._db_action,
objects.action.State.FAILED)
Except where otherwise noted, this document is licensed under Creative Commons Attribution 3.0 License. See all OpenStack Legal Documents.