keystone.tests.unit.test_v3

Source code for keystone.tests.unit.test_v3

# Copyright 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import datetime
import uuid

import oslo_context.context
from oslo_serialization import jsonutils
from six.moves import http_client
from testtools import matchers
import webtest

from keystone import auth
from keystone.common import authorization
from keystone.common import cache
from keystone.common import provider_api
from keystone.common.validation import validators
from keystone import exception
from keystone import middleware
from keystone.tests.common import auth as common_auth
from keystone.tests import unit
from keystone.tests.unit import rest


PROVIDERS = provider_api.ProviderAPIs
DEFAULT_DOMAIN_ID = 'default'

TIME_FORMAT = unit.TIME_FORMAT


[docs]class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase, common_auth.AuthTestMixin):
[docs] def generate_token_schema(self, system_scoped=False, domain_scoped=False, project_scoped=False): """Return a dictionary of token properties to validate against.""" properties = { 'audit_ids': { 'type': 'array', 'items': { 'type': 'string', }, 'minItems': 1, 'maxItems': 2, }, 'bind': { 'type': 'object', 'properties': { 'kerberos': { 'type': 'string', }, }, 'required': ['kerberos'], 'additionalProperties': False, }, 'expires_at': { 'type': 'string', 'pattern': unit.TIME_FORMAT_REGEX, }, 'issued_at': { 'type': 'string', 'pattern': unit.TIME_FORMAT_REGEX, }, 'methods': { 'type': 'array', 'items': { 'type': 'string', }, }, 'user': { 'type': 'object', 'required': ['id', 'name', 'domain', 'password_expires_at'], 'properties': { 'id': {'type': 'string'}, 'name': {'type': 'string'}, 'domain': { 'type': 'object', 'properties': { 'id': {'type': 'string'}, 'name': {'type': 'string'} }, 'required': ['id', 'name'], 'additonalProperties': False, }, 'password_expires_at': { 'type': ['string', 'null'], 'pattern': unit.TIME_FORMAT_REGEX, } }, 'additionalProperties': False, } } if system_scoped: properties['catalog'] = {'type': 'array'} properties['system'] = { 'type': 'object', 'properties': { 'all': {'type': 'boolean'} } } properties['roles'] = { 'type': 'array', 'items': { 'type': 'object', 'properties': { 'id': {'type': 'string', }, 'name': {'type': 'string', }, }, 'required': ['id', 'name', ], 'additionalProperties': False, }, 'minItems': 1, } elif domain_scoped: properties['catalog'] = {'type': 'array'} properties['roles'] = { 'type': 'array', 'items': { 'type': 'object', 'properties': { 'id': {'type': 'string', }, 'name': {'type': 'string', }, }, 'required': ['id', 'name', ], 'additionalProperties': False, }, 'minItems': 1, } properties['domain'] = { 'type': 'object', 'required': ['id', 'name'], 'properties': { 'id': {'type': 'string'}, 'name': {'type': 'string'} }, 'additionalProperties': False } elif project_scoped: properties['is_admin_project'] = {'type': 'boolean'} properties['catalog'] = {'type': 'array'} properties['roles'] = {'type': 'array'} properties['is_domain'] = {'type': 'boolean'} properties['project'] = { 'type': ['object'], 'required': ['id', 'name', 'domain'], 'properties': { 'id': {'type': 'string'}, 'name': {'type': 'string'}, 'domain': { 'type': ['object'], 'required': ['id', 'name'], 'properties': { 'id': {'type': 'string'}, 'name': {'type': 'string'} }, 'additionalProperties': False } }, 'additionalProperties': False } schema = { 'type': 'object', 'properties': properties, 'required': ['audit_ids', 'expires_at', 'issued_at', 'methods', 'user'], 'optional': ['bind'], 'additionalProperties': False } if system_scoped: schema['required'].extend(['system', 'roles']) schema['optional'].append('catalog') elif domain_scoped: schema['required'].extend(['domain', 'roles']) schema['optional'].append('catalog') elif project_scoped: schema['required'].append('project') schema['optional'].append('catalog') schema['optional'].append('OS-TRUST:trust') schema['optional'].append('is_admin_project') schema['optional'].append('is_domain') return schema
[docs] def config_files(self): config_files = super(RestfulTestCase, self).config_files() config_files.append(unit.dirs.tests_conf('backend_sql.conf')) return config_files
[docs] def setUp(self, app_conf='keystone'): """Setup for v3 Restful Test Cases.""" super(RestfulTestCase, self).setUp(app_conf=app_conf) self.empty_context = {'environment': {}}
[docs] def load_backends(self): # ensure the cache region instance is setup cache.configure_cache() super(RestfulTestCase, self).load_backends()
[docs] def load_fixtures(self, fixtures): self.load_sample_data()
def _populate_default_domain(self): try: PROVIDERS.resource_api.get_domain(DEFAULT_DOMAIN_ID) except exception.DomainNotFound: domain = unit.new_domain_ref( description=(u'The default domain'), id=DEFAULT_DOMAIN_ID, name=u'Default') PROVIDERS.resource_api.create_domain(DEFAULT_DOMAIN_ID, domain)
[docs] def load_sample_data(self, create_region_and_endpoints=True): self._populate_default_domain() self.domain = unit.new_domain_ref() self.domain_id = self.domain['id'] PROVIDERS.resource_api.create_domain(self.domain_id, self.domain) self.project = unit.new_project_ref(domain_id=self.domain_id) self.project_id = self.project['id'] self.project = PROVIDERS.resource_api.create_project( self.project_id, self.project ) self.user = unit.create_user(PROVIDERS.identity_api, domain_id=self.domain_id) self.user_id = self.user['id'] self.default_domain_project_id = uuid.uuid4().hex self.default_domain_project = unit.new_project_ref( domain_id=DEFAULT_DOMAIN_ID) self.default_domain_project['id'] = self.default_domain_project_id PROVIDERS.resource_api.create_project( self.default_domain_project_id, self.default_domain_project ) self.default_domain_user = unit.create_user( PROVIDERS.identity_api, domain_id=DEFAULT_DOMAIN_ID) self.default_domain_user_id = self.default_domain_user['id'] # create & grant policy.json's default role for admin_required self.role = unit.new_role_ref(name='admin') self.role_id = self.role['id'] PROVIDERS.role_api.create_role(self.role_id, self.role) PROVIDERS.assignment_api.add_role_to_user_and_project( self.user_id, self.project_id, self.role_id) PROVIDERS.assignment_api.add_role_to_user_and_project( self.default_domain_user_id, self.default_domain_project_id, self.role_id) PROVIDERS.assignment_api.add_role_to_user_and_project( self.default_domain_user_id, self.project_id, self.role_id) # Create "req_admin" user for simulating a real user instead of the # admin_token_auth middleware self.user_reqadmin = unit.create_user(PROVIDERS.identity_api, DEFAULT_DOMAIN_ID) PROVIDERS.assignment_api.add_role_to_user_and_project( self.user_reqadmin['id'], self.default_domain_project_id, self.role_id) if create_region_and_endpoints: self.region = unit.new_region_ref() self.region_id = self.region['id'] PROVIDERS.catalog_api.create_region(self.region) self.service = unit.new_service_ref() self.service_id = self.service['id'] PROVIDERS.catalog_api.create_service( self.service_id, self.service.copy() ) self.endpoint = unit.new_endpoint_ref(service_id=self.service_id, interface='public', region_id=self.region_id) self.endpoint_id = self.endpoint['id'] PROVIDERS.catalog_api.create_endpoint( self.endpoint_id, self.endpoint.copy() ) # The server adds 'enabled' and defaults to True. self.endpoint['enabled'] = True
[docs] def create_new_default_project_for_user(self, user_id, domain_id, enable_project=True): ref = unit.new_project_ref(domain_id=domain_id, enabled=enable_project) r = self.post('/projects', body={'project': ref}) project = self.assertValidProjectResponse(r, ref) # set the user's preferred project body = {'user': {'default_project_id': project['id']}} r = self.patch('/users/%(user_id)s' % { 'user_id': user_id}, body=body) self.assertValidUserResponse(r) return project
[docs] def get_admin_token(self): """Convenience method so that we can test authenticated requests.""" r = self.admin_request( method='POST', path='/v3/auth/tokens', body={ 'auth': { 'identity': { 'methods': ['password'], 'password': { 'user': { 'name': self.user_reqadmin['name'], 'password': self.user_reqadmin['password'], 'domain': { 'id': self.user_reqadmin['domain_id'] } } } }, 'scope': { 'project': { 'id': self.default_domain_project_id, } } } }) return r.headers.get('X-Subject-Token')
[docs] def get_unscoped_token(self): """Convenience method so that we can test authenticated requests.""" r = self.admin_request( method='POST', path='/v3/auth/tokens', body={ 'auth': { 'identity': { 'methods': ['password'], 'password': { 'user': { 'name': self.user['name'], 'password': self.user['password'], 'domain': { 'id': self.user['domain_id'] } } } } } }) return r.headers.get('X-Subject-Token')
[docs] def get_scoped_token(self): """Convenience method so that we can test authenticated requests.""" r = self.admin_request( method='POST', path='/v3/auth/tokens', body={ 'auth': { 'identity': { 'methods': ['password'], 'password': { 'user': { 'name': self.user['name'], 'password': self.user['password'], 'domain': { 'id': self.user['domain_id'] } } } }, 'scope': { 'project': { 'id': self.project['id'], } } } }) return r.headers.get('X-Subject-Token')
[docs] def get_system_scoped_token(self): """Convenience method for requesting system scoped tokens.""" r = self.admin_request( method='POST', path='/v3/auth/tokens', body={ 'auth': { 'identity': { 'methods': ['password'], 'password': { 'user': { 'name': self.user['name'], 'password': self.user['password'], 'domain': { 'id': self.user['domain_id'] } } } }, 'scope': { 'system': {'all': True} } } }) return r.headers.get('X-Subject-Token')
[docs] def get_domain_scoped_token(self): """Convenience method for requesting domain scoped token.""" r = self.admin_request( method='POST', path='/v3/auth/tokens', body={ 'auth': { 'identity': { 'methods': ['password'], 'password': { 'user': { 'name': self.user['name'], 'password': self.user['password'], 'domain': { 'id': self.user['domain_id'] } } } }, 'scope': { 'domain': { 'id': self.domain['id'], } } } }) return r.headers.get('X-Subject-Token')
[docs] def get_requested_token(self, auth): """Request the specific token we want.""" r = self.v3_create_token(auth) return r.headers.get('X-Subject-Token')
[docs] def v3_create_token(self, auth, expected_status=http_client.CREATED): return self.admin_request(method='POST', path='/v3/auth/tokens', body=auth, expected_status=expected_status)
[docs] def v3_noauth_request(self, path, **kwargs): # request does not require auth token header path = '/v3' + path return self.admin_request(path=path, **kwargs)
[docs] def v3_request(self, path, **kwargs): # check to see if caller requires token for the API call. if kwargs.pop('noauth', None): return self.v3_noauth_request(path, **kwargs) # Check if the caller has passed in auth details for # use in requesting the token auth_arg = kwargs.pop('auth', None) if auth_arg: token = self.get_requested_token(auth_arg) else: token = kwargs.pop('token', None) if not token: token = self.get_scoped_token() path = '/v3' + path return self.admin_request(path=path, token=token, **kwargs)
[docs] def get(self, path, expected_status=http_client.OK, **kwargs): return self.v3_request(path, method='GET', expected_status=expected_status, **kwargs)
[docs] def head(self, path, expected_status=http_client.NO_CONTENT, **kwargs): r = self.v3_request(path, method='HEAD', expected_status=expected_status, **kwargs) self.assertEqual(b'', r.body) return r
[docs] def post(self, path, expected_status=http_client.CREATED, **kwargs): return self.v3_request(path, method='POST', expected_status=expected_status, **kwargs)
[docs] def put(self, path, expected_status=http_client.NO_CONTENT, **kwargs): return self.v3_request(path, method='PUT', expected_status=expected_status, **kwargs)
[docs] def patch(self, path, expected_status=http_client.OK, **kwargs): return self.v3_request(path, method='PATCH', expected_status=expected_status, **kwargs)
[docs] def delete(self, path, expected_status=http_client.NO_CONTENT, **kwargs): return self.v3_request(path, method='DELETE', expected_status=expected_status, **kwargs)
[docs] def assertValidErrorResponse(self, r): resp = r.result self.assertIsNotNone(resp.get('error')) self.assertIsNotNone(resp['error'].get('code')) self.assertIsNotNone(resp['error'].get('title')) self.assertIsNotNone(resp['error'].get('message')) self.assertEqual(int(resp['error']['code']), r.status_code)
[docs] def assertValidListResponse(self, resp, key, entity_validator, ref=None, expected_length=None, keys_to_check=None, resource_url=None): """Make assertions common to all API list responses. If a reference is provided, it's ID will be searched for in the response, and asserted to be equal. """ entities = resp.result.get(key) self.assertIsNotNone(entities) if expected_length is not None: self.assertEqual(expected_length, len(entities)) elif ref is not None: # we're at least expecting the ref self.assertNotEmpty(entities) # collections should have relational links self.assertValidListLinks(resp.result.get('links'), resource_url=resource_url) for entity in entities: self.assertIsNotNone(entity) self.assertValidEntity(entity, keys_to_check=keys_to_check) entity_validator(entity) if ref: entity = [x for x in entities if x['id'] == ref['id']][0] self.assertValidEntity(entity, ref=ref, keys_to_check=keys_to_check) entity_validator(entity, ref) return entities
[docs] def assertValidResponse(self, resp, key, entity_validator, *args, **kwargs): """Make assertions common to all API responses.""" entity = resp.result.get(key) self.assertIsNotNone(entity) keys = kwargs.pop('keys_to_check', None) self.assertValidEntity(entity, keys_to_check=keys, *args, **kwargs) entity_validator(entity, *args, **kwargs) return entity
[docs] def assertValidEntity(self, entity, ref=None, keys_to_check=None): """Make assertions common to all API entities. If a reference is provided, the entity will also be compared against the reference. """ if keys_to_check is not None: keys = keys_to_check else: keys = ['name', 'description', 'enabled'] for k in ['id'] + keys: msg = '%s unexpectedly None in %s' % (k, entity) self.assertIsNotNone(entity.get(k), msg) self.assertIsNotNone(entity.get('links')) self.assertIsNotNone(entity['links'].get('self')) self.assertThat(entity['links']['self'], matchers.StartsWith('http://localhost')) self.assertIn(entity['id'], entity['links']['self']) if ref: for k in keys: msg = '%s not equal: %s != %s' % (k, ref[k], entity[k]) self.assertEqual(ref[k], entity[k]) return entity
# auth validation
[docs] def assertValidISO8601ExtendedFormatDatetime(self, dt): try: return datetime.datetime.strptime(dt, TIME_FORMAT) except Exception: msg = '%s is not a valid ISO 8601 extended format date time.' % dt raise AssertionError(msg)
[docs] def assertValidTokenResponse(self, r, user=None, forbid_token_id=False): if forbid_token_id: self.assertNotIn('X-Subject-Token', r.headers) else: self.assertTrue(r.headers.get('X-Subject-Token')) token = r.result['token'] self.assertIsNotNone(token.get('expires_at')) expires_at = self.assertValidISO8601ExtendedFormatDatetime( token['expires_at']) self.assertIsNotNone(token.get('issued_at')) issued_at = self.assertValidISO8601ExtendedFormatDatetime( token['issued_at']) self.assertLess(issued_at, expires_at) self.assertIn('user', token) self.assertIn('id', token['user']) self.assertIn('name', token['user']) self.assertIn('domain', token['user']) self.assertIn('id', token['user']['domain']) if user is not None: self.assertEqual(user['id'], token['user']['id']) self.assertEqual(user['name'], token['user']['name']) self.assertEqual(user['domain_id'], token['user']['domain']['id']) return token
[docs] def assertValidUnscopedTokenResponse(self, r, *args, **kwargs): token = self.assertValidTokenResponse(r, *args, **kwargs) validator_object = validators.SchemaValidator( self.generate_token_schema() ) validator_object.validate(token) return token
[docs] def assertValidScopedTokenResponse(self, r, *args, **kwargs): require_catalog = kwargs.pop('require_catalog', True) endpoint_filter = kwargs.pop('endpoint_filter', False) ep_filter_assoc = kwargs.pop('ep_filter_assoc', 0) is_admin_project = kwargs.pop('is_admin_project', None) token = self.assertValidTokenResponse(r, *args, **kwargs) if require_catalog: endpoint_num = 0 self.assertIn('catalog', token) if isinstance(token['catalog'], list): # only test JSON for service in token['catalog']: for endpoint in service['endpoints']: self.assertNotIn('enabled', endpoint) self.assertNotIn('legacy_endpoint_id', endpoint) self.assertNotIn('service_id', endpoint) endpoint_num += 1 # sub test for the OS-EP-FILTER extension enabled if endpoint_filter: self.assertEqual(ep_filter_assoc, endpoint_num) else: self.assertNotIn('catalog', token) self.assertIn('roles', token) self.assertTrue(token['roles']) for role in token['roles']: self.assertIn('id', role) self.assertIn('name', role) # NOTE(samueldmq): We want to explicitly test for boolean or None self.assertIs(is_admin_project, token.get('is_admin_project')) return token
[docs] def assertValidProjectScopedTokenResponse(self, r, *args, **kwargs): token = self.assertValidScopedTokenResponse(r, *args, **kwargs) project_scoped_token_schema = self.generate_token_schema( project_scoped=True) if token.get('OS-TRUST:trust'): trust_properties = { 'OS-TRUST:trust': { 'type': ['object'], 'required': ['id', 'impersonation', 'trustor_user', 'trustee_user'], 'properties': { 'id': {'type': 'string'}, 'impersonation': {'type': 'boolean'}, 'trustor_user': { 'type': 'object', 'required': ['id'], 'properties': { 'id': {'type': 'string'} }, 'additionalProperties': False }, 'trustee_user': { 'type': 'object', 'required': ['id'], 'properties': { 'id': {'type': 'string'} }, 'additionalProperties': False } }, 'additionalProperties': False } } project_scoped_token_schema['properties'].update(trust_properties) validator_object = validators.SchemaValidator( project_scoped_token_schema) validator_object.validate(token) self.assertEqual(self.role_id, token['roles'][0]['id']) return token
[docs] def assertValidDomainScopedTokenResponse(self, r, *args, **kwargs): token = self.assertValidScopedTokenResponse(r, *args, **kwargs) validator_object = validators.SchemaValidator( self.generate_token_schema(domain_scoped=True) ) validator_object.validate(token) return token
[docs] def assertValidSystemScopedTokenResponse(self, r, *args, **kwargs): token = self.assertValidTokenResponse(r) self.assertTrue(token['system']['all']) system_scoped_token_schema = self.generate_token_schema( system_scoped=True ) validator_object = validators.SchemaValidator( system_scoped_token_schema ) validator_object.validate(token) return token
[docs] def assertEqualTokens(self, a, b): """Assert that two tokens are equal. Compare two tokens except for their ids. This also truncates the time in the comparison. """ def normalize(token): del token['token']['expires_at'] del token['token']['issued_at'] return token a_expires_at = self.assertValidISO8601ExtendedFormatDatetime( a['token']['expires_at']) b_expires_at = self.assertValidISO8601ExtendedFormatDatetime( b['token']['expires_at']) self.assertCloseEnoughForGovernmentWork(a_expires_at, b_expires_at) a_issued_at = self.assertValidISO8601ExtendedFormatDatetime( a['token']['issued_at']) b_issued_at = self.assertValidISO8601ExtendedFormatDatetime( b['token']['issued_at']) self.assertCloseEnoughForGovernmentWork(a_issued_at, b_issued_at) return self.assertDictEqual(normalize(a), normalize(b))
# catalog validation
[docs] def assertValidCatalogResponse(self, resp, *args, **kwargs): self.assertEqual(set(['catalog', 'links']), set(resp.json.keys())) self.assertValidCatalog(resp.json['catalog']) self.assertIn('links', resp.json) self.assertIsInstance(resp.json['links'], dict) self.assertEqual(['self'], list(resp.json['links'].keys())) self.assertEqual( 'http://localhost/v3/auth/catalog', resp.json['links']['self'])
[docs] def assertValidCatalog(self, entity): self.assertIsInstance(entity, list) self.assertGreater(len(entity), 0) for service in entity: self.assertIsNotNone(service.get('id')) self.assertIsNotNone(service.get('name')) self.assertIsNotNone(service.get('type')) self.assertNotIn('enabled', service) self.assertGreater(len(service['endpoints']), 0) for endpoint in service['endpoints']: self.assertIsNotNone(endpoint.get('id')) self.assertIsNotNone(endpoint.get('interface')) self.assertIsNotNone(endpoint.get('url')) self.assertNotIn('enabled', endpoint) self.assertNotIn('legacy_endpoint_id', endpoint) self.assertNotIn('service_id', endpoint)
# region validation
[docs] def assertValidRegionListResponse(self, resp, *args, **kwargs): # NOTE(jaypipes): I have to pass in a blank keys_to_check parameter # below otherwise the base assertValidEntity method # tries to find a "name" and an "enabled" key in the # returned ref dicts. The issue is, I don't understand # how the service and endpoint entity assertions below # actually work (they don't raise assertions), since # AFAICT, the service and endpoint tables don't have # a "name" column either... :( return self.assertValidListResponse( resp, 'regions', self.assertValidRegion, keys_to_check=[], *args, **kwargs)
[docs] def assertValidRegionResponse(self, resp, *args, **kwargs): return self.assertValidResponse( resp, 'region', self.assertValidRegion, keys_to_check=[], *args, **kwargs)
[docs] def assertValidRegion(self, entity, ref=None): self.assertIsNotNone(entity.get('description')) if ref: self.assertEqual(ref['description'], entity['description']) return entity
# service validation
[docs] def assertValidServiceListResponse(self, resp, *args, **kwargs): return self.assertValidListResponse( resp, 'services', self.assertValidService, *args, **kwargs)
[docs] def assertValidServiceResponse(self, resp, *args, **kwargs): return self.assertValidResponse( resp, 'service', self.assertValidService, *args, **kwargs)
[docs] def assertValidService(self, entity, ref=None): self.assertIsNotNone(entity.get('type')) self.assertIsInstance(entity.get('enabled'), bool) if ref: self.assertEqual(ref['type'], entity['type']) return entity
# endpoint validation
[docs] def assertValidEndpointListResponse(self, resp, *args, **kwargs): return self.assertValidListResponse( resp, 'endpoints', self.assertValidEndpoint, *args, **kwargs)
[docs] def assertValidEndpointResponse(self, resp, *args, **kwargs): return self.assertValidResponse( resp, 'endpoint', self.assertValidEndpoint, *args, **kwargs)
[docs] def assertValidEndpoint(self, entity, ref=None): self.assertIsNotNone(entity.get('interface')) self.assertIsNotNone(entity.get('service_id')) self.assertIsInstance(entity['enabled'], bool) # this is intended to be an unexposed implementation detail self.assertNotIn('legacy_endpoint_id', entity) if ref: self.assertEqual(ref['interface'], entity['interface']) self.assertEqual(ref['service_id'], entity['service_id']) if ref.get('region') is not None: self.assertEqual(ref['region_id'], entity.get('region_id')) return entity
# domain validation
[docs] def assertValidDomainListResponse(self, resp, *args, **kwargs): return self.assertValidListResponse( resp, 'domains', self.assertValidDomain, *args, **kwargs)
[docs] def assertValidDomainResponse(self, resp, *args, **kwargs): return self.assertValidResponse( resp, 'domain', self.assertValidDomain, *args, **kwargs)
[docs] def assertValidDomain(self, entity, ref=None): if ref: pass return entity
# project validation
[docs] def assertValidProjectListResponse(self, resp, *args, **kwargs): return self.assertValidListResponse( resp, 'projects', self.assertValidProject, *args, **kwargs)
[docs] def assertValidProjectResponse(self, resp, *args, **kwargs): return self.assertValidResponse( resp, 'project', self.assertValidProject, *args, **kwargs)
[docs] def assertValidProject(self, entity, ref=None): if ref: self.assertEqual(ref['domain_id'], entity['domain_id']) return entity
# user validation
[docs] def assertValidUserListResponse(self, resp, *args, **kwargs): return self.assertValidListResponse( resp, 'users', self.assertValidUser, keys_to_check=['name', 'enabled'], *args, **kwargs)
[docs] def assertValidUserResponse(self, resp, *args, **kwargs): return self.assertValidResponse( resp, 'user', self.assertValidUser, keys_to_check=['name', 'enabled'], *args, **kwargs)
[docs] def assertValidUser(self, entity, ref=None): self.assertIsNotNone(entity.get('domain_id')) self.assertIsNotNone(entity.get('email')) self.assertIsNone(entity.get('password')) self.assertNotIn('tenantId', entity) self.assertIn('password_expires_at', entity) if ref: self.assertEqual(ref['domain_id'], entity['domain_id']) self.assertEqual(ref['email'], entity['email']) if 'default_project_id' in ref: self.assertIsNotNone(ref['default_project_id']) self.assertEqual(ref['default_project_id'], entity['default_project_id']) return entity
# group validation
[docs] def assertValidGroupListResponse(self, resp, *args, **kwargs): return self.assertValidListResponse( resp, 'groups', self.assertValidGroup, keys_to_check=['name', 'description', 'domain_id'], *args, **kwargs)
[docs] def assertValidGroupResponse(self, resp, *args, **kwargs): return self.assertValidResponse( resp, 'group', self.assertValidGroup, keys_to_check=['name', 'description', 'domain_id'], *args, **kwargs)
[docs] def assertValidGroup(self, entity, ref=None): self.assertIsNotNone(entity.get('name')) if ref: self.assertEqual(ref['name'], entity['name']) return entity
# credential validation
[docs] def assertValidCredentialListResponse(self, resp, *args, **kwargs): return self.assertValidListResponse( resp, 'credentials', self.assertValidCredential, keys_to_check=['blob', 'user_id', 'type'], *args, **kwargs)
[docs] def assertValidCredentialResponse(self, resp, *args, **kwargs): return self.assertValidResponse( resp, 'credential', self.assertValidCredential, keys_to_check=['blob', 'user_id', 'type'], *args, **kwargs)
[docs] def assertValidCredential(self, entity, ref=None): self.assertIsNotNone(entity.get('user_id')) self.assertIsNotNone(entity.get('blob')) self.assertIsNotNone(entity.get('type')) self.assertNotIn('key_hash', entity) self.assertNotIn('encrypted_blob', entity) if ref: self.assertEqual(ref['user_id'], entity['user_id']) self.assertEqual(ref['blob'], entity['blob']) self.assertEqual(ref['type'], entity['type']) self.assertEqual(ref.get('project_id'), entity.get('project_id')) return entity
# role validation
[docs] def assertValidRoleListResponse(self, resp, *args, **kwargs): return self.assertValidListResponse( resp, 'roles', self.assertValidRole, keys_to_check=['name'], *args, **kwargs)
[docs] def assertRoleInListResponse(self, resp, ref, expected=1): found_count = 0 for entity in resp.result.get('roles'): try: self.assertValidRole(entity, ref=ref) except Exception: # It doesn't match, so let's go onto the next one pass else: found_count += 1 self.assertEqual(expected, found_count)
[docs] def assertRoleNotInListResponse(self, resp, ref): self.assertRoleInListResponse(resp, ref=ref, expected=0)
[docs] def assertValidRoleResponse(self, resp, *args, **kwargs): return self.assertValidResponse( resp, 'role', self.assertValidRole, keys_to_check=['name'], *args, **kwargs)
[docs] def assertValidRole(self, entity, ref=None): self.assertIsNotNone(entity.get('name')) if ref: self.assertEqual(ref['name'], entity['name']) self.assertEqual(ref['domain_id'], entity['domain_id']) return entity
# role assignment validation
[docs] def assertValidRoleAssignmentListResponse(self, resp, expected_length=None, resource_url=None): entities = resp.result.get('role_assignments') if expected_length or expected_length == 0: self.assertEqual(expected_length, len(entities)) # Collections should have relational links self.assertValidListLinks(resp.result.get('links'), resource_url=resource_url) for entity in entities: self.assertIsNotNone(entity) self.assertValidRoleAssignment(entity) return entities
[docs] def assertValidRoleAssignment(self, entity, ref=None): # A role should be present self.assertIsNotNone(entity.get('role')) self.assertIsNotNone(entity['role'].get('id')) # Only one of user or group should be present if entity.get('user'): self.assertIsNone(entity.get('group')) self.assertIsNotNone(entity['user'].get('id')) else: self.assertIsNotNone(entity.get('group')) self.assertIsNotNone(entity['group'].get('id')) # A scope should be present and have only one of domain or project self.assertIsNotNone(entity.get('scope')) if entity['scope'].get('project'): self.assertIsNone(entity['scope'].get('domain')) self.assertIsNotNone(entity['scope']['project'].get('id')) elif entity['scope'].get('domain'): self.assertIsNotNone(entity['scope'].get('domain')) self.assertIsNotNone(entity['scope']['domain'].get('id')) else: self.assertIsNotNone(entity['scope'].get('system')) self.assertTrue(entity['scope']['system']['all']) # An assignment link should be present self.assertIsNotNone(entity.get('links')) self.assertIsNotNone(entity['links'].get('assignment')) if ref: links = ref.pop('links') try: self.assertDictContainsSubset(ref, entity) self.assertIn(links['assignment'], entity['links']['assignment']) finally: if links: ref['links'] = links
[docs] def assertRoleAssignmentInListResponse(self, resp, ref, expected=1): found_count = 0 for entity in resp.result.get('role_assignments'): try: self.assertValidRoleAssignment(entity, ref=ref) except Exception: # It doesn't match, so let's go onto the next one pass else: found_count += 1 self.assertEqual(expected, found_count)
[docs] def assertRoleAssignmentNotInListResponse(self, resp, ref): self.assertRoleAssignmentInListResponse(resp, ref=ref, expected=0)
# policy validation
[docs] def assertValidPolicyListResponse(self, resp, *args, **kwargs): return self.assertValidListResponse( resp, 'policies', self.assertValidPolicy, *args, **kwargs)
[docs] def assertValidPolicyResponse(self, resp, *args, **kwargs): return self.assertValidResponse( resp, 'policy', self.assertValidPolicy, *args, **kwargs)
[docs] def assertValidPolicy(self, entity, ref=None): self.assertIsNotNone(entity.get('blob')) self.assertIsNotNone(entity.get('type')) if ref: self.assertEqual(ref['blob'], entity['blob']) self.assertEqual(ref['type'], entity['type']) return entity
# trust validation
[docs] def assertValidTrustListResponse(self, resp, *args, **kwargs): return self.assertValidListResponse( resp, 'trusts', self.assertValidTrustSummary, keys_to_check=['trustor_user_id', 'trustee_user_id', 'impersonation'], *args, **kwargs)
[docs] def assertValidTrustResponse(self, resp, *args, **kwargs): return self.assertValidResponse( resp, 'trust', self.assertValidTrust, keys_to_check=['trustor_user_id', 'trustee_user_id', 'impersonation'], *args, **kwargs)
[docs] def assertValidTrustSummary(self, entity, ref=None): return self.assertValidTrust(entity, ref, summary=True)
[docs] def assertValidTrust(self, entity, ref=None, summary=False): self.assertIsNotNone(entity.get('trustor_user_id')) self.assertIsNotNone(entity.get('trustee_user_id')) self.assertIsNotNone(entity.get('impersonation')) self.assertIn('expires_at', entity) if entity['expires_at'] is not None: self.assertValidISO8601ExtendedFormatDatetime(entity['expires_at']) if summary: # Trust list contains no roles, but getting a specific # trust by ID provides the detailed response containing roles self.assertNotIn('roles', entity) self.assertIn('project_id', entity) else: for role in entity['roles']: self.assertIsNotNone(role) self.assertValidEntity(role, keys_to_check=['name']) self.assertValidRole(role) self.assertValidListLinks(entity.get('roles_links')) # always disallow role xor project_id (neither or both is allowed) has_roles = bool(entity.get('roles')) has_project = bool(entity.get('project_id')) self.assertFalse(has_roles ^ has_project) if ref: self.assertEqual(ref['trustor_user_id'], entity['trustor_user_id']) self.assertEqual(ref['trustee_user_id'], entity['trustee_user_id']) self.assertEqual(ref['project_id'], entity['project_id']) if entity.get('expires_at') or ref.get('expires_at'): entity_exp = self.assertValidISO8601ExtendedFormatDatetime( entity['expires_at']) ref_exp = self.assertValidISO8601ExtendedFormatDatetime( ref['expires_at']) self.assertCloseEnoughForGovernmentWork(entity_exp, ref_exp) else: self.assertEqual(ref.get('expires_at'), entity.get('expires_at')) return entity
# Service providers (federation)
[docs] def assertValidServiceProvider(self, entity, ref=None, *args, **kwargs): attributes = frozenset(['auth_url', 'id', 'enabled', 'description', 'links', 'relay_state_prefix', 'sp_url']) for attribute in attributes: self.assertIsNotNone(entity.get(attribute))
[docs] def assertValidServiceProviderListResponse(self, resp, *args, **kwargs): if kwargs.get('keys_to_check') is None: kwargs['keys_to_check'] = ['auth_url', 'id', 'enabled', 'description', 'relay_state_prefix', 'sp_url'] return self.assertValidListResponse( resp, 'service_providers', self.assertValidServiceProvider, *args, **kwargs)
[docs] def build_external_auth_request(self, remote_user, remote_domain=None, auth_data=None, kerberos=False): environment = {'REMOTE_USER': remote_user, 'AUTH_TYPE': 'Negotiate'} if remote_domain: environment['REMOTE_DOMAIN'] = remote_domain if not auth_data: auth_data = self.build_authentication_request( kerberos=kerberos)['auth'] auth_info = auth.core.AuthInfo.create(auth_data) auth_context = auth.core.AuthContext(extras={}, method_names=[]) return self.make_request(environ=environment), auth_info, auth_context
[docs]class VersionTestCase(RestfulTestCase):
[docs] def test_get_version(self): pass
# NOTE(gyee): test AuthContextMiddleware here instead of test_middleware.py # because we need the token
[docs]class AuthContextMiddlewareTestCase(RestfulTestCase): def _middleware_request(self, token, extra_environ=None): def application(environ, start_response): body = b'body' headers = [('Content-Type', 'text/html; charset=utf8'), ('Content-Length', str(len(body)))] start_response('200 OK', headers) return [body] app = webtest.TestApp(middleware.AuthContextMiddleware(application), extra_environ=extra_environ) resp = app.get('/', headers={authorization.AUTH_TOKEN_HEADER: token}) self.assertEqual(b'body', resp.body) # just to make sure it worked return resp.request
[docs] def test_auth_context_build_by_middleware(self): # test to make sure AuthContextMiddleware successful build the auth # context from the incoming auth token admin_token = self.get_scoped_token() req = self._middleware_request(admin_token) self.assertEqual( self.user['id'], req.environ.get(authorization.AUTH_CONTEXT_ENV)['user_id'])
[docs] def test_auth_context_override(self): overridden_context = 'OVERRIDDEN_CONTEXT' # this token should not be used token = uuid.uuid4().hex extra_environ = {authorization.AUTH_CONTEXT_ENV: overridden_context} req = self._middleware_request(token, extra_environ=extra_environ) # make sure overridden context take precedence self.assertEqual(overridden_context, req.environ.get(authorization.AUTH_CONTEXT_ENV))
[docs] def test_unscoped_token_auth_context(self): unscoped_token = self.get_unscoped_token() req = self._middleware_request(unscoped_token) # This check originally looked that the value was unset # but that was an artifact of the custom context keystone # used to create. Oslo-context will always provide the # same set of keys, but the values will be None in an # unscoped token for key in ['project_id', 'domain_id', 'domain_name']: self.assertIsNone( req.environ.get(authorization.AUTH_CONTEXT_ENV)[key])
[docs] def test_project_scoped_token_auth_context(self): project_scoped_token = self.get_scoped_token() req = self._middleware_request(project_scoped_token) self.assertEqual( self.project['id'], req.environ.get(authorization.AUTH_CONTEXT_ENV)['project_id'])
[docs] def test_domain_scoped_token_auth_context(self): # grant the domain role to user path = '/domains/%s/users/%s/roles/%s' % ( self.domain['id'], self.user['id'], self.role['id']) self.put(path=path) domain_scoped_token = self.get_domain_scoped_token() req = self._middleware_request(domain_scoped_token) self.assertEqual( self.domain['id'], req.environ.get(authorization.AUTH_CONTEXT_ENV)['domain_id']) self.assertEqual( self.domain['name'], req.environ.get(authorization.AUTH_CONTEXT_ENV)['domain_name'])
[docs] def test_oslo_context(self): # After AuthContextMiddleware runs, an # oslo_context.context.RequestContext was created so that its fields # can be logged. This test validates that the RequestContext was # created and the fields are set as expected. # Use a scoped token so more fields can be set. token = self.get_scoped_token() # oslo_middleware RequestId middleware sets openstack.request_id. request_id = uuid.uuid4().hex environ = {'openstack.request_id': request_id} self._middleware_request(token, extra_environ=environ) req_context = oslo_context.context.get_current() self.assertEqual(request_id, req_context.request_id) self.assertEqual(token, req_context.auth_token) self.assertEqual(self.user['id'], req_context.user_id) self.assertEqual(self.project['id'], req_context.project_id) self.assertIsNone(req_context.domain_id) self.assertEqual(self.user['domain_id'], req_context.user_domain_id) self.assertEqual(self.project['domain_id'], req_context.project_domain_id) self.assertFalse(req_context.is_admin)
[docs]class JsonHomeTestMixin(object): """JSON Home test. Mixin this class to provide a test for the JSON-Home response for an extension. The base class must set JSON_HOME_DATA to a dict of relationship URLs (rels) to the JSON-Home data for the relationship. The rels and associated data must be in the response. """
[docs] def test_get_json_home(self): resp = self.get('/', convert=False, headers={'Accept': 'application/json-home'}) self.assertThat(resp.headers['Content-Type'], matchers.Equals('application/json-home')) resp_data = jsonutils.loads(resp.body) # Check that the example relationships are present. for rel in self.JSON_HOME_DATA: self.assertThat(resp_data['resources'][rel], matchers.Equals(self.JSON_HOME_DATA[rel]))
[docs]class AssignmentTestMixin(object): """To hold assignment helper functions."""
[docs] def build_role_assignment_query_url(self, effective=False, **filters): """Build and return a role assignment query url with provided params. Available filters are: domain_id, project_id, user_id, group_id, role_id and inherited_to_projects. """ query_params = '?effective' if effective else '' for k, v in filters.items(): query_params += '?' if not query_params else '&' if k == 'inherited_to_projects': query_params += 'scope.OS-INHERIT:inherited_to=projects' else: if k in ['domain_id', 'project_id']: query_params += 'scope.' elif k not in ['user_id', 'group_id', 'role_id']: raise ValueError( 'Invalid key \'%s\' in provided filters.' % k) query_params += '%s=%s' % (k.replace('_', '.'), v) return '/role_assignments%s' % query_params
[docs] def build_role_assignment_entity( self, link=None, prior_role_link=None, **attribs): """Build and return a role assignment entity with provided attributes. Provided attributes are expected to contain: domain_id or project_id, user_id or group_id, role_id and, optionally, inherited_to_projects. """ entity = {'links': {'assignment': ( link or self.build_role_assignment_link(**attribs))}} if attribs.get('domain_id'): entity['scope'] = {'domain': {'id': attribs['domain_id']}} else: entity['scope'] = {'project': {'id': attribs['project_id']}} if attribs.get('user_id'): entity['user'] = {'id': attribs['user_id']} if attribs.get('group_id'): entity['links']['membership'] = ('/groups/%s/users/%s' % (attribs['group_id'], attribs['user_id'])) else: entity['group'] = {'id': attribs['group_id']} entity['role'] = {'id': attribs['role_id']} if attribs.get('inherited_to_projects'): entity['scope']['OS-INHERIT:inherited_to'] = 'projects' if prior_role_link: entity['links']['prior_role'] = prior_role_link return entity
[docs] def build_role_assignment_entity_include_names(self, domain_ref=None, role_ref=None, group_ref=None, user_ref=None, project_ref=None, inherited_assignment=None): """Build and return a role assignment entity with provided attributes. The expected attributes are: domain_ref or project_ref, user_ref or group_ref, role_ref and, optionally, inherited_to_projects. """ entity = {'links': {}} attributes_for_links = {} if project_ref: dmn_name = PROVIDERS.resource_api.get_domain( project_ref['domain_id'])['name'] entity['scope'] = {'project': { 'id': project_ref['id'], 'name': project_ref['name'], 'domain': { 'id': project_ref['domain_id'], 'name': dmn_name}}} attributes_for_links['project_id'] = project_ref['id'] else: entity['scope'] = {'domain': {'id': domain_ref['id'], 'name': domain_ref['name']}} attributes_for_links['domain_id'] = domain_ref['id'] if user_ref: dmn_name = PROVIDERS.resource_api.get_domain( user_ref['domain_id'])['name'] entity['user'] = {'id': user_ref['id'], 'name': user_ref['name'], 'domain': {'id': user_ref['domain_id'], 'name': dmn_name}} attributes_for_links['user_id'] = user_ref['id'] else: dmn_name = PROVIDERS.resource_api.get_domain( group_ref['domain_id'])['name'] entity['group'] = {'id': group_ref['id'], 'name': group_ref['name'], 'domain': { 'id': group_ref['domain_id'], 'name': dmn_name}} attributes_for_links['group_id'] = group_ref['id'] if role_ref: entity['role'] = {'id': role_ref['id'], 'name': role_ref['name']} if role_ref['domain_id']: dmn_name = PROVIDERS.resource_api.get_domain( role_ref['domain_id'])['name'] entity['role']['domain'] = {'id': role_ref['domain_id'], 'name': dmn_name} attributes_for_links['role_id'] = role_ref['id'] if inherited_assignment: entity['scope']['OS-INHERIT:inherited_to'] = 'projects' attributes_for_links['inherited_to_projects'] = True entity['links']['assignment'] = self.build_role_assignment_link( **attributes_for_links) return entity
Creative Commons Attribution 3.0 License

Except where otherwise noted, this document is licensed under Creative Commons Attribution 3.0 License. See all OpenStack Legal Documents.