Source code for compute.admin.test_volumes_negative

# Copyright 2016 NEC Corporation.  All rights reserved.
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

from tempest.api.compute import base
from tempest.common import utils
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc

CONF = config.CONF

[docs]class VolumesAdminNegativeTest(base.BaseV2ComputeAdminTest): """Negative tests of volume swapping""" create_default_network = True @classmethod def setup_credentials(cls): cls.prepare_instance_network() super(VolumesAdminNegativeTest, cls).setup_credentials() @classmethod def skip_checks(cls): super(VolumesAdminNegativeTest, cls).skip_checks() if not CONF.service_available.cinder: skip_msg = ("%s skipped as Cinder is not available" % cls.__name__) raise cls.skipException(skip_msg)
[docs] @decorators.attr(type=['negative']) @decorators.idempotent_id('309b5ecd-0585-4a7e-a36f-d2b2bf55259d') def test_update_attached_volume_with_nonexistent_volume_in_uri(self): """Test swapping non existent volume should fail""" self.server = self.create_test_server(wait_until="ACTIVE") volume = self.create_volume() nonexistent_volume = data_utils.rand_uuid() self.assertRaises(lib_exc.NotFound, self.admin_servers_client.update_attached_volume, self.server['id'], nonexistent_volume, volumeId=volume['id'])
[docs] @decorators.related_bug('1629110', status_code=400) @decorators.attr(type=['negative']) @decorators.idempotent_id('7dcac15a-b107-46d3-a5f6-cb863f4e454a') def test_update_attached_volume_with_nonexistent_volume_in_body(self): """Test swapping volume to a non existence volume should fail""" validation_resources = self.get_class_validation_resources( self.os_primary) # NOTE(gibi): We need to wait for the guest to fully boot as # test_update_attached_volume_with_nonexistent_volume_in_body case # will attach a volume to it and therefore cleanup will try to detach # it. See bug 1960346 for details. self.server = self.create_test_server( validatable=True, validation_resources=validation_resources, wait_until="SSHABLE") volume = self.create_volume() self.attach_volume(self.server, volume) nonexistent_volume = data_utils.rand_uuid() self.assertRaises(lib_exc.BadRequest, self.admin_servers_client.update_attached_volume, self.server['id'], volume['id'], volumeId=nonexistent_volume)
[docs]class UpdateMultiattachVolumeNegativeTest(base.BaseV2ComputeAdminTest): """Negative tests of swapping volume attached to multiple servers Negative tests of swapping volume attached to multiple servers with compute microversion greater than 2.59 and volume microversion greater than 3.26 """ min_microversion = '2.60' volume_min_microversion = '3.27' create_default_network = True @classmethod def setup_credentials(cls): cls.prepare_instance_network() super(UpdateMultiattachVolumeNegativeTest, cls).setup_credentials() @classmethod def skip_checks(cls): super(UpdateMultiattachVolumeNegativeTest, cls).skip_checks() if not CONF.compute_feature_enabled.volume_multiattach: raise cls.skipException('Volume multi-attach is not available.')
[docs] @decorators.attr(type=['negative']) @decorators.idempotent_id('7576d497-b7c6-44bd-9cc5-c5b4e50fec71')'volume') def test_multiattach_rw_volume_update_failure(self): """Test swapping volume attached to multi-servers with read-write mode 1. Create two volumes "vol1" and "vol2" 2. Create two instances "server1" and "server2" 3. Attach "vol1" to both of these instances 4. By default both of these attachments should have an attach_mode of read-write, so trying to swap "vol1" to "vol2" should fail 5. Check "vol1" is still attached to both servers 6. Check "vol2" is not attached to any server """ multiattach_vol_type = CONF.volume.volume_type_multiattach # Create two multiattach capable volumes. vol1 = self.create_volume(volume_type=multiattach_vol_type) vol2 = self.create_volume(volume_type=multiattach_vol_type) # Create two instances. validation_resources = self.get_class_validation_resources( self.os_primary) # NOTE(gibi): We need to wait for the guests to fully boot as the test # will attach volumes to the servers and therefore cleanup will try to # detach them. See bug 1960346 for details. server1 = self.create_test_server( validatable=True, validation_resources=validation_resources, wait_until='SSHABLE' ) server2 = self.create_test_server( validatable=True, validation_resources=validation_resources, wait_until='SSHABLE' ) # Attach vol1 to both of these instances. vol1_attachment1 = self.attach_volume(server1, vol1) vol1_attachment2 = self.attach_volume(server2, vol1) # Assert that we now have two attachments. vol1 = self.volumes_client.show_volume(vol1['id'])['volume'] self.assertEqual(2, len(vol1['attachments'])) # By default both of these attachments should have an attach_mode of # read-write, assert that here to ensure the following calls to update # the volume will be rejected. for volume_attachment in vol1['attachments']: attachment_id = volume_attachment['attachment_id'] attachment = self.attachments_client.show_attachment( attachment_id)['attachment'] self.assertEqual('rw', attachment['attach_mode']) # Assert that a BadRequest is raised when we attempt to update volume1 # to volume2 on server1 or server2. self.assertRaises(lib_exc.BadRequest, self.admin_servers_client.update_attached_volume, server1['id'], vol1['id'], volumeId=vol2['id']) self.assertRaises(lib_exc.BadRequest, self.admin_servers_client.update_attached_volume, server2['id'], vol1['id'], volumeId=vol2['id']) # Fetch the volume 1 to check the current attachments. vol1 = self.volumes_client.show_volume(vol1['id'])['volume'] vol1_attachment_ids = [a['id'] for a in vol1['attachments']] # Assert that volume 1 is still attached to both server 1 and 2. self.assertIn(vol1_attachment1['id'], vol1_attachment_ids) self.assertIn(vol1_attachment2['id'], vol1_attachment_ids) # Assert that volume 2 has no attachments. vol2 = self.volumes_client.show_volume(vol2['id'])['volume'] self.assertEqual([], vol2['attachments'])