Source code for ironic.common.inspection_rules.engine

#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

from oslo_log import log
import yaml

from ironic.common import exception
from ironic.common.i18n import _
from ironic.common.inspection_rules import actions
from ironic.common.inspection_rules import operators
from ironic.common.inspection_rules import utils
from ironic.common.inspection_rules import validation
from ironic.conf import CONF
from ironic import objects


LOG = log.getLogger(__name__)
SENSITIVE_FIELDS = ['password', 'auth_token', 'bmc_password']


[docs] def get_built_in_rules(): """Load built-in inspection rules.""" built_in_rules = [] built_in_rules_dir = CONF.inspection_rules.built_in_rules if not built_in_rules_dir: return built_in_rules try: with open(built_in_rules_dir, 'r') as f: rules_data = yaml.safe_load(f) for rule_data in rules_data: try: rule = { 'uuid': rule_data.get('uuid'), 'priority': rule_data.get('priority', 0), 'description': rule_data.get('description'), 'scope': rule_data.get('scope'), 'sensitive': rule_data.get('sensitive', False), 'phase': rule_data.get('phase', 'main'), 'actions': rule_data.get('actions', []), 'conditions': rule_data.get('conditions', []), 'built_in': True } validation.validate_rule(rule) built_in_rules.append(rule) except Exception as e: LOG.error(_("Error parsing built-in rule: %s"), e) raise except FileNotFoundError: LOG.error(_("Built-in rules file not found: %s"), built_in_rules_dir) raise except yaml.YAMLError as e: LOG.error(_("Error parsing YAML in built-in rules file %s: %s"), built_in_rules_dir, e) raise except Exception as e: LOG.error(_("Error loading built-in rules from %s: %s"), built_in_rules_dir, e) raise return built_in_rules
[docs] def check_conditions(task, rule, inventory, plugin_data): try: if not rule.get('conditions', None): return True for condition in rule['conditions']: op, invtd = utils.parse_inverted_operator( condition['op']) if op not in operators.OPERATORS: supported_ops = ', '.join(operators.OPERATORS.keys()) msg = (_("Unsupported operator: '%(op)s'. Supported " "operators are: %(supported_ops)s.") % { 'op': op, 'supported_ops': supported_ops}) raise ValueError(msg) plugin = operators.get_operator(op) if condition.get('loop', []): result = plugin().check_with_loop(task, condition, inventory, plugin_data) else: result = plugin().check_condition(task, condition, inventory, plugin_data) if not result: LOG.debug("Skipping rule %(rule)s on node %(node)s: " "condition check '%(op)s': '%(args)s' failed ", {'rule': rule['uuid'], 'node': task.node.uuid, 'op': condition['op'], 'args': condition['args']}) return False return True except Exception as err: LOG.error("Error checking condition on node %(node)s: %(err)s.", {'node': task.node.uuid, 'err': err}) raise
[docs] def apply_actions(task, rule, inventory, plugin_data): for action in rule['actions']: try: op = action['op'] if op not in actions.ACTIONS: supported_ops = ', '.join(actions.ACTIONS.keys()) msg = (_("Unsupported action: '%(op)s'. Supported actions " "are: %(supported_ops)s.") % { 'op': op, 'supported_ops': supported_ops}) raise ValueError(msg) plugin = actions.get_action(op) if action.get('loop', []): plugin().execute_with_loop(task, action, inventory, plugin_data) else: plugin().execute_action(task, action, inventory, plugin_data) except exception.IronicException as err: LOG.error("Error applying action on node %(node)s: %(err)s.", {'node': task.node.uuid, 'err': err}) raise except Exception as err: LOG.exception("Unexpected error applying action on node " "%(node)s: %(err)s.", {'node': task.node.uuid, 'err': err}) raise
[docs] def apply_rules(task, inventory, plugin_data, inspection_phase): """Apply inspection rules to a node.""" node = task.node all_rules = objects.InspectionRule.list( context=task.context, filters={'phase': inspection_phase}) built_in_rules = get_built_in_rules() rules = all_rules + built_in_rules if not rules: LOG.debug("No inspection rules to apply for phase " "'%(phase)s on node: %(node)s'", { 'phase': inspection_phase, 'node': node.uuid}) return rules.sort(key=lambda rule: rule.get('priority', 0), reverse=True) LOG.debug("Applying %(count)d inspection rules to node %(node)s", {'count': len(rules), 'node': node.uuid}) mask_secrets = CONF.inspection_rules.mask_secrets for rule in rules: try: should_mask = False is_sensitive_rule = rule.get('sensitive', False) if (mask_secrets == 'always' or mask_secrets == 'sensitive' and not is_sensitive_rule): should_mask = True masked_inventory = utils.ShallowMaskDict( inventory, sensitive_fields=SENSITIVE_FIELDS, mask_enabled=should_mask) masked_plugin_data = utils.ShallowMaskDict( plugin_data, sensitive_fields=SENSITIVE_FIELDS, mask_enabled=should_mask) if not check_conditions(task, rule, masked_inventory, masked_plugin_data): continue LOG.info("Applying actions for rule %(rule)s to node %(node)s", {'rule': rule['uuid'], 'node': node.uuid}) apply_actions(task, rule, masked_inventory, masked_plugin_data) except exception.HardwareInspectionFailure: raise except exception.IronicException as e: LOG.error(_("Error applying rule %(rule)s to node " "%(node)s: %(error)s"), {'rule': rule['uuid'], 'node': node.uuid, 'error': e}) raise except Exception as e: msg = ("Failed to apply rule %(rule)s to node %(node)s: " "%(error)s" % {'rule': rule['uuid'], 'node': node.uuid, 'error': e}) LOG.exception(msg) raise exception.IronicException(msg) LOG.info("Finished applying inspection rules to node %s", node.uuid)