octavia.network.drivers.neutron.allowed_address_pairs

Source code for octavia.network.drivers.neutron.allowed_address_pairs

#    Copyright 2014 Rackspace
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import time

from neutronclient.common import exceptions as neutron_client_exceptions
from novaclient import exceptions as nova_client_exceptions
from oslo_config import cfg
from oslo_log import log as logging
import six

from octavia.common import clients
from octavia.common import constants
from octavia.common import data_models
from octavia.network import base
from octavia.network import data_models as n_data_models
from octavia.network.drivers.neutron import base as neutron_base
from octavia.network.drivers.neutron import utils

import ipaddress


LOG = logging.getLogger(__name__)
AAP_EXT_ALIAS = 'allowed-address-pairs'
VIP_SECURITY_GRP_PREFIX = 'lb-'
OCTAVIA_OWNER = 'Octavia'

CONF = cfg.CONF


[docs]class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver): def __init__(self): super(AllowedAddressPairsDriver, self).__init__() self._check_aap_loaded() self.nova_client = clients.NovaAuth.get_nova_client( endpoint=CONF.nova.endpoint, region=CONF.nova.region_name, endpoint_type=CONF.nova.endpoint_type, service_name=CONF.nova.service_name, insecure=CONF.nova.insecure, cacert=CONF.nova.ca_certificates_file ) def _check_aap_loaded(self): if not self._check_extension_enabled(AAP_EXT_ALIAS): raise base.NetworkException( 'The {alias} extension is not enabled in neutron. This ' 'driver cannot be used with the {alias} extension ' 'disabled.'.format(alias=AAP_EXT_ALIAS)) def _get_interfaces_to_unplug(self, interfaces, network_id, ip_address=None): ret = [] for interface in interfaces: if interface.network_id == network_id: if ip_address: for fixed_ip in interface.fixed_ips: if ip_address == fixed_ip.ip_address: ret.append(interface) else: ret.append(interface) return ret def _get_plugged_interface(self, compute_id, network_id, lb_network_ip): interfaces = self.get_plugged_networks(compute_id) for interface in interfaces: is_correct_interface = interface.network_id == network_id for ip in interface.fixed_ips: if ip.ip_address == lb_network_ip: is_correct_interface = False if is_correct_interface: return interface def _plug_amphora_vip(self, amphora, subnet): # We need a vip port owned by Octavia for Act/Stby and failover try: port = {'port': {'name': 'octavia-lb-vrrp-' + amphora.id, 'network_id': subnet.network_id, 'fixed_ips': [{'subnet_id': subnet.id}], 'admin_state_up': True, 'device_owner': OCTAVIA_OWNER}} new_port = self.neutron_client.create_port(port) new_port = utils.convert_port_dict_to_model(new_port) LOG.debug('Created vip port: %(port_id)s for amphora: %(amp)s', {'port_id': new_port.id, 'amp': amphora.id}) interface = self.plug_port(amphora, new_port) except Exception: message = _('Error plugging amphora (compute_id: {compute_id}) ' 'into vip network {network_id}.').format( compute_id=amphora.compute_id, network_id=subnet.network_id) LOG.exception(message) raise base.PlugVIPException(message) return interface def _add_vip_address_pair(self, port_id, vip_address): try: self._add_allowed_address_pair_to_port(port_id, vip_address) except neutron_client_exceptions.PortNotFoundClient as e: raise base.PortNotFound(e.message) except Exception: message = _('Error adding allowed address pair {ip} ' 'to port {port_id}.').format(ip=vip_address, port_id=port_id) LOG.exception(message) raise base.PlugVIPException(message) def _get_lb_security_group(self, load_balancer_id): sec_grp_name = VIP_SECURITY_GRP_PREFIX + load_balancer_id sec_grps = self.neutron_client.list_security_groups(name=sec_grp_name) if sec_grps and sec_grps.get('security_groups'): return sec_grps.get('security_groups')[0] def _get_ethertype_for_ip(self, ip): address = ipaddress.ip_address( ip if isinstance(ip, six.text_type) else six.u(ip)) return 'IPv6' if address.version is 6 else 'IPv4' def _update_security_group_rules(self, load_balancer, sec_grp_id): rules = self.neutron_client.list_security_group_rules( security_group_id=sec_grp_id) updated_ports = [ listener.protocol_port for listener in load_balancer.listeners if listener.provisioning_status != constants.PENDING_DELETE and listener.provisioning_status != constants.DELETED] peer_ports = [ listener.peer_port for listener in load_balancer.listeners if listener.provisioning_status != constants.PENDING_DELETE and listener.provisioning_status != constants.DELETED] updated_ports.extend(peer_ports) # Just going to use port_range_max for now because we can assume that # port_range_max and min will be the same since this driver is # responsible for creating these rules old_ports = [rule.get('port_range_max') for rule in rules.get('security_group_rules', []) # Don't remove egress rules and don't # confuse other protocols with None ports # with the egress rules. VRRP uses protocol # 51 and 112 if rule.get('direction') != 'egress' and rule.get('protocol', '').lower() == 'tcp'] add_ports = set(updated_ports) - set(old_ports) del_ports = set(old_ports) - set(updated_ports) for rule in rules.get('security_group_rules', []): if rule.get('port_range_max') in del_ports: rule_id = rule.get('id') try: self.neutron_client.delete_security_group_rule(rule_id) except neutron_client_exceptions.NotFound: LOG.info("Security group rule %s not found, will assume " "it is already deleted.", rule_id) ethertype = self._get_ethertype_for_ip(load_balancer.vip.ip_address) for port in add_ports: self._create_security_group_rule(sec_grp_id, 'TCP', port_min=port, port_max=port, ethertype=ethertype) # Currently we are using the VIP network for VRRP # so we need to open up the protocols for it if (CONF.controller_worker.loadbalancer_topology == constants.TOPOLOGY_ACTIVE_STANDBY): try: self._create_security_group_rule( sec_grp_id, constants.VRRP_PROTOCOL_NUM, direction='ingress', ethertype=ethertype) except neutron_client_exceptions.Conflict: # It's ok if this rule already exists pass except Exception as e: raise base.PlugVIPException(str(e)) try: self._create_security_group_rule( sec_grp_id, constants.AUTH_HEADER_PROTOCOL_NUMBER, direction='ingress', ethertype=ethertype) except neutron_client_exceptions.Conflict: # It's ok if this rule already exists pass except Exception as e: raise base.PlugVIPException(str(e)) def _update_vip_security_group(self, load_balancer, vip): sec_grp = self._get_lb_security_group(load_balancer.id) if not sec_grp: sec_grp_name = VIP_SECURITY_GRP_PREFIX + load_balancer.id sec_grp = self._create_security_group(sec_grp_name) self._update_security_group_rules(load_balancer, sec_grp.get('id')) self._add_vip_security_group_to_port(load_balancer.id, vip.port_id, sec_grp.get('id')) def _add_vip_security_group_to_port(self, load_balancer_id, port_id, sec_grp_id=None): sec_grp_id = (sec_grp_id or self._get_lb_security_group(load_balancer_id).get('id')) try: self._add_security_group_to_port(sec_grp_id, port_id) except base.PortNotFound: raise except base.NetworkException as e: raise base.PlugVIPException(str(e)) def _delete_vip_security_group(self, sec_grp): """Deletes a security group in neutron. Retries upon an exception because removing a security group from a neutron port does not happen immediately. """ attempts = 0 while attempts <= CONF.networking.max_retries: try: self.neutron_client.delete_security_group(sec_grp) LOG.info("Deleted security group %s", sec_grp) return except neutron_client_exceptions.NotFound: LOG.info("Security group %s not found, will assume it is " "already deleted", sec_grp) return except Exception: LOG.warning("Attempt %(attempt)s to remove security group " "%(sg)s failed.", {'attempt': attempts + 1, 'sg': sec_grp}) attempts += 1 time.sleep(CONF.networking.retry_interval) message = _("All attempts to remove security group {0} have " "failed.").format(sec_grp) LOG.exception(message) raise base.DeallocateVIPException(message) @staticmethod def _filter_amphora(amp): return amp.status == constants.AMPHORA_ALLOCATED def _delete_security_group(self, vip, port): if self.sec_grp_enabled: sec_grp = self._get_lb_security_group(vip.load_balancer.id) if sec_grp: sec_grp = sec_grp.get('id') LOG.info( "Removing security group %(sg)s from port %(port)s", {'sg': sec_grp, 'port': vip.port_id}) raw_port = None try: if port: raw_port = self.neutron_client.show_port(port.id) except Exception: LOG.warning('Unable to get port information for port ' ' %s. Continuing to delete the security ' 'group.', port.id) if raw_port: sec_grps = raw_port.get( 'port', {}).get('security_groups', []) if sec_grp in sec_grps: sec_grps.remove(sec_grp) port_update = {'port': {'security_groups': sec_grps}} self.neutron_client.update_port(port.id, port_update) self._delete_vip_security_group(sec_grp)
[docs] def deallocate_vip(self, vip): """Delete the vrrp_port (instance port) in case nova didn't This can happen if a failover has occurred. """ for amphora in six.moves.filter(self._filter_amphora, vip.load_balancer.amphorae): try: self.neutron_client.delete_port(amphora.vrrp_port_id) except (neutron_client_exceptions.NotFound, neutron_client_exceptions.PortNotFoundClient): LOG.debug('VIP instance port %s already deleted. Skipping.', amphora.vrrp_port_id) try: port = self.get_port(vip.port_id) except base.PortNotFound: LOG.warning("Can't deallocate VIP because the vip port {0} " "cannot be found in neutron. " "Continuing cleanup.".format(vip.port_id)) port = None self._delete_security_group(vip, port) if port and port.device_owner == OCTAVIA_OWNER: try: self.neutron_client.delete_port(vip.port_id) except (neutron_client_exceptions.NotFound, neutron_client_exceptions.PortNotFoundClient): LOG.debug('VIP port %s already deleted. Skipping.', vip.port_id) except Exception: message = _('Error deleting VIP port_id {port_id} from ' 'neutron').format(port_id=vip.port_id) LOG.exception(message) raise base.DeallocateVIPException(message) elif port: LOG.info("Port %s will not be deleted by Octavia as it was " "not created by Octavia.", vip.port_id)
[docs] def plug_vip(self, load_balancer, vip): if self.sec_grp_enabled: self._update_vip_security_group(load_balancer, vip) plugged_amphorae = [] subnet = self.get_subnet(vip.subnet_id) for amphora in six.moves.filter( lambda amp: amp.status == constants.AMPHORA_ALLOCATED, load_balancer.amphorae): interface = self._get_plugged_interface( amphora.compute_id, subnet.network_id, amphora.lb_network_ip) if not interface: interface = self._plug_amphora_vip(amphora, subnet) self._add_vip_address_pair(interface.port_id, vip.ip_address) if self.sec_grp_enabled: self._add_vip_security_group_to_port(load_balancer.id, interface.port_id) vrrp_ip = None for fixed_ip in interface.fixed_ips: is_correct_subnet = fixed_ip.subnet_id == subnet.id is_management_ip = fixed_ip.ip_address == amphora.lb_network_ip if is_correct_subnet and not is_management_ip: vrrp_ip = fixed_ip.ip_address break plugged_amphorae.append(data_models.Amphora( id=amphora.id, compute_id=amphora.compute_id, vrrp_ip=vrrp_ip, ha_ip=vip.ip_address, vrrp_port_id=interface.port_id, ha_port_id=vip.port_id)) return plugged_amphorae
[docs] def allocate_vip(self, load_balancer): if load_balancer.vip.port_id: LOG.info('Port %s already exists. Nothing to be done.', load_balancer.vip.port_id) port = self.get_port(load_balancer.vip.port_id) return self._port_to_vip(port, load_balancer) fixed_ip = {} if load_balancer.vip.subnet_id: fixed_ip['subnet_id'] = load_balancer.vip.subnet_id if load_balancer.vip.ip_address: fixed_ip['ip_address'] = load_balancer.vip.ip_address # It can be assumed that network_id exists port = {'port': {'name': 'octavia-lb-' + load_balancer.id, 'network_id': load_balancer.vip.network_id, 'admin_state_up': False, 'device_id': 'lb-{0}'.format(load_balancer.id), 'device_owner': OCTAVIA_OWNER, 'project_id': load_balancer.project_id}} if fixed_ip: port['port']['fixed_ips'] = [fixed_ip] try: new_port = self.neutron_client.create_port(port) except Exception as e: message = _('Error creating neutron port on network ' '{network_id}.').format( network_id=load_balancer.vip.network_id) LOG.exception(message) raise base.AllocateVIPException( message, orig_msg=getattr(e, 'message', None), orig_code=getattr(e, 'status_code', None)) new_port = utils.convert_port_dict_to_model(new_port) return self._port_to_vip(new_port, load_balancer)
[docs] def unplug_vip(self, load_balancer, vip): try: subnet = self.get_subnet(vip.subnet_id) except base.SubnetNotFound: msg = ("Can't unplug vip because vip subnet {0} was not " "found").format(vip.subnet_id) LOG.exception(msg) raise base.PluggedVIPNotFound(msg) for amphora in six.moves.filter( lambda amp: amp.status == constants.AMPHORA_ALLOCATED, load_balancer.amphorae): interface = self._get_plugged_interface( amphora.compute_id, subnet.network_id, amphora.lb_network_ip) if not interface: # Thought about raising PluggedVIPNotFound exception but # then that wouldn't evaluate all amphorae, so just continue LOG.debug('Cannot get amphora %s interface, skipped', amphora.compute_id) continue try: self.unplug_network(amphora.compute_id, subnet.network_id) except Exception: pass try: aap_update = {'port': { 'allowed_address_pairs': [] }} self.neutron_client.update_port(interface.port_id, aap_update) except Exception: message = _('Error unplugging VIP. Could not clear ' 'allowed address pairs from port ' '{port_id}.').format(port_id=vip.port_id) LOG.exception(message) raise base.UnplugVIPException(message) # Delete the VRRP port if we created it try: port = self.get_port(amphora.vrrp_port_id) if port.name.startswith('octavia-lb-vrrp-'): self.neutron_client.delete_port(amphora.vrrp_port_id) except (neutron_client_exceptions.NotFound, neutron_client_exceptions.PortNotFoundClient): pass except Exception as e: LOG.error('Failed to delete port. Resources may still be in ' 'use for port: %(port)s due to error: %s(except)s', {'port': amphora.vrrp_port_id, 'except': e})
[docs] def plug_network(self, compute_id, network_id, ip_address=None): try: interface = self.nova_client.servers.interface_attach( server=compute_id, net_id=network_id, fixed_ip=ip_address, port_id=None) except nova_client_exceptions.NotFound as e: if 'Instance' in e.message: raise base.AmphoraNotFound(e.message) elif 'Network' in e.message: raise base.NetworkNotFound(e.message) else: raise base.PlugNetworkException(e.message) except Exception: message = _('Error plugging amphora (compute_id: {compute_id}) ' 'into network {network_id}.').format( compute_id=compute_id, network_id=network_id) LOG.exception(message) raise base.PlugNetworkException(message) return self._nova_interface_to_octavia_interface(compute_id, interface)
[docs] def unplug_network(self, compute_id, network_id, ip_address=None): interfaces = self.get_plugged_networks(compute_id) if not interfaces: msg = ('Amphora with compute id {compute_id} does not have any ' 'plugged networks').format(compute_id=compute_id) raise base.NetworkNotFound(msg) unpluggers = self._get_interfaces_to_unplug(interfaces, network_id, ip_address=ip_address) for index, unplugger in enumerate(unpluggers): try: self.nova_client.servers.interface_detach( server=compute_id, port_id=unplugger.port_id) except Exception: LOG.warning('Error unplugging port {port_id} from amphora ' 'with compute ID {compute_id}. ' 'Skipping.'.format(port_id=unplugger.port_id, compute_id=compute_id))
[docs] def update_vip(self, load_balancer): sec_grp = self._get_lb_security_group(load_balancer.id) self._update_security_group_rules(load_balancer, sec_grp.get('id'))
[docs] def failover_preparation(self, amphora): if self.dns_integration_enabled: self._failover_preparation(amphora)
def _failover_preparation(self, amphora): interfaces = self.get_plugged_networks(compute_id=amphora.compute_id) ports = [] for interface_ in interfaces: port = self.get_port(port_id=interface_.port_id) ips = port.fixed_ips lb_network = False for ip in ips: if ip.ip_address == amphora.lb_network_ip: lb_network = True if not lb_network: ports.append(port) for port in ports: try: self.neutron_client.update_port(port.id, {'port': {'dns_name': ''}}) except (neutron_client_exceptions.NotFound, neutron_client_exceptions.PortNotFoundClient): raise base.PortNotFound()
[docs] def plug_port(self, amphora, port): try: interface = self.nova_client.servers.interface_attach( server=amphora.compute_id, net_id=None, fixed_ip=None, port_id=port.id) plugged_interface = self._nova_interface_to_octavia_interface( amphora.compute_id, interface) except nova_client_exceptions.NotFound as e: if 'Instance' in e.message: raise base.AmphoraNotFound(e.message) elif 'Network' in e.message: raise base.NetworkNotFound(e.message) else: raise base.PlugNetworkException(e.message) except nova_client_exceptions.Conflict: LOG.info('Port %(portid)s is already plugged, ' 'skipping', {'portid': port.id}) plugged_interface = n_data_models.Interface( compute_id=amphora.compute_id, network_id=port.network_id, port_id=port.id, fixed_ips=port.fixed_ips) except Exception: message = _('Error plugging amphora (compute_id: ' '{compute_id}) into port ' '{port_id}.').format( compute_id=amphora.compute_id, port_id=port.id) LOG.exception(message) raise base.PlugNetworkException(message) return plugged_interface
[docs] def get_network_configs(self, loadbalancer): vip_subnet = self.get_subnet(loadbalancer.vip.subnet_id) vip_port = self.get_port(loadbalancer.vip.port_id) amp_configs = {} for amp in loadbalancer.amphorae: if amp.status != constants.DELETED: LOG.debug("Retrieving network details for amphora %s", amp.id) vrrp_port = self.get_port(amp.vrrp_port_id) vrrp_subnet = self.get_subnet( vrrp_port.get_subnet_id(amp.vrrp_ip)) vrrp_port.network = self.get_network(vrrp_port.network_id) ha_port = self.get_port(amp.ha_port_id) ha_subnet = self.get_subnet( ha_port.get_subnet_id(amp.ha_ip)) amp_configs[amp.id] = n_data_models.AmphoraNetworkConfig( amphora=amp, vip_subnet=vip_subnet, vip_port=vip_port, vrrp_subnet=vrrp_subnet, vrrp_port=vrrp_port, ha_subnet=ha_subnet, ha_port=ha_port ) return amp_configs
[docs] def wait_for_port_detach(self, amphora): """Waits for the amphora ports device_id to be unset. This method waits for the ports on an amphora device_id parameter to be '' or None which signifies that nova has finished detaching the port from the instance. :param amphora: Amphora to wait for ports to detach. :returns: None :raises TimeoutException: Port did not detach in interval. :raises PortNotFound: Port was not found by neutron. """ interfaces = self.get_plugged_networks(compute_id=amphora.compute_id) ports = [] port_detach_timeout = CONF.networking.port_detach_timeout for interface_ in interfaces: port = self.get_port(port_id=interface_.port_id) ips = port.fixed_ips lb_network = False for ip in ips: if ip.ip_address == amphora.lb_network_ip: lb_network = True if not lb_network: ports.append(port) for port in ports: try: neutron_port = self.neutron_client.show_port( port.id).get('port') device_id = neutron_port['device_id'] start = int(time.time()) while device_id: time.sleep(CONF.networking.retry_interval) neutron_port = self.neutron_client.show_port( port.id).get('port') device_id = neutron_port['device_id'] timed_out = int(time.time()) - start >= port_detach_timeout if device_id and timed_out: message = ('Port %s failed to detach (device_id %s) ' 'within the required time (%s s).' % (port.id, device_id, port_detach_timeout)) raise base.TimeoutException(message) except (neutron_client_exceptions.NotFound, neutron_client_exceptions.PortNotFoundClient): pass
Creative Commons Attribution 3.0 License

Except where otherwise noted, this document is licensed under Creative Commons Attribution 3.0 License. See all OpenStack Legal Documents.