Source code for volume.base

# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

from tempest.common import compute
from tempest.common import object_storage
from tempest.common import waiters
from tempest import config
from tempest.lib.common import api_version_utils
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib.decorators import cleanup_order
from tempest.lib import exceptions as lib_exc
import tempest.test

CONF = config.CONF


[docs] class BaseVolumeTest(api_version_utils.BaseMicroversionTest, tempest.test.BaseTestCase): """Base test case class for all Cinder API tests.""" # Set this to True in subclasses to create a default network. See # https://bugs.launchpad.net/tempest/+bug/1844568 create_default_network = False credentials = ['primary'] @classmethod def skip_checks(cls): super(BaseVolumeTest, cls).skip_checks() if not CONF.service_available.cinder: skip_msg = ("%s skipped as Cinder is not available" % cls.__name__) raise cls.skipException(skip_msg) if cls.create_default_network and not CONF.service_available.neutron: skip_msg = ( "%s skipped as Neutron is not available" % cls.__name__) raise cls.skipException(skip_msg) api_version_utils.check_skip_with_microversion( cls.volume_min_microversion, cls.volume_max_microversion, CONF.volume.min_microversion, CONF.volume.max_microversion) @classmethod def setup_credentials(cls): cls.set_network_resources( network=cls.create_default_network, router=cls.create_default_network, dhcp=cls.create_default_network, subnet=cls.create_default_network) super(BaseVolumeTest, cls).setup_credentials() @classmethod def setup_clients(cls): super(BaseVolumeTest, cls).setup_clients() cls.servers_client = cls.os_primary.servers_client if CONF.service_available.glance: cls.images_client = cls.os_primary.image_client_v2 cls.container_client = cls.os_primary.container_client cls.object_client = cls.os_primary.object_client cls.backups_client = cls.os_primary.backups_client_latest cls.volumes_client = cls.os_primary.volumes_client_latest cls.messages_client = cls.os_primary.volume_messages_client_latest cls.versions_client = cls.os_primary.volume_versions_client_latest cls.groups_client = cls.os_primary.groups_client_latest cls.group_snapshots_client = ( cls.os_primary.group_snapshots_client_latest) cls.snapshots_client = cls.os_primary.snapshots_client_latest cls.volumes_extension_client =\ cls.os_primary.volumes_extension_client_latest cls.availability_zone_client = ( cls.os_primary.volume_availability_zone_client_latest) cls.volume_limits_client = cls.os_primary.volume_limits_client_latest @classmethod def resource_setup(cls): super(BaseVolumeTest, cls).resource_setup() cls.volume_request_microversion = ( api_version_utils.select_request_microversion( cls.volume_min_microversion, CONF.volume.min_microversion)) cls.compute_request_microversion = ( api_version_utils.select_request_microversion( cls.min_microversion, CONF.compute.min_microversion)) cls.setup_api_microversion_fixture( compute_microversion=cls.compute_request_microversion, volume_microversion=cls.volume_request_microversion) cls.image_ref = CONF.compute.image_ref cls.flavor_ref = CONF.compute.flavor_ref cls.build_interval = CONF.volume.build_interval cls.build_timeout = CONF.volume.build_timeout @cleanup_order def create_volume(self, wait_until='available', **kwargs): """Wrapper utility that returns a test volume. :param wait_until: wait till volume status, None means no wait. """ if 'size' not in kwargs: kwargs['size'] = CONF.volume.volume_size if 'imageRef' in kwargs: image = self.images_client.show_image(kwargs['imageRef']) min_disk = image['min_disk'] kwargs['size'] = max(kwargs['size'], min_disk) if 'name' not in kwargs: name = data_utils.rand_name( prefix=CONF.resource_name_prefix, name=self.__name__ + '-Volume') kwargs['name'] = name if CONF.volume.volume_type and 'volume_type' not in kwargs: # If volume_type is not provided in config then no need to # add a volume type and # if volume_type has already been added by child class then # no need to override. kwargs['volume_type'] = CONF.volume.volume_type if CONF.compute.compute_volume_common_az: kwargs.setdefault('availability_zone', CONF.compute.compute_volume_common_az) volume = self.volumes_client.create_volume(**kwargs)['volume'] self.cleanup(test_utils.call_and_ignore_notfound_exc, self._delete_volume_for_cleanup, self.volumes_client, volume['id']) if wait_until: waiters.wait_for_volume_resource_status(self.volumes_client, volume['id'], wait_until) return volume @staticmethod def _delete_volume_for_cleanup(volumes_client, volume_id): """Delete a volume (only) for cleanup. If it is attached to a server, wait for it to become available, assuming we have already deleted the server and just need nova to complete the delete operation before it is available to be deleted. Otherwise proceed to the regular delete_volume(). """ try: vol = volumes_client.show_volume(volume_id)['volume'] if vol['status'] == 'in-use': waiters.wait_for_volume_resource_status(volumes_client, volume_id, 'available') except lib_exc.NotFound: pass BaseVolumeTest.delete_volume(volumes_client, volume_id) @cleanup_order def create_snapshot(self, volume_id=1, **kwargs): """Wrapper utility that returns a test snapshot.""" if 'name' not in kwargs: name = data_utils.rand_name( prefix=CONF.resource_name_prefix, name=self.__name__ + '-Snapshot') kwargs['name'] = name snapshot = self.snapshots_client.create_snapshot( volume_id=volume_id, **kwargs)['snapshot'] self.cleanup(test_utils.call_and_ignore_notfound_exc, self.delete_snapshot, snapshot['id']) waiters.wait_for_volume_resource_status(self.snapshots_client, snapshot['id'], 'available') return snapshot def create_backup(self, volume_id, backup_client=None, object_client=None, container_client=None, **kwargs): """Wrapper utility that returns a test backup.""" if backup_client is None: backup_client = self.backups_client if container_client is None: container_client = self.container_client if object_client is None: object_client = self.object_client if 'name' not in kwargs: name = data_utils.rand_name( prefix=CONF.resource_name_prefix, name=self.__class__.__name__ + '-Backup') kwargs['name'] = name if CONF.volume.backup_driver == "swift": if 'container' not in kwargs: cont_name = self.__class__.__name__ + '-backup-container' cont = data_utils.rand_name( prefix=CONF.resource_name_prefix, name=cont_name) kwargs['container'] = cont.lower() self.addCleanup(object_storage.delete_containers, kwargs['container'], container_client, object_client) backup = backup_client.create_backup( volume_id=volume_id, **kwargs)['backup'] # addCleanup uses list pop to cleanup. Wait should be added before # the backup is deleted self.addCleanup(backup_client.wait_for_resource_deletion, backup['id']) self.addCleanup(backup_client.delete_backup, backup['id']) waiters.wait_for_volume_resource_status(backup_client, backup['id'], 'available') return backup # NOTE(afazekas): these create_* and clean_* could be defined # only in a single location in the source, and could be more general. @staticmethod def delete_volume(client, volume_id): """Delete volume by the given client""" client.delete_volume(volume_id) client.wait_for_resource_deletion(volume_id) @cleanup_order def delete_snapshot(self, snapshot_id, snapshots_client=None): """Delete snapshot by the given client""" if snapshots_client is None: snapshots_client = self.snapshots_client snapshots_client.delete_snapshot(snapshot_id) snapshots_client.wait_for_resource_deletion(snapshot_id) def attach_volume(self, server_id, volume_id, wait_for_detach=True): """Attach a volume to a server""" self.servers_client.attach_volume( server_id, volumeId=volume_id, device='/dev/%s' % CONF.compute.volume_device_name) waiters.wait_for_volume_resource_status(self.volumes_client, volume_id, 'in-use') if wait_for_detach: self.addCleanup(waiters.wait_for_volume_resource_status, self.volumes_client, volume_id, 'available', server_id, self.servers_client) self.addCleanup(self.servers_client.detach_volume, server_id, volume_id) def create_server(self, wait_until='ACTIVE', **kwargs): name = kwargs.pop( 'name', data_utils.rand_name( prefix=CONF.resource_name_prefix, name=self.__class__.__name__ + '-instance')) if wait_until == 'SSHABLE' and not kwargs.get('validation_resources'): # If we were asked for SSHABLE but were not provided with the # required validation_resources and validatable flag, ensure we # pass them to create_test_server() so that it will actually wait. kwargs['validation_resources'] = ( self.get_test_validation_resources(self.os_primary)) kwargs['validatable'] = True tenant_network = self.get_tenant_network() body, _ = compute.create_test_server( self.os_primary, tenant_network=tenant_network, name=name, wait_until=wait_until, **kwargs) self.addCleanup(test_utils.call_and_ignore_notfound_exc, waiters.wait_for_server_termination, self.servers_client, body['id']) self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.servers_client.delete_server, body['id']) return body def create_group(self, **kwargs): if 'name' not in kwargs: kwargs['name'] = data_utils.rand_name( prefix=CONF.resource_name_prefix, name=self.__class__.__name__ + '-Group') group = self.groups_client.create_group(**kwargs)['group'] self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.delete_group, group['id']) waiters.wait_for_volume_resource_status( self.groups_client, group['id'], 'available') return group def delete_group(self, group_id, delete_volumes=True): group_vols = [] if delete_volumes: vols = self.volumes_client.list_volumes(detail=True)['volumes'] for vol in vols: if vol['group_id'] == group_id: group_vols.append(vol['id']) self.groups_client.delete_group(group_id, delete_volumes) for vol in group_vols: self.volumes_client.wait_for_resource_deletion(vol) self.groups_client.wait_for_resource_deletion(group_id)
[docs] class BaseVolumeAdminTest(BaseVolumeTest): """Base test case class for all Volume Admin API tests.""" credentials = ['primary', 'admin'] @classmethod def setup_clients(cls): super(BaseVolumeAdminTest, cls).setup_clients() cls.admin_volume_qos_client = cls.os_admin.volume_qos_client_latest cls.admin_volume_services_client = \ cls.os_admin.volume_services_client_latest cls.admin_volume_types_client = cls.os_admin.volume_types_client_latest cls.admin_volume_manage_client = ( cls.os_admin.volume_manage_client_latest) cls.admin_volume_client = cls.os_admin.volumes_client_latest cls.admin_groups_client = cls.os_admin.groups_client_latest cls.admin_messages_client = cls.os_admin.volume_messages_client_latest cls.admin_group_snapshots_client = \ cls.os_admin.group_snapshots_client_latest cls.admin_group_types_client = cls.os_admin.group_types_client_latest cls.admin_hosts_client = cls.os_admin.volume_hosts_client_latest cls.admin_snapshot_manage_client = \ cls.os_admin.snapshot_manage_client_latest cls.admin_snapshots_client = cls.os_admin.snapshots_client_latest cls.admin_backups_client = cls.os_admin.backups_client_latest cls.admin_encryption_types_client = \ cls.os_admin.encryption_types_client_latest cls.admin_quota_classes_client = \ cls.os_admin.volume_quota_classes_client_latest cls.admin_quotas_client = cls.os_admin.volume_quotas_client_latest cls.admin_volume_limits_client = ( cls.os_admin.volume_limits_client_latest) cls.admin_capabilities_client = \ cls.os_admin.volume_capabilities_client_latest cls.admin_scheduler_stats_client = \ cls.os_admin.volume_scheduler_stats_client_latest @cleanup_order def create_test_qos_specs(self, name=None, consumer=None, **kwargs): """create a test Qos-Specs.""" name = name or data_utils.rand_name( prefix=CONF.resource_name_prefix, name=self.__name__ + '-QoS') consumer = consumer or 'front-end' qos_specs = self.admin_volume_qos_client.create_qos( name=name, consumer=consumer, **kwargs)['qos_specs'] self.cleanup(self.clear_qos_spec, qos_specs['id']) return qos_specs @cleanup_order def create_volume_type(self, name=None, **kwargs): """Create a test volume-type""" name = name or data_utils.rand_name( prefix=CONF.resource_name_prefix, name=self.__name__ + '-volume-type') volume_type = self.admin_volume_types_client.create_volume_type( name=name, **kwargs)['volume_type'] self.cleanup(self.clear_volume_type, volume_type['id']) return volume_type def create_encryption_type(self, type_id=None, provider=None, key_size=None, cipher=None, control_location=None): if not type_id: volume_type = self.create_volume_type() type_id = volume_type['id'] self.admin_encryption_types_client.create_encryption_type( type_id, provider=provider, key_size=key_size, cipher=cipher, control_location=control_location) def create_encrypted_volume(self, encryption_provider, key_size=256, cipher='aes-xts-plain64', control_location='front-end'): volume_type = self.create_volume_type() self.create_encryption_type(type_id=volume_type['id'], provider=encryption_provider, key_size=key_size, cipher=cipher, control_location=control_location) return self.create_volume(volume_type=volume_type['name']) def create_group_type(self, name=None, **kwargs): """Create a test group-type""" name = name or data_utils.rand_name( prefix=CONF.resource_name_prefix, name=self.__class__.__name__ + '-group-type') group_type = self.admin_group_types_client.create_group_type( name=name, **kwargs)['group_type'] self.addCleanup(self.admin_group_types_client.delete_group_type, group_type['id']) return group_type @cleanup_order def clear_qos_spec(self, qos_id): test_utils.call_and_ignore_notfound_exc( self.admin_volume_qos_client.delete_qos, qos_id) test_utils.call_and_ignore_notfound_exc( self.admin_volume_qos_client.wait_for_resource_deletion, qos_id) @cleanup_order def clear_volume_type(self, vol_type_id): test_utils.call_and_ignore_notfound_exc( self.admin_volume_types_client.delete_volume_type, vol_type_id) test_utils.call_and_ignore_notfound_exc( self.admin_volume_types_client.wait_for_resource_deletion, vol_type_id)