Source code for heat.db.api

#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

"""Implementation of SQLAlchemy backend."""

import datetime
import functools
import itertools
import random
from urllib.parse import urlparse

from oslo_config import cfg
from oslo_db import api as oslo_db_api
from oslo_db import exception as db_exception
from oslo_db import options
from oslo_db.sqlalchemy import enginefacade
from oslo_db.sqlalchemy import utils
from oslo_log import log as logging
from oslo_utils import timeutils
import sqlalchemy
from sqlalchemy import and_
from sqlalchemy import func
from sqlalchemy import or_
from sqlalchemy import orm

from heat.common import crypt
from heat.common import exception
from heat.common.i18n import _
from heat.db import filters as db_filters
from heat.db import models
from heat.db import utils as db_utils
from heat.engine import environment as heat_environment
from heat.rpc import api as rpc_api

CONF = cfg.CONF
CONF.import_opt('hidden_stack_tags', 'heat.common.config')
CONF.import_opt('max_events_per_stack', 'heat.common.config')
CONF.import_group('profiler', 'heat.common.config')
CONF.import_opt('db_max_retries', 'oslo_db.options', group='database')
CONF.import_opt('db_retry_interval', 'oslo_db.options', group='database')
CONF.import_opt(
    'db_inc_retry_interval', 'oslo_db.options', group='database')
CONF.import_opt(
    'db_max_retry_interval', 'oslo_db.options', group='database')

options.set_defaults(CONF)

_facade = None

LOG = logging.getLogger(__name__)

# Maximum byte length for status_reason field in MySQL TEXT columns
# MySQL TEXT columns have a 65,535 byte limit
MYSQL_TEXT_BYTE_LIMIT = 65535


_is_mysql_cache = None


def _is_mysql():
    """Check if the database backend is MySQL.

    Uses lazy evaluation to detect the backend on first call, then caches
    the result for subsequent calls. This ensures config is loaded before
    we check the database connection string.

    Returns True if the database backend is MySQL, False otherwise.
    Defaults to True for safety if detection fails.
    """
    global _is_mysql_cache
    if _is_mysql_cache is None:
        try:
            engine_name = urlparse(cfg.CONF.database.connection).scheme
            _is_mysql_cache = engine_name.startswith('mysql')
        except Exception:
            # Default to True (apply truncation) for safety if we can't
            # determine the backend type
            _is_mysql_cache = True
    return _is_mysql_cache


# TODO(sbaker): fix tests so that sqlite_fk=True can be passed to configure
context_manager = enginefacade.transaction_context()


# utility methods


def _truncate_status_reason(values):
    """Truncate status_reason in values dict if needed for MySQL.

    MySQL TEXT columns have a 65,535 byte limit. This function truncates
    the status_reason field based on its actual UTF-8 byte length if MySQL
    is the backend, to prevent database errors.

    Args:
        values: Dictionary that may contain a 'status_reason' key

    Returns:
        The values dict, possibly with status_reason truncated. If truncation
        occurs, a copy of the dict is returned to avoid mutating the caller's
        dict. Otherwise, the original dict is returned.
    """
    if not _is_mysql():
        return values

    if 'status_reason' not in values or not values['status_reason']:
        return values

    # Common case, UTF-8 chars are max 4 bytes
    if len(values['status_reason']) < (MYSQL_TEXT_BYTE_LIMIT / 4):
        return values

    # Check UTF-8 byte length (status_reason is always a string at this point)
    encoded = values['status_reason'].encode('utf-8')
    if len(encoded) <= MYSQL_TEXT_BYTE_LIMIT:
        return values

    # Truncate to fit within byte limit
    values = dict(values)  # Copy to avoid mutating caller's dict
    truncated_bytes = encoded[:MYSQL_TEXT_BYTE_LIMIT]
    # Use 'ignore' to handle any partial multi-byte character at the end
    values['status_reason'] = truncated_bytes.decode('utf-8', errors='ignore')

    return values


[docs] def get_engine(): return context_manager.writer.get_engine()
[docs] def retry_on_db_error(func): @functools.wraps(func) def try_func(context, *args, **kwargs): wrapped = oslo_db_api.wrap_db_retry( max_retries=CONF.database.db_max_retries, retry_on_deadlock=True, retry_on_disconnect=True, retry_interval=CONF.database.db_retry_interval, inc_retry_interval=CONF.database.db_inc_retry_interval, max_retry_interval=CONF.database.db_max_retry_interval, )(func) return wrapped(context, *args, **kwargs) return try_func
def _soft_delete(context, obj): """Mark this object as deleted.""" setattr(obj, 'deleted_at', timeutils.utcnow()) def _soft_delete_aware_query(context, *args, **kwargs): """Stack query helper that accounts for context's `show_deleted` field. :param show_deleted: if True, overrides context's show_deleted field. """ query = context.session.query(*args) show_deleted = kwargs.get('show_deleted') or context.show_deleted if not show_deleted: query = query.filter_by(deleted_at=None) return query # raw template
[docs] @context_manager.reader def raw_template_get(context, template_id): return _raw_template_get(context, template_id)
def _raw_template_get(context, template_id): result = context.session.get(models.RawTemplate, template_id) if not result: raise exception.NotFound(_('raw template with id %s not found') % template_id) return result
[docs] @context_manager.writer def raw_template_create(context, values): raw_template_ref = models.RawTemplate() raw_template_ref.update(values) raw_template_ref.save(context.session) return raw_template_ref
[docs] @context_manager.writer def raw_template_update(context, template_id, values): raw_template_ref = _raw_template_get(context, template_id) # get only the changed values values = dict((k, v) for k, v in values.items() if getattr(raw_template_ref, k) != v) if values: for k, v in values.items(): setattr(raw_template_ref, k, v) return raw_template_ref
[docs] @context_manager.writer def raw_template_delete(context, template_id): try: raw_template = _raw_template_get(context, template_id) except exception.NotFound: # Ignore not found return raw_tmpl_files_id = raw_template.files_id context.session.delete(raw_template) if raw_tmpl_files_id is None: return # If no other raw_template is referencing the same raw_template_files, # delete that too if context.session.query(models.RawTemplate).filter_by( files_id=raw_tmpl_files_id).first() is None: try: raw_tmpl_files = _raw_template_files_get( context, raw_tmpl_files_id) except exception.NotFound: # Ignore not found return context.session.delete(raw_tmpl_files)
# raw template files
[docs] @context_manager.writer def raw_template_files_create(context, values): raw_templ_files_ref = models.RawTemplateFiles() raw_templ_files_ref.update(values) raw_templ_files_ref.save(context.session) return raw_templ_files_ref
[docs] @context_manager.reader def raw_template_files_get(context, files_id): return _raw_template_files_get(context, files_id)
def _raw_template_files_get(context, files_id): result = context.session.get(models.RawTemplateFiles, files_id) if not result: raise exception.NotFound( _("raw_template_files with files_id %d not found") % files_id) return result # resource
[docs] @context_manager.writer def resource_create(context, values): return _resource_create(context, values)
def _resource_create(context, values): resource_ref = models.Resource() resource_ref.data = [] resource_ref.attr_data = None resource_ref.rsrc_attr_data = None # Keep truncation as the last transformation before DB write values = _truncate_status_reason(values) resource_ref.update(values) resource_ref.save(context.session) return resource_ref
[docs] @retry_on_db_error @context_manager.writer def resource_create_replacement(context, existing_res_id, new_res_values, atomic_key, expected_engine_id=None): try: with context_manager.writer.independent.using(context): new_res = _resource_create(context, new_res_values) update_data = {'replaced_by': new_res.id} rows_updated = _resource_update( context, existing_res_id, update_data, atomic_key, expected_engine_id=expected_engine_id, ) if not bool(rows_updated): data = {} if 'name' in new_res_values: data['resource_name'] = new_res_values['name'] raise exception.UpdateInProgress(**data) except db_exception.DBReferenceError as exc: # New template_id no longer exists LOG.debug('Not creating replacement resource: %s', exc) return None else: return new_res
[docs] @context_manager.reader def resource_get_all_by_stack(context, stack_id, filters=None): query = context.session.query( models.Resource ).filter_by( stack_id=stack_id ).options( orm.joinedload(models.Resource.attr_data), orm.joinedload(models.Resource.data), orm.joinedload(models.Resource.rsrc_prop_data), ) query = db_filters.exact_filter(query, models.Resource, filters) results = query.all() return dict((res.name, res) for res in results)
[docs] @context_manager.reader def resource_get_all_active_by_stack(context, stack_id): filters = {'stack_id': stack_id, 'action': 'DELETE', 'status': 'COMPLETE'} subquery = context.session.query(models.Resource.id).filter_by(**filters) results = context.session.query(models.Resource).filter_by( stack_id=stack_id).filter( models.Resource.id.notin_(subquery.scalar_subquery()) ).options( orm.joinedload(models.Resource.attr_data), orm.joinedload(models.Resource.data), orm.joinedload(models.Resource.rsrc_prop_data), ).all() return dict((res.id, res) for res in results)
[docs] @context_manager.reader def resource_get_all_by_root_stack(context, stack_id, filters=None, stack_id_only=False): query = context.session.query( models.Resource ).filter_by( root_stack_id=stack_id ) if stack_id_only: query = query.options( orm.load_only(models.Resource.id, models.Resource.stack_id) ) else: query = query.options( orm.joinedload(models.Resource.attr_data), orm.joinedload(models.Resource.data), orm.joinedload(models.Resource.rsrc_prop_data), ) query = db_filters.exact_filter(query, models.Resource, filters) results = query.all() return dict((res.id, res) for res in results)
[docs] @context_manager.reader def engine_get_all_locked_by_stack(context, stack_id): query = context.session.query( func.distinct(models.Resource.engine_id) ).filter( models.Resource.stack_id == stack_id, models.Resource.engine_id.isnot(None)) return set(i[0] for i in query.all())
[docs] @context_manager.reader def resource_get(context, resource_id, refresh=False, refresh_data=False): return _resource_get(context, resource_id, refresh=refresh, refresh_data=refresh_data)
def _resource_get(context, resource_id, refresh=False, refresh_data=False): result = context.session.query( models.Resource ).filter_by( id=resource_id ).options( orm.joinedload(models.Resource.attr_data), orm.joinedload(models.Resource.data), orm.joinedload(models.Resource.rsrc_prop_data), ).first() if not result: raise exception.NotFound(_("resource with id %s not found") % resource_id) if refresh: context.session.refresh(result) if refresh_data: # ensure data is loaded (lazy or otherwise) result.data return result
[docs] @context_manager.reader def resource_get_by_name_and_stack(context, resource_name, stack_id): result = context.session.query( models.Resource ).filter_by( name=resource_name ).filter_by( stack_id=stack_id ).options( orm.joinedload(models.Resource.attr_data), orm.joinedload(models.Resource.data), orm.joinedload(models.Resource.rsrc_prop_data), ).first() return result
[docs] @context_manager.reader def resource_get_all_by_physical_resource_id(context, physical_resource_id): return list( _resource_get_all_by_physical_resource_id( context, physical_resource_id, ) )
def _resource_get_all_by_physical_resource_id(context, physical_resource_id): results = context.session.query( models.Resource, ).filter_by( physical_resource_id=physical_resource_id, ).options( orm.joinedload(models.Resource.attr_data), orm.joinedload(models.Resource.data), orm.joinedload(models.Resource.rsrc_prop_data), ).all() for result in results: if context is None or context.is_admin or context.project_id in ( result.stack.tenant, result.stack.stack_user_project_id, ): yield result
[docs] @context_manager.reader def resource_get_by_physical_resource_id(context, physical_resource_id): results = _resource_get_all_by_physical_resource_id( context, physical_resource_id, ) try: return next(results) except StopIteration: return None
[docs] @context_manager.reader def resource_get_all(context): results = context.session.query( models.Resource, ).options( orm.joinedload(models.Resource.attr_data), orm.joinedload(models.Resource.data), orm.joinedload(models.Resource.rsrc_prop_data), ).all() if not results: raise exception.NotFound(_('no resources were found')) return results
[docs] @retry_on_db_error @context_manager.writer def resource_purge_deleted(context, stack_id): filters = {'stack_id': stack_id, 'action': 'DELETE', 'status': 'COMPLETE'} query = context.session.query(models.Resource) result = query.filter_by(**filters) attr_ids = [r.attr_data_id for r in result if r.attr_data_id is not None] result.delete() if attr_ids: context.session.query(models.ResourcePropertiesData).filter( models.ResourcePropertiesData.id.in_(attr_ids)).delete( synchronize_session=False)
def _add_atomic_key_to_values(values, atomic_key): if atomic_key is None: values['atomic_key'] = 1 else: values['atomic_key'] = atomic_key + 1
[docs] @retry_on_db_error @context_manager.writer def resource_update(context, resource_id, values, atomic_key, expected_engine_id=None): return _resource_update( context, resource_id, values, atomic_key, expected_engine_id=expected_engine_id, )
def _resource_update( context, resource_id, values, atomic_key, expected_engine_id=None, ): _add_atomic_key_to_values(values, atomic_key) # Keep truncation as the last transformation before DB write values = _truncate_status_reason(values) rows_updated = context.session.query(models.Resource).filter_by( id=resource_id, engine_id=expected_engine_id, atomic_key=atomic_key).update(values) return bool(rows_updated)
[docs] @context_manager.writer def resource_update_and_save(context, resource_id, values): resource = context.session.get(models.Resource, resource_id) # Keep truncation as the last transformation before DB write values = _truncate_status_reason(values) resource.update(values) resource.save(context.session) return _resource_get(context, resource.id)
[docs] @context_manager.writer def resource_delete(context, resource_id): resource = context.session.get(models.Resource, resource_id) if resource: context.session.delete(resource) if resource.attr_data_id is not None: attr_prop_data = context.session.get( models.ResourcePropertiesData, resource.attr_data_id) context.session.delete(attr_prop_data)
[docs] @context_manager.writer def resource_exchange_stacks(context, resource_id1, resource_id2): res1 = context.session.get(models.Resource, resource_id1) res2 = context.session.get(models.Resource, resource_id2) res1.stack, res2.stack = res2.stack, res1.stack
[docs] @context_manager.writer def resource_attr_id_set(context, resource_id, atomic_key, attr_id): values = {'attr_data_id': attr_id} _add_atomic_key_to_values(values, atomic_key) rows_updated = context.session.query(models.Resource).filter(and_( models.Resource.id == resource_id, models.Resource.atomic_key == atomic_key, models.Resource.engine_id.is_(None), or_(models.Resource.attr_data_id == attr_id, models.Resource.attr_data_id.is_(None)))).update( values) if rows_updated > 0: return True else: # Someone else set the attr_id first and/or we have a stale # view of the resource based on atomic_key, so delete the # resource_properties_data (attr) DB row. LOG.debug('Not updating res_id %(rid)s with attr_id %(aid)s', {'rid': resource_id, 'aid': attr_id}) context.session.query( models.ResourcePropertiesData).filter( models.ResourcePropertiesData.id == attr_id).delete() return False
[docs] @context_manager.writer def resource_attr_data_delete(context, resource_id, attr_id): resource = context.session.get(models.Resource, resource_id) attr_prop_data = context.session.get( models.ResourcePropertiesData, attr_id) if resource: resource.update({'attr_data_id': None}) if attr_prop_data: context.session.delete(attr_prop_data)
# resource data
[docs] @context_manager.reader def resource_data_get_all(context, resource_id, data=None): """Looks up resource_data by resource.id. If data is encrypted, this method will decrypt the results. """ if data is None: data = (context.session.query(models.ResourceData) .filter_by(resource_id=resource_id)).all() if not data: raise exception.NotFound(_('no resource data found')) ret = {} for res in data: if res.redact: try: ret[res.key] = crypt.decrypt(res.decrypt_method, res.value) continue except exception.InvalidEncryptionKey: LOG.exception('Failed to decrypt resource data %(rkey)s ' 'for %(rid)s, ignoring.', {'rkey': res.key, 'rid': resource_id}) ret[res.key] = res.value return ret
[docs] @context_manager.reader def resource_data_get(context, resource_id, key): """Lookup value of resource's data by key. Decrypts resource data if necessary. """ result = _resource_data_get_by_key(context, resource_id, key) if result.redact: return crypt.decrypt(result.decrypt_method, result.value) return result.value
[docs] @context_manager.reader def resource_data_get_by_key(context, resource_id, key): return _resource_data_get_by_key(context, resource_id, key)
def _resource_data_get_by_key(context, resource_id, key): """Looks up resource_data by resource_id and key. Does not decrypt resource_data. """ result = (context.session.query(models.ResourceData) .filter_by(resource_id=resource_id) .filter_by(key=key).first()) if not result: raise exception.NotFound(_('No resource data found')) return result
[docs] @context_manager.writer def resource_data_set(context, resource_id, key, value, redact=False): """Save resource's key/value pair to database.""" if redact: method, value = crypt.encrypt(value) else: method = '' try: current = _resource_data_get_by_key(context, resource_id, key) except exception.NotFound: current = models.ResourceData() current.key = key current.resource_id = resource_id current.redact = redact current.value = value current.decrypt_method = method current.save(session=context.session) return current
[docs] @context_manager.writer def resource_data_delete(context, resource_id, key): result = _resource_data_get_by_key(context, resource_id, key) context.session.delete(result)
# resource properties data
[docs] @context_manager.writer def resource_prop_data_create_or_update(context, values, rpd_id=None): return _resource_prop_data_create_or_update(context, values, rpd_id=rpd_id)
def _resource_prop_data_create_or_update(context, values, rpd_id=None): obj_ref = None if rpd_id is not None: obj_ref = context.session.query( models.ResourcePropertiesData).filter_by(id=rpd_id).first() if obj_ref is None: obj_ref = models.ResourcePropertiesData() obj_ref.update(values) obj_ref.save(context.session) return obj_ref
[docs] @context_manager.writer def resource_prop_data_create(context, values): return _resource_prop_data_create_or_update(context, values)
[docs] @context_manager.reader def resource_prop_data_get(context, resource_prop_data_id): result = context.session.get( models.ResourcePropertiesData, resource_prop_data_id) if result is None: raise exception.NotFound( _('ResourcePropertiesData with id %s not found') % resource_prop_data_id) return result
# stack
[docs] @context_manager.reader def stack_get_by_name_and_owner_id(context, stack_name, owner_id): query = _soft_delete_aware_query( context, models.Stack ).options( orm.joinedload(models.Stack.raw_template), ).filter( sqlalchemy.or_( models.Stack.tenant == context.project_id, models.Stack.stack_user_project_id == context.project_id, ) ).filter_by(name=stack_name).filter_by(owner_id=owner_id) return query.first()
[docs] @context_manager.reader def stack_get_by_name(context, stack_name): return _stack_get_by_name(context, stack_name)
def _stack_get_by_name(context, stack_name): query = _soft_delete_aware_query( context, models.Stack ).options( orm.joinedload(models.Stack.raw_template), ).filter( sqlalchemy.or_( models.Stack.tenant == context.project_id, models.Stack.stack_user_project_id == context.project_id), ).filter_by(name=stack_name) return query.order_by(models.Stack.created_at).first()
[docs] @context_manager.reader def stack_get(context, stack_id, show_deleted=False, eager_load=True): return _stack_get( context, stack_id, show_deleted=show_deleted, eager_load=eager_load )
def _stack_get(context, stack_id, show_deleted=False, eager_load=True): options = [] if eager_load: options.append(orm.joinedload(models.Stack.raw_template)) result = context.session.get(models.Stack, stack_id, options=options) deleted_ok = show_deleted or context.show_deleted if result is None or result.deleted_at is not None and not deleted_ok: return None # One exception to normal project scoping is users created by the # stacks in the stack_user_project_id (in the heat stack user domain) if (result is not None and context is not None and not context.is_admin and context.project_id not in (result.tenant, result.stack_user_project_id)): return None return result
[docs] @context_manager.reader def stack_get_status(context, stack_id): query = context.session.query(models.Stack) query = query.options( orm.load_only( models.Stack.action, models.Stack.status, models.Stack.status_reason, models.Stack.updated_at, ) ) result = query.filter_by(id=stack_id).first() if result is None: raise exception.NotFound(_('Stack with id %s not found') % stack_id) return (result.action, result.status, result.status_reason, result.updated_at)
[docs] @context_manager.reader def stack_get_all_by_owner_id(context, owner_id): return _stack_get_all_by_owner_id(context, owner_id)
def _stack_get_all_by_owner_id(context, owner_id): results = _soft_delete_aware_query( context, models.Stack, ).filter_by( owner_id=owner_id, backup=False, ).all() return results
[docs] @context_manager.reader def stack_get_all_by_root_owner_id(context, owner_id): return list(_stack_get_all_by_root_owner_id(context, owner_id))
def _stack_get_all_by_root_owner_id(context, owner_id): for stack in _stack_get_all_by_owner_id(context, owner_id): yield stack for ch_st in _stack_get_all_by_root_owner_id(context, stack.id): yield ch_st def _get_sort_keys(sort_keys, mapping): """Returns an array containing only allowed keys :param sort_keys: an array of strings :param mapping: a mapping from keys to DB column names :returns: filtered list of sort keys """ if isinstance(sort_keys, str): sort_keys = [sort_keys] return [mapping[key] for key in sort_keys or [] if key in mapping] def _paginate_query(context, query, model, limit=None, sort_keys=None, marker=None, sort_dir=None): default_sort_keys = ['created_at'] if not sort_keys: sort_keys = default_sort_keys if not sort_dir: sort_dir = 'desc' # This assures the order of the stacks will always be the same # even for sort_key values that are not unique in the database sort_keys = sort_keys + ['id'] model_marker = None if marker: model_marker = context.session.get(model, marker) try: query = utils.paginate_query(query, model, limit, sort_keys, model_marker, sort_dir) except utils.InvalidSortKey as exc: raise exception.Invalid(reason=str(exc)) return query def _query_stack_get_all(context, show_deleted=False, show_nested=False, show_hidden=False, tags=None, tags_any=None, not_tags=None, not_tags_any=None): if show_nested: query = _soft_delete_aware_query( context, models.Stack, show_deleted=show_deleted ).filter_by(backup=False) else: query = _soft_delete_aware_query( context, models.Stack, show_deleted=show_deleted ).filter_by(owner_id=None) if not context.is_admin: query = query.filter_by(tenant=context.project_id) query = query.options(orm.subqueryload(models.Stack.tags)) if tags: for tag in tags: tag_alias = orm.aliased(models.StackTag) query = query.join(tag_alias, models.Stack.tags) query = query.filter(tag_alias.tag == tag) if tags_any: query = query.filter( models.Stack.tags.any( models.StackTag.tag.in_(tags_any))) if not_tags: subquery = _soft_delete_aware_query( context, models.Stack, show_deleted=show_deleted ) for tag in not_tags: tag_alias = orm.aliased(models.StackTag) subquery = subquery.join(tag_alias, models.Stack.tags) subquery = subquery.filter(tag_alias.tag == tag) not_stack_ids = [s.id for s in subquery.all()] query = query.filter(models.Stack.id.notin_(not_stack_ids)) if not_tags_any: query = query.filter( ~models.Stack.tags.any( models.StackTag.tag.in_(not_tags_any))) if not show_hidden and cfg.CONF.hidden_stack_tags: query = query.filter( ~models.Stack.tags.any( models.StackTag.tag.in_(cfg.CONF.hidden_stack_tags))) return query
[docs] @context_manager.reader def stack_get_all(context, limit=None, sort_keys=None, marker=None, sort_dir=None, filters=None, show_deleted=False, show_nested=False, show_hidden=False, tags=None, tags_any=None, not_tags=None, not_tags_any=None, eager_load=False): query = _query_stack_get_all(context, show_deleted=show_deleted, show_nested=show_nested, show_hidden=show_hidden, tags=tags, tags_any=tags_any, not_tags=not_tags, not_tags_any=not_tags_any) if eager_load: query = query.options(orm.joinedload(models.Stack.raw_template)) return _filter_and_page_query(context, query, limit, sort_keys, marker, sort_dir, filters).all()
def _filter_and_page_query(context, query, limit=None, sort_keys=None, marker=None, sort_dir=None, filters=None): if filters is None: filters = {} sort_key_map = {rpc_api.STACK_NAME: models.Stack.name.key, rpc_api.STACK_STATUS: models.Stack.status.key, rpc_api.STACK_CREATION_TIME: models.Stack.created_at.key, rpc_api.STACK_UPDATED_TIME: models.Stack.updated_at.key} valid_sort_keys = _get_sort_keys(sort_keys, sort_key_map) query = db_filters.exact_filter(query, models.Stack, filters) return _paginate_query(context, query, models.Stack, limit, valid_sort_keys, marker, sort_dir)
[docs] @context_manager.reader def stack_count_all(context, filters=None, show_deleted=False, show_nested=False, show_hidden=False, tags=None, tags_any=None, not_tags=None, not_tags_any=None): query = _query_stack_get_all(context, show_deleted=show_deleted, show_nested=show_nested, show_hidden=show_hidden, tags=tags, tags_any=tags_any, not_tags=not_tags, not_tags_any=not_tags_any) query = db_filters.exact_filter(query, models.Stack, filters) return query.count()
[docs] @context_manager.writer def stack_create(context, values): stack_ref = models.Stack() stack_ref.update(values) stack_name = stack_ref.name stack_ref.save(context.session) # Even though we just created a stack with this name, we may not find # it again because some unit tests create stacks with deleted_at set. Also # some backup stacks may not be found, for reasons that are unclear. earliest = _stack_get_by_name(context, stack_name) if earliest is not None and earliest.id != stack_ref.id: context.session.query(models.Stack).filter_by( id=stack_ref.id, ).delete() raise exception.StackExists(stack_name=stack_name) return stack_ref
[docs] @retry_on_db_error @context_manager.writer def stack_update(context, stack_id, values, exp_trvsl=None): # Keep truncation as the last transformation before DB write values = _truncate_status_reason(values) query = (context.session.query(models.Stack) .filter(and_(models.Stack.id == stack_id), (models.Stack.deleted_at.is_(None)))) if not context.is_admin: query = query.filter(sqlalchemy.or_( models.Stack.tenant == context.project_id, models.Stack.stack_user_project_id == context.project_id)) if exp_trvsl is not None: query = query.filter(models.Stack.current_traversal == exp_trvsl) rows_updated = query.update(values, synchronize_session=False) if not rows_updated: LOG.debug('Did not set stack state with values ' '%(vals)s, stack id: %(id)s with ' 'expected traversal: %(trav)s', {'id': stack_id, 'vals': str(values), 'trav': str(exp_trvsl)}) if not _stack_get(context, stack_id, eager_load=False): raise exception.NotFound( _('Attempt to update a stack with id: ' '%(id)s %(msg)s') % { 'id': stack_id, 'msg': 'that does not exist'}) return (rows_updated is not None and rows_updated > 0)
[docs] @context_manager.writer def stack_delete(context, stack_id): s = _stack_get(context, stack_id, eager_load=False) if not s: raise exception.NotFound(_('Attempt to delete a stack with id: ' '%(id)s %(msg)s') % { 'id': stack_id, 'msg': 'that does not exist'}) attr_ids = [] # normally the resources are deleted already by this point for r in s.resources: if r.attr_data_id is not None: attr_ids.append(r.attr_data_id) context.session.delete(r) if attr_ids: context.session.query( models.ResourcePropertiesData.id).filter( models.ResourcePropertiesData.id.in_(attr_ids)).delete( synchronize_session=False) _soft_delete(context, s)
[docs] @context_manager.writer def reset_stack_status(context, stack_id): return _reset_stack_status(context, stack_id)
# NOTE(stephenfin): This method uses separate transactions to delete nested # stacks, thus it's the only private method that is allowed to open a # transaction (via 'context.session.begin') def _reset_stack_status(context, stack_id, stack=None): if stack is None: stack = context.session.get(models.Stack, stack_id) if stack is None: raise exception.NotFound(_('Stack with id %s not found') % stack_id) query = context.session.query(models.Resource).filter_by( status='IN_PROGRESS', stack_id=stack_id) query.update({'status': 'FAILED', 'status_reason': 'Stack status manually reset', 'engine_id': None}) query = context.session.query(models.ResourceData) query = query.join(models.Resource) query = query.filter_by(stack_id=stack_id) query = query.filter( models.ResourceData.key.in_(heat_environment.HOOK_TYPES)) data_ids = [data.id for data in query] if data_ids: query = context.session.query(models.ResourceData) query = query.filter(models.ResourceData.id.in_(data_ids)) query.delete(synchronize_session='fetch') # commit what we've done already context.session.commit() query = context.session.query(models.Stack).filter_by(owner_id=stack_id) for child in query: _reset_stack_status(context, child.id, child) if stack.status == 'IN_PROGRESS': stack.status = 'FAILED' stack.status_reason = 'Stack status manually reset' context.session.query( models.StackLock ).filter_by(stack_id=stack_id).delete()
[docs] @context_manager.writer def stack_tags_set(context, stack_id, tags): _stack_tags_delete(context, stack_id) result = [] for tag in tags: stack_tag = models.StackTag() stack_tag.tag = tag stack_tag.stack_id = stack_id stack_tag.save(session=context.session) result.append(stack_tag) return result or None
[docs] @context_manager.writer def stack_tags_delete(context, stack_id): return _stack_tags_delete(context, stack_id)
def _stack_tags_delete(context, stack_id): result = _stack_tags_get(context, stack_id) if result: for tag in result: context.session.delete(tag)
[docs] @context_manager.reader def stack_tags_get(context, stack_id): return _stack_tags_get(context, stack_id)
def _stack_tags_get(context, stack_id): result = (context.session.query(models.StackTag) .filter_by(stack_id=stack_id) .all()) return result or None # stack lock def _is_duplicate_error(exc): return isinstance(exc, db_exception.DBDuplicateEntry)
[docs] @oslo_db_api.wrap_db_retry(max_retries=3, retry_on_deadlock=True, retry_on_disconnect=True, retry_interval=0.5, inc_retry_interval=True, exception_checker=_is_duplicate_error) def stack_lock_create(context, stack_id, engine_id): with context_manager.writer.independent.using(context) as session: lock = session.get(models.StackLock, stack_id) if lock is not None: return lock.engine_id session.add(models.StackLock(stack_id=stack_id, engine_id=engine_id))
[docs] def stack_lock_get_engine_id(context, stack_id): with context_manager.reader.independent.using(context) as session: lock = session.get(models.StackLock, stack_id) if lock is not None: return lock.engine_id
[docs] @context_manager.writer def persist_state_and_release_lock(context, stack_id, engine_id, values): rows_updated = (context.session.query(models.Stack) .filter(models.Stack.id == stack_id) .update(values, synchronize_session=False)) rows_affected = None if rows_updated is not None and rows_updated > 0: rows_affected = context.session.query( models.StackLock ).filter_by(stack_id=stack_id, engine_id=engine_id).delete() if not rows_affected: return True
[docs] def stack_lock_steal(context, stack_id, old_engine_id, new_engine_id): with context_manager.writer.independent.using(context) as session: lock = session.get(models.StackLock, stack_id) rows_affected = session.query( models.StackLock ).filter_by(stack_id=stack_id, engine_id=old_engine_id ).update({"engine_id": new_engine_id}) if not rows_affected: return lock.engine_id if lock is not None else True
[docs] def stack_lock_release(context, stack_id, engine_id): with context_manager.writer.independent.using(context) as session: rows_affected = session.query( models.StackLock ).filter_by(stack_id=stack_id, engine_id=engine_id).delete() if not rows_affected: return True
[docs] @context_manager.reader def stack_get_root_id(context, stack_id): s = _stack_get(context, stack_id, eager_load=False) if not s: return None while s.owner_id: s = _stack_get(context, s.owner_id, eager_load=False) return s.id
[docs] @context_manager.reader def stack_count_total_resources(context, stack_id): # count all resources which belong to the root stack return context.session.query( func.count(models.Resource.id) ).filter_by(root_stack_id=stack_id).scalar()
# user credentials
[docs] @context_manager.writer def user_creds_create(context): values = context.to_dict() user_creds_ref = models.UserCreds() if values.get('trust_id'): method, trust_id = crypt.encrypt(values.get('trust_id')) user_creds_ref.trust_id = trust_id user_creds_ref.decrypt_method = method user_creds_ref.trustor_user_id = values.get('trustor_user_id') user_creds_ref.username = None user_creds_ref.password = None user_creds_ref.tenant = values.get('project_name') user_creds_ref.tenant_id = values.get('project_id') user_creds_ref.auth_url = values.get('auth_url') user_creds_ref.region_name = values.get('region_name') else: user_creds_ref.update(values) user_creds_ref.tenant = values.get('project_name') user_creds_ref.tenant_id = values.get('project_id') method, password = crypt.encrypt(values['password']) if len(str(password)) > 255: raise exception.Error(_("Length of OS_PASSWORD after encryption" " exceeds Heat limit (255 chars)")) user_creds_ref.password = password user_creds_ref.decrypt_method = method user_creds_ref.save(context.session) result = dict(user_creds_ref) if values.get('trust_id'): result['trust_id'] = values.get('trust_id') else: result['password'] = values.get('password') return result
[docs] @context_manager.reader def user_creds_get(context, user_creds_id): db_result = context.session.get(models.UserCreds, user_creds_id) if db_result is None: return None # Return a dict copy of DB results, do not decrypt details into db_result # or it can be committed back to the DB in decrypted form result = dict(db_result) del result['decrypt_method'] result['password'] = crypt.decrypt( db_result.decrypt_method, result['password']) result['trust_id'] = crypt.decrypt( db_result.decrypt_method, result['trust_id']) return result
[docs] @db_utils.retry_on_stale_data_error @context_manager.writer def user_creds_delete(context, user_creds_id): creds = context.session.get(models.UserCreds, user_creds_id) if not creds: raise exception.NotFound( _('Attempt to delete user creds with id ' '%(id)s that does not exist') % {'id': user_creds_id}) context.session.delete(creds)
# event
[docs] @context_manager.reader def event_get_all_by_tenant(context, limit=None, marker=None, sort_keys=None, sort_dir=None, filters=None): query = context.session.query(models.Event) query = db_filters.exact_filter(query, models.Event, filters) query = query.join( models.Event.stack ).filter_by(tenant=context.project_id).filter_by(deleted_at=None) filters = None return _events_filter_and_page_query(context, query, limit, marker, sort_keys, sort_dir, filters).all()
[docs] @context_manager.reader def event_get_all_by_stack(context, stack_id, limit=None, marker=None, sort_keys=None, sort_dir=None, filters=None): query = context.session.query(models.Event).filter_by(stack_id=stack_id) if filters and 'uuid' in filters: # retrieving a single event, so eager load its rsrc_prop_data detail query = query.options(orm.joinedload(models.Event.rsrc_prop_data)) return _events_filter_and_page_query(context, query, limit, marker, sort_keys, sort_dir, filters).all()
def _events_paginate_query(context, query, model, limit=None, sort_keys=None, marker=None, sort_dir=None): default_sort_keys = ['created_at'] if not sort_keys: sort_keys = default_sort_keys if not sort_dir: sort_dir = 'desc' # This assures the order of the stacks will always be the same # even for sort_key values that are not unique in the database sort_keys = sort_keys + ['id'] model_marker = None if marker: # not to use context.session.get(model, marker), because # user can only see the ID(column 'uuid') and the ID as the marker model_marker = context.session.query( model).filter_by(uuid=marker).first() try: query = utils.paginate_query(query, model, limit, sort_keys, model_marker, sort_dir) except utils.InvalidSortKey as exc: raise exception.Invalid(reason=str(exc)) return query def _events_filter_and_page_query(context, query, limit=None, marker=None, sort_keys=None, sort_dir=None, filters=None): if filters is None: filters = {} sort_key_map = {rpc_api.EVENT_TIMESTAMP: models.Event.created_at.key, rpc_api.EVENT_RES_TYPE: models.Event.resource_type.key} valid_sort_keys = _get_sort_keys(sort_keys, sort_key_map) query = db_filters.exact_filter(query, models.Event, filters) return _events_paginate_query(context, query, models.Event, limit, valid_sort_keys, marker, sort_dir)
[docs] @context_manager.reader def event_count_all_by_stack(context, stack_id): return _event_count_all_by_stack(context, stack_id)
def _event_count_all_by_stack(context, stack_id): query = context.session.query(func.count(models.Event.id)) return query.filter_by(stack_id=stack_id).scalar() def _find_rpd_references(context, stack_id): ev_ref_ids = set( e.rsrc_prop_data_id for e in context.session.query(models.Event).filter_by( stack_id=stack_id, ).all() ) rsrc_ref_ids = set( r.rsrc_prop_data_id for r in context.session.query(models.Resource).filter_by( stack_id=stack_id, ).all() ) return ev_ref_ids | rsrc_ref_ids def _all_backup_stack_ids(context, stack_id): """Iterate over all the IDs of all stacks related as stack/backup pairs. All backup stacks of a main stack, past and present (i.e. including those that are soft deleted), are included. The main stack itself is also included if the initial ID passed in is for a backup stack. The initial ID passed in is never included in the output. """ stack = context.session.get(models.Stack, stack_id) if stack is None: LOG.error('Stack %s not found', stack_id) return is_backup = stack.name.endswith('*') if is_backup: main = context.session.get(models.Stack, stack.owner_id) if main is None: LOG.error('Main stack for backup "%s" %s not found', stack.name, stack_id) return yield main.id for backup_id in _all_backup_stack_ids(context, main.id): if backup_id != stack_id: yield backup_id else: q_backup = context.session.query(models.Stack).filter(sqlalchemy.or_( models.Stack.tenant == context.project_id, models.Stack.stack_user_project_id == context.project_id)) q_backup = q_backup.filter_by(name=stack.name + '*') q_backup = q_backup.filter_by(owner_id=stack_id) for backup in q_backup.all(): yield backup.id def _delete_event_rows(context, stack_id, limit): # MySQL does not support LIMIT in subqueries, # sqlite does not support JOIN in DELETE. # So we must manually supply the IN() values. # pgsql SHOULD work with the pure DELETE/JOIN below but that must be # confirmed via integration tests. query = context.session.query(models.Event).filter_by( stack_id=stack_id, ) query = query.order_by(models.Event.id).limit(limit) id_pairs = [(e.id, e.rsrc_prop_data_id) for e in query.all()] if not id_pairs: return 0 (ids, rsrc_prop_ids) = zip(*id_pairs) max_id = ids[-1] # delete the events retval = context.session.query(models.Event).filter( models.Event.id <= max_id).filter( models.Event.stack_id == stack_id).delete() # delete unreferenced resource_properties_data def del_rpd(rpd_ids): if not rpd_ids: return q_rpd = context.session.query(models.ResourcePropertiesData) q_rpd = q_rpd.filter(models.ResourcePropertiesData.id.in_(rpd_ids)) q_rpd.delete(synchronize_session=False) if rsrc_prop_ids: clr_prop_ids = set(rsrc_prop_ids) - _find_rpd_references(context, stack_id) clr_prop_ids.discard(None) try: del_rpd(clr_prop_ids) except db_exception.DBReferenceError: LOG.debug('Checking backup/stack pairs for RPD references') found = False for partner_stack_id in _all_backup_stack_ids(context, stack_id): found = True clr_prop_ids -= _find_rpd_references(context, partner_stack_id) if not found: LOG.debug('No backup/stack pairs found for %s', stack_id) raise del_rpd(clr_prop_ids) return retval
[docs] @retry_on_db_error @context_manager.writer def event_create(context, values): if 'stack_id' in values and cfg.CONF.max_events_per_stack: # only count events and purge on average # 200.0/cfg.CONF.event_purge_batch_size percent of the time. check = (2.0 / cfg.CONF.event_purge_batch_size) > random.uniform(0, 1) if ( check and _event_count_all_by_stack( context, values['stack_id'] ) >= cfg.CONF.max_events_per_stack ): # prune try: _delete_event_rows( context, values['stack_id'], cfg.CONF.event_purge_batch_size, ) except db_exception.DBError as exc: LOG.error('Failed to purge events: %s', str(exc)) event_ref = models.Event() event_ref.update(values) event_ref.save(context.session) result = context.session.query(models.Event).filter_by( id=event_ref.id, ).options( orm.joinedload(models.Event.rsrc_prop_data) ).first() return result
# software config
[docs] @context_manager.writer def software_config_create(context, values): obj_ref = models.SoftwareConfig() obj_ref.update(values) obj_ref.save(context.session) return obj_ref
[docs] @context_manager.reader def software_config_get(context, config_id): return _software_config_get(context, config_id)
def _software_config_get(context, config_id): result = context.session.get(models.SoftwareConfig, config_id) if (result is not None and context is not None and not context.is_admin and result.tenant != context.project_id): result = None if not result: raise exception.NotFound(_('Software config with id %s not found') % config_id) return result
[docs] @context_manager.reader def software_config_get_all(context, limit=None, marker=None): query = context.session.query(models.SoftwareConfig) if not context.is_admin: query = query.filter_by(tenant=context.project_id) return _paginate_query(context, query, models.SoftwareConfig, limit=limit, marker=marker).all()
[docs] @context_manager.reader def software_config_count_all(context): query = context.session.query(models.SoftwareConfig) if not context.is_admin: query = query.filter_by(tenant=context.project_id) return query.count()
[docs] @context_manager.writer def software_config_delete(context, config_id): config = _software_config_get(context, config_id) # Query if the software config has been referenced by deployment. result = context.session.query(models.SoftwareDeployment).filter_by( config_id=config_id).first() if result: msg = (_("Software config with id %s can not be deleted as " "it is referenced.") % config_id) raise exception.InvalidRestrictedAction(message=msg) context.session.delete(config)
# software deployment
[docs] @context_manager.writer def software_deployment_create(context, values): obj_ref = models.SoftwareDeployment() obj_ref.update(values) try: obj_ref.save(context.session) except db_exception.DBReferenceError: # NOTE(tkajinam): config_id is the only FK in SoftwareDeployment err_msg = _('Config with id %s not found') % values['config_id'] raise exception.Invalid(reason=err_msg) return _software_deployment_get(context, obj_ref.id)
[docs] @context_manager.reader def software_deployment_get(context, deployment_id): return _software_deployment_get(context, deployment_id)
def _software_deployment_get(context, deployment_id): # TODO(stephenfin): Why doesn't options work with session.get? result = context.session.query( models.SoftwareDeployment, ).filter_by( id=deployment_id, ).options( orm.joinedload(models.SoftwareDeployment.config), ).first() if (result is not None and context is not None and not context.is_admin and context.project_id not in (result.tenant, result.stack_user_project_id)): result = None if not result: raise exception.NotFound(_('Deployment with id %s not found') % deployment_id) return result
[docs] @context_manager.reader def software_deployment_get_all(context, server_id=None): sd = models.SoftwareDeployment query = context.session.query(sd).order_by(sd.created_at) if not context.is_admin: query = query.filter( sqlalchemy.or_( sd.tenant == context.project_id, sd.stack_user_project_id == context.project_id, ) ) if server_id: query = query.filter_by(server_id=server_id) query = query.join( models.SoftwareDeployment.config, ).options( orm.contains_eager(models.SoftwareDeployment.config) ) return query.all()
[docs] @context_manager.reader def software_deployment_count_all(context): sd = models.SoftwareDeployment query = context.session.query(sd) if not context.is_admin: query = query.filter( sqlalchemy.or_( sd.tenant == context.project_id, sd.stack_user_project_id == context.project_id, ) ) return query.count()
[docs] @context_manager.writer def software_deployment_update(context, deployment_id, values): deployment = _software_deployment_get(context, deployment_id) try: for k, v in values.items(): setattr(deployment, k, v) except db_exception.DBReferenceError: # NOTE(tkajinam): config_id is the only FK in SoftwareDeployment err_msg = _('Config with id %s not found') % values['config_id'] raise exception.Invalid(reason=err_msg) return deployment
[docs] @context_manager.writer def software_deployment_delete(context, deployment_id): deployment = _software_deployment_get(context, deployment_id) context.session.delete(deployment)
# snapshot
[docs] @context_manager.writer def snapshot_create(context, values): obj_ref = models.Snapshot() obj_ref.update(values) obj_ref.save(context.session) return obj_ref
[docs] @context_manager.reader def snapshot_get(context, snapshot_id): return _snapshot_get(context, snapshot_id)
def _snapshot_get(context, snapshot_id): result = context.session.get(models.Snapshot, snapshot_id) if (result is not None and context is not None and context.project_id != result.tenant): result = None if not result: raise exception.NotFound(_('Snapshot with id %s not found') % snapshot_id) return result
[docs] @context_manager.reader def snapshot_get_by_stack(context, snapshot_id, stack): snapshot = _snapshot_get(context, snapshot_id) if snapshot.stack_id != stack.id: raise exception.SnapshotNotFound(snapshot=snapshot_id, stack=stack.name) return snapshot
[docs] @context_manager.writer def snapshot_update(context, snapshot_id, values): snapshot = _snapshot_get(context, snapshot_id) snapshot.update(values) snapshot.save(context.session) return snapshot
[docs] @context_manager.writer def snapshot_delete(context, snapshot_id): snapshot = _snapshot_get(context, snapshot_id) context.session.delete(snapshot)
[docs] @context_manager.reader def snapshot_get_all_by_stack(context, stack_id): return context.session.query(models.Snapshot).filter_by( stack_id=stack_id, tenant=context.project_id)
[docs] @context_manager.reader def snapshot_count_all_by_stack(context, stack_id): return context.session.query(models.Snapshot).filter_by( stack_id=stack_id, tenant=context.project_id).count()
# service
[docs] @context_manager.writer def service_create(context, values): service = models.Service() service.update(values) service.save(context.session) return service
[docs] @context_manager.writer def service_update(context, service_id, values): service = _service_get(context, service_id) values.update({'updated_at': timeutils.utcnow()}) service.update(values) service.save(context.session) return service
[docs] @context_manager.writer def service_delete(context, service_id, soft_delete=True): service = _service_get(context, service_id) if soft_delete: _soft_delete(context, service) else: context.session.delete(service)
[docs] @context_manager.reader def service_get(context, service_id): return _service_get(context, service_id)
def _service_get(context, service_id): result = context.session.get(models.Service, service_id) if result is None: raise exception.EntityNotFound(entity='Service', name=service_id) return result
[docs] @context_manager.reader def service_get_all(context): return context.session.query(models.Service).filter_by( deleted_at=None, ).all()
[docs] @context_manager.reader def service_get_all_by_args(context, host, binary, hostname): return (context.session.query(models.Service). filter_by(host=host). filter_by(binary=binary). filter_by(hostname=hostname).all())
# purge
[docs] def purge_deleted(age, granularity='days', project_id=None, batch_size=20): def _validate_positive_integer(val, argname): try: val = int(val) except ValueError: raise exception.Error(_("%s should be an integer") % argname) if val < 0: raise exception.Error(_("%s should be a positive integer") % argname) return val age = _validate_positive_integer(age, 'age') batch_size = _validate_positive_integer(batch_size, 'batch_size') if granularity not in ('days', 'hours', 'minutes', 'seconds'): raise exception.Error( _("granularity should be days, hours, minutes, or seconds")) if granularity == 'days': age = age * 86400 elif granularity == 'hours': age = age * 3600 elif granularity == 'minutes': age = age * 60 time_line = timeutils.utcnow() - datetime.timedelta(seconds=age) engine = get_engine() meta = sqlalchemy.MetaData() with engine.connect() as conn, conn.begin(): stack = sqlalchemy.Table('stack', meta, autoload_with=conn) service = sqlalchemy.Table('service', meta, autoload_with=conn) # Purge deleted services srvc_del = service.delete().where(service.c.deleted_at < time_line) with engine.connect() as conn, conn.begin(): conn.execute(srvc_del) # find the soft-deleted stacks that are past their expiry sel = sqlalchemy.select( stack.c.id, stack.c.raw_template_id, stack.c.prev_raw_template_id, stack.c.user_creds_id, stack.c.action, stack.c.status, stack.c.name) if project_id: stack_where = sel.where(and_( stack.c.tenant == project_id, stack.c.deleted_at < time_line)) else: stack_where = sel.where( stack.c.deleted_at < time_line) with engine.connect() as conn, conn.begin(): stacks = conn.execute(stack_where) while True: next_stacks_to_purge = list(itertools.islice(stacks, batch_size)) if len(next_stacks_to_purge): _purge_stacks(next_stacks_to_purge, engine, meta) else: break
@oslo_db_api.wrap_db_retry(max_retries=3, retry_on_deadlock=True, retry_interval=0.5, inc_retry_interval=True) def _purge_stacks(stack_infos, engine, meta): """Purge some stacks and their releated events, raw_templates, etc. stack_infos is a list of lists of selected stack columns: [[id, raw_template_id, prev_raw_template_id, user_creds_id, action, status, name], ...] """ with engine.connect() as conn, conn.begin(): stack = sqlalchemy.Table('stack', meta, autoload_with=conn) stack_lock = sqlalchemy.Table('stack_lock', meta, autoload_with=conn) stack_tag = sqlalchemy.Table('stack_tag', meta, autoload_with=conn) resource = sqlalchemy.Table('resource', meta, autoload_with=conn) resource_data = sqlalchemy.Table( 'resource_data', meta, autoload_with=conn) resource_properties_data = sqlalchemy.Table( 'resource_properties_data', meta, autoload_with=conn) event = sqlalchemy.Table('event', meta, autoload_with=conn) raw_template = sqlalchemy.Table( 'raw_template', meta, autoload_with=conn) raw_template_files = sqlalchemy.Table( 'raw_template_files', meta, autoload_with=conn) user_creds = sqlalchemy.Table('user_creds', meta, autoload_with=conn) syncpoint = sqlalchemy.Table('sync_point', meta, autoload_with=conn) stack_info_str = ','.join([str(i) for i in stack_infos]) LOG.info("Purging stacks %s", stack_info_str) # TODO(cwolfe): find a way to make this re-entrant with # reasonably sized transactions (good luck), or add # a cleanup for orphaned rows. stack_ids = [stack_info[0] for stack_info in stack_infos] # delete stack locks (just in case some got stuck) stack_lock_del = stack_lock.delete().where( stack_lock.c.stack_id.in_(stack_ids)) with engine.connect() as conn, conn.begin(): conn.execute(stack_lock_del) # delete stack tags stack_tag_del = stack_tag.delete().where( stack_tag.c.stack_id.in_(stack_ids)) with engine.connect() as conn, conn.begin(): conn.execute(stack_tag_del) # delete resource_data res_where = sqlalchemy.select(resource.c.id).where( resource.c.stack_id.in_(stack_ids)) res_data_del = resource_data.delete().where( resource_data.c.resource_id.in_(res_where)) with engine.connect() as conn, conn.begin(): conn.execute(res_data_del) # clean up any sync_points that may have lingered sync_del = syncpoint.delete().where( syncpoint.c.stack_id.in_(stack_ids)) with engine.connect() as conn, conn.begin(): conn.execute(sync_del) # get rsrc_prop_data_ids to delete rsrc_prop_data_where = sqlalchemy.select( resource.c.rsrc_prop_data_id, ).where( resource.c.stack_id.in_(stack_ids)) with engine.connect() as conn, conn.begin(): rsrc_prop_data_ids = set( [i[0] for i in list(conn.execute(rsrc_prop_data_where))] ) rsrc_prop_data_where = sqlalchemy.select( resource.c.attr_data_id, ).where( resource.c.stack_id.in_(stack_ids)) with engine.connect() as conn, conn.begin(): rsrc_prop_data_ids.update( [i[0] for i in list(conn.execute(rsrc_prop_data_where))] ) rsrc_prop_data_where = sqlalchemy.select( event.c.rsrc_prop_data_id, ).where( event.c.stack_id.in_(stack_ids)) with engine.connect() as conn, conn.begin(): rsrc_prop_data_ids.update( [i[0] for i in list(conn.execute(rsrc_prop_data_where))] ) # delete events event_del = event.delete().where(event.c.stack_id.in_(stack_ids)) with engine.connect() as conn, conn.begin(): conn.execute(event_del) # delete resources (normally there shouldn't be any) res_del = resource.delete().where(resource.c.stack_id.in_(stack_ids)) with engine.connect() as conn, conn.begin(): conn.execute(res_del) # delete resource_properties_data if rsrc_prop_data_ids: # keep rpd's in events rsrc_prop_data_where = sqlalchemy.select( event.c.rsrc_prop_data_id, ).where( event.c.rsrc_prop_data_id.in_(rsrc_prop_data_ids)) with engine.connect() as conn, conn.begin(): ids = list(conn.execute(rsrc_prop_data_where)) rsrc_prop_data_ids.difference_update([i[0] for i in ids]) if rsrc_prop_data_ids: # keep rpd's in resources rsrc_prop_data_where = sqlalchemy.select( resource.c.rsrc_prop_data_id, ).where( resource.c.rsrc_prop_data_id.in_(rsrc_prop_data_ids)) with engine.connect() as conn, conn.begin(): ids = list(conn.execute(rsrc_prop_data_where)) rsrc_prop_data_ids.difference_update([i[0] for i in ids]) if rsrc_prop_data_ids: # delete if we have any rsrc_prop_data_del = resource_properties_data.delete().where( resource_properties_data.c.id.in_(rsrc_prop_data_ids)) with engine.connect() as conn, conn.begin(): conn.execute(rsrc_prop_data_del) # delete the stacks stack_del = stack.delete().where(stack.c.id.in_(stack_ids)) with engine.connect() as conn, conn.begin(): conn.execute(stack_del) # delete orphaned raw templates raw_template_ids = [i[1] for i in stack_infos if i[1] is not None] raw_template_ids.extend(i[2] for i in stack_infos if i[2] is not None) if raw_template_ids: # keep those still referenced raw_tmpl_sel = sqlalchemy.select(stack.c.raw_template_id).where( stack.c.raw_template_id.in_(raw_template_ids)) with engine.connect() as conn, conn.begin(): raw_tmpl = [i[0] for i in conn.execute(raw_tmpl_sel)] raw_template_ids = set(raw_template_ids) - set(raw_tmpl) if raw_template_ids: # keep those still referenced (previous tmpl) raw_tmpl_sel = sqlalchemy.select( stack.c.prev_raw_template_id, ).where( stack.c.prev_raw_template_id.in_(raw_template_ids)) with engine.connect() as conn, conn.begin(): raw_tmpl = [i[0] for i in conn.execute(raw_tmpl_sel)] raw_template_ids = raw_template_ids - set(raw_tmpl) if raw_template_ids: # delete raw_templates if we have any raw_tmpl_file_sel = sqlalchemy.select( raw_template.c.files_id, ).where( raw_template.c.id.in_(raw_template_ids)) with engine.connect() as conn, conn.begin(): raw_tmpl_file_ids = [i[0] for i in conn.execute( raw_tmpl_file_sel)] raw_templ_del = raw_template.delete().where( raw_template.c.id.in_(raw_template_ids)) with engine.connect() as conn, conn.begin(): conn.execute(raw_templ_del) if raw_tmpl_file_ids: # keep _files still referenced raw_tmpl_file_sel = sqlalchemy.select( raw_template.c.files_id, ).where( raw_template.c.files_id.in_(raw_tmpl_file_ids)) with engine.connect() as conn, conn.begin(): raw_tmpl_files = [i[0] for i in conn.execute( raw_tmpl_file_sel)] raw_tmpl_file_ids = set(raw_tmpl_file_ids) \ - set(raw_tmpl_files) if raw_tmpl_file_ids: # delete _files if we have any raw_tmpl_file_del = raw_template_files.delete().where( raw_template_files.c.id.in_(raw_tmpl_file_ids)) with engine.connect() as conn, conn.begin(): conn.execute(raw_tmpl_file_del) # purge any user creds that are no longer referenced user_creds_ids = [i[3] for i in stack_infos if i[3] is not None] if user_creds_ids: # keep those still referenced user_sel = sqlalchemy.select(stack.c.user_creds_id).where( stack.c.user_creds_id.in_(user_creds_ids)) with engine.connect() as conn, conn.begin(): users = [i[0] for i in conn.execute(user_sel)] user_creds_ids = set(user_creds_ids) - set(users) if user_creds_ids: # delete if we have any usr_creds_del = user_creds.delete().where( user_creds.c.id.in_(user_creds_ids)) with engine.connect() as conn, conn.begin(): conn.execute(usr_creds_del) # sync point
[docs] @context_manager.writer def sync_point_delete_all_by_stack_and_traversal(context, stack_id, traversal_id): rows_deleted = context.session.query(models.SyncPoint).filter_by( stack_id=stack_id, traversal_id=traversal_id).delete() return rows_deleted
[docs] @retry_on_db_error @context_manager.writer def sync_point_create(context, values): values['entity_id'] = str(values['entity_id']) sync_point_ref = models.SyncPoint() sync_point_ref.update(values) sync_point_ref.save(context.session) return sync_point_ref
[docs] @context_manager.reader def sync_point_get(context, entity_id, traversal_id, is_update): entity_id = str(entity_id) return context.session.get( models.SyncPoint, (entity_id, traversal_id, is_update), )
[docs] @retry_on_db_error @context_manager.writer def sync_point_update_input_data(context, entity_id, traversal_id, is_update, atomic_key, input_data, extra_data=None): entity_id = str(entity_id) update_values = {"atomic_key": atomic_key + 1} if input_data is not None: update_values["input_data"] = input_data if extra_data is not None: update_values["extra_data"] = extra_data rows_updated = context.session.query(models.SyncPoint).filter_by( entity_id=entity_id, traversal_id=traversal_id, is_update=is_update, atomic_key=atomic_key ).update(update_values) return rows_updated