
Source code for watcher.decision_engine.strategy.strategies.zone_migration

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from dateutil.parser import parse
import six

from oslo_log import log

from cinderclient.v2.volumes import Volume
from novaclient.v2.servers import Server
from watcher._i18n import _
from watcher.common import cinder_helper
from watcher.common import exception as wexc
from watcher.common import nova_helper
from watcher.decision_engine.model import element
from watcher.decision_engine.strategy.strategies import base

LOG = log.getLogger(__name__)

INSTANCE = "instance"
VOLUME = "volume"
ACTIVE = "active"
PAUSED = 'paused'
STOPPED = "stopped"
status_ACTIVE = 'ACTIVE'
status_PAUSED = 'PAUSED'
AVAILABLE = "available"
IN_USE = "in-use"

[docs]class ZoneMigration(base.ZoneMigrationBaseStrategy): """Zone migration using instance and volume migration This is zone migration strategy to migrate many instances and volumes efficiently with minimum downtime for hardware maintenance. """ def __init__(self, config, osc=None): super(ZoneMigration, self).__init__(config, osc) self._nova = None self._cinder = None self.live_count = 0 self.planned_live_count = 0 self.cold_count = 0 self.planned_cold_count = 0 self.volume_count = 0 self.planned_volume_count = 0 self.volume_update_count = 0 self.planned_volume_update_count = 0
[docs] @classmethod def get_name(cls): return "zone_migration"
[docs] @classmethod def get_display_name(cls): return _("Zone migration")
[docs] @classmethod def get_translatable_display_name(cls): return "Zone migration"
[docs] @classmethod def get_schema(cls): return { "properties": { "compute_nodes": { "type": "array", "items": { "type": "object", "properties": { "src_node": { "description": "Compute node from which" " instances migrate", "type": "string" }, "dst_node": { "description": "Compute node to which" "instances migrate", "type": "string" } }, "required": ["src_node"], "additionalProperties": False } }, "storage_pools": { "type": "array", "items": { "type": "object", "properties": { "src_pool": { "description": "Storage pool from which" " volumes migrate", "type": "string" }, "dst_pool": { "description": "Storage pool to which" " volumes migrate", "type": "string" }, "src_type": { "description": "Volume type from which" " volumes migrate", "type": "string" }, "dst_type": { "description": "Volume type to which" " volumes migrate", "type": "string" } }, "required": ["src_pool", "src_type", "dst_type"], "additionalProperties": False } }, "parallel_total": { "description": "The number of actions to be run in" " parallel in total", "type": "integer", "minimum": 0, "default": 6 }, "parallel_per_node": { "description": "The number of actions to be run in" " parallel per compute node", "type": "integer", "minimum": 0, "default": 2 }, "parallel_per_pool": { "description": "The number of actions to be run in" " parallel per storage host", "type": "integer", "minimum": 0, "default": 2 }, "priority": { "description": "List prioritizes instances and volumes", "type": "object", "properties": { "project": { "type": "array", "items": {"type": "string"} }, "compute_node": { "type": "array", "items": {"type": "string"} }, "storage_pool": { "type": "array", "items": {"type": "string"} }, "compute": { "enum": ["vcpu_num", "mem_size", "disk_size", "created_at"] }, "storage": { "enum": ["size", "created_at"] } }, "additionalProperties": False }, "with_attached_volume": { "description": "instance migrates just after attached" " volumes or not", "type": "boolean", "default": False }, }, "additionalProperties": False }
@property def migrate_compute_nodes(self): """Get compute nodes from input_parameters :returns: compute nodes e.g. [{"src_node": "w012", "dst_node": "w022"}, {"src_node": "w013", "dst_node": "w023"}] """ return self.input_parameters.get('compute_nodes') @property def migrate_storage_pools(self): """Get storage pools from input_parameters :returns: storage pools e.g. [ {"src_pool": "src1@back1#pool1", "dst_pool": "dst1@back1#pool1", "src_type": "src1_type", "dst_type": "dst1_type"}, {"src_pool": "src1@back2#pool1", "dst_pool": "dst1@back2#pool1", "src_type": "src1_type", "dst_type": "dst1_type"} ] """ return self.input_parameters.get('storage_pools') @property def parallel_total(self): return self.input_parameters.get('parallel_total') @property def parallel_per_node(self): return self.input_parameters.get('parallel_per_node') @property def parallel_per_pool(self): return self.input_parameters.get('parallel_per_pool') @property def priority(self): """Get priority from input_parameters :returns: priority map e.g. { "project": ["pj1"], "compute_node": ["compute1", "compute2"], "compute": ["vcpu_num"], "storage_pool": ["pool1", "pool2"], "storage": ["size", "created_at"] } """ return self.input_parameters.get('priority') @property def with_attached_volume(self): return self.input_parameters.get('with_attached_volume') @property def nova(self): if self._nova is None: self._nova = nova_helper.NovaHelper(osc=self.osc) return self._nova @property def cinder(self): if self._cinder is None: self._cinder = cinder_helper.CinderHelper(osc=self.osc) return self._cinder
[docs] def get_available_compute_nodes(self): default_node_scope = [element.ServiceState.ENABLED.value, element.ServiceState.DISABLED.value] return {uuid: cn for uuid, cn in self.compute_model.get_all_compute_nodes().items() if cn.state == element.ServiceState.ONLINE.value and cn.status in default_node_scope}
[docs] def get_available_storage_nodes(self): default_node_scope = [element.ServiceState.ENABLED.value, element.ServiceState.DISABLED.value] return {uuid: cn for uuid, cn in self.storage_model.get_all_storage_nodes().items() if cn.state == element.ServiceState.ONLINE.value and cn.status in default_node_scope}
[docs] def pre_execute(self): """Pre-execution phase This can be used to fetch some pre-requisites or data. """"Initializing zone migration Strategy") if len(self.get_available_compute_nodes()) == 0: raise wexc.ComputeClusterEmpty() if len(self.get_available_storage_nodes()) == 0: raise wexc.StorageClusterEmpty() LOG.debug(self.compute_model.to_string()) LOG.debug(self.storage_model.to_string())
[docs] def do_execute(self): """Strategy execution phase """ filtered_targets = self.filtered_targets() self.set_migration_count(filtered_targets) total_limit = self.parallel_total per_node_limit = self.parallel_per_node per_pool_limit = self.parallel_per_pool action_counter = ActionCounter(total_limit, per_pool_limit, per_node_limit) for k, targets in six.iteritems(filtered_targets): if k == VOLUME: self.volumes_migration(targets, action_counter) elif k == INSTANCE: if self.volume_count == 0 and self.volume_update_count == 0: # if with_attached_volume is true, # instance having attached volumes already migrated, # migrate instances which does not have attached volumes if self.with_attached_volume: targets = self.instances_no_attached(targets) self.instances_migration(targets, action_counter) else: self.instances_migration(targets, action_counter) LOG.debug("action total: %s, pools: %s, nodes %s ", action_counter.total_count, action_counter.per_pool_count, action_counter.per_node_count)
[docs] def post_execute(self): """Post-execution phase This can be used to compute the global efficacy """ self.solution.set_efficacy_indicators( live_migrate_instance_count=self.live_count, planned_live_migrate_instance_count=self.planned_live_count, cold_migrate_instance_count=self.cold_count, planned_cold_migrate_instance_count=self.planned_cold_count, volume_migrate_count=self.volume_count, planned_volume_migrate_count=self.planned_volume_count, volume_update_count=self.volume_update_count, planned_volume_update_count=self.planned_volume_update_count )
[docs] def set_migration_count(self, targets): """Set migration count :param targets: dict of instance object and volume object list keys of dict are instance and volume """ for instance in targets.get('instance', []): if self.is_live(instance): self.live_count += 1 elif self.is_cold(instance): self.cold_count += 1 for volume in targets.get('volume', []): if self.is_available(volume): self.volume_count += 1 elif self.is_in_use(volume): self.volume_update_count += 1
[docs] def is_live(self, instance): status = getattr(instance, 'status') state = getattr(instance, 'OS-EXT-STS:vm_state') return (status == status_ACTIVE and state == ACTIVE ) or (status == status_PAUSED and state == PAUSED)
[docs] def is_cold(self, instance): status = getattr(instance, 'status') state = getattr(instance, 'OS-EXT-STS:vm_state') return status == status_SHUTOFF and state == STOPPED
[docs] def is_available(self, volume): return getattr(volume, 'status') == AVAILABLE
[docs] def is_in_use(self, volume): return getattr(volume, 'status') == IN_USE
[docs] def instances_no_attached(instances): return [i for i in instances if not getattr(i, "os-extended-volumes:volumes_attached")]
[docs] def get_host_by_pool(self, pool): """Get host name from pool name Utility method to get host name from pool name which is formatted as host@backend#pool. :param pool: pool name :returns: host name """ return pool.split('@')[0]
[docs] def get_dst_node(self, src_node): """Get destination node from self.migration_compute_nodes :param src_node: compute node name :returns: destination node name """ for node in self.migrate_compute_nodes: if node.get("src_node") == src_node: return node.get("dst_node")
[docs] def get_dst_pool_and_type(self, src_pool, src_type): """Get destination pool and type from self.migration_storage_pools :param src_pool: storage pool name :param src_type: storage volume type :returns: set of storage pool name and volume type name """ for pool in self.migrate_storage_pools: if pool.get("src_pool") == src_pool: return (pool.get("dst_pool", None), pool.get("dst_type"))
[docs] def volumes_migration(self, volumes, action_counter): for volume in volumes: if action_counter.is_total_max(): LOG.debug('total reached limit') break pool = getattr(volume, 'os-vol-host-attr:host') if action_counter.is_pool_max(pool): LOG.debug("%s has objects to be migrated, but it has" " reached the limit of parallelization.", pool) continue src_type = volume.volume_type dst_pool, dst_type = self.get_dst_pool_and_type(pool, src_type) LOG.debug(src_type) LOG.debug("%s %s", dst_pool, dst_type) if self.is_available(volume): if src_type == dst_type: self._volume_migrate(, dst_pool) else: self._volume_retype(, dst_type) elif self.is_in_use(volume): self._volume_update(, dst_type) # if with_attached_volume is True, migrate attaching instances if self.with_attached_volume: instances = [self.nova.find_instance(dic.get('server_id')) for dic in volume.attachments] self.instances_migration(instances, action_counter) action_counter.add_pool(pool)
[docs] def instances_migration(self, instances, action_counter): for instance in instances: src_node = getattr(instance, 'OS-EXT-SRV-ATTR:host') if action_counter.is_total_max(): LOG.debug('total reached limit') break if action_counter.is_node_max(src_node): LOG.debug("%s has objects to be migrated, but it has" " reached the limit of parallelization.", src_node) continue dst_node = self.get_dst_node(src_node) if self.is_live(instance): self._live_migration(, src_node, dst_node) elif self.is_cold(instance): self._cold_migration(, src_node, dst_node) action_counter.add_node(src_node)
def _live_migration(self, resource_id, src_node, dst_node): parameters = {"migration_type": "live", "destination_node": dst_node, "source_node": src_node} self.solution.add_action( action_type="migrate", resource_id=resource_id, input_parameters=parameters) self.planned_live_count += 1 def _cold_migration(self, resource_id, src_node, dst_node): parameters = {"migration_type": "cold", "destination_node": dst_node, "source_node": src_node} self.solution.add_action( action_type="migrate", resource_id=resource_id, input_parameters=parameters) self.planned_cold_count += 1 def _volume_update(self, resource_id, dst_type): parameters = {"migration_type": "swap", "destination_type": dst_type} self.solution.add_action( action_type="volume_migrate", resource_id=resource_id, input_parameters=parameters) self.planned_volume_update_count += 1 def _volume_migrate(self, resource_id, dst_pool): parameters = {"migration_type": "migrate", "destination_node": dst_pool} self.solution.add_action( action_type="volume_migrate", resource_id=resource_id, input_parameters=parameters) self.planned_volume_count += 1 def _volume_retype(self, resource_id, dst_type): parameters = {"migration_type": "retype", "destination_type": dst_type} self.solution.add_action( action_type="volume_migrate", resource_id=resource_id, input_parameters=parameters) self.planned_volume_count += 1
[docs] def get_src_node_list(self): """Get src nodes from migrate_compute_nodes :returns: src node name list """ if not self.migrate_compute_nodes: return None return [v for dic in self.migrate_compute_nodes for k, v in dic.items() if k == "src_node"]
[docs] def get_src_pool_list(self): """Get src pools from migrate_storage_pools :returns: src pool name list """ return [v for dic in self.migrate_storage_pools for k, v in dic.items() if k == "src_pool"]
[docs] def get_instances(self): """Get migrate target instances :returns: instance list on src nodes and compute scope """ src_node_list = self.get_src_node_list() if not src_node_list: return None return [i for i in self.nova.get_instance_list() if getattr(i, 'OS-EXT-SRV-ATTR:host') in src_node_list and self.compute_model.get_instance_by_uuid(]
[docs] def get_volumes(self): """Get migrate target volumes :returns: volume list on src pools and storage scope """ src_pool_list = self.get_src_pool_list() return [i for i in self.cinder.get_volume_list() if getattr(i, 'os-vol-host-attr:host') in src_pool_list and self.storage_model.get_volume_by_uuid(]
[docs] def filtered_targets(self): """Filter targets prioritize instances and volumes based on priorities from input parameters. :returns: prioritized targets """ result = {} if self.migrate_compute_nodes: result["instance"] = self.get_instances() if self.migrate_storage_pools: result["volume"] = self.get_volumes() if not self.priority: return result filter_actions = self.get_priority_filter_list() LOG.debug(filter_actions) # apply all filters set in input prameter for action in list(reversed(filter_actions)): LOG.debug(action) result = action.apply_filter(result) return result
[docs] def get_priority_filter_list(self): """Get priority filters :returns: list of filter object with arguments in self.priority """ filter_list = [] priority_filter_map = self.get_priority_filter_map() for k, v in six.iteritems(self.priority): if k in priority_filter_map: filter_list.append(priority_filter_map[k](v)) return filter_list
[docs] def get_priority_filter_map(self): """Get priority filter map :returns: filter map key is the key in priority input parameters. value is filter class for prioritizing. """ return { "project": ProjectSortFilter, "compute_node": ComputeHostSortFilter, "storage_pool": StorageHostSortFilter, "compute": ComputeSpecSortFilter, "storage": StorageSpecSortFilter, }
[docs]class ActionCounter(object): """Manage the number of actions in parallel""" def __init__(self, total_limit=6, per_pool_limit=2, per_node_limit=2): """Initialize dict of host and the number of action :param total_limit: total number of actions :param per_pool_limit: the number of migrate actions per storage pool :param per_node_limit: the number of migrate actions per compute node """ self.total_limit = total_limit self.per_pool_limit = per_pool_limit self.per_node_limit = per_node_limit self.per_pool_count = {} self.per_node_count = {} self.total_count = 0
[docs] def add_pool(self, pool): """Increment the number of actions on host and total count :param pool: storage pool :returns: True if incremented, False otherwise """ if pool not in self.per_pool_count: self.per_pool_count[pool] = 0 if not self.is_total_max() and not self.is_pool_max(pool): self.per_pool_count[pool] += 1 self.total_count += 1 LOG.debug("total: %s, per_pool: %s", self.total_count, self.per_pool_count) return True return False
[docs] def add_node(self, node): """Add the number of actions on node :param host: compute node :returns: True if action can be added, False otherwise """ if node not in self.per_node_count: self.per_node_count[node] = 0 if not self.is_total_max() and not self.is_node_max(node): self.per_node_count[node] += 1 self.total_count += 1 LOG.debug("total: %s, per_node: %s", self.total_count, self.per_node_count) return True return False
[docs] def is_total_max(self): """Check if total count reached limit :returns: True if total count reached limit, False otherwise """ return self.total_count >= self.total_limit
[docs] def is_pool_max(self, pool): """Check if per pool count reached limit :returns: True if count reached limit, False otherwise """ if pool not in self.per_pool_count: self.per_pool_count[pool] = 0 LOG.debug("the number of parallel per pool %s is %s ", pool, self.per_pool_count[pool]) LOG.debug("per pool limit is %s", self.per_pool_limit) return self.per_pool_count[pool] >= self.per_pool_limit
[docs] def is_node_max(self, node): """Check if per node count reached limit :returns: True if count reached limit, False otherwise """ if node not in self.per_node_count: self.per_node_count[node] = 0 return self.per_node_count[node] >= self.per_node_limit
[docs]class BaseFilter(object): """Base class for Filter""" apply_targets = ('ALL',) def __init__(self, values=[], **kwargs): """initialization :param values: priority value """ if not isinstance(values, list): values = [values] self.condition = values
[docs] def apply_filter(self, targets): """apply filter to targets :param targets: dict of instance object and volume object list keys of dict are instance and volume """ if not targets: return {} for cond in list(reversed(self.condition)): for k, v in six.iteritems(targets): if not self.is_allowed(k): continue LOG.debug("filter:%s with the key: %s", cond, k) targets[k] = self.exec_filter(v, cond) LOG.debug(targets) return targets
[docs] def is_allowed(self, key): return (key in self.apply_targets) or ('ALL' in self.apply_targets)
[docs] def exec_filter(self, items, sort_key): """This is implemented by sub class""" return items
[docs]class SortMovingToFrontFilter(BaseFilter): """This is to move to front if a condition is True"""
[docs] def exec_filter(self, items, sort_key): return self.sort_moving_to_front(items, sort_key, self.compare_func)
[docs] def sort_moving_to_front(self, items, sort_key=None, compare_func=None): if not compare_func or not sort_key: return items for item in list(reversed(items)): if compare_func(item, sort_key): items.remove(item) items.insert(0, item) return items
[docs] def compare_func(self, item, sort_key): return True
[docs]class ProjectSortFilter(SortMovingToFrontFilter): """ComputeHostSortFilter""" apply_targets = ('instance', 'volume') def __init__(self, values=[], **kwargs): super(ProjectSortFilter, self).__init__(values, **kwargs)
[docs] def compare_func(self, item, sort_key): """Compare project id of item with sort_key :param item: instance object or volume object :param sort_key: project id :returns: true: project id of item equals sort_key false: otherwise """ project_id = self.get_project_id(item) LOG.debug("project_id: %s, sort_key: %s", project_id, sort_key) return project_id == sort_key
[docs] def get_project_id(self, item): """get project id of item :param item: instance object or volume object :returns: project id """ if isinstance(item, Volume): return getattr(item, 'os-vol-tenant-attr:tenant_id') elif isinstance(item, Server): return item.tenant_id
[docs]class ComputeHostSortFilter(SortMovingToFrontFilter): """ComputeHostSortFilter""" apply_targets = ('instance',) def __init__(self, values=[], **kwargs): super(ComputeHostSortFilter, self).__init__(values, **kwargs)
[docs] def compare_func(self, item, sort_key): """Compare compute name of item with sort_key :param item: instance object :param sort_key: compute host name :returns: true: compute name on which intance is equals sort_key false: otherwise """ host = self.get_host(item) LOG.debug("host: %s, sort_key: %s", host, sort_key) return host == sort_key
[docs] def get_host(self, item): """get hostname on which item is :param item: instance object :returns: hostname on which item is """ return getattr(item, 'OS-EXT-SRV-ATTR:host')
[docs]class StorageHostSortFilter(SortMovingToFrontFilter): """StoragehostSortFilter""" apply_targets = ('volume',)
[docs] def compare_func(self, item, sort_key): """Compare pool name of item with sort_key :param item: volume object :param sort_key: storage pool name :returns: true: pool name on which intance is equals sort_key false: otherwise """ host = self.get_host(item) LOG.debug("host: %s, sort_key: %s", host, sort_key) return host == sort_key
[docs] def get_host(self, item): return getattr(item, 'os-vol-host-attr:host')
[docs]class ComputeSpecSortFilter(BaseFilter): """ComputeSpecSortFilter""" apply_targets = ('instance',) accept_keys = ['vcpu_num', 'mem_size', 'disk_size', 'created_at'] def __init__(self, values=[], **kwargs): super(ComputeSpecSortFilter, self).__init__(values, **kwargs) self._nova = None @property def nova(self): if self._nova is None: self._nova = nova_helper.NovaHelper() return self._nova
[docs] def exec_filter(self, items, sort_key): result = items if sort_key not in self.accept_keys: LOG.warning("Invalid key is specified: %s", sort_key) else: result = self.get_sorted_items(items, sort_key) return result
[docs] def get_sorted_items(self, items, sort_key): """Sort items by sort_key :param items: instances :param sort_key: sort_key :returns: items sorted by sort_key """ result = items flavors = self.nova.get_flavor_list() if sort_key == 'mem_size': result = sorted(items, key=lambda x: float(self.get_mem_size(x, flavors)), reverse=True) elif sort_key == 'vcpu_num': result = sorted(items, key=lambda x: float(self.get_vcpu_num(x, flavors)), reverse=True) elif sort_key == 'disk_size': result = sorted(items, key=lambda x: float( self.get_disk_size(x, flavors)), reverse=True) elif sort_key == 'created_at': result = sorted(items, key=lambda x: parse(getattr(x, sort_key)), reverse=False) return result
[docs] def get_mem_size(self, item, flavors): """Get memory size of item :param item: instance :param flavors: flavors :returns: memory size of item """ LOG.debug("item: %s, flavors: %s", item, flavors) for flavor in flavors: LOG.debug("item.flavor: %s, flavor: %s", item.flavor, flavor) if item.flavor.get('id') == LOG.debug("flavor.ram: %s", flavor.ram) return flavor.ram
[docs] def get_vcpu_num(self, item, flavors): """Get vcpu number of item :param item: instance :param flavors: flavors :returns: vcpu number of item """ LOG.debug("item: %s, flavors: %s", item, flavors) for flavor in flavors: LOG.debug("item.flavor: %s, flavor: %s", item.flavor, flavor) if item.flavor.get('id') == LOG.debug("flavor.vcpus: %s", flavor.vcpus) return flavor.vcpus
[docs] def get_disk_size(self, item, flavors): """Get disk size of item :param item: instance :param flavors: flavors :returns: disk size of item """ LOG.debug("item: %s, flavors: %s", item, flavors) for flavor in flavors: LOG.debug("item.flavor: %s, flavor: %s", item.flavor, flavor) if item.flavor.get('id') == LOG.debug("flavor.disk: %s", flavor.disk) return flavor.disk
[docs]class StorageSpecSortFilter(BaseFilter): """StorageSpecSortFilter""" apply_targets = ('volume',) accept_keys = ['size', 'created_at']
[docs] def exec_filter(self, items, sort_key): result = items if sort_key not in self.accept_keys: LOG.warning("Invalid key is specified: %s", sort_key) return result if sort_key == 'created_at': result = sorted(items, key=lambda x: parse(getattr(x, sort_key)), reverse=False) else: result = sorted(items, key=lambda x: float(getattr(x, sort_key)), reverse=True) LOG.debug(result) return result
Creative Commons Attribution 3.0 License

Except where otherwise noted, this document is licensed under Creative Commons Attribution 3.0 License. See all OpenStack Legal Documents.