Source code for ironic_inspector.db.api

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

"""DB models API for inspection data and shared database code."""


import threading
import time

from oslo_config import cfg
from oslo_db import api as oslo_db_api
from oslo_db import exception as db_exc
from oslo_db import options as db_opts
from oslo_db.sqlalchemy import enginefacade
from oslo_utils import timeutils
from oslo_utils import uuidutils
from sqlalchemy import delete
from sqlalchemy import insert
from sqlalchemy import or_, and_
from sqlalchemy import orm
from sqlalchemy.orm import exc as orm_errors
from sqlalchemy import update

from ironic_inspector.common.i18n import _
from ironic_inspector.db import model
from ironic_inspector import utils


LOG = utils.getProcessingLogger(__name__)

_DEFAULT_SQL_CONNECTION = 'sqlite:///ironic_inspector.sqlite'

_CONTEXT = threading.local()

db_opts.set_defaults(cfg.CONF, connection=_DEFAULT_SQL_CONNECTION)

CONF = cfg.CONF


[docs] def init(): """Initialize the database. Method called on service start up, initialize transaction context manager and try to create db session. """ get_writer_session()
[docs] def model_query(model, *args, **kwargs): """Query helper for simpler session usage. :param session: if present, the session to use """ with session_for_read() as session: query = session.query(model, *args) return query
[docs] def get_writer_session(): """Help method to get writer session. :returns: The writer session. """ return enginefacade.writer.using(_CONTEXT)
[docs] def session_for_read(): """Create read session within context manager""" return enginefacade.reader.using(_CONTEXT)
[docs] def session_for_write(): """Create write session within context manager""" return enginefacade.writer.using(_CONTEXT)
[docs] def get_nodes(): """Get list of cached nodes :returns: list of nodes, could be empty """ with session_for_read() as session: res = session.query( model.Node ).order_by( model.Node.started_at.desc() ) return [model.Node(uuid=entry.uuid, version_id=entry.version_id, state=entry.state, started_at=entry.started_at, finished_at=entry.finished_at, error=entry.error, manage_boot=entry.manage_boot) for entry in res.all()]
[docs] def get_node(uuid, **fields): """Get all cached nodes :param uuid: node uuid :param fields: fields are used as filtering criterion :returns: get node object :raises: NodeNotFoundInDBError in case node not found or node version differ from passed in fields. """ try: with session_for_read() as session: res = session.query(model.Node).filter_by( uuid=uuid, **fields).one() return model.Node(uuid=res.uuid, version_id=res.version_id, state=res.state, started_at=res.started_at, finished_at=res.finished_at, error=res.error, manage_boot=res.manage_boot) except (orm_errors.NoResultFound, orm_errors.StaleDataError): raise utils.NodeNotFoundInDBError()
[docs] def get_active_nodes(started_before=None): """Get list of nodes on introspection :param started_before: datetime object, returns nodes, started before provided time :returns: list of nodes, could be empty """ with session_for_read() as session: query = session.query(model.Node).filter_by( finished_at=None).order_by(model.Node.started_at.desc()) if started_before: query = query.filter(model.Node.started_at < started_before) return [model.Node(uuid=entry.uuid, version_id=entry.version_id, state=entry.state, started_at=entry.started_at, finished_at=entry.finished_at, error=entry.error, manage_boot=entry.manage_boot) for entry in query.all()]
[docs] def list_nodes_by_attributes(attributes): """Get list of nodes with certain attributes :param attributes: list of attributes as (name, value) pair :returns: list of nodes, could be empty """ attr_filters = [] for name, value in attributes: attr_filters.append(and_(model.Attribute.name == name, model.Attribute.value == value)) with session_for_read() as session: query = session.query( model.Attribute ).filter(or_(*attr_filters)).all() result = [model.Attribute(uuid=attr.uuid, node_uuid=attr.node_uuid, name=attr.name, value=attr.value) for attr in query] return result
[docs] @oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True) def update_node(uuid, **values): """Update node by uuid Updates node fields with provided values, also bump node version. :param uuid: node uuid :param values: node fields with values to be updated :raises: NodeNotFoundInDBError in case node not found or node version differ from passed in values. """ fields_ver = values.copy() with session_for_write() as session: stmt = update( model.Node ).where( model.Node.uuid == uuid ).values( fields_ver ).execution_options( synchronize_session=False ) res = session.execute(stmt) if res.rowcount == 0: raise utils.NodeNotFoundInDBError()
[docs] def create_node(uuid, state, started_at=None, finished_at=None, error=None, manage_boot=None): """Create new node :param uuid: node uuid :param state: initial node state :param started_at: node caching datetime :param finished_at: introspection finished datetime :param error: introspection error :returns: created node object """ node = model.Node(uuid=uuid, state=state, started_at=started_at, finished_at=finished_at, error=error, manage_boot=manage_boot) with session_for_write() as session: session.add(node) return node
[docs] def add_node(uuid, state, started_at=None, finished_at=None, error=None, manage_boot=None): """Add new node Before creating new node with certain uuid clean ups all existing node info. :param uuid: node uuid :param state: initial node state :param started_at: node caching datetime :param finished_at: introspection finished datetime :param error: introspection error :param manage_boot: whether to manage boot for this node :returns: created node object """ with session_for_write() as session: # Delete attribute data session.execute( delete(model.Attribute).where( model.Attribute.node_uuid == uuid)) # Delete introspection data session.execute( delete(model.Option).where( model.Option.uuid == uuid)) session.execute( delete(model.IntrospectionData).where( model.IntrospectionData.uuid == uuid)) # Delete the actual node session.execute( delete(model.Node).where( model.Node.uuid == uuid ).execution_options(synchronize_session=False) ) node = model.Node(uuid=uuid, state=state, started_at=started_at, finished_at=finished_at, error=error, manage_boot=manage_boot) session.add(node) return node
[docs] def list_nodes_options_by_uuid(uuid): """Get list of node options :param uuid: node uuid :returns: list of node options, could be empty """ with session_for_read() as session: query = session.query(model.Option).filter(model.Option.uuid == uuid) return [model.Option(uuid=opt.uuid, name=opt.name, value=opt.value) for opt in query.all()]
[docs] def delete_node(uuid): """Delete node and its attributes :param uuid: node uuid :returns: None """ with session_for_write() as session: # Delete attribute data session.execute( delete(model.Attribute).where( model.Attribute.node_uuid == uuid)) # Delete introspection data session.execute( delete(model.Option).where( model.Option.uuid == uuid)) session.execute( delete(model.IntrospectionData).where( model.IntrospectionData.uuid == uuid)) # Delete the actual node session.execute( delete(model.Node).where( model.Node.uuid == uuid ).execution_options(synchronize_session=False) )
[docs] def delete_nodes(finished_until=None): """Delete all nodes :param finished_until: datetime object, delete nodes are introspected before finished_until time :returns: None """ with session_for_read() as session: query = session.query(model.Node.uuid) if finished_until: query = query.filter( model.Node.finished_at.isnot(None), model.Node.finished_at < finished_until) uuid_list = [] for node in query.all(): # This breaks the requests up and allows proper value # deletion since there are structural dependencies on # for nodes in other tables. Performance wise this takes # a little slower overall, but doesn't cause the tables to # be locked, and handles the other tables without building # DB triggers. uuid_list.append(node[0]) for uuid in uuid_list: delete_node(uuid) # Allow the Python GIL to let something else run, and # give the DB a chance to breath. time.sleep(0)
[docs] def set_option(node_uuid, name, value): """Set option for node :param node_uuid: node uuid :param name: option name :param value: option value :returns: None """ with session_for_write() as session: opt = model.Option(uuid=node_uuid, name=name, value=value) session.add(opt)
[docs] def delete_options(**filters): """Delete all options :param filters: deletion filter criteria :returns: None """ with session_for_write() as session: session.query(model.Option).filter_by(**filters).delete()
[docs] def set_attribute(node_uuid, name, values): """Set lookup attributes for node :param node_uuid: node uuid :param name: option name :param values: list of attribute values :returns: None """ if not isinstance(values, list): values = [values] with session_for_write() as session: for value in values: attr = model.Attribute(node_uuid=node_uuid, uuid=uuidutils.generate_uuid(), name=name, value=value) session.add(attr)
[docs] def delete_attributes(uuid): """Delete all attributes :param uuid: the UUID of the node whose attributes you wish tod elete :returns: None """ # FIXME(TheJulia): This is going to be difficult to match # in later versions of sqlalchemy since query needs to move # to use the object model instead of free form attribute name. with session_for_write() as session: session.execute( delete(model.Attribute).where( model.Attribute.node_uuid == uuid))
[docs] def get_attributes(order_by=None, **fields): """Get all attributes :param order_by: ordering criterion :param fields: filter criteria fields :returns: list of attributes """ # FIXME(TheJulia) This needs to be rewritten with session_for_read() as session: query = session.query(model.Attribute).filter_by(**fields) if order_by: orders = [getattr(model.Attribute, key) for key in order_by] query = query.order_by(*orders) res = query.all() result = [model.Attribute(uuid=attr.uuid, node_uuid=attr.node_uuid, name=attr.name, value=attr.value) for attr in res] return result
[docs] def get_options(**fields): """Get all options :param fields: filter criteria fields :returns: list of options """ return model_query(model.Option).filter_by(**fields).all()
[docs] def create_rule(uuid, conditions, actions, description=None, scope=None): """Create new rule :param uuid: rule uuid :param conditions: list of (field, op, multiple, invert, params) tuple, which represents condition object :param actions: list of (action, params) pair, which represents action object :param description: rule description :param scope: rule scope :returns: created rule """ try: with session_for_write() as session: rule = model.Rule( uuid=uuid, description=description, disabled=False, created_at=timeutils.utcnow(), scope=scope) rule.conditions = rule.action = [] for field, op, multiple, invert, params in conditions: rule.conditions.append(model.RuleCondition(op=op, field=field, multiple=multiple, invert=invert, params=params)) for action, params in actions: rule.actions.append(model.RuleAction(action=action, params=params)) session.add(rule) except db_exc.DBDuplicateEntry as exc: LOG.error('Database integrity error %s when creating a rule', exc) raise utils.RuleUUIDExistError(uuid) return rule
[docs] def get_rule(uuid): """Get rule by uuid :param uuid: rule uuid :returns: rule object """ try: with session_for_read() as session: query = session.query(model.Rule).where( model.Rule.uuid == uuid) rule = query.one() return model.Rule(uuid=rule.uuid, created_at=rule.created_at, description=rule.description, disabled=rule.disabled, scope=rule.scope, conditions=rule.conditions, actions=rule.actions) except orm.exc.NoResultFound: raise utils.RuleNotFoundError(uuid)
[docs] def get_rules(**fields): """List all rules.""" with session_for_read() as session: query = session.query( model.Rule ).filter_by( **fields ).order_by( model.Rule.created_at ) return [model.Rule( uuid=rule.uuid, actions=rule.actions, conditions=rule.conditions, description=rule.description, scope=rule.scope) for rule in query]
[docs] def get_rules_conditions(**fields): """Get all rule conditions :param fields: field filter criteria :returns: list of conditions """ # NOTE(TheJulia): This appears to exist largely to help unit # testing of rules functionality. with session_for_read() as session: query = session.query( model.RuleCondition ).filter_by(**fields) return [model.RuleCondition( id=condition.id, rule=condition.rule, op=condition.op, multiple=condition.multiple, invert=condition.invert, field=condition.field, params=condition.params) for condition in query.all()]
[docs] def get_rules_actions(**fields): """Get all rule actions :param fields: field filter criteria :returns: list of actions """ # NOTE(TheJulia): This appears to exist largely to help unit # testing of rules functionality. with session_for_read() as session: query = session.query( model.RuleAction ).filter_by(**fields) return [model.RuleAction( id=action.id, rule=action.rule, action=action.action, params=action.params) for action in query.all()]
[docs] def delete_rule(uuid): """Delete the rule by uuid :param uuid: rule uuid :raises: RuleNotFoundError in case rule not found :returns: None """ with session_for_write() as session: stmt = ( delete( model.RuleAction ).where( model.RuleAction.rule == uuid ).execution_options(synchronize_session=False) ) session.execute(stmt) stmt = ( delete( model.RuleCondition ).where( model.RuleCondition.rule == uuid ).execution_options(synchronize_session=False) ) session.execute(stmt) stmt = ( delete( model.Rule ).where( model.Rule.uuid == uuid ).execution_options(synchronize_session=False) ) res = session.execute(stmt) if res.rowcount == 0: raise utils.RuleNotFoundError(uuid)
[docs] def delete_all_rules(): """Delete all rules :returns: None """ with session_for_write() as session: session.execute( delete(model.RuleAction).execution_options( synchronize_session=False ) ) session.execute( delete(model.RuleCondition).execution_options( synchronize_session=False ) ) session.execute( delete(model.Rule).execution_options( synchronize_session=False ) ) session.commit()
[docs] def store_introspection_data(node_id, introspection_data, processed=True): """Store introspection data for this node. :param node_id: node UUID. :param introspection_data: A dictionary of introspection data :param processed: Specify the type of introspected data, set to False indicates the data is unprocessed. """ updated = False with session_for_write() as session: record = session.query(model.IntrospectionData).filter_by( uuid=node_id, processed=processed).first() if record: record.update({'data': introspection_data}) updated = True else: # by default, all write sessions are committed. In this # case, we can safely rollback. Once we rollback, we # launch a new session. session.rollback() if not updated: with session_for_write() as session: stmt = insert(model.IntrospectionData).values( {'uuid': node_id, 'processed': processed, 'data': introspection_data} ) session.execute(stmt)
[docs] def get_introspection_data(node_id, processed=True): """Get introspection data for this node. :param node_id: node UUID. :param processed: Specify the type of introspected data, set to False indicates retrieving the unprocessed data. :return: A dictionary representation of intropsected data """ try: with session_for_read() as session: ref = session.query(model.IntrospectionData).filter_by( uuid=node_id, processed=processed).one() res = ref['data'] return res except orm_errors.NoResultFound: msg = _('Introspection data not found for node %(node)s, ' 'processed=%(processed)s') % {'node': node_id, 'processed': processed} raise utils.IntrospectionDataNotFound(msg)