keystone.trust.core

Source code for keystone.trust.core

# Copyright 2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

"""Main entry point into the Trust service."""

from six.moves import zip

from keystone.common import dependency
from keystone.common import manager
import keystone.conf
from keystone import exception
from keystone.i18n import _
from keystone import notifications


CONF = keystone.conf.CONF


[docs]@dependency.requires('identity_api') @dependency.provider('trust_api') class Manager(manager.Manager): """Default pivot point for the Trust backend. See :mod:`keystone.common.manager.Manager` for more details on how this dynamically calls the backend. """ driver_namespace = 'keystone.trust' _TRUST = "OS-TRUST:trust" def __init__(self): super(Manager, self).__init__(CONF.trust.driver) notifications.register_event_callback( notifications.ACTIONS.deleted, 'user', self._on_user_delete) def _on_user_delete(self, service, resource_type, operation, payload): # NOTE(davechen): Only delete the user that is maintained by # keystone will delete the related trust, since we don't know # when a LDAP user or federation user is deleted. user_id = payload['resource_info'] trusts = self.driver.list_trusts_for_trustee(user_id) trusts = trusts + self.driver.list_trusts_for_trustor(user_id) for trust in trusts: self.driver.delete_trust(trust['id']) @staticmethod def _validate_redelegation(redelegated_trust, trust): # Validate against: # 0 < redelegation_count <= max_redelegation_count max_redelegation_count = CONF.trust.max_redelegation_count redelegation_depth = redelegated_trust.get('redelegation_count', 0) if not (0 < redelegation_depth <= max_redelegation_count): raise exception.Forbidden( _('Remaining redelegation depth of %(redelegation_depth)d' ' out of allowed range of [0..%(max_count)d]') % {'redelegation_depth': redelegation_depth, 'max_count': max_redelegation_count}) # remaining_uses is None remaining_uses = trust.get('remaining_uses') if remaining_uses is not None: raise exception.Forbidden( _('Field "remaining_uses" is set to %(value)s' ' while it must not be set in order to redelegate a trust'), value=remaining_uses) # expiry times trust_expiry = trust.get('expires_at') redelegated_expiry = redelegated_trust['expires_at'] if trust_expiry: # redelegated trust is from backend and has no tzinfo if redelegated_expiry < trust_expiry.replace(tzinfo=None): raise exception.Forbidden( _('Requested expiration time is more ' 'than redelegated trust can provide')) else: trust['expires_at'] = redelegated_expiry # trust roles is a subset of roles of the redelegated trust parent_roles = set(role['id'] for role in redelegated_trust['roles']) if not all(role['id'] in parent_roles for role in trust['roles']): raise exception.Forbidden( _('Some of requested roles are not in redelegated trust')) # forbid to create a trust (with impersonation set to true) from a # redelegated trust (with impersonation set to false) if not redelegated_trust['impersonation'] and trust['impersonation']: raise exception.Forbidden( _('Impersonation is not allowed because redelegated trust ' 'does not specify impersonation. Redelegated trust id: %s') % redelegated_trust['id'])
[docs] def get_trust_pedigree(self, trust_id): trust = self.driver.get_trust(trust_id) trust_chain = [trust] while trust and trust.get('redelegated_trust_id'): trust = self.driver.get_trust(trust['redelegated_trust_id']) trust_chain.append(trust) return trust_chain
[docs] def get_trust(self, trust_id, deleted=False): trust = self.driver.get_trust(trust_id, deleted) if trust and trust.get('redelegated_trust_id') and not deleted: trust_chain = self.get_trust_pedigree(trust_id) for parent, child in zip(trust_chain[1:], trust_chain): self._validate_redelegation(parent, child) try: self.identity_api.assert_user_enabled( parent['trustee_user_id']) except (AssertionError, exception.NotFound): raise exception.Forbidden( _('One of the trust agents is disabled or deleted')) return trust
[docs] def create_trust(self, trust_id, trust, roles, redelegated_trust=None, initiator=None): """Create a new trust. :returns: a new trust """ # Default for initial trust in chain is max_redelegation_count max_redelegation_count = CONF.trust.max_redelegation_count requested_count = trust.get('redelegation_count') redelegatable = (trust.pop('allow_redelegation', False) and requested_count != 0) if not redelegatable: trust['redelegation_count'] = requested_count = 0 remaining_uses = trust.get('remaining_uses') if remaining_uses is not None and remaining_uses <= 0: msg = _('remaining_uses must be a positive integer or null.') raise exception.ValidationError(msg) else: # Validate requested redelegation depth if requested_count and requested_count > max_redelegation_count: raise exception.Forbidden( _('Requested redelegation depth of %(requested_count)d ' 'is greater than allowed %(max_count)d') % {'requested_count': requested_count, 'max_count': max_redelegation_count}) # Decline remaining_uses if trust.get('remaining_uses') is not None: raise exception.ValidationError( _('remaining_uses must not be set if redelegation is ' 'allowed')) if redelegated_trust: trust['redelegated_trust_id'] = redelegated_trust['id'] remaining_count = redelegated_trust['redelegation_count'] - 1 # Validate depth consistency if (redelegatable and requested_count and requested_count != remaining_count): msg = _('Modifying "redelegation_count" upon redelegation is ' 'forbidden. Omitting this parameter is advised.') raise exception.Forbidden(msg) trust.setdefault('redelegation_count', remaining_count) # Check entire trust pedigree validity pedigree = self.get_trust_pedigree(redelegated_trust['id']) for t in pedigree: self._validate_redelegation(t, trust) trust.setdefault('redelegation_count', max_redelegation_count) ref = self.driver.create_trust(trust_id, trust, roles) notifications.Audit.created(self._TRUST, trust_id, initiator=initiator) return ref
[docs] def delete_trust(self, trust_id, initiator=None): """Remove a trust. :raises keystone.exception.TrustNotFound: If the trust doesn't exist. Recursively remove given and redelegated trusts """ trust = self.driver.get_trust(trust_id) trusts = self.driver.list_trusts_for_trustor( trust['trustor_user_id']) for t in trusts: if t.get('redelegated_trust_id') == trust_id: # recursive call to make sure all notifications are sent try: self.delete_trust(t['id']) except exception.TrustNotFound: # nosec # if trust was deleted by concurrent process # consistency must not suffer pass # end recursion self.driver.delete_trust(trust_id) notifications.Audit.deleted(self._TRUST, trust_id, initiator)
Creative Commons Attribution 3.0 License

Except where otherwise noted, this document is licensed under Creative Commons Attribution 3.0 License. See all OpenStack Legal Documents.