Source code for octavia.controller.worker.v2.tasks.network_tasks

# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import time

from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
from taskflow import task
from taskflow.types import failure
import tenacity

from octavia.common import constants
from octavia.common import data_models
from octavia.common import utils
from octavia.controller.worker import task_utils
from octavia.db import api as db_apis
from octavia.db import repositories as repo
from octavia.network import base
from octavia.network import data_models as n_data_models

LOG = logging.getLogger(__name__)
CONF = cfg.CONF


[docs]class BaseNetworkTask(task.Task): """Base task to load drivers common to the tasks.""" def __init__(self, **kwargs): super().__init__(**kwargs) self._network_driver = None self.task_utils = task_utils.TaskUtils() self.loadbalancer_repo = repo.LoadBalancerRepository() self.amphora_repo = repo.AmphoraRepository() @property def network_driver(self): if self._network_driver is None: self._network_driver = utils.get_network_driver() return self._network_driver
[docs]class CalculateAmphoraDelta(BaseNetworkTask): default_provides = constants.DELTA # TODO(gthiemonge) ensure we no longer need vrrp_port
[docs] def execute(self, loadbalancer, amphora, availability_zone): LOG.debug("Calculating network delta for amphora id: %s", amphora.get(constants.ID)) vip_subnet_to_net_map = { loadbalancer[constants.VIP_SUBNET_ID]: loadbalancer[constants.VIP_NETWORK_ID] } # Figure out what networks we want # seed with lb network(s) if (availability_zone and availability_zone.get(constants.MANAGEMENT_NETWORK)): management_nets = [ availability_zone.get(constants.MANAGEMENT_NETWORK)] else: management_nets = CONF.controller_worker.amp_boot_network_list db_lb = self.loadbalancer_repo.get( db_apis.get_session(), id=loadbalancer[constants.LOADBALANCER_ID]) desired_subnet_to_net_map = {} for mgmt_net_id in management_nets: for subnet_id in self.network_driver.get_network( mgmt_net_id).subnets: desired_subnet_to_net_map[subnet_id] = mgmt_net_id desired_subnet_to_net_map.update(vip_subnet_to_net_map) for pool in db_lb.pools: for member in pool.members: if (member.subnet_id and member.provisioning_status != constants.PENDING_DELETE): member_network = self.network_driver.get_subnet( member.subnet_id).network_id desired_subnet_to_net_map[member.subnet_id] = ( member_network) desired_network_ids = set(desired_subnet_to_net_map.values()) desired_subnet_ids = set(desired_subnet_to_net_map) # Calculate Network deltas nics = self.network_driver.get_plugged_networks( amphora[constants.COMPUTE_ID]) # we don't have two nics in the same network network_to_nic_map = {nic.network_id: nic for nic in nics} plugged_network_ids = set(network_to_nic_map) del_ids = plugged_network_ids - desired_network_ids delete_nics = [n_data_models.Interface( network_id=net_id, port_id=network_to_nic_map[net_id].port_id) for net_id in del_ids] add_ids = desired_network_ids - plugged_network_ids add_nics = [n_data_models.Interface( network_id=add_net_id, fixed_ips=[ n_data_models.FixedIP( subnet_id=subnet_id) for subnet_id, net_id in desired_subnet_to_net_map.items() if net_id == add_net_id]) for add_net_id in add_ids] # Calculate member Subnet deltas plugged_subnets = {} for nic in network_to_nic_map.values(): for fixed_ip in nic.fixed_ips or []: plugged_subnets[fixed_ip.subnet_id] = nic.network_id plugged_subnet_ids = set(plugged_subnets) del_subnet_ids = plugged_subnet_ids - desired_subnet_ids add_subnet_ids = desired_subnet_ids - plugged_subnet_ids def _subnet_updates(subnet_ids, subnets): updates = [] for s in subnet_ids: network_id = subnets[s] nic = network_to_nic_map.get(network_id) port_id = nic.port_id if nic else None updates.append({ constants.SUBNET_ID: s, constants.NETWORK_ID: network_id, constants.PORT_ID: port_id }) return updates add_subnets = _subnet_updates(add_subnet_ids, desired_subnet_to_net_map) del_subnets = _subnet_updates(del_subnet_ids, plugged_subnets) delta = n_data_models.Delta( amphora_id=amphora[constants.ID], compute_id=amphora[constants.COMPUTE_ID], add_nics=add_nics, delete_nics=delete_nics, add_subnets=add_subnets, delete_subnets=del_subnets) return delta.to_dict(recurse=True)
[docs]class CalculateDelta(BaseNetworkTask): """Task to calculate the delta between the nics on the amphora and the ones we need. Returns a list for plumbing them. """ default_provides = constants.DELTAS
[docs] def execute(self, loadbalancer, availability_zone): """Compute which NICs need to be plugged for the amphora to become operational. :param loadbalancer: the loadbalancer to calculate deltas for all amphorae :param availability_zone: availability zone metadata dict :returns: dict of octavia.network.data_models.Delta keyed off amphora id """ calculate_amp = CalculateAmphoraDelta() deltas = {} db_lb = self.loadbalancer_repo.get( db_apis.get_session(), id=loadbalancer[constants.LOADBALANCER_ID]) for amphora in filter( lambda amp: amp.status == constants.AMPHORA_ALLOCATED, db_lb.amphorae): delta = calculate_amp.execute(loadbalancer, amphora.to_dict(), availability_zone) deltas[amphora.id] = delta return deltas
[docs]class GetPlumbedNetworks(BaseNetworkTask): """Task to figure out the NICS on an amphora. This will likely move into the amphora driver :returns: Array of networks """ default_provides = constants.NICS
[docs] def execute(self, amphora): """Get plumbed networks for the amphora.""" LOG.debug("Getting plumbed networks for amphora id: %s", amphora[constants.ID]) return self.network_driver.get_plugged_networks( amphora[constants.COMPUTE_ID])
[docs]class PlugNetworks(BaseNetworkTask): """Task to plug the networks. This uses the delta to add all missing networks/nics """
[docs] def execute(self, amphora, delta): """Update the amphora networks for the delta.""" LOG.debug("Plug or unplug networks for amphora id: %s", amphora[constants.ID]) if not delta: LOG.debug("No network deltas for amphora id: %s", amphora[constants.ID]) return # add nics for nic in delta[constants.ADD_NICS]: self.network_driver.plug_network(amphora[constants.COMPUTE_ID], nic[constants.NETWORK_ID])
[docs] def revert(self, amphora, delta, *args, **kwargs): """Handle a failed network plug by removing all nics added.""" LOG.warning("Unable to plug networks for amp id %s", amphora[constants.ID]) if not delta: return for nic in delta[constants.ADD_NICS]: try: self.network_driver.unplug_network( amphora[constants.COMPUTE_ID], nic[constants.NETWORK_ID]) except base.NetworkNotFound: pass
[docs]class UnPlugNetworks(BaseNetworkTask): """Task to unplug the networks Loop over all nics and unplug them based on delta """
[docs] def execute(self, amphora, delta): """Unplug the networks.""" LOG.debug("Unplug network for amphora") if not delta: LOG.debug("No network deltas for amphora id: %s", amphora[constants.ID]) return for nic in delta[constants.DELETE_NICS]: try: self.network_driver.unplug_network( amphora[constants.COMPUTE_ID], nic[constants.NETWORK_ID]) except base.NetworkNotFound: LOG.debug("Network %d not found", nic[constants.NETWORK_ID]) except Exception: LOG.exception("Unable to unplug network")
# TODO(xgerman) follow up if that makes sense
[docs]class GetMemberPorts(BaseNetworkTask):
[docs] def execute(self, loadbalancer, amphora): vip_port = self.network_driver.get_port(loadbalancer['vip_port_id']) member_ports = [] interfaces = self.network_driver.get_plugged_networks( amphora[constants.COMPUTE_ID]) for interface in interfaces: port = self.network_driver.get_port(interface.port_id) if vip_port.network_id == port.network_id: continue port.network = self.network_driver.get_network(port.network_id) for fixed_ip in port.fixed_ips: if amphora['lb_network_ip'] == fixed_ip.ip_address: break fixed_ip.subnet = self.network_driver.get_subnet( fixed_ip.subnet_id) # Only add the port to the list if the IP wasn't the mgmt IP else: member_ports.append(port) return member_ports
[docs]class HandleNetworkDelta(BaseNetworkTask): """Task to plug and unplug networks Plug or unplug networks based on delta """ def _fill_port_info(self, port): port.network = self.network_driver.get_network(port.network_id) for fixed_ip in port.fixed_ips: fixed_ip.subnet = self.network_driver.get_subnet( fixed_ip.subnet_id)
[docs] def execute(self, amphora, delta): """Handle network plugging based off deltas.""" db_amp = self.amphora_repo.get(db_apis.get_session(), id=amphora.get(constants.ID)) updated_ports = {} for nic in delta[constants.ADD_NICS]: subnet_id = nic[constants.FIXED_IPS][0][constants.SUBNET_ID] interface = self.network_driver.plug_network( db_amp.compute_id, nic[constants.NETWORK_ID]) port = self.network_driver.get_port(interface.port_id) # nova may plugged undesired subnets (it plugs one of the subnets # of the network), we can safely unplug the subnets we don't need, # the desired subnet will be added in the 'ADD_SUBNETS' loop. extra_subnets = [ fixed_ip.subnet_id for fixed_ip in port.fixed_ips if fixed_ip.subnet_id != subnet_id] for subnet_id in extra_subnets: port = self.network_driver.unplug_fixed_ip( port_id=interface.port_id, subnet_id=subnet_id) self._fill_port_info(port) updated_ports[port.network_id] = port.to_dict(recurse=True) for update in delta.get(constants.ADD_SUBNETS, []): network_id = update[constants.NETWORK_ID] # Get already existing port from Deltas or # newly created port from updated_ports dict port_id = (update[constants.PORT_ID] or updated_ports[network_id][constants.ID]) subnet_id = update[constants.SUBNET_ID] # Avoid duplicated subnets has_subnet = False if network_id in updated_ports: has_subnet = any( fixed_ip[constants.SUBNET_ID] == subnet_id for fixed_ip in updated_ports[network_id][ constants.FIXED_IPS]) if not has_subnet: port = self.network_driver.plug_fixed_ip( port_id=port_id, subnet_id=subnet_id) self._fill_port_info(port) updated_ports[network_id] = ( port.to_dict(recurse=True)) for update in delta.get(constants.DELETE_SUBNETS, []): network_id = update[constants.NETWORK_ID] port_id = update[constants.PORT_ID] subnet_id = update[constants.SUBNET_ID] port = self.network_driver.unplug_fixed_ip( port_id=port_id, subnet_id=subnet_id) self._fill_port_info(port) # In neutron, when removing an ipv6 subnet (with slaac) from a # port, it just ignores it. # https://bugs.launchpad.net/neutron/+bug/1945156 # When it happens, don't add the port to the updated_ports dict has_subnet = any( fixed_ip.subnet_id == subnet_id for fixed_ip in port.fixed_ips) if not has_subnet: updated_ports[network_id] = ( port.to_dict(recurse=True)) for nic in delta[constants.DELETE_NICS]: network_id = nic[constants.NETWORK_ID] try: self.network_driver.unplug_network( db_amp.compute_id, network_id) except base.NetworkNotFound: LOG.debug("Network %s not found", network_id) except Exception: LOG.exception("Unable to unplug network") port_id = nic[constants.PORT_ID] try: self.network_driver.delete_port(port_id) except Exception: LOG.exception("Unable to delete the port") updated_ports.pop(network_id, None) return {amphora[constants.ID]: list(updated_ports.values())}
[docs] def revert(self, result, amphora, delta, *args, **kwargs): """Handle a network plug or unplug failures.""" if isinstance(result, failure.Failure): return if not delta: return LOG.warning("Unable to plug networks for amp id %s", delta['amphora_id']) for nic in delta[constants.ADD_NICS]: try: self.network_driver.unplug_network(delta[constants.COMPUTE_ID], nic[constants.NETWORK_ID]) except Exception: LOG.exception("Unable to unplug network %s", nic[constants.NETWORK_ID]) port_id = nic[constants.PORT_ID] try: self.network_driver.delete_port(port_id) except Exception: LOG.exception("Unable to delete port %s", port_id)
[docs]class HandleNetworkDeltas(BaseNetworkTask): """Task to plug and unplug networks Loop through the deltas and plug or unplug networks based on delta """
[docs] def execute(self, deltas, loadbalancer): """Handle network plugging based off deltas.""" db_lb = self.loadbalancer_repo.get( db_apis.get_session(), id=loadbalancer[constants.LOADBALANCER_ID]) amphorae = {amp.id: amp for amp in db_lb.amphorae} updated_ports = {} handle_delta = HandleNetworkDelta() for amp_id, delta in deltas.items(): ret = handle_delta.execute(amphorae[amp_id].to_dict(), delta) updated_ports.update(ret) return updated_ports
[docs] def revert(self, result, deltas, *args, **kwargs): """Handle a network plug or unplug failures.""" if isinstance(result, failure.Failure): return if not deltas: return for amp_id, delta in deltas.items(): LOG.warning("Unable to plug networks for amp id %s", delta[constants.AMPHORA_ID]) for nic in delta[constants.ADD_NICS]: try: self.network_driver.unplug_network( delta[constants.COMPUTE_ID], nic[constants.NETWORK_ID]) except Exception: LOG.exception("Unable to unplug network %s", nic[constants.NETWORK_ID]) port_id = nic[constants.PORT_ID] try: self.network_driver.delete_port(port_id) except Exception: LOG.exception("Unable to delete port %s", port_id)
[docs]class PlugVIP(BaseNetworkTask): """Task to plumb a VIP."""
[docs] def execute(self, loadbalancer): """Plumb a vip to an amphora.""" LOG.debug("Plumbing VIP for loadbalancer id: %s", loadbalancer[constants.LOADBALANCER_ID]) db_lb = self.loadbalancer_repo.get( db_apis.get_session(), id=loadbalancer[constants.LOADBALANCER_ID]) amps_data = self.network_driver.plug_vip(db_lb, db_lb.vip) return [amp.to_dict() for amp in amps_data]
[docs] def revert(self, result, loadbalancer, *args, **kwargs): """Handle a failure to plumb a vip.""" if isinstance(result, failure.Failure): return LOG.warning("Unable to plug VIP for loadbalancer id %s", loadbalancer[constants.LOADBALANCER_ID]) db_lb = self.loadbalancer_repo.get( db_apis.get_session(), id=loadbalancer[constants.LOADBALANCER_ID]) try: # Make sure we have the current port IDs for cleanup for amp_data in result: for amphora in filter( # pylint: disable=cell-var-from-loop lambda amp: amp.id == amp_data['id'], db_lb.amphorae): amphora.vrrp_port_id = amp_data['vrrp_port_id'] amphora.ha_port_id = amp_data['ha_port_id'] self.network_driver.unplug_vip(db_lb, db_lb.vip) except Exception as e: LOG.error("Failed to unplug VIP. Resources may still " "be in use from vip: %(vip)s due to error: %(except)s", {'vip': loadbalancer['vip_address'], 'except': str(e)})
[docs]class UpdateVIPSecurityGroup(BaseNetworkTask): """Task to setup SG for LB."""
[docs] def execute(self, loadbalancer_id): """Task to setup SG for LB.""" LOG.debug("Setting up VIP SG for load balancer id: %s", loadbalancer_id) db_lb = self.loadbalancer_repo.get( db_apis.get_session(), id=loadbalancer_id) sg_id = self.network_driver.update_vip_sg(db_lb, db_lb.vip) LOG.info("Set up VIP SG %s for load balancer %s complete", sg_id if sg_id else "None", loadbalancer_id) return sg_id
[docs]class GetSubnetFromVIP(BaseNetworkTask): """Task to plumb a VIP."""
[docs] def execute(self, loadbalancer): """Plumb a vip to an amphora.""" LOG.debug("Getting subnet for LB: %s", loadbalancer[constants.LOADBALANCER_ID]) subnet = self.network_driver.get_subnet(loadbalancer['vip_subnet_id']) LOG.info("Got subnet %s for load balancer %s", loadbalancer['vip_subnet_id'] if subnet else "None", loadbalancer[constants.LOADBALANCER_ID]) return subnet.to_dict()
[docs]class PlugVIPAmphora(BaseNetworkTask): """Task to plumb a VIP."""
[docs] def execute(self, loadbalancer, amphora, subnet): """Plumb a vip to an amphora.""" LOG.debug("Plumbing VIP for amphora id: %s", amphora.get(constants.ID)) db_amp = self.amphora_repo.get(db_apis.get_session(), id=amphora.get(constants.ID)) db_subnet = self.network_driver.get_subnet(subnet[constants.ID]) db_lb = self.loadbalancer_repo.get( db_apis.get_session(), id=loadbalancer[constants.LOADBALANCER_ID]) amp_data = self.network_driver.plug_aap_port( db_lb, db_lb.vip, db_amp, db_subnet) return amp_data.to_dict()
[docs] def revert(self, result, loadbalancer, amphora, subnet, *args, **kwargs): """Handle a failure to plumb a vip.""" if isinstance(result, failure.Failure): return LOG.warning("Unable to plug VIP for amphora id %s " "load balancer id %s", amphora.get(constants.ID), loadbalancer[constants.LOADBALANCER_ID]) try: db_amp = self.amphora_repo.get(db_apis.get_session(), id=amphora.get(constants.ID)) db_amp.vrrp_port_id = result[constants.VRRP_PORT_ID] db_amp.ha_port_id = result[constants.HA_PORT_ID] db_subnet = self.network_driver.get_subnet(subnet[constants.ID]) db_lb = self.loadbalancer_repo.get( db_apis.get_session(), id=loadbalancer[constants.LOADBALANCER_ID]) self.network_driver.unplug_aap_port(db_lb.vip, db_amp, db_subnet) except Exception as e: LOG.error('Failed to unplug AAP port. Resources may still be in ' 'use for VIP: %s due to error: %s', db_lb.vip, str(e))
[docs]class UnplugVIP(BaseNetworkTask): """Task to unplug the vip."""
[docs] def execute(self, loadbalancer): """Unplug the vip.""" LOG.debug("Unplug vip on amphora") try: db_lb = self.loadbalancer_repo.get( db_apis.get_session(), id=loadbalancer[constants.LOADBALANCER_ID]) self.network_driver.unplug_vip(db_lb, db_lb.vip) except Exception: LOG.exception("Unable to unplug vip from load balancer %s", loadbalancer[constants.LOADBALANCER_ID])
[docs]class AllocateVIP(BaseNetworkTask): """Task to allocate a VIP."""
[docs] def execute(self, loadbalancer): """Allocate a vip to the loadbalancer.""" LOG.debug("Allocating vip with port id %s, subnet id %s, " "ip address %s for load balancer %s", loadbalancer[constants.VIP_PORT_ID], loadbalancer[constants.VIP_SUBNET_ID], loadbalancer[constants.VIP_ADDRESS], loadbalancer[constants.LOADBALANCER_ID]) db_lb = self.loadbalancer_repo.get( db_apis.get_session(), id=loadbalancer[constants.LOADBALANCER_ID]) vip, additional_vips = self.network_driver.allocate_vip(db_lb) LOG.info("Allocated vip with port id %s, subnet id %s, ip address %s " "for load balancer %s", loadbalancer[constants.VIP_PORT_ID], loadbalancer[constants.VIP_SUBNET_ID], loadbalancer[constants.VIP_ADDRESS], loadbalancer[constants.LOADBALANCER_ID]) for add_vip in additional_vips: LOG.debug('Allocated an additional VIP: subnet=%(subnet)s ' 'ip_address=%(ip)s', {'subnet': add_vip.subnet_id, 'ip': add_vip.ip_address}) return (vip.to_dict(), [additional_vip.to_dict() for additional_vip in additional_vips])
[docs] def revert(self, result, loadbalancer, *args, **kwargs): """Handle a failure to allocate vip.""" if isinstance(result, failure.Failure): LOG.exception("Unable to allocate VIP") return vip, additional_vips = result vip = data_models.Vip(**vip) LOG.warning("Deallocating vip %s", vip.ip_address) try: self.network_driver.deallocate_vip(vip) except Exception as e: LOG.error("Failed to deallocate VIP. Resources may still " "be in use from vip: %(vip)s due to error: %(except)s", {'vip': vip.ip_address, 'except': str(e)})
[docs]class AllocateVIPforFailover(AllocateVIP): """Task to allocate/validate the VIP for a failover flow."""
[docs] def revert(self, result, loadbalancer, *args, **kwargs): """Handle a failure to allocate vip.""" if isinstance(result, failure.Failure): LOG.exception("Unable to allocate VIP") return vip, additional_vips = result vip = data_models.Vip(**vip) LOG.info("Failover revert is not deallocating vip %s because this is " "a failover.", vip.ip_address)
[docs]class DeallocateVIP(BaseNetworkTask): """Task to deallocate a VIP."""
[docs] def execute(self, loadbalancer): """Deallocate a VIP.""" LOG.debug("Deallocating a VIP %s", loadbalancer[constants.VIP_ADDRESS]) # NOTE(blogan): this is kind of ugly but sufficient for now. Drivers # will need access to the load balancer that the vip is/was attached # to. However the data model serialization for the vip does not give a # backref to the loadbalancer if accessed through the loadbalancer. db_lb = self.loadbalancer_repo.get( db_apis.get_session(), id=loadbalancer[constants.LOADBALANCER_ID]) vip = db_lb.vip vip.load_balancer = db_lb self.network_driver.deallocate_vip(vip)
[docs]class UpdateVIP(BaseNetworkTask): """Task to update a VIP."""
[docs] def execute(self, listeners): loadbalancer = self.loadbalancer_repo.get( db_apis.get_session(), id=listeners[0][constants.LOADBALANCER_ID]) LOG.debug("Updating VIP of load_balancer %s.", loadbalancer.id) self.network_driver.update_vip(loadbalancer)
[docs]class UpdateVIPForDelete(BaseNetworkTask): """Task to update a VIP for listener delete flows."""
[docs] def execute(self, loadbalancer_id): loadbalancer = self.loadbalancer_repo.get( db_apis.get_session(), id=loadbalancer_id) LOG.debug("Updating VIP for listener delete on load_balancer %s.", loadbalancer.id) self.network_driver.update_vip(loadbalancer, for_delete=True)
[docs]class GetAmphoraNetworkConfigs(BaseNetworkTask): """Task to retrieve amphora network details."""
[docs] def execute(self, loadbalancer, amphora=None): LOG.debug("Retrieving vip network details.") db_amp = self.amphora_repo.get(db_apis.get_session(), id=amphora.get(constants.ID)) db_lb = self.loadbalancer_repo.get( db_apis.get_session(), id=loadbalancer[constants.LOADBALANCER_ID]) db_configs = self.network_driver.get_network_configs( db_lb, amphora=db_amp) provider_dict = {} for amp_id, amp_conf in db_configs.items(): provider_dict[amp_id] = amp_conf.to_dict(recurse=True) return provider_dict
[docs]class GetAmphoraNetworkConfigsByID(BaseNetworkTask): """Task to retrieve amphora network details."""
[docs] def execute(self, loadbalancer_id, amphora_id=None): LOG.debug("Retrieving vip network details.") loadbalancer = self.loadbalancer_repo.get(db_apis.get_session(), id=loadbalancer_id) amphora = self.amphora_repo.get(db_apis.get_session(), id=amphora_id) db_configs = self.network_driver.get_network_configs(loadbalancer, amphora=amphora) provider_dict = {} for amp_id, amp_conf in db_configs.items(): provider_dict[amp_id] = amp_conf.to_dict(recurse=True) return provider_dict
[docs]class GetAmphoraeNetworkConfigs(BaseNetworkTask): """Task to retrieve amphorae network details."""
[docs] def execute(self, loadbalancer_id): LOG.debug("Retrieving vip network details.") db_lb = self.loadbalancer_repo.get( db_apis.get_session(), id=loadbalancer_id) db_configs = self.network_driver.get_network_configs(db_lb) provider_dict = {} for amp_id, amp_conf in db_configs.items(): provider_dict[amp_id] = amp_conf.to_dict(recurse=True) return provider_dict
[docs]class FailoverPreparationForAmphora(BaseNetworkTask): """Task to prepare an amphora for failover."""
[docs] def execute(self, amphora): db_amp = self.amphora_repo.get(db_apis.get_session(), id=amphora[constants.ID]) LOG.debug("Prepare amphora %s for failover.", amphora[constants.ID]) self.network_driver.failover_preparation(db_amp)
[docs]class RetrievePortIDsOnAmphoraExceptLBNetwork(BaseNetworkTask): """Task retrieving all the port ids on an amphora, except lb network."""
[docs] def execute(self, amphora): LOG.debug("Retrieve all but the lb network port id on amphora %s.", amphora[constants.ID]) interfaces = self.network_driver.get_plugged_networks( compute_id=amphora[constants.COMPUTE_ID]) ports = [] for interface_ in interfaces: if interface_.port_id not in ports: port = self.network_driver.get_port(port_id=interface_.port_id) ips = port.fixed_ips lb_network = False for ip in ips: if ip.ip_address == amphora[constants.LB_NETWORK_IP]: lb_network = True if not lb_network: ports.append(port) return ports
[docs]class PlugPorts(BaseNetworkTask): """Task to plug neutron ports into a compute instance."""
[docs] def execute(self, amphora, ports): db_amp = self.amphora_repo.get(db_apis.get_session(), id=amphora[constants.ID]) for port in ports: LOG.debug('Plugging port ID: %(port_id)s into compute instance: ' '%(compute_id)s.', {constants.PORT_ID: port.id, constants.COMPUTE_ID: amphora[constants.COMPUTE_ID]}) self.network_driver.plug_port(db_amp, port)
[docs]class ApplyQos(BaseNetworkTask): """Apply Quality of Services to the VIP""" def _apply_qos_on_vrrp_ports(self, loadbalancer, amps_data, qos_policy_id, is_revert=False, request_qos_id=None): """Call network driver to apply QoS Policy on the vrrp ports.""" if not amps_data: db_lb = self.loadbalancer_repo.get( db_apis.get_session(), id=loadbalancer[constants.LOADBALANCER_ID]) amps_data = db_lb.amphorae amps_data = [amp for amp in amps_data if amp.status == constants.AMPHORA_ALLOCATED] apply_qos = ApplyQosAmphora() for amp_data in amps_data: apply_qos._apply_qos_on_vrrp_port(loadbalancer, amp_data.to_dict(), qos_policy_id)
[docs] def execute(self, loadbalancer, amps_data=None, update_dict=None): """Apply qos policy on the vrrp ports which are related with vip.""" db_lb = self.loadbalancer_repo.get( db_apis.get_session(), id=loadbalancer[constants.LOADBALANCER_ID]) qos_policy_id = db_lb.vip.qos_policy_id if not qos_policy_id and ( not update_dict or ( 'vip' not in update_dict or 'qos_policy_id' not in update_dict[constants.VIP])): return if update_dict and update_dict.get(constants.VIP): vip_dict = update_dict[constants.VIP] if vip_dict.get(constants.QOS_POLICY_ID): qos_policy_id = vip_dict[constants.QOS_POLICY_ID] self._apply_qos_on_vrrp_ports(loadbalancer, amps_data, qos_policy_id)
[docs] def revert(self, result, loadbalancer, amps_data=None, update_dict=None, *args, **kwargs): """Handle a failure to apply QoS to VIP""" request_qos_id = loadbalancer['vip_qos_policy_id'] orig_lb = self.task_utils.get_current_loadbalancer_from_db( loadbalancer[constants.LOADBALANCER_ID]) orig_qos_id = orig_lb.vip.qos_policy_id if request_qos_id != orig_qos_id: self._apply_qos_on_vrrp_ports(loadbalancer, amps_data, orig_qos_id, is_revert=True, request_qos_id=request_qos_id)
[docs]class ApplyQosAmphora(BaseNetworkTask): """Apply Quality of Services to the VIP""" def _apply_qos_on_vrrp_port(self, loadbalancer, amp_data, qos_policy_id, is_revert=False, request_qos_id=None): """Call network driver to apply QoS Policy on the vrrp ports.""" try: self.network_driver.apply_qos_on_port( qos_policy_id, amp_data[constants.VRRP_PORT_ID]) except Exception: if not is_revert: raise LOG.warning('Failed to undo qos policy %(qos_id)s ' 'on vrrp port: %(port)s from ' 'amphorae: %(amp)s', {'qos_id': request_qos_id, 'port': amp_data[constants.VRRP_PORT_ID], 'amp': [amp.get(constants.ID) for amp in amp_data]})
[docs] def execute(self, loadbalancer, amp_data=None, update_dict=None): """Apply qos policy on the vrrp ports which are related with vip.""" qos_policy_id = loadbalancer['vip_qos_policy_id'] if not qos_policy_id and ( update_dict and ( 'vip' not in update_dict or 'qos_policy_id' not in update_dict[constants.VIP])): return self._apply_qos_on_vrrp_port(loadbalancer, amp_data, qos_policy_id)
[docs] def revert(self, result, loadbalancer, amp_data=None, update_dict=None, *args, **kwargs): """Handle a failure to apply QoS to VIP""" try: request_qos_id = loadbalancer['vip_qos_policy_id'] orig_lb = self.task_utils.get_current_loadbalancer_from_db( loadbalancer[constants.LOADBALANCER_ID]) orig_qos_id = orig_lb.vip.qos_policy_id if request_qos_id != orig_qos_id: self._apply_qos_on_vrrp_port(loadbalancer, amp_data, orig_qos_id, is_revert=True, request_qos_id=request_qos_id) except Exception as e: LOG.error('Failed to remove QoS policy: %s from port: %s due ' 'to error: %s', orig_qos_id, amp_data[constants.VRRP_PORT_ID], str(e))
[docs]class DeletePort(BaseNetworkTask): """Task to delete a network port."""
[docs] @tenacity.retry(retry=tenacity.retry_if_exception_type(), stop=tenacity.stop_after_attempt( CONF.networking.max_retries), wait=tenacity.wait_exponential( multiplier=CONF.networking.retry_backoff, min=CONF.networking.retry_interval, max=CONF.networking.retry_max), reraise=True) def execute(self, port_id, passive_failure=False): """Delete the network port.""" if port_id is None: return if self.execute.retry.statistics.get(constants.ATTEMPT_NUMBER, 1) == 1: LOG.debug("Deleting network port %s", port_id) else: LOG.warning('Retrying network port %s delete attempt %s of %s.', port_id, self.execute.retry.statistics[ constants.ATTEMPT_NUMBER], self.execute.retry.stop.max_attempt_number) # Let the Taskflow engine know we are working and alive # Don't use get with a default for 'attempt_number', we need to fail # if that number is missing. self.update_progress( self.execute.retry.statistics[constants.ATTEMPT_NUMBER] / self.execute.retry.stop.max_attempt_number) try: self.network_driver.delete_port(port_id) except Exception: if (self.execute.retry.statistics[constants.ATTEMPT_NUMBER] != self.execute.retry.stop.max_attempt_number): LOG.warning('Network port delete for port id: %s failed. ' 'Retrying.', port_id) raise if passive_failure: LOG.exception('Network port delete for port ID: %s failed. ' 'This resource will be abandoned and should ' 'manually be cleaned up once the ' 'network service is functional.', port_id) # Let's at least attempt to disable it so if the instance # comes back from the dead it doesn't conflict with anything. try: self.network_driver.admin_down_port(port_id) LOG.info('Successfully disabled (admin down) network port ' '%s that failed to delete.', port_id) except Exception: LOG.warning('Attempt to disable (admin down) network port ' '%s failed. The network service has failed. ' 'Continuing.', port_id) else: LOG.exception('Network port delete for port ID: %s failed. ' 'The network service has failed. ' 'Aborting and reverting.', port_id) raise
[docs]class CreateVIPBasePort(BaseNetworkTask): """Task to create the VIP base port for an amphora."""
[docs] @tenacity.retry(retry=tenacity.retry_if_exception_type(), stop=tenacity.stop_after_attempt( CONF.networking.max_retries), wait=tenacity.wait_exponential( multiplier=CONF.networking.retry_backoff, min=CONF.networking.retry_interval, max=CONF.networking.retry_max), reraise=True) def execute(self, vip, vip_sg_id, amphora_id, additional_vips): port_name = constants.AMP_BASE_PORT_PREFIX + amphora_id fixed_ips = [{constants.SUBNET_ID: vip[constants.SUBNET_ID]}] sg_id = [] if vip_sg_id: sg_id = [vip_sg_id] secondary_ips = [vip[constants.IP_ADDRESS]] for add_vip in additional_vips: secondary_ips.append(add_vip[constants.IP_ADDRESS]) port = self.network_driver.create_port( vip[constants.NETWORK_ID], name=port_name, fixed_ips=fixed_ips, secondary_ips=secondary_ips, security_group_ids=sg_id, qos_policy_id=vip[constants.QOS_POLICY_ID]) LOG.info('Created port %s with ID %s for amphora %s', port_name, port.id, amphora_id) return port.to_dict(recurse=True)
[docs] def revert(self, result, vip, vip_sg_id, amphora_id, additional_vips, *args, **kwargs): if isinstance(result, failure.Failure): return try: port_name = constants.AMP_BASE_PORT_PREFIX + amphora_id for port in result: self.network_driver.delete_port(port.id) LOG.info('Deleted port %s with ID %s for amphora %s due to a ' 'revert.', port_name, port.id, amphora_id) except Exception as e: LOG.error('Failed to delete port %s. Resources may still be in ' 'use for a port intended for amphora %s due to error ' '%s. Search for a port named %s', result, amphora_id, str(e), port_name)
[docs]class AdminDownPort(BaseNetworkTask):
[docs] def execute(self, port_id): try: self.network_driver.set_port_admin_state_up(port_id, False) except base.PortNotFound: return for i in range(CONF.networking.max_retries): port = self.network_driver.get_port(port_id) if port.status == constants.DOWN: LOG.debug('Disabled port: %s', port_id) return LOG.debug('Port %s is %s instead of DOWN, waiting.', port_id, port.status) time.sleep(CONF.networking.retry_interval) LOG.error('Port %s failed to go DOWN. Port status is still %s. ' 'Ignoring and continuing.', port_id, port.status)
[docs] def revert(self, result, port_id, *args, **kwargs): if isinstance(result, failure.Failure): return try: self.network_driver.set_port_admin_state_up(port_id, True) except Exception as e: LOG.error('Failed to bring port %s admin up on revert due to: %s.', port_id, str(e))
[docs]class GetVIPSecurityGroupID(BaseNetworkTask):
[docs] def execute(self, loadbalancer_id): sg_name = utils.get_vip_security_group_name(loadbalancer_id) try: security_group = self.network_driver.get_security_group(sg_name) if security_group: return security_group.id except base.SecurityGroupNotFound: with excutils.save_and_reraise_exception() as ctxt: if self.network_driver.sec_grp_enabled: LOG.error('VIP security group %s was not found.', sg_name) else: ctxt.reraise = False return None