keystone.tests.unit.test_cli

Source code for keystone.tests.unit.test_cli

# Copyright 2014 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import logging
import os
import uuid

import argparse
import fixtures
import mock
import oslo_config.fixture
from oslo_db.sqlalchemy import migration
from oslo_log import log
from six.moves import configparser
from six.moves import range
from testtools import matchers

from keystone.auth import controllers
from keystone.cmd import cli
from keystone.cmd.doctor import caching
from keystone.cmd.doctor import credential
from keystone.cmd.doctor import database as doc_database
from keystone.cmd.doctor import debug
from keystone.cmd.doctor import federation
from keystone.cmd.doctor import ldap
from keystone.cmd.doctor import security_compliance
from keystone.cmd.doctor import tokens
from keystone.cmd.doctor import tokens_fernet
from keystone.common import provider_api
from keystone.common.sql import upgrades
import keystone.conf
from keystone import exception
from keystone.i18n import _
from keystone.identity.mapping_backends import mapping as identity_mapping
from keystone.tests import unit
from keystone.tests.unit import default_fixtures
from keystone.tests.unit.ksfixtures import database
from keystone.tests.unit.ksfixtures import ldapdb


CONF = keystone.conf.CONF
PROVIDERS = provider_api.ProviderAPIs


[docs]class CliTestCase(unit.SQLDriverOverrides, unit.TestCase):
[docs] def config_files(self): config_files = super(CliTestCase, self).config_files() config_files.append(unit.dirs.tests_conf('backend_sql.conf')) return config_files
[docs] def test_token_flush(self): self.useFixture(database.Database()) self.load_backends() # NOTE(morgan): we are testing a direct instantiation of the # persistence manager for flushing. We should clear this out so we # don't error. CLI should never impact a running service # and should never actually lock the registry for dependencies. provider_api.ProviderAPIs._clear_registry_instances() cli.TokenFlush.main()
[docs]class CliNoConfigTestCase(unit.BaseTestCase):
[docs] def setUp(self): self.config_fixture = self.useFixture(oslo_config.fixture.Config(CONF)) self.config_fixture.register_cli_opt(cli.command_opt) self.useFixture(fixtures.MockPatch( 'oslo_config.cfg.find_config_files', return_value=[])) super(CliNoConfigTestCase, self).setUp() # NOTE(crinkle): the command call doesn't have to actually work, # that's what the other unit tests are for. So just mock it out. class FakeConfCommand(object): def __init__(self): self.cmd_class = mock.Mock() self.useFixture(fixtures.MockPatchObject( CONF, 'command', FakeConfCommand())) self.logging = self.useFixture(fixtures.FakeLogger(level=log.WARN))
[docs] def test_cli(self): expected_msg = 'Config file not found, using default configs.' cli.main(argv=['keystone-manage', 'db_sync']) self.assertThat(self.logging.output, matchers.Contains(expected_msg))
[docs]class CliBootStrapTestCase(unit.SQLDriverOverrides, unit.TestCase):
[docs] def setUp(self): self.useFixture(database.Database()) super(CliBootStrapTestCase, self).setUp()
[docs] def config_files(self): self.config_fixture.register_cli_opt(cli.command_opt) config_files = super(CliBootStrapTestCase, self).config_files() config_files.append(unit.dirs.tests_conf('backend_sql.conf')) return config_files
[docs] def config(self, config_files): CONF(args=['bootstrap', '--bootstrap-password', uuid.uuid4().hex], project='keystone', default_config_files=config_files)
[docs] def test_bootstrap(self): bootstrap = cli.BootStrap() self._do_test_bootstrap(bootstrap)
def _do_test_bootstrap(self, bootstrap): bootstrap.do_bootstrap() project = bootstrap.resource_manager.get_project_by_name( bootstrap.project_name, 'default') user = bootstrap.identity_manager.get_user_by_name( bootstrap.username, 'default') role = bootstrap.role_manager.get_role(bootstrap.role_id) role_list = ( bootstrap.assignment_manager.get_roles_for_user_and_project( user['id'], project['id'])) self.assertIs(1, len(role_list)) self.assertEqual(role_list[0], role['id']) system_roles = ( bootstrap.assignment_manager.list_system_grants_for_user( user['id'] ) ) self.assertIs(1, len(system_roles)) self.assertEqual(system_roles[0]['id'], role['id']) # NOTE(morganfainberg): Pass an empty context, it isn't used by # `authenticate` method. bootstrap.identity_manager.authenticate( self.make_request(), user['id'], bootstrap.password) if bootstrap.region_id: region = bootstrap.catalog_manager.get_region(bootstrap.region_id) self.assertEqual(self.region_id, region['id']) if bootstrap.service_id: svc = bootstrap.catalog_manager.get_service(bootstrap.service_id) self.assertEqual(self.service_name, svc['name']) self.assertEqual(set(['admin', 'public', 'internal']), set(bootstrap.endpoints)) urls = {'public': self.public_url, 'internal': self.internal_url, 'admin': self.admin_url} for interface, url in urls.items(): endpoint_id = bootstrap.endpoints[interface] endpoint = bootstrap.catalog_manager.get_endpoint(endpoint_id) self.assertEqual(self.region_id, endpoint['region_id']) self.assertEqual(url, endpoint['url']) self.assertEqual(svc['id'], endpoint['service_id']) self.assertEqual(interface, endpoint['interface'])
[docs] def test_bootstrap_is_idempotent_when_password_does_not_change(self): # NOTE(morganfainberg): Ensure we can run bootstrap with the same # configuration multiple times without erroring. bootstrap = cli.BootStrap() self._do_test_bootstrap(bootstrap) v3_token_controller = controllers.Auth() v3_password_data = { 'identity': { "methods": ["password"], "password": { "user": { "name": bootstrap.username, "password": bootstrap.password, "domain": { "id": CONF.identity.default_domain_id } } } } } auth_response = v3_token_controller.authenticate_for_token( self.make_request(), v3_password_data) token = auth_response.headers['X-Subject-Token'] self._do_test_bootstrap(bootstrap) # build validation request request = self.make_request(is_admin=True) request.context_dict['subject_token_id'] = token # Make sure the token we authenticate for is still valid. v3_token_controller.validate_token(request)
[docs] def test_bootstrap_is_not_idempotent_when_password_does_change(self): # NOTE(lbragstad): Ensure bootstrap isn't idempotent when run with # different arguments or configuration values. bootstrap = cli.BootStrap() self._do_test_bootstrap(bootstrap) v3_token_controller = controllers.Auth() v3_password_data = { 'identity': { "methods": ["password"], "password": { "user": { "name": bootstrap.username, "password": bootstrap.password, "domain": { "id": CONF.identity.default_domain_id } } } } } auth_response = v3_token_controller.authenticate_for_token( self.make_request(), v3_password_data) token = auth_response.headers['X-Subject-Token'] os.environ['OS_BOOTSTRAP_PASSWORD'] = uuid.uuid4().hex self._do_test_bootstrap(bootstrap) # build validation request request = self.make_request(is_admin=True) request.context_dict['subject_token_id'] = token # Since the user account was recovered with a different password, we # shouldn't be able to validate this token. Bootstrap should have # persisted a revocation event because the user's password was updated. # Since this token was obtained using the original password, it should # now be invalid. self.assertRaises( exception.TokenNotFound, v3_token_controller.validate_token, request )
[docs] def test_bootstrap_recovers_user(self): bootstrap = cli.BootStrap() self._do_test_bootstrap(bootstrap) # Completely lock the user out. user_id = bootstrap.identity_manager.get_user_by_name( bootstrap.username, 'default')['id'] bootstrap.identity_manager.update_user( user_id, {'enabled': False, 'password': uuid.uuid4().hex}) # The second bootstrap run will recover the account. self._do_test_bootstrap(bootstrap) # Sanity check that the original password works again. bootstrap.identity_manager.authenticate( self.make_request(), user_id, bootstrap.password)
[docs] def test_bootstrap_with_ambiguous_role_names(self): bootstrap = cli.BootStrap() # bootstrap system to create the default admin role self._do_test_bootstrap(bootstrap) # create a domain-specific roles that share the same names as the # default roles created by keystone-manage bootstrap domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} domain = PROVIDERS.resource_api.create_domain(domain['id'], domain) domain_roles = {} for name in ['admin', 'member', 'reader']: domain_role = { 'domain_id': domain['id'], 'id': uuid.uuid4().hex, 'name': name } domain_roles[name] = PROVIDERS.role_api.create_role( domain_role['id'], domain_role ) # ensure subsequent bootstrap attempts don't fail because of # ambiguity self._do_test_bootstrap(bootstrap)
[docs]class CliBootStrapTestCaseWithEnvironment(CliBootStrapTestCase):
[docs] def config(self, config_files): CONF(args=['bootstrap'], project='keystone', default_config_files=config_files)
[docs] def setUp(self): super(CliBootStrapTestCaseWithEnvironment, self).setUp() self.password = uuid.uuid4().hex self.username = uuid.uuid4().hex self.project_name = uuid.uuid4().hex self.role_name = uuid.uuid4().hex self.service_name = uuid.uuid4().hex self.public_url = uuid.uuid4().hex self.internal_url = uuid.uuid4().hex self.admin_url = uuid.uuid4().hex self.region_id = uuid.uuid4().hex self.default_domain = { 'id': CONF.identity.default_domain_id, 'name': 'Default', } self.useFixture( fixtures.EnvironmentVariable('OS_BOOTSTRAP_PASSWORD', newvalue=self.password)) self.useFixture( fixtures.EnvironmentVariable('OS_BOOTSTRAP_USERNAME', newvalue=self.username)) self.useFixture( fixtures.EnvironmentVariable('OS_BOOTSTRAP_PROJECT_NAME', newvalue=self.project_name)) self.useFixture( fixtures.EnvironmentVariable('OS_BOOTSTRAP_ROLE_NAME', newvalue=self.role_name)) self.useFixture( fixtures.EnvironmentVariable('OS_BOOTSTRAP_SERVICE_NAME', newvalue=self.service_name)) self.useFixture( fixtures.EnvironmentVariable('OS_BOOTSTRAP_PUBLIC_URL', newvalue=self.public_url)) self.useFixture( fixtures.EnvironmentVariable('OS_BOOTSTRAP_INTERNAL_URL', newvalue=self.internal_url)) self.useFixture( fixtures.EnvironmentVariable('OS_BOOTSTRAP_ADMIN_URL', newvalue=self.admin_url)) self.useFixture( fixtures.EnvironmentVariable('OS_BOOTSTRAP_REGION_ID', newvalue=self.region_id))
[docs] def test_assignment_created_with_user_exists(self): # test assignment can be created if user already exists. bootstrap = cli.BootStrap() bootstrap.resource_manager.create_domain(self.default_domain['id'], self.default_domain) user_ref = unit.new_user_ref(self.default_domain['id'], name=self.username, password=self.password) bootstrap.identity_manager.create_user(user_ref) self._do_test_bootstrap(bootstrap)
[docs] def test_assignment_created_with_project_exists(self): # test assignment can be created if project already exists. bootstrap = cli.BootStrap() bootstrap.resource_manager.create_domain(self.default_domain['id'], self.default_domain) project_ref = unit.new_project_ref(self.default_domain['id'], name=self.project_name) bootstrap.resource_manager.create_project(project_ref['id'], project_ref) self._do_test_bootstrap(bootstrap)
[docs] def test_assignment_created_with_role_exists(self): # test assignment can be created if role already exists. bootstrap = cli.BootStrap() bootstrap.resource_manager.create_domain(self.default_domain['id'], self.default_domain) role = unit.new_role_ref(name=self.role_name) bootstrap.role_manager.create_role(role['id'], role) self._do_test_bootstrap(bootstrap)
[docs] def test_assignment_created_with_region_exists(self): # test assignment can be created if region already exists. bootstrap = cli.BootStrap() bootstrap.resource_manager.create_domain(self.default_domain['id'], self.default_domain) region = unit.new_region_ref(id=self.region_id) bootstrap.catalog_manager.create_region(region) self._do_test_bootstrap(bootstrap)
[docs] def test_endpoints_created_with_service_exists(self): # test assignment can be created if service already exists. bootstrap = cli.BootStrap() bootstrap.resource_manager.create_domain(self.default_domain['id'], self.default_domain) service = unit.new_service_ref(name=self.service_name) bootstrap.catalog_manager.create_service(service['id'], service) self._do_test_bootstrap(bootstrap)
[docs] def test_endpoints_created_with_endpoint_exists(self): # test assignment can be created if endpoint already exists. bootstrap = cli.BootStrap() bootstrap.resource_manager.create_domain(self.default_domain['id'], self.default_domain) service = unit.new_service_ref(name=self.service_name) bootstrap.catalog_manager.create_service(service['id'], service) region = unit.new_region_ref(id=self.region_id) bootstrap.catalog_manager.create_region(region) endpoint = unit.new_endpoint_ref(interface='public', service_id=service['id'], url=self.public_url, region_id=self.region_id) bootstrap.catalog_manager.create_endpoint(endpoint['id'], endpoint) self._do_test_bootstrap(bootstrap)
[docs]class CliDomainConfigAllTestCase(unit.SQLDriverOverrides, unit.TestCase):
[docs] def setUp(self): self.useFixture(database.Database()) super(CliDomainConfigAllTestCase, self).setUp() self.load_backends() self.config_fixture.config( group='identity', domain_config_dir=unit.TESTCONF + '/domain_configs_multi_ldap') self.domain_count = 3 self.setup_initial_domains() self.logging = self.useFixture( fixtures.FakeLogger(level=logging.INFO))
[docs] def config_files(self): self.config_fixture.register_cli_opt(cli.command_opt) config_files = super(CliDomainConfigAllTestCase, self).config_files() config_files.append(unit.dirs.tests_conf('backend_sql.conf')) return config_files
[docs] def cleanup_domains(self): for domain in self.domains: if domain == 'domain_default': # Not allowed to delete the default domain, but should at least # delete any domain-specific config for it. PROVIDERS.domain_config_api.delete_config( CONF.identity.default_domain_id) continue this_domain = self.domains[domain] this_domain['enabled'] = False PROVIDERS.resource_api.update_domain( this_domain['id'], this_domain ) PROVIDERS.resource_api.delete_domain(this_domain['id']) self.domains = {}
[docs] def config(self, config_files): CONF(args=['domain_config_upload', '--all'], project='keystone', default_config_files=config_files)
[docs] def setup_initial_domains(self): def create_domain(domain): return PROVIDERS.resource_api.create_domain(domain['id'], domain) self.domains = {} self.addCleanup(self.cleanup_domains) for x in range(1, self.domain_count): domain = 'domain%s' % x self.domains[domain] = create_domain( {'id': uuid.uuid4().hex, 'name': domain}) self.default_domain = unit.new_domain_ref( description=u'The default domain', id=CONF.identity.default_domain_id, name=u'Default') self.domains['domain_default'] = create_domain(self.default_domain)
[docs] def test_config_upload(self): # The values below are the same as in the domain_configs_multi_ldap # directory of test config_files. default_config = { 'ldap': {'url': 'fake://memory', 'user': 'cn=Admin', 'password': 'password', 'suffix': 'cn=example,cn=com'}, 'identity': {'driver': 'ldap'} } domain1_config = { 'ldap': {'url': 'fake://memory1', 'user': 'cn=Admin', 'password': 'password', 'suffix': 'cn=example,cn=com'}, 'identity': {'driver': 'ldap', 'list_limit': '101'} } domain2_config = { 'ldap': {'url': 'fake://memory', 'user': 'cn=Admin', 'password': 'password', 'suffix': 'cn=myroot,cn=com', 'group_tree_dn': 'ou=UserGroups,dc=myroot,dc=org', 'user_tree_dn': 'ou=Users,dc=myroot,dc=org'}, 'identity': {'driver': 'ldap'} } # Clear backend dependencies, since cli loads these manually provider_api.ProviderAPIs._clear_registry_instances() cli.DomainConfigUpload.main() res = PROVIDERS.domain_config_api.get_config_with_sensitive_info( CONF.identity.default_domain_id) self.assertEqual(default_config, res) res = PROVIDERS.domain_config_api.get_config_with_sensitive_info( self.domains['domain1']['id']) self.assertEqual(domain1_config, res) res = PROVIDERS.domain_config_api.get_config_with_sensitive_info( self.domains['domain2']['id']) self.assertEqual(domain2_config, res)
[docs]class CliDomainConfigSingleDomainTestCase(CliDomainConfigAllTestCase):
[docs] def config(self, config_files): CONF(args=['domain_config_upload', '--domain-name', 'Default'], project='keystone', default_config_files=config_files)
[docs] def test_config_upload(self): # The values below are the same as in the domain_configs_multi_ldap # directory of test config_files. default_config = { 'ldap': {'url': 'fake://memory', 'user': 'cn=Admin', 'password': 'password', 'suffix': 'cn=example,cn=com'}, 'identity': {'driver': 'ldap'} } # Clear backend dependencies, since cli loads these manually provider_api.ProviderAPIs._clear_registry_instances() cli.DomainConfigUpload.main() res = PROVIDERS.domain_config_api.get_config_with_sensitive_info( CONF.identity.default_domain_id) self.assertEqual(default_config, res) res = PROVIDERS.domain_config_api.get_config_with_sensitive_info( self.domains['domain1']['id']) self.assertEqual({}, res) res = PROVIDERS.domain_config_api.get_config_with_sensitive_info( self.domains['domain2']['id']) self.assertEqual({}, res)
[docs] def test_no_overwrite_config(self): # Create a config for the default domain default_config = { 'ldap': {'url': uuid.uuid4().hex}, 'identity': {'driver': 'ldap'} } PROVIDERS.domain_config_api.create_config( CONF.identity.default_domain_id, default_config) # Now try and upload the settings in the configuration file for the # default domain provider_api.ProviderAPIs._clear_registry_instances() with mock.patch('six.moves.builtins.print') as mock_print: self.assertRaises(unit.UnexpectedExit, cli.DomainConfigUpload.main) file_name = ('keystone.%s.conf' % self.default_domain['name']) error_msg = _( 'Domain: %(domain)s already has a configuration defined - ' 'ignoring file: %(file)s.') % { 'domain': self.default_domain['name'], 'file': os.path.join(CONF.identity.domain_config_dir, file_name)} mock_print.assert_has_calls([mock.call(error_msg)]) res = PROVIDERS.domain_config_api.get_config( CONF.identity.default_domain_id) # The initial config should not have been overwritten self.assertEqual(default_config, res)
[docs]class CliDomainConfigNoOptionsTestCase(CliDomainConfigAllTestCase):
[docs] def config(self, config_files): CONF(args=['domain_config_upload'], project='keystone', default_config_files=config_files)
[docs] def test_config_upload(self): provider_api.ProviderAPIs._clear_registry_instances() with mock.patch('six.moves.builtins.print') as mock_print: self.assertRaises(unit.UnexpectedExit, cli.DomainConfigUpload.main) mock_print.assert_has_calls( [mock.call( _('At least one option must be provided, use either ' '--all or --domain-name'))])
[docs]class CliDomainConfigTooManyOptionsTestCase(CliDomainConfigAllTestCase):
[docs] def config(self, config_files): CONF(args=['domain_config_upload', '--all', '--domain-name', 'Default'], project='keystone', default_config_files=config_files)
[docs] def test_config_upload(self): provider_api.ProviderAPIs._clear_registry_instances() with mock.patch('six.moves.builtins.print') as mock_print: self.assertRaises(unit.UnexpectedExit, cli.DomainConfigUpload.main) mock_print.assert_has_calls( [mock.call(_('The --all option cannot be used with ' 'the --domain-name option'))])
[docs]class CliDomainConfigInvalidDomainTestCase(CliDomainConfigAllTestCase):
[docs] def config(self, config_files): self.invalid_domain_name = uuid.uuid4().hex CONF(args=['domain_config_upload', '--domain-name', self.invalid_domain_name], project='keystone', default_config_files=config_files)
[docs] def test_config_upload(self): provider_api.ProviderAPIs._clear_registry_instances() with mock.patch('six.moves.builtins.print') as mock_print: self.assertRaises(unit.UnexpectedExit, cli.DomainConfigUpload.main) file_name = 'keystone.%s.conf' % self.invalid_domain_name error_msg = (_( 'Invalid domain name: %(domain)s found in config file name: ' '%(file)s - ignoring this file.') % { 'domain': self.invalid_domain_name, 'file': os.path.join(CONF.identity.domain_config_dir, file_name)}) mock_print.assert_has_calls([mock.call(error_msg)])
[docs]class TestDomainConfigFinder(unit.BaseTestCase):
[docs] def setUp(self): super(TestDomainConfigFinder, self).setUp() self.logging = self.useFixture(fixtures.LoggerFixture())
[docs] @mock.patch('os.walk') def test_finder_ignores_files(self, mock_walk): mock_walk.return_value = [ ['.', [], ['file.txt', 'keystone.conf', 'keystone.domain0.conf']], ] domain_configs = list(cli._domain_config_finder('.')) expected_domain_configs = [('./keystone.domain0.conf', 'domain0')] self.assertThat(domain_configs, matchers.Equals(expected_domain_configs)) expected_msg_template = ('Ignoring file (%s) while scanning ' 'domain config directory') self.assertThat( self.logging.output, matchers.Contains(expected_msg_template % 'file.txt')) self.assertThat( self.logging.output, matchers.Contains(expected_msg_template % 'keystone.conf'))
[docs]class CliDBSyncTestCase(unit.BaseTestCase):
[docs] class FakeConfCommand(object): def __init__(self, parent): self.extension = False self.check = parent.command_check self.expand = parent.command_expand self.migrate = parent.command_migrate self.contract = parent.command_contract self.version = None
[docs] def setUp(self): super(CliDBSyncTestCase, self).setUp() self.config_fixture = self.useFixture(oslo_config.fixture.Config(CONF)) self.config_fixture.register_cli_opt(cli.command_opt) upgrades.offline_sync_database_to_version = mock.Mock() upgrades.expand_schema = mock.Mock() upgrades.migrate_data = mock.Mock() upgrades.contract_schema = mock.Mock() self.command_check = False self.command_expand = False self.command_migrate = False self.command_contract = False
def _assert_correct_call(self, mocked_function): for func in [upgrades.offline_sync_database_to_version, upgrades.expand_schema, upgrades.migrate_data, upgrades.contract_schema]: if func == mocked_function: self.assertTrue(func.called) else: self.assertFalse(func.called)
[docs] def test_db_sync(self): self.useFixture(fixtures.MockPatchObject( CONF, 'command', self.FakeConfCommand(self))) cli.DbSync.main() self._assert_correct_call( upgrades.offline_sync_database_to_version)
[docs] def test_db_sync_expand(self): self.command_expand = True self.useFixture(fixtures.MockPatchObject( CONF, 'command', self.FakeConfCommand(self))) cli.DbSync.main() self._assert_correct_call(upgrades.expand_schema)
[docs] def test_db_sync_migrate(self): self.command_migrate = True self.useFixture(fixtures.MockPatchObject( CONF, 'command', self.FakeConfCommand(self))) cli.DbSync.main() self._assert_correct_call(upgrades.migrate_data)
[docs] def test_db_sync_contract(self): self.command_contract = True self.useFixture(fixtures.MockPatchObject( CONF, 'command', self.FakeConfCommand(self))) cli.DbSync.main() self._assert_correct_call(upgrades.contract_schema)
[docs] @mock.patch('keystone.cmd.cli.upgrades.get_db_version') def test_db_sync_check_when_database_is_empty(self, mocked_get_db_version): e = migration.exception.DBMigrationError("Invalid version") mocked_get_db_version.side_effect = e checker = cli.DbSync() log_info = self.useFixture(fixtures.FakeLogger(level=log.INFO)) status = checker.check_db_sync_status() self.assertIn("not currently under version control", log_info.output) self.assertEqual(status, 2)
[docs]class TestMappingPopulate(unit.SQLDriverOverrides, unit.TestCase):
[docs] def setUp(self): sqldb = self.useFixture(database.Database()) super(TestMappingPopulate, self).setUp() self.ldapdb = self.useFixture(ldapdb.LDAPDatabase()) self.ldapdb.clear() self.load_backends() sqldb.recreate() self.load_fixtures(default_fixtures)
[docs] def config_files(self): self.config_fixture.register_cli_opt(cli.command_opt) config_files = super(TestMappingPopulate, self).config_files() config_files.append(unit.dirs.tests_conf('backend_ldap_sql.conf')) return config_files
[docs] def config_overrides(self): super(TestMappingPopulate, self).config_overrides() self.config_fixture.config(group='identity', driver='ldap') self.config_fixture.config(group='identity_mapping', backward_compatible_ids=False)
[docs] def config(self, config_files): CONF(args=['mapping_populate', '--domain-name', 'Default'], project='keystone', default_config_files=config_files)
[docs] def test_mapping_populate(self): # mapping_populate should create id mappings. Test plan: # 0. Purge mappings # 1. Fetch user list directly via backend. It will not create any # mappings because it bypasses identity manager # 2. Verify that users have no public_id yet # 3. Execute mapping_populate. It should create id mappings # 4. For the same users verify that they have public_id now purge_filter = {} PROVIDERS.id_mapping_api.purge_mappings(purge_filter) hints = None users = PROVIDERS.identity_api.driver.list_users(hints) for user in users: local_entity = { 'domain_id': CONF.identity.default_domain_id, 'local_id': user['id'], 'entity_type': identity_mapping.EntityType.USER} self.assertIsNone( PROVIDERS.id_mapping_api.get_public_id(local_entity) ) # backends are loaded again in the command handler provider_api.ProviderAPIs._clear_registry_instances() cli.MappingPopulate.main() for user in users: local_entity = { 'domain_id': CONF.identity.default_domain_id, 'local_id': user['id'], 'entity_type': identity_mapping.EntityType.USER} self.assertIsNotNone( PROVIDERS.id_mapping_api.get_public_id(local_entity))
[docs] def test_bad_domain_name(self): CONF(args=['mapping_populate', '--domain-name', uuid.uuid4().hex], project='keystone') # backends are loaded again in the command handler provider_api.ProviderAPIs._clear_registry_instances() # NOTE: assertEqual is used on purpose. assertFalse passes with None. self.assertEqual(False, cli.MappingPopulate.main())
[docs]class CliDomainConfigUploadNothing(unit.BaseTestCase):
[docs] def setUp(self): super(CliDomainConfigUploadNothing, self).setUp() config_fixture = self.useFixture(oslo_config.fixture.Config(CONF)) config_fixture.register_cli_opt(cli.command_opt) # NOTE(dstanek): since this is not testing any database # functionality there is no need to go through the motions and # setup a test database. def fake_load_backends(self): self.resource_manager = mock.Mock() self.useFixture(fixtures.MockPatchObject( cli.DomainConfigUploadFiles, 'load_backends', fake_load_backends)) tempdir = self.useFixture(fixtures.TempDir()) config_fixture.config(group='identity', domain_config_dir=tempdir.path) self.logging = self.useFixture( fixtures.FakeLogger(level=logging.DEBUG))
[docs] def test_uploading_all_from_an_empty_directory(self): CONF(args=['domain_config_upload', '--all'], project='keystone', default_config_files=[]) cli.DomainConfigUpload.main() expected_msg = ('No domain configs uploaded from %r' % CONF.identity.domain_config_dir) self.assertThat(self.logging.output, matchers.Contains(expected_msg))
[docs]class CachingDoctorTests(unit.TestCase):
[docs] def test_symptom_caching_disabled(self): # Symptom Detected: Caching disabled self.config_fixture.config(group='cache', enabled=False) self.assertTrue(caching.symptom_caching_disabled()) # No Symptom Detected: Caching is enabled self.config_fixture.config(group='cache', enabled=True) self.assertFalse(caching.symptom_caching_disabled())
[docs] def test_caching_symptom_caching_enabled_without_a_backend(self): # Success Case: Caching enabled and backend configured self.config_fixture.config(group='cache', enabled=True) self.config_fixture.config(group='cache', backend='dogpile.cache.null') self.assertTrue(caching.symptom_caching_enabled_without_a_backend()) # Failure Case 1: Caching disabled and backend not configured self.config_fixture.config(group='cache', enabled=False) self.config_fixture.config(group='cache', backend='dogpile.cache.null') self.assertFalse(caching.symptom_caching_enabled_without_a_backend()) # Failure Case 2: Caching disabled and backend configured self.config_fixture.config(group='cache', enabled=False) self.config_fixture.config(group='cache', backend='dogpile.cache.memory') self.assertFalse(caching.symptom_caching_enabled_without_a_backend()) # Failure Case 3: Caching enabled and backend configured self.config_fixture.config(group='cache', enabled=True) self.config_fixture.config(group='cache', backend='dogpile.cache.memory') self.assertFalse(caching.symptom_caching_enabled_without_a_backend())
[docs]class CredentialDoctorTests(unit.TestCase):
[docs] def test_credential_and_fernet_key_repositories_match(self): # Symptom Detected: Key repository paths are not unique directory = self.useFixture(fixtures.TempDir()).path self.config_fixture.config(group='credential', key_repository=directory) self.config_fixture.config(group='fernet_tokens', key_repository=directory) self.assertTrue(credential.symptom_unique_key_repositories())
[docs] def test_credential_and_fernet_key_repositories_are_unique(self): # No Symptom Detected: Key repository paths are unique self.config_fixture.config(group='credential', key_repository='/etc/keystone/cred-repo') self.config_fixture.config(group='fernet_tokens', key_repository='/etc/keystone/fernet-repo') self.assertFalse(credential.symptom_unique_key_repositories())
[docs] @mock.patch('keystone.cmd.doctor.credential.utils') def test_usability_of_cred_fernet_key_repo_raised(self, mock_utils): # Symptom Detected: credential fernet key repository is world readable self.config_fixture.config(group='credential', provider='fernet') mock_utils.TokenUtils().validate_key_repository.return_value = False self.assertTrue( credential.symptom_usability_of_credential_fernet_key_repository())
[docs] @mock.patch('keystone.cmd.doctor.credential.utils') def test_usability_of_cred_fernet_key_repo_not_raised(self, mock_utils): # No Symptom Detected: Custom driver is used self.config_fixture.config(group='credential', provider='my-driver') mock_utils.TokenUtils().validate_key_repository.return_value = True self.assertFalse( credential.symptom_usability_of_credential_fernet_key_repository()) # No Symptom Detected: key repository is not world readable self.config_fixture.config(group='credential', provider='fernet') mock_utils.TokenUtils().validate_key_repository.return_value = True self.assertFalse( credential.symptom_usability_of_credential_fernet_key_repository())
[docs] @mock.patch('keystone.cmd.doctor.credential.utils') def test_keys_in_credential_fernet_key_repository_raised(self, mock_utils): # Symptom Detected: Key repo is empty self.config_fixture.config(group='credential', provider='fernet') mock_utils.TokenUtils().load_keys.return_value = False self.assertTrue( credential.symptom_keys_in_credential_fernet_key_repository())
[docs] @mock.patch('keystone.cmd.doctor.credential.utils') def test_keys_in_credential_fernet_key_repository_not_raised( self, mock_utils): # No Symptom Detected: Custom driver is used self.config_fixture.config(group='credential', provider='my-driver') mock_utils.TokenUtils().load_keys.return_value = True self.assertFalse( credential.symptom_keys_in_credential_fernet_key_repository()) # No Symptom Detected: Key repo is not empty, fernet is current driver self.config_fixture.config(group='credential', provider='fernet') mock_utils.TokenUtils().load_keys.return_value = True self.assertFalse( credential.symptom_keys_in_credential_fernet_key_repository())
[docs]class DatabaseDoctorTests(unit.TestCase):
[docs] def test_symptom_is_raised_if_database_connection_is_SQLite(self): # Symptom Detected: Database connection is sqlite self.config_fixture.config( group='database', connection='sqlite:///mydb') self.assertTrue( doc_database.symptom_database_connection_is_not_SQLite()) # No Symptom Detected: Database connection is MySQL self.config_fixture.config( group='database', connection='mysql+mysqlconnector://admin:secret@localhost/mydb') self.assertFalse( doc_database.symptom_database_connection_is_not_SQLite())
[docs]class DebugDoctorTests(unit.TestCase):
[docs] def test_symptom_debug_mode_is_enabled(self): # Symptom Detected: Debug mode is enabled self.config_fixture.config(debug=True) self.assertTrue(debug.symptom_debug_mode_is_enabled()) # No Symptom Detected: Debug mode is disabled self.config_fixture.config(debug=False) self.assertFalse(debug.symptom_debug_mode_is_enabled())
[docs]class FederationDoctorTests(unit.TestCase):
[docs] def test_symptom_comma_in_SAML_public_certificate_path(self): # Symptom Detected: There is a comma in path to public cert file self.config_fixture.config(group='saml', certfile='file,cert.pem') self.assertTrue( federation.symptom_comma_in_SAML_public_certificate_path()) # No Symptom Detected: There is no comma in the path self.config_fixture.config(group='saml', certfile='signing_cert.pem') self.assertFalse( federation.symptom_comma_in_SAML_public_certificate_path())
[docs] def test_symptom_comma_in_SAML_private_key_file_path(self): # Symptom Detected: There is a comma in path to private key file self.config_fixture.config(group='saml', keyfile='file,key.pem') self.assertTrue( federation.symptom_comma_in_SAML_private_key_file_path()) # No Symptom Detected: There is no comma in the path self.config_fixture.config(group='saml', keyfile='signing_key.pem') self.assertFalse( federation.symptom_comma_in_SAML_private_key_file_path())
[docs]class LdapDoctorTests(unit.TestCase):
[docs] def test_user_enabled_emulation_dn_ignored_raised(self): # Symptom when user_enabled_emulation_dn is being ignored because the # user did not enable the user_enabled_emulation self.config_fixture.config(group='ldap', user_enabled_emulation=False) self.config_fixture.config( group='ldap', user_enabled_emulation_dn='cn=enabled_users,dc=example,dc=com') self.assertTrue( ldap.symptom_LDAP_user_enabled_emulation_dn_ignored())
[docs] def test_user_enabled_emulation_dn_ignored_not_raised(self): # No symptom when configuration set properly self.config_fixture.config(group='ldap', user_enabled_emulation=True) self.config_fixture.config( group='ldap', user_enabled_emulation_dn='cn=enabled_users,dc=example,dc=com') self.assertFalse( ldap.symptom_LDAP_user_enabled_emulation_dn_ignored()) # No symptom when both configurations disabled self.config_fixture.config(group='ldap', user_enabled_emulation=False) self.config_fixture.config(group='ldap', user_enabled_emulation_dn=None) self.assertFalse( ldap.symptom_LDAP_user_enabled_emulation_dn_ignored())
[docs] def test_user_enabled_emulation_use_group_config_ignored_raised(self): # Symptom when user enabled emulation isn't enabled but group_config is # enabled self.config_fixture.config(group='ldap', user_enabled_emulation=False) self.config_fixture.config( group='ldap', user_enabled_emulation_use_group_config=True) self.assertTrue( ldap. symptom_LDAP_user_enabled_emulation_use_group_config_ignored())
[docs] def test_user_enabled_emulation_use_group_config_ignored_not_raised(self): # No symptom when configuration deactivated self.config_fixture.config(group='ldap', user_enabled_emulation=False) self.config_fixture.config( group='ldap', user_enabled_emulation_use_group_config=False) self.assertFalse( ldap. symptom_LDAP_user_enabled_emulation_use_group_config_ignored()) # No symptom when configurations set properly self.config_fixture.config(group='ldap', user_enabled_emulation=True) self.config_fixture.config( group='ldap', user_enabled_emulation_use_group_config=True) self.assertFalse( ldap. symptom_LDAP_user_enabled_emulation_use_group_config_ignored())
[docs] def test_group_members_are_ids_disabled_raised(self): # Symptom when objectclass is set to posixGroup but members_are_ids are # not enabled self.config_fixture.config(group='ldap', group_objectclass='posixGroup') self.config_fixture.config(group='ldap', group_members_are_ids=False) self.assertTrue(ldap.symptom_LDAP_group_members_are_ids_disabled())
[docs] def test_group_members_are_ids_disabled_not_raised(self): # No symptom when the configurations are set properly self.config_fixture.config(group='ldap', group_objectclass='posixGroup') self.config_fixture.config(group='ldap', group_members_are_ids=True) self.assertFalse(ldap.symptom_LDAP_group_members_are_ids_disabled()) # No symptom when configuration deactivated self.config_fixture.config(group='ldap', group_objectclass='groupOfNames') self.config_fixture.config(group='ldap', group_members_are_ids=False) self.assertFalse(ldap.symptom_LDAP_group_members_are_ids_disabled())
[docs] @mock.patch('os.listdir') @mock.patch('os.path.isdir') def test_file_based_domain_specific_configs_raised(self, mocked_isdir, mocked_listdir): self.config_fixture.config( group='identity', domain_specific_drivers_enabled=True) self.config_fixture.config( group='identity', domain_configurations_from_database=False) # Symptom if there is no existing directory mocked_isdir.return_value = False self.assertTrue(ldap.symptom_LDAP_file_based_domain_specific_configs()) # Symptom if there is an invalid filename inside the domain directory mocked_isdir.return_value = True mocked_listdir.return_value = ['openstack.domains.conf'] self.assertTrue(ldap.symptom_LDAP_file_based_domain_specific_configs())
[docs] @mock.patch('os.listdir') @mock.patch('os.path.isdir') def test_file_based_domain_specific_configs_not_raised(self, mocked_isdir, mocked_listdir): # No symptom if both configurations deactivated self.config_fixture.config( group='identity', domain_specific_drivers_enabled=False) self.config_fixture.config( group='identity', domain_configurations_from_database=False) self.assertFalse( ldap.symptom_LDAP_file_based_domain_specific_configs()) # No symptom if directory exists with no invalid filenames self.config_fixture.config( group='identity', domain_specific_drivers_enabled=True) self.config_fixture.config( group='identity', domain_configurations_from_database=False) mocked_isdir.return_value = True mocked_listdir.return_value = ['keystone.domains.conf'] self.assertFalse( ldap.symptom_LDAP_file_based_domain_specific_configs())
[docs] @mock.patch('os.listdir') @mock.patch('os.path.isdir') @mock.patch('keystone.cmd.doctor.ldap.configparser.ConfigParser') def test_file_based_domain_specific_configs_formatted_correctly_raised( self, mocked_parser, mocked_isdir, mocked_listdir): symptom = ('symptom_LDAP_file_based_domain_specific_configs' '_formatted_correctly') # Symptom Detected: Ldap domain specific configuration files are not # formatted correctly self.config_fixture.config( group='identity', domain_specific_drivers_enabled=True) self.config_fixture.config( group='identity', domain_configurations_from_database=False) mocked_isdir.return_value = True mocked_listdir.return_value = ['keystone.domains.conf'] mock_instance = mock.MagicMock() mock_instance.read.side_effect = configparser.Error('No Section') mocked_parser.return_value = mock_instance self.assertTrue(getattr(ldap, symptom)())
[docs] @mock.patch('os.listdir') @mock.patch('os.path.isdir') def test_file_based_domain_specific_configs_formatted_correctly_not_raised( self, mocked_isdir, mocked_listdir): symptom = ('symptom_LDAP_file_based_domain_specific_configs' '_formatted_correctly') # No Symptom Detected: Domain_specific drivers is not enabled self.config_fixture.config( group='identity', domain_specific_drivers_enabled=False) self.assertFalse(getattr(ldap, symptom)()) # No Symptom Detected: Domain configuration from database is enabled self.config_fixture.config( group='identity', domain_specific_drivers_enabled=True) self.assertFalse(getattr(ldap, symptom)()) self.config_fixture.config( group='identity', domain_configurations_from_database=True) self.assertFalse(getattr(ldap, symptom)()) # No Symptom Detected: The directory in domain_config_dir doesn't exist mocked_isdir.return_value = False self.assertFalse(getattr(ldap, symptom)()) # No Symptom Detected: domain specific drivers are enabled, domain # configurations from database are disabled, directory exists, and no # exceptions found. self.config_fixture.config( group='identity', domain_configurations_from_database=False) mocked_isdir.return_value = True # An empty directory should not raise this symptom self.assertFalse(getattr(ldap, symptom)()) # Test again with a file inside the directory mocked_listdir.return_value = ['keystone.domains.conf'] self.assertFalse(getattr(ldap, symptom)())
[docs]class SecurityComplianceDoctorTests(unit.TestCase):
[docs] def test_minimum_password_age_greater_than_password_expires_days(self): # Symptom Detected: Minimum password age is greater than the password # expires days. Both values are positive integers greater than zero. self.config_fixture.config(group='security_compliance', minimum_password_age=2) self.config_fixture.config(group='security_compliance', password_expires_days=1) self.assertTrue( security_compliance. symptom_minimum_password_age_greater_than_expires_days())
[docs] def test_minimum_password_age_equal_to_password_expires_days(self): # Symptom Detected: Minimum password age is equal to the password # expires days. Both values are positive integers greater than zero. self.config_fixture.config(group='security_compliance', minimum_password_age=1) self.config_fixture.config(group='security_compliance', password_expires_days=1) self.assertTrue( security_compliance. symptom_minimum_password_age_greater_than_expires_days())
[docs] def test_minimum_password_age_less_than_password_expires_days(self): # No Symptom Detected: Minimum password age is less than password # expires days. Both values are positive integers greater than zero. self.config_fixture.config(group='security_compliance', minimum_password_age=1) self.config_fixture.config(group='security_compliance', password_expires_days=2) self.assertFalse( security_compliance. symptom_minimum_password_age_greater_than_expires_days())
[docs] def test_minimum_password_age_and_password_expires_days_deactivated(self): # No Symptom Detected: when minimum_password_age's default value is 0 # and password_expires_days' default value is None self.assertFalse( security_compliance. symptom_minimum_password_age_greater_than_expires_days())
[docs] def test_invalid_password_regular_expression(self): # Symptom Detected: Regular expression is invalid self.config_fixture.config( group='security_compliance', password_regex='^^(??=.*\d)$') self.assertTrue( security_compliance.symptom_invalid_password_regular_expression())
[docs] def test_valid_password_regular_expression(self): # No Symptom Detected: Regular expression is valid self.config_fixture.config( group='security_compliance', password_regex='^(?=.*\d)(?=.*[a-zA-Z]).{7,}$') self.assertFalse( security_compliance.symptom_invalid_password_regular_expression())
[docs] def test_password_regular_expression_deactivated(self): # No Symptom Detected: Regular expression deactivated to None self.config_fixture.config( group='security_compliance', password_regex=None) self.assertFalse( security_compliance.symptom_invalid_password_regular_expression())
[docs] def test_password_regular_expression_description_not_set(self): # Symptom Detected: Regular expression is set but description is not self.config_fixture.config( group='security_compliance', password_regex='^(?=.*\d)(?=.*[a-zA-Z]).{7,}$') self.config_fixture.config( group='security_compliance', password_regex_description=None) self.assertTrue( security_compliance. symptom_password_regular_expression_description_not_set())
[docs] def test_password_regular_expression_description_set(self): # No Symptom Detected: Regular expression and description are set desc = '1 letter, 1 digit, and a minimum length of 7 is required' self.config_fixture.config( group='security_compliance', password_regex='^(?=.*\d)(?=.*[a-zA-Z]).{7,}$') self.config_fixture.config( group='security_compliance', password_regex_description=desc) self.assertFalse( security_compliance. symptom_password_regular_expression_description_not_set())
[docs] def test_password_regular_expression_description_deactivated(self): # No Symptom Detected: Regular expression and description are # deactivated to None self.config_fixture.config( group='security_compliance', password_regex=None) self.config_fixture.config( group='security_compliance', password_regex_description=None) self.assertFalse( security_compliance. symptom_password_regular_expression_description_not_set())
[docs]class TokensDoctorTests(unit.TestCase):
[docs] def test_unreasonable_max_token_size_raised(self): # Symptom Detected: the max_token_size for uuid is not 32 self.config_fixture.config(group='token', provider='uuid') self.config_fixture.config(max_token_size=33) self.assertTrue(tokens.symptom_unreasonable_max_token_size()) # Symptom Detected: the max_token_size for fernet is greater than 255 self.config_fixture.config(group='token', provider='fernet') self.config_fixture.config(max_token_size=256) self.assertTrue(tokens.symptom_unreasonable_max_token_size())
[docs] def test_unreasonable_max_token_size_not_raised(self): # No Symptom Detected: the max_token_size for uuid is 32 self.config_fixture.config(group='token', provider='uuid') self.config_fixture.config(max_token_size=32) self.assertFalse(tokens.symptom_unreasonable_max_token_size()) # No Symptom Detected: the max_token_size for fernet is 255 or less self.config_fixture.config(group='token', provider='fernet') self.config_fixture.config(max_token_size=255) self.assertFalse(tokens.symptom_unreasonable_max_token_size())
[docs]class TokenFernetDoctorTests(unit.TestCase):
[docs] @mock.patch('keystone.cmd.doctor.tokens_fernet.utils') def test_usability_of_Fernet_key_repository_raised(self, mock_utils): # Symptom Detected: Fernet key repo is world readable self.config_fixture.config(group='token', provider='fernet') mock_utils.TokenUtils().validate_key_repository.return_value = False self.assertTrue( tokens_fernet.symptom_usability_of_Fernet_key_repository())
[docs] @mock.patch('keystone.cmd.doctor.tokens_fernet.utils') def test_usability_of_Fernet_key_repository_not_raised(self, mock_utils): # No Symptom Detected: UUID is used instead of fernet self.config_fixture.config(group='token', provider='uuid') mock_utils.TokenUtils().validate_key_repository.return_value = False self.assertFalse( tokens_fernet.symptom_usability_of_Fernet_key_repository()) # No Symptom Detected: configs set properly, key repo is not world # readable but is user readable self.config_fixture.config(group='token', provider='fernet') mock_utils.TokenUtils().validate_key_repository.return_value = True self.assertFalse( tokens_fernet.symptom_usability_of_Fernet_key_repository())
[docs] @mock.patch('keystone.cmd.doctor.tokens_fernet.utils') def test_keys_in_Fernet_key_repository_raised(self, mock_utils): # Symptom Detected: Fernet key repository is empty self.config_fixture.config(group='token', provider='fernet') mock_utils.TokenUtils().load_keys.return_value = False self.assertTrue( tokens_fernet.symptom_keys_in_Fernet_key_repository())
[docs] @mock.patch('keystone.cmd.doctor.tokens_fernet.utils') def test_keys_in_Fernet_key_repository_not_raised(self, mock_utils): # No Symptom Detected: UUID is used instead of fernet self.config_fixture.config(group='token', provider='uuid') mock_utils.TokenUtils().load_keys.return_value = True self.assertFalse( tokens_fernet.symptom_usability_of_Fernet_key_repository()) # No Symptom Detected: configs set properly, key repo has been # populated with keys self.config_fixture.config(group='token', provider='fernet') mock_utils.TokenUtils().load_keys.return_value = True self.assertFalse( tokens_fernet.symptom_usability_of_Fernet_key_repository())
[docs]class TestMappingPurge(unit.SQLDriverOverrides, unit.BaseTestCase):
[docs] class FakeConfCommand(object): def __init__(self, parent): self.extension = False self.all = parent.command_all self.type = parent.command_type self.domain_name = parent.command_domain_name self.local_id = parent.command_local_id self.public_id = parent.command_public_id
[docs] def setUp(self): # Set up preset cli options and a parser super(TestMappingPurge, self).setUp() self.config_fixture = self.useFixture(oslo_config.fixture.Config(CONF)) self.config_fixture.register_cli_opt(cli.command_opt) # For unit tests that should not throw any erorrs, # Use the argument parser to test that the combinations work parser_test = argparse.ArgumentParser() subparsers = parser_test.add_subparsers() self.parser = cli.MappingPurge.add_argument_parser(subparsers)
[docs] def test_mapping_purge_with_no_arguments_fails(self): # Make sure the logic in main() actually catches no argument error self.command_type = None self.command_all = False self.command_domain_name = None self.command_local_id = None self.command_public_id = None self.useFixture(fixtures.MockPatchObject( CONF, 'command', self.FakeConfCommand(self))) self.assertRaises(ValueError, cli.MappingPurge.main)
[docs] def test_mapping_purge_with_all_and_other_argument_fails(self): # Make sure the logic in main() actually catches invalid combinations self.command_type = 'user' self.command_all = True self.command_domain_name = None self.command_local_id = None self.command_public_id = None self.useFixture(fixtures.MockPatchObject( CONF, 'command', self.FakeConfCommand(self))) self.assertRaises(ValueError, cli.MappingPurge.main)
[docs] def test_mapping_purge_with_only_all_passes(self): args = (['--all']) res = self.parser.parse_args(args) self.assertTrue(vars(res)['all'])
[docs] def test_mapping_purge_with_domain_name_argument_succeeds(self): args = (['--domain-name', uuid.uuid4().hex]) self.parser.parse_args(args)
[docs] def test_mapping_purge_with_public_id_argument_succeeds(self): args = (['--public-id', uuid.uuid4().hex]) self.parser.parse_args(args)
[docs] def test_mapping_purge_with_local_id_argument_succeeds(self): args = (['--local-id', uuid.uuid4().hex]) self.parser.parse_args(args)
[docs] def test_mapping_purge_with_type_argument_succeeds(self): args = (['--type', 'user']) self.parser.parse_args(args) args = (['--type', 'group']) self.parser.parse_args(args)
[docs] def test_mapping_purge_with_invalid_argument_fails(self): args = (['--invalid-option', 'some value']) self.assertRaises(unit.UnexpectedExit, self.parser.parse_args, args)
[docs] def test_mapping_purge_with_all_other_combinations_passes(self): args = (['--type', 'user', '--local-id', uuid.uuid4().hex]) self.parser.parse_args(args) args.append('--domain-name') args.append('test') self.parser.parse_args(args) args.append('--public-id') args.append(uuid.uuid4().hex) self.parser.parse_args(args)
Creative Commons Attribution 3.0 License

Except where otherwise noted, this document is licensed under Creative Commons Attribution 3.0 License. See all OpenStack Legal Documents.