keystone.tests.unit.limit.test_backends

Source code for keystone.tests.unit.limit.test_backends

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import uuid

from keystone.common import driver_hints
from keystone.common import provider_api
from keystone import exception
from keystone.tests import unit
from keystone.tests.unit import utils as test_utils

PROVIDERS = provider_api.ProviderAPIs


[docs]class RegisteredLimitTests(object):
[docs] def test_create_registered_limit_crud(self): # create one, return all registered_limits registered_limit_1 = unit.new_registered_limit_ref( service_id=self.service_one['id'], region_id=self.region_one['id'], resource_name='volume', default_limit=10, id=uuid.uuid4().hex) res1 = PROVIDERS.unified_limit_api.create_registered_limits( [registered_limit_1]) self.assertDictEqual(registered_limit_1, res1[0]) # create another, return all registered_limits registered_limit_2 = unit.new_registered_limit_ref( service_id=self.service_one['id'], region_id=self.region_one['id'], resource_name='snapshot', default_limit=5, id=uuid.uuid4().hex) res2 = PROVIDERS.unified_limit_api.create_registered_limits( [registered_limit_2]) self.assertEqual(2, len(res2)) for re in res2: if re['id'] == registered_limit_1['id']: self.assertDictEqual(registered_limit_1, re) if re['id'] == registered_limit_2['id']: self.assertDictEqual(registered_limit_2, re)
[docs] def test_create_registered_limit_duplicate(self): registered_limit_1 = unit.new_registered_limit_ref( service_id=self.service_one['id'], region_id=self.region_one['id'], resource_name='volume', default_limit=10, id=uuid.uuid4().hex) PROVIDERS.unified_limit_api.create_registered_limits( [registered_limit_1]) registered_limit_2 = unit.new_registered_limit_ref( service_id=self.service_one['id'], region_id=self.region_one['id'], resource_name='volume', default_limit=10, id=uuid.uuid4().hex) self.assertRaises(exception.Conflict, PROVIDERS.unified_limit_api.create_registered_limits, [registered_limit_2])
[docs] def test_create_multi_registered_limits_duplicate(self): registered_limit_1 = unit.new_registered_limit_ref( service_id=self.service_one['id'], region_id=self.region_one['id'], resource_name='volume', default_limit=10, id=uuid.uuid4().hex) PROVIDERS.unified_limit_api.create_registered_limits( [registered_limit_1]) # Create with a duplicated one and a normal one. Both of them will not # be created. registered_limit_2 = unit.new_registered_limit_ref( service_id=self.service_one['id'], region_id=self.region_one['id'], resource_name='volume', default_limit=10, id=uuid.uuid4().hex) registered_limit_3 = unit.new_registered_limit_ref( service_id=self.service_one['id'], region_id=self.region_one['id'], resource_name='snapshot', default_limit=10, id=uuid.uuid4().hex) self.assertRaises(exception.Conflict, PROVIDERS.unified_limit_api.create_registered_limits, [registered_limit_2, registered_limit_3]) reg_limits = PROVIDERS.unified_limit_api.list_registered_limits() self.assertEqual(1, len(reg_limits)) self.assertEqual(registered_limit_1['id'], reg_limits[0]['id'])
[docs] def test_create_registered_limit_invalid_service(self): registered_limit_1 = unit.new_registered_limit_ref( service_id=uuid.uuid4().hex, region_id=self.region_one['id'], resource_name='volume', default_limit=10, id=uuid.uuid4().hex) self.assertRaises(exception.ValidationError, PROVIDERS.unified_limit_api.create_registered_limits, [registered_limit_1])
[docs] def test_create_registered_limit_invalid_region(self): registered_limit_1 = unit.new_registered_limit_ref( service_id=self.service_one['id'], region_id=uuid.uuid4().hex, resource_name='volume', default_limit=10, id=uuid.uuid4().hex) self.assertRaises(exception.ValidationError, PROVIDERS.unified_limit_api.create_registered_limits, [registered_limit_1])
[docs] def test_update_registered_limit(self): # create two registered limits registered_limit_1 = unit.new_registered_limit_ref( service_id=self.service_one['id'], region_id=self.region_one['id'], resource_name='volume', default_limit=10, id=uuid.uuid4().hex) registered_limit_2 = unit.new_registered_limit_ref( service_id=self.service_one['id'], region_id=self.region_one['id'], resource_name='snapshot', default_limit=5, id=uuid.uuid4().hex) PROVIDERS.unified_limit_api.create_registered_limits( [registered_limit_1, registered_limit_2]) # update one, return all registered_limits registered_limit_update = {'id': registered_limit_1['id'], 'region_id': 'region_two'} res = PROVIDERS.unified_limit_api.update_registered_limits( [registered_limit_update]) for re in res: if re['id'] == registered_limit_1['id']: self.assertEqual('region_two', re['region_id']) if re['id'] == registered_limit_2['id']: self.assertDictEqual(registered_limit_2, re)
[docs] def test_update_registered_limit_invalid_input_return_bad_request(self): registered_limit_1 = unit.new_registered_limit_ref( service_id=self.service_one['id'], region_id=self.region_one['id'], resource_name='volume', default_limit=10, id=uuid.uuid4().hex) PROVIDERS.unified_limit_api.create_registered_limits( [registered_limit_1]) update_ref = {'id': registered_limit_1['id'], 'service_id': uuid.uuid4().hex} self.assertRaises(exception.ValidationError, PROVIDERS.unified_limit_api.update_registered_limits, [update_ref]) update_ref = {'id': registered_limit_1['id'], 'region_id': 'fake_id'} self.assertRaises(exception.ValidationError, PROVIDERS.unified_limit_api.update_registered_limits, [update_ref])
[docs] def test_update_registered_limit_duplicate(self): registered_limit_1 = unit.new_registered_limit_ref( service_id=self.service_one['id'], region_id=self.region_one['id'], resource_name='volume', default_limit=10, id=uuid.uuid4().hex) registered_limit_2 = unit.new_registered_limit_ref( service_id=self.service_one['id'], region_id=self.region_two['id'], resource_name='snapshot', default_limit=10, id=uuid.uuid4().hex) PROVIDERS.unified_limit_api.create_registered_limits( [registered_limit_1, registered_limit_2]) # Update registered_limit1 to registered_limit2 update_ref = {'id': registered_limit_1['id'], 'region_id': self.region_two['id'], 'resource_name': 'snapshot'} self.assertRaises(exception.Conflict, PROVIDERS.unified_limit_api.update_registered_limits, [update_ref])
[docs] @test_utils.wip("Skipped until Bug 1744195 is resolved") def test_update_registered_limit_when_reference_limit_exist(self): registered_limit_1 = unit.new_registered_limit_ref( service_id=self.service_one['id'], region_id=self.region_one['id'], resource_name='volume', default_limit=10, id=uuid.uuid4().hex) PROVIDERS.unified_limit_api.create_registered_limits( [registered_limit_1]) limit_1 = unit.new_limit_ref( project_id=self.tenant_bar['id'], service_id=self.service_one['id'], region_id=self.region_one['id'], resource_name='volume', resource_limit=10, id=uuid.uuid4().hex) PROVIDERS.unified_limit_api.create_limits([limit_1]) registered_limit_update = {'id': registered_limit_1['id'], 'region_id': 'region_two'} self.assertRaises(exception.RegisteredLimitError, PROVIDERS.unified_limit_api.update_registered_limits, [registered_limit_update]) registered_limit_2 = unit.new_registered_limit_ref( service_id=self.service_one['id'], resource_name='volume', default_limit=10, id=uuid.uuid4().hex) PROVIDERS.unified_limit_api.create_registered_limits( [registered_limit_2]) limit_2 = unit.new_limit_ref( project_id=self.tenant_bar['id'], service_id=self.service_one['id'], resource_name='volume', resource_limit=10, id=uuid.uuid4().hex) PROVIDERS.unified_limit_api.create_limits([limit_2]) registered_limit_update = {'id': registered_limit_2['id'], 'region_id': 'region_two'} self.assertRaises(exception.RegisteredLimitError, PROVIDERS.unified_limit_api.update_registered_limits, [registered_limit_update])
[docs] def test_list_registered_limits(self): # create two registered limits registered_limit_1 = unit.new_registered_limit_ref( service_id=self.service_one['id'], region_id=self.region_one['id'], resource_name='volume', default_limit=10, id=uuid.uuid4().hex) registered_limit_2 = unit.new_registered_limit_ref( service_id=self.service_one['id'], region_id=self.region_one['id'], resource_name='snapshot', default_limit=5, id=uuid.uuid4().hex) reg_limits_1 = PROVIDERS.unified_limit_api.create_registered_limits( [registered_limit_1, registered_limit_2]) # list reg_limits_2 = PROVIDERS.unified_limit_api.list_registered_limits() self.assertEqual(2, len(reg_limits_2)) self.assertDictEqual(reg_limits_1[0], reg_limits_2[0]) self.assertDictEqual(reg_limits_1[1], reg_limits_2[1])
[docs] def test_list_registered_limit_by_limit(self): self.config_fixture.config(list_limit=1) # create two registered limits registered_limit_1 = unit.new_registered_limit_ref( service_id=self.service_one['id'], region_id=self.region_one['id'], resource_name='volume', default_limit=10, id=uuid.uuid4().hex) registered_limit_2 = unit.new_registered_limit_ref( service_id=self.service_one['id'], region_id=self.region_one['id'], resource_name='snapshot', default_limit=5, id=uuid.uuid4().hex) PROVIDERS.unified_limit_api.create_registered_limits( [registered_limit_1, registered_limit_2]) # list, limit is 1 hints = driver_hints.Hints() reg_limits = PROVIDERS.unified_limit_api.list_registered_limits( hints=hints) self.assertEqual(1, len(reg_limits)) if reg_limits[0]['id'] == registered_limit_1['id']: self.assertDictEqual(registered_limit_1, reg_limits[0]) else: self.assertDictEqual(registered_limit_2, reg_limits[0])
[docs] def test_list_registered_limit_by_filter(self): registered_limit_1 = unit.new_registered_limit_ref( service_id=self.service_one['id'], region_id=self.region_one['id'], resource_name='volume', default_limit=10, id=uuid.uuid4().hex) registered_limit_2 = unit.new_registered_limit_ref( service_id=self.service_one['id'], region_id=self.region_two['id'], resource_name='snapshot', default_limit=10, id=uuid.uuid4().hex) PROVIDERS.unified_limit_api.create_registered_limits( [registered_limit_1, registered_limit_2]) hints = driver_hints.Hints() hints.add_filter('service_id', self.service_one['id']) res = PROVIDERS.unified_limit_api.list_registered_limits(hints) self.assertEqual(2, len(res)) hints = driver_hints.Hints() hints.add_filter('region_id', self.region_one['id']) res = PROVIDERS.unified_limit_api.list_registered_limits(hints) self.assertEqual(1, len(res)) hints = driver_hints.Hints() hints.add_filter('resource_name', 'backup') res = PROVIDERS.unified_limit_api.list_registered_limits(hints) self.assertEqual(0, len(res))
[docs] def test_get_registered_limit(self): # create two registered limits registered_limit_1 = unit.new_registered_limit_ref( service_id=self.service_one['id'], region_id=self.region_one['id'], resource_name='volume', default_limit=10, id=uuid.uuid4().hex) registered_limit_2 = unit.new_registered_limit_ref( service_id=self.service_one['id'], region_id=self.region_one['id'], resource_name='snapshot', default_limit=5, id=uuid.uuid4().hex) PROVIDERS.unified_limit_api.create_registered_limits( [registered_limit_1, registered_limit_2]) # show one res = PROVIDERS.unified_limit_api.get_registered_limit( registered_limit_2['id']) self.assertDictEqual(registered_limit_2, res)
[docs] def test_get_registered_limit_returns_not_found(self): self.assertRaises(exception.RegisteredLimitNotFound, PROVIDERS.unified_limit_api.get_registered_limit, uuid.uuid4().hex)
[docs] def test_delete_registered_limit(self): # create two registered limits registered_limit_1 = unit.new_registered_limit_ref( service_id=self.service_one['id'], region_id=self.region_one['id'], resource_name='volume', default_limit=10, id=uuid.uuid4().hex) registered_limit_2 = unit.new_registered_limit_ref( service_id=self.service_one['id'], region_id=self.region_one['id'], resource_name='snapshot', default_limit=5, id=uuid.uuid4().hex) PROVIDERS.unified_limit_api.create_registered_limits( [registered_limit_1, registered_limit_2]) # delete one PROVIDERS.unified_limit_api.delete_registered_limit( registered_limit_1['id']) self.assertRaises(exception.RegisteredLimitNotFound, PROVIDERS.unified_limit_api.get_registered_limit, registered_limit_1['id']) reg_limits = PROVIDERS.unified_limit_api.list_registered_limits() self.assertEqual(1, len(reg_limits)) self.assertEqual(registered_limit_2['id'], reg_limits[0]['id'])
[docs] def test_delete_registered_limit_returns_not_found(self): self.assertRaises(exception.RegisteredLimitNotFound, PROVIDERS.unified_limit_api.delete_registered_limit, uuid.uuid4().hex)
[docs] @test_utils.wip("Skipped until Bug 1744195 is resolved") def test_delete_registered_limit_when_reference_limit_exist(self): registered_limit_1 = unit.new_registered_limit_ref( service_id=self.service_one['id'], region_id=self.region_one['id'], resource_name='volume', default_limit=10, id=uuid.uuid4().hex) PROVIDERS.unified_limit_api.create_registered_limits( [registered_limit_1]) limit_1 = unit.new_limit_ref( project_id=self.tenant_bar['id'], service_id=self.service_one['id'], region_id=self.region_one['id'], resource_name='volume', resource_limit=10, id=uuid.uuid4().hex) PROVIDERS.unified_limit_api.create_limits([limit_1]) self.assertRaises(exception.RegisteredLimitError, PROVIDERS.unified_limit_api.delete_registered_limit, registered_limit_1['id']) registered_limit_2 = unit.new_registered_limit_ref( service_id=self.service_one['id'], resource_name='volume', default_limit=10, id=uuid.uuid4().hex) PROVIDERS.unified_limit_api.create_registered_limits( [registered_limit_2]) limit_2 = unit.new_limit_ref( project_id=self.tenant_bar['id'], service_id=self.service_one['id'], resource_name='volume', resource_limit=10, id=uuid.uuid4().hex) PROVIDERS.unified_limit_api.create_limits([limit_2]) self.assertRaises(exception.RegisteredLimitError, PROVIDERS.unified_limit_api.delete_registered_limit, registered_limit_2['id'])
[docs]class LimitTests(object):
[docs] def test_create_limit(self): # create one, return all limits limit_1 = unit.new_limit_ref( project_id=self.tenant_bar['id'], service_id=self.service_one['id'], region_id=self.region_one['id'], resource_name='volume', resource_limit=10, id=uuid.uuid4().hex) res1 = PROVIDERS.unified_limit_api.create_limits([limit_1]) self.assertDictEqual(limit_1, res1[0]) # create another, return all limits limit_2 = unit.new_limit_ref( project_id=self.tenant_bar['id'], service_id=self.service_one['id'], region_id=self.region_two['id'], resource_name='snapshot', resource_limit=5, id=uuid.uuid4().hex) res2 = PROVIDERS.unified_limit_api.create_limits([limit_2]) for re in res2: if re['id'] == limit_1['id']: self.assertDictEqual(limit_1, re) if re['id'] == limit_2['id']: self.assertDictEqual(limit_2, re)
[docs] def test_create_limit_duplicate(self): limit_1 = unit.new_limit_ref( project_id=self.tenant_bar['id'], service_id=self.service_one['id'], region_id=self.region_one['id'], resource_name='volume', resource_limit=10, id=uuid.uuid4().hex) PROVIDERS.unified_limit_api.create_limits([limit_1]) # use different id but the same project_id, service_id and region_id limit_1 = unit.new_limit_ref( project_id=self.tenant_bar['id'], service_id=self.service_one['id'], region_id=self.region_one['id'], resource_name='volume', resource_limit=10, id=uuid.uuid4().hex) self.assertRaises(exception.Conflict, PROVIDERS.unified_limit_api.create_limits, [limit_1])
[docs] def test_create_limit_with_invalid_service_raises_validation_error(self): limit = unit.new_limit_ref( project_id=self.tenant_bar['id'], service_id=uuid.uuid4().hex, region_id=self.region_one['id'], resource_name='volume', resource_limit=10, id=uuid.uuid4().hex) self.assertRaises(exception.ValidationError, PROVIDERS.unified_limit_api.create_limits, [limit])
[docs] def test_create_limit_with_invalid_region_raises_validation_error(self): limit = unit.new_limit_ref( project_id=self.tenant_bar['id'], service_id=self.service_one['id'], region_id=uuid.uuid4().hex, resource_name='volume', resource_limit=10, id=uuid.uuid4().hex) self.assertRaises(exception.ValidationError, PROVIDERS.unified_limit_api.create_limits, [limit])
[docs] @test_utils.wip("Skipped until Bug 1744195 is resolved") def test_create_limit_without_reference_registered_limit(self): limit_1 = unit.new_limit_ref( project_id=self.tenant_bar['id'], service_id=self.service_one['id'], region_id=self.region_one['id'], resource_name='volume', resource_limit=10, id=uuid.uuid4().hex) self.assertRaises(exception.NoLimitReference, PROVIDERS.unified_limit_api.create_limits, [limit_1])
[docs] def test_update_limits(self): # create two limits limit_1 = unit.new_limit_ref( project_id=self.tenant_bar['id'], service_id=self.service_one['id'], region_id=self.region_one['id'], resource_name='volume', resource_limit=10, id=uuid.uuid4().hex) limit_2 = unit.new_limit_ref( project_id=self.tenant_bar['id'], service_id=self.service_one['id'], region_id=self.region_two['id'], resource_name='snapshot', resource_limit=5, id=uuid.uuid4().hex) PROVIDERS.unified_limit_api.create_limits([limit_1, limit_2]) # update one, return all limits limit_update = {'id': limit_1['id'], 'resource_limit': 8} res = PROVIDERS.unified_limit_api.update_limits([limit_update]) for re in res: if re['id'] == limit_1['id']: self.assertEqual(8, re['resource_limit']) if re['id'] == limit_2['id']: self.assertDictEqual(limit_2, re)
[docs] def test_list_limits(self): # create two limits limit_1 = unit.new_limit_ref( project_id=self.tenant_bar['id'], service_id=self.service_one['id'], region_id=self.region_one['id'], resource_name='volume', resource_limit=10, id=uuid.uuid4().hex) limit_2 = unit.new_limit_ref( project_id=self.tenant_bar['id'], service_id=self.service_one['id'], region_id=self.region_two['id'], resource_name='snapshot', resource_limit=5, id=uuid.uuid4().hex) PROVIDERS.unified_limit_api.create_limits([limit_1, limit_2]) # list hints = driver_hints.Hints() hints.add_filter('project_id', self.tenant_bar['id']) limits = PROVIDERS.unified_limit_api.list_limits(hints) self.assertEqual(2, len(limits)) for re in limits: if re['id'] == limit_1['id']: self.assertDictEqual(limit_1, re) if re['id'] == limit_2['id']: self.assertDictEqual(limit_2, re)
[docs] def test_list_limit_by_limit(self): self.config_fixture.config(list_limit=1) # create two limits limit_1 = unit.new_limit_ref( project_id=self.tenant_bar['id'], service_id=self.service_one['id'], region_id=self.region_one['id'], resource_name='volume', resource_limit=10, id=uuid.uuid4().hex) limit_2 = unit.new_limit_ref( project_id=self.tenant_bar['id'], service_id=self.service_one['id'], region_id=self.region_two['id'], resource_name='snapshot', resource_limit=5, id=uuid.uuid4().hex) PROVIDERS.unified_limit_api.create_limits([limit_1, limit_2]) # list, limit is 1 hints = driver_hints.Hints() limits = PROVIDERS.unified_limit_api.list_limits(hints=hints) self.assertEqual(1, len(limits)) if limits[0]['id'] == limit_1['id']: self.assertDictEqual(limit_1, limits[0]) else: self.assertDictEqual(limit_2, limits[0])
[docs] def test_list_limit_by_filter(self): limit_1 = unit.new_limit_ref( project_id=self.tenant_bar['id'], service_id=self.service_one['id'], region_id=self.region_one['id'], resource_name='volume', resource_limit=10, id=uuid.uuid4().hex) limit_2 = unit.new_limit_ref( project_id=self.tenant_bar['id'], service_id=self.service_one['id'], region_id=self.region_two['id'], resource_name='snapshot', resource_limit=10, id=uuid.uuid4().hex) PROVIDERS.unified_limit_api.create_limits([limit_1, limit_2]) hints = driver_hints.Hints() hints.add_filter('service_id', self.service_one['id']) hints.add_filter('project_id', self.tenant_bar['id']) res = PROVIDERS.unified_limit_api.list_limits(hints) self.assertEqual(2, len(res)) hints = driver_hints.Hints() hints.add_filter('region_id', self.region_one['id']) hints.add_filter('project_id', self.tenant_bar['id']) res = PROVIDERS.unified_limit_api.list_limits(hints) self.assertEqual(1, len(res)) self.assertDictEqual(limit_1, res[0]) hints = driver_hints.Hints() hints.add_filter('resource_name', 'backup') hints.add_filter('project_id', self.tenant_bar['id']) res = PROVIDERS.unified_limit_api.list_limits(hints) self.assertEqual(0, len(res))
[docs] def test_get_limit(self): # create two limits limit_1 = unit.new_limit_ref( project_id=self.tenant_bar['id'], service_id=self.service_one['id'], region_id=self.region_one['id'], resource_name='volume', resource_limit=10, id=uuid.uuid4().hex) limit_2 = unit.new_limit_ref( project_id=self.tenant_bar['id'], service_id=self.service_one['id'], region_id=self.region_two['id'], resource_name='snapshot', resource_limit=5, id=uuid.uuid4().hex) PROVIDERS.unified_limit_api.create_limits([limit_1, limit_2]) # show one res = PROVIDERS.unified_limit_api.get_limit(limit_2['id']) self.assertDictEqual(limit_2, res)
[docs] def test_get_limit_returns_not_found(self): self.assertRaises(exception.LimitNotFound, PROVIDERS.unified_limit_api.get_limit, uuid.uuid4().hex)
[docs] def test_delete_limit(self): # create two limits limit_1 = unit.new_limit_ref( project_id=self.tenant_bar['id'], service_id=self.service_one['id'], region_id=self.region_one['id'], resource_name='volume', resource_limit=10, id=uuid.uuid4().hex) limit_2 = unit.new_limit_ref( project_id=self.tenant_bar['id'], service_id=self.service_one['id'], region_id=self.region_two['id'], resource_name='snapshot', resource_limit=5, id=uuid.uuid4().hex) PROVIDERS.unified_limit_api.create_limits([limit_1, limit_2]) # delete one PROVIDERS.unified_limit_api.delete_limit(limit_1['id']) # delete again self.assertRaises(exception.LimitNotFound, PROVIDERS.unified_limit_api.get_limit, limit_1['id'])
[docs] def test_delete_limit_returns_not_found(self): self.assertRaises(exception.LimitNotFound, PROVIDERS.unified_limit_api.delete_limit, uuid.uuid4().hex)
Creative Commons Attribution 3.0 License

Except where otherwise noted, this document is licensed under Creative Commons Attribution 3.0 License. See all OpenStack Legal Documents.