Source code for volume.admin.test_volumes_backup

# Copyright 2014 OpenStack Foundation
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

from oslo_serialization import base64
from oslo_serialization import jsonutils as json

from tempest.api.volume import base
from tempest.common import waiters
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators

CONF = config.CONF


[docs] class VolumesBackupsAdminTest(base.BaseVolumeAdminTest): """Test volume backups""" @classmethod def skip_checks(cls): super(VolumesBackupsAdminTest, cls).skip_checks() if not CONF.volume_feature_enabled.backup: raise cls.skipException("Cinder backup feature disabled") def _delete_backup(self, backup_id): self.admin_backups_client.delete_backup(backup_id) self.admin_backups_client.wait_for_resource_deletion(backup_id) def _decode_url(self, backup_url): return json.loads(base64.decode_as_text(backup_url)) def _encode_backup(self, backup): retval = json.dumps(backup) return base64.encode_as_text(retval) def _modify_backup_url(self, backup_url, changes): backup = self._decode_url(backup_url) backup.update(changes) return self._encode_backup(backup)
[docs] @decorators.idempotent_id('a99c54a1-dd80-4724-8a13-13bf58d4068d') def test_volume_backup_export_import(self): """Test backup export import functionality. Cinder allows exporting DB backup information through its API so it can be imported back in case of a DB loss. """ volume = self.create_volume() # Create backup backup_name = data_utils.rand_name( prefix=CONF.resource_name_prefix, name=self.__class__.__name__ + '-Backup') backup = self.create_backup(volume_id=volume['id'], name=backup_name) waiters.wait_for_volume_resource_status(self.volumes_client, volume['id'], 'available') self.assertEqual(backup_name, backup['name']) # Export Backup export_backup = (self.admin_backups_client.export_backup(backup['id']) ['backup-record']) self.assertTrue(export_backup['backup_service'].startswith( 'cinder.backup.drivers')) # NOTE(geguileo): Backups are imported with the same backup id # (important for incremental backups among other things), so we cannot # import the exported backup information as it is, because that Backup # ID already exists. So we'll fake the data by changing the backup id # in the exported backup DB info we have retrieved before importing it # back. new_id = data_utils.rand_uuid() new_url = self._modify_backup_url( export_backup['backup_url'], {'id': new_id}) # Import Backup import_backup = self.admin_backups_client.import_backup( backup_service=export_backup['backup_service'], backup_url=new_url)['backup'] # NOTE(geguileo): We delete both backups, but only one of those # deletions will delete data from the backup back-end because they # were both pointing to the same backend data. self.addCleanup(self._delete_backup, new_id) self.assertEqual(new_id, import_backup['id']) waiters.wait_for_volume_resource_status(self.admin_backups_client, import_backup['id'], 'available') # Verify Import Backup backups = self.admin_backups_client.list_backups()['backups'] self.assertIn(new_id, [b['id'] for b in backups]) # Restore backup restore = self.backups_client.restore_backup(backup['id'])['restore'] self.addCleanup(self.volumes_client.delete_volume, restore['volume_id']) self.assertEqual(backup['id'], restore['backup_id']) # When restore operation is performed then, backup['id'] # goes to 'restoring' state so we need to wait for # backup['id'] to become 'available'. waiters.wait_for_volume_resource_status( self.backups_client, backup['id'], 'available') waiters.wait_for_volume_resource_status( self.volumes_client, restore['volume_id'], 'available') # Verify if restored volume is there in volume list volumes = self.volumes_client.list_volumes()['volumes'] self.assertIn(restore['volume_id'], [v['id'] for v in volumes])
[docs] @decorators.idempotent_id('47a35425-a891-4e13-961c-c45deea21e94') def test_volume_backup_reset_status(self): """Test resetting volume backup status to error""" # Create a volume volume = self.create_volume() # Create a backup backup_name = data_utils.rand_name( prefix=CONF.resource_name_prefix, name=self.__class__.__name__ + '-Backup') backup = self.create_backup(volume_id=volume['id'], name=backup_name) waiters.wait_for_volume_resource_status(self.volumes_client, volume['id'], 'available') self.assertEqual(backup_name, backup['name']) # Reset backup status to error self.admin_backups_client.reset_backup_status(backup_id=backup['id'], status="error") waiters.wait_for_volume_resource_status(self.backups_client, backup['id'], 'error')