import re

from keystoneauth1 import exceptions as ks_exceptions

from heat.common import exception
from heat.engine.clients import client_plugin
from heat.engine.clients.os.keystone import heat_keystoneclient as hkc

[docs]class KeystoneClientPlugin(client_plugin.ClientPlugin): exceptions_module = [ks_exceptions, exception] service_types = [IDENTITY] = ['identity'] def _create(self): region_name = self._get_region_name() return hkc.KeystoneClient(self.context, region_name)
[docs] def is_not_found(self, ex): return isinstance(ex, (ks_exceptions.NotFound, exception.EntityNotFound))
[docs] def is_over_limit(self, ex): return isinstance(ex, ks_exceptions.RequestEntityTooLarge)
[docs] def is_conflict(self, ex): return isinstance(ex, ks_exceptions.Conflict)
[docs] def parse_entity_with_domain(self, entity_with_domain, entity_type): """Parse keystone entity user/role/project with domain. entity_with_domain should be in entity{domain} format. Returns a tuple of (entity, domain). """ try: match ="\{(.*?)\}$", entity_with_domain) if match: entity = entity_with_domain[:match.start()] domain = domain = self.get_domain_id(domain) return (entity, domain) else: return (entity_with_domain, None) except Exception: raise exception.EntityNotFound(entity=entity_type, name=entity_with_domain)
[docs] def get_role_id(self, role, domain=None): if role is None: return None if not domain: role, domain = self.parse_entity_with_domain(role, 'KeystoneRole') try: role_obj = self.client().client.roles.get(role) return except ks_exceptions.NotFound: role_list = self.client().client.roles.list(name=role, domain=domain) for role_obj in role_list: if == role: return raise exception.EntityNotFound(entity='KeystoneRole', name=role)
[docs] def get_project_id(self, project, domain=None): if project is None: return None if not domain: project, domain = self.parse_entity_with_domain(project, 'KeystoneProject') try: project_obj = self.client().client.projects.get(project) return except ks_exceptions.NotFound: project_list = self.client().client.projects.list(name=project, domain=domain) for project_obj in project_list: if == project: return raise exception.EntityNotFound(entity='KeystoneProject', name=project)
[docs] def get_domain_id(self, domain): if domain is None: return None try: domain_obj = self.client() return except ks_exceptions.NotFound: domain_list = self.client() for domain_obj in domain_list: if == domain: return raise exception.EntityNotFound(entity='KeystoneDomain', name=domain)
[docs] def get_group_id(self, group, domain=None): if group is None: return None if not domain: group, domain = self.parse_entity_with_domain(group, 'KeystoneGroup') try: group_obj = self.client().client.groups.get(group) return except ks_exceptions.NotFound: group_list = self.client().client.groups.list(name=group, domain=domain) for group_obj in group_list: if == group: return raise exception.EntityNotFound(entity='KeystoneGroup', name=group)
[docs] def get_service_id(self, service): if service is None: return None try: service_obj = self.client() return except ks_exceptions.NotFound: service_list = self.client() if len(service_list) == 1: return service_list[0].id elif len(service_list) > 1: raise exception.KeystoneServiceNameConflict(service=service) else: raise exception.EntityNotFound(entity='KeystoneService', name=service)
[docs] def get_user_id(self, user, domain=None): if user is None: return None if not domain: user, domain = self.parse_entity_with_domain(user, 'KeystoneUser') try: user_obj = self.client().client.users.get(user) return except ks_exceptions.NotFound: try: user_obj = self.client().client.users.find(name=user, domain_id=domain) return except ks_exceptions.NotFound: pass raise exception.EntityNotFound(entity='KeystoneUser', name=user)
[docs] def get_region_id(self, region): try: region_obj = self.client().client.regions.get(region) return except ks_exceptions.NotFound: raise exception.EntityNotFound(entity='KeystoneRegion', name=region)