# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import re
import stat
import subprocess
import typing as tp
import jinja2
from oslo_config import cfg
from oslo_log import log as logging
from octavia.amphorae.backends.utils import ip_advertisement
from octavia.amphorae.backends.utils import network_utils
from octavia.common import constants as consts
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
FRONTEND_BACKEND_PATTERN = re.compile(r'\n(frontend|backend)\s+(\S+)\n')
LISTENER_MODE_PATTERN = re.compile(r'^\s+mode\s+(.*)$', re.MULTILINE)
TLS_CERT_PATTERN = re.compile(r'^\s+bind\s+\S+\s+ssl crt-list\s+(\S*)',
re.MULTILINE)
STATS_SOCKET_PATTERN = re.compile(r'stats socket\s+(\S+)')
[docs]
class ParsingError(Exception):
pass
[docs]
def init_path(lb_id):
return os.path.join(consts.SYSTEMD_DIR, f'haproxy-{lb_id}.service')
[docs]
def keepalived_lvs_dir():
return os.path.join(CONF.haproxy_amphora.base_path, 'lvs')
[docs]
def keepalived_lvs_init_path(listener_id):
return os.path.join(consts.SYSTEMD_DIR,
consts.KEEPALIVEDLVS_SYSTEMD %
str(listener_id))
[docs]
def keepalived_backend_check_script_dir():
return os.path.join(CONF.haproxy_amphora.base_path, 'lvs/check/')
[docs]
def keepalived_backend_check_script_path():
return os.path.join(keepalived_backend_check_script_dir(),
'udp_check.sh')
[docs]
def keepalived_lvs_pids_path(listener_id):
pids_path = {}
for file_ext in ['pid', 'vrrp.pid', 'check.pid']:
pids_path[file_ext] = (
os.path.join(CONF.haproxy_amphora.base_path,
f"lvs/octavia-keepalivedlvs-{str(listener_id)}."
f"{file_ext}"))
return pids_path['pid'], pids_path['vrrp.pid'], pids_path['check.pid']
[docs]
def keepalived_lvs_cfg_path(listener_id):
return os.path.join(CONF.haproxy_amphora.base_path,
f"lvs/octavia-keepalivedlvs-{str(listener_id)}.conf")
[docs]
def haproxy_dir(lb_id):
return os.path.join(CONF.haproxy_amphora.base_path, lb_id)
[docs]
def pid_path(lb_id):
return os.path.join(haproxy_dir(lb_id), lb_id + '.pid')
[docs]
def config_path(lb_id):
return os.path.join(haproxy_dir(lb_id), 'haproxy.cfg')
[docs]
def state_file_path(lb_id):
return os.path.join(haproxy_dir(lb_id), 'servers-state')
[docs]
def get_haproxy_pid(lb_id):
with open(pid_path(lb_id), encoding='utf-8') as f:
return f.readline().rstrip()
[docs]
def get_keepalivedlvs_pid(listener_id):
pid_file = keepalived_lvs_pids_path(listener_id)[0]
with open(pid_file, encoding='utf-8') as f:
return f.readline().rstrip()
[docs]
def haproxy_sock_path(lb_id):
return os.path.join(CONF.haproxy_amphora.base_path, lb_id + '.sock')
[docs]
def haproxy_check_script_path():
return os.path.join(keepalived_check_scripts_dir(),
'haproxy_check_script.sh')
[docs]
def keepalived_dir():
return os.path.join(CONF.haproxy_amphora.base_path, 'vrrp')
[docs]
def keepalived_init_path():
return os.path.join(consts.SYSTEMD_DIR, consts.KEEPALIVED_SYSTEMD)
[docs]
def keepalived_pid_path():
return os.path.join(CONF.haproxy_amphora.base_path,
'vrrp/octavia-keepalived.pid')
[docs]
def keepalived_cfg_path():
return os.path.join(CONF.haproxy_amphora.base_path,
'vrrp/octavia-keepalived.conf')
[docs]
def keepalived_log_path():
return os.path.join(CONF.haproxy_amphora.base_path,
'vrrp/octavia-keepalived.log')
[docs]
def keepalived_check_scripts_dir():
return os.path.join(CONF.haproxy_amphora.base_path,
'vrrp/check_scripts')
[docs]
def keepalived_check_script_path():
return os.path.join(CONF.haproxy_amphora.base_path,
'vrrp/check_script.sh')
[docs]
def get_listeners():
"""Get Listeners
:returns: An array with the ids of all listeners, e.g. ['123', '456', ...]
or [] if no listeners exist
"""
listeners = []
for lb_id in get_loadbalancers():
listeners_on_lb = parse_haproxy_file(lb_id)[1]
listeners.extend(list(listeners_on_lb.keys()))
return listeners
[docs]
def get_loadbalancers():
"""Get Load balancers
:returns: An array with the uuids of all load balancers,
e.g. ['123', '456', ...] or [] if no loadbalancers exist
"""
if os.path.exists(CONF.haproxy_amphora.base_path):
return [f for f in os.listdir(CONF.haproxy_amphora.base_path)
if os.path.exists(config_path(f))]
return []
[docs]
def is_lb_running(lb_id):
return os.path.exists(pid_path(lb_id)) and os.path.exists(
os.path.join('/proc', get_haproxy_pid(lb_id)))
[docs]
def get_lvs_listeners():
result = []
if os.path.exists(keepalived_lvs_dir()):
for f in os.listdir(keepalived_lvs_dir()):
if f.endswith('.conf'):
prefix = f.split('.')[0]
if re.search("octavia-keepalivedlvs-", prefix):
result.append(f.split(
'octavia-keepalivedlvs-')[1].split('.')[0])
return result
[docs]
def is_lvs_listener_running(listener_id):
pid_file = keepalived_lvs_pids_path(listener_id)[0]
return os.path.exists(pid_file) and os.path.exists(
os.path.join('/proc', get_keepalivedlvs_pid(listener_id)))
[docs]
def install_netns_systemd_service():
flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC
# mode 00644
mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH
# TODO(bcafarel): implement this for other init systems
# netns handling depends on a separate unit file
netns_path = os.path.join(consts.SYSTEMD_DIR,
consts.AMP_NETNS_SVC_PREFIX + '.service')
jinja_env = jinja2.Environment(
autoescape=True, loader=jinja2.FileSystemLoader(os.path.dirname(
os.path.realpath(__file__)
) + consts.AGENT_API_TEMPLATES))
if not os.path.exists(netns_path):
with os.fdopen(os.open(netns_path, flags, mode), 'w') as text_file:
text = jinja_env.get_template(
consts.AMP_NETNS_SVC_PREFIX + '.systemd.j2').render(
amphora_nsname=consts.AMPHORA_NAMESPACE)
text_file.write(text)
[docs]
def run_systemctl_command(command, service, raise_error=True):
cmd = f"systemctl {command} {service}"
try:
subprocess.check_output(cmd.split(), stderr=subprocess.STDOUT,
encoding='utf-8')
except subprocess.CalledProcessError as e:
LOG.debug("Failed to %(cmd)s %(srvc)s service: "
"%(err)s %(out)s", {'cmd': command, 'srvc': service,
'err': e, 'out': e.output})
if raise_error:
raise
[docs]
def get_backend_for_lb_object(object_id):
"""Returns the backend for a listener.
If the listener is a TCP based listener return 'HAPROXY'.
If the listener is a UDP or SCTP based listener return 'LVS'
If the listener is not identifiable, return None.
:param listener_id: The ID of the listener to identify.
:returns: HAPROXY_BACKEND, LVS_BACKEND or None
"""
if os.path.exists(config_path(object_id)):
return consts.HAPROXY_BACKEND
if os.path.exists(keepalived_lvs_cfg_path(object_id)):
return consts.LVS_BACKEND
return None
[docs]
def parse_haproxy_file(lb_id):
with open(config_path(lb_id), encoding='utf-8') as file:
cfg = file.read()
listeners = {}
m = FRONTEND_BACKEND_PATTERN.split(cfg)
last_token = None
last_id = None
for section in m:
if last_token is None:
# We aren't in a section yet, see if this line starts one
if section == 'frontend':
last_token = section
elif last_token == 'frontend':
# We're in a frontend section, save the id for later
last_token = last_token + "_id"
last_id = section
elif last_token == 'frontend_id':
# We're in a frontend section and already have the id
# Look for the mode
mode_matcher = LISTENER_MODE_PATTERN.search(section)
if not mode_matcher:
raise ParsingError()
listeners[last_id] = {
'mode': mode_matcher.group(1).upper(),
}
# Now see if this is a TLS frontend
tls_matcher = TLS_CERT_PATTERN.search(section)
if tls_matcher:
# TODO(rm_work): Can't we have terminated tcp?
listeners[last_id]['mode'] = 'TERMINATED_HTTPS'
listeners[last_id]['ssl_crt'] = tls_matcher.group(1)
# Clear out the token and id and start over
last_token = last_id = None
m = STATS_SOCKET_PATTERN.search(cfg)
if not m:
raise ParsingError()
stats_socket = m.group(1)
return stats_socket, listeners
[docs]
def vrrp_check_script_update(lb_id, action):
os.makedirs(keepalived_dir(), exist_ok=True)
os.makedirs(keepalived_check_scripts_dir(), exist_ok=True)
lb_ids = get_loadbalancers()
lvs_ids = get_lvs_listeners()
# If no LBs are found, so make sure keepalived thinks haproxy is down.
if not lb_ids:
if not lvs_ids:
with open(haproxy_check_script_path(),
'w', encoding='utf-8') as text_file:
text_file.write('exit 1')
else:
try:
LOG.debug("Attempting to remove old haproxy check script...")
os.remove(haproxy_check_script_path())
LOG.debug("Finished removing old haproxy check script.")
except FileNotFoundError:
LOG.debug("No haproxy check script to remove.")
return
if action == consts.AMP_ACTION_STOP:
lb_ids.remove(lb_id)
args = []
for lbid in lb_ids:
args.append(haproxy_sock_path(lbid))
cmd = f"haproxy-vrrp-check {' '.join(args)}; exit $?"
with open(haproxy_check_script_path(), 'w', encoding='utf-8') as text_file:
text_file.write(cmd)
[docs]
def get_haproxy_vip_addresses(lb_id):
"""Get the VIP addresses for a load balancer.
:param lb_id: The load balancer ID to get VIP addresses from.
:returns: List of VIP addresses (IPv4 and IPv6)
"""
vips = []
with open(config_path(lb_id), encoding='utf-8') as file:
for line in file:
current_line = line.strip()
if current_line.startswith('bind'):
for section in current_line.split(' '):
# We will always have a port assigned per the template.
if ':' in section:
if ',' in section:
addr_port = section.rstrip(',')
vips.append(addr_port.rpartition(':')[0])
else:
vips.append(section.rpartition(':')[0])
break
return vips
[docs]
def get_lvs_vip_addresses(listener_id: str) -> list[str]:
"""Get the VIP addresses for a LVS load balancer.
:param listener_id: The listener ID to get VIP addresses from.
:returns: List of VIP addresses (IPv4 and IPv6)
"""
vips = []
# Extract the VIP addresses from keepalived configuration
# Format is
# virtual_server_group ipv<n>-group {
# vip_address1 port1
# vip_address2 port2
# }
# it can be repeated in case of dual-stack LBs
with open(keepalived_lvs_cfg_path(listener_id), encoding='utf-8') as file:
vsg_section = False
for line in file:
current_line = line.strip()
if vsg_section:
if current_line.startswith('}'):
vsg_section = False
else:
vip_address = current_line.split(' ')[0]
vips.append(vip_address)
elif line.startswith('virtual_server_group '):
vsg_section = True
return vips
[docs]
def send_vip_advertisements(lb_id: tp.Optional[str] = None,
listener_id: tp.Optional[str] = None):
"""Sends address advertisements for each load balancer VIP.
This method will send either GARP (IPv4) or neighbor advertisements (IPv6)
for the VIP addresses on a load balancer.
:param lb_id: The load balancer ID to send advertisements for.
:returns: None
"""
try:
if lb_id:
vips = get_haproxy_vip_addresses(lb_id)
else:
vips = get_lvs_vip_addresses(listener_id)
for vip in vips:
interface = network_utils.get_interface_name(
vip, net_ns=consts.AMPHORA_NAMESPACE)
ip_advertisement.send_ip_advertisement(
interface, vip, net_ns=consts.AMPHORA_NAMESPACE)
except Exception as e:
LOG.debug('Send VIP advertisement failed due to :%s. '
'This amphora may not be the MASTER. Ignoring.', str(e))
[docs]
def send_member_advertisements(fixed_ips: tp.Iterable[tp.Dict[str, str]]):
"""Sends advertisements for each fixed_ip of a list
This method will send either GARP (IPv4) or neighbor advertisements (IPv6)
for the addresses of the subnets of the members.
:param fixed_ips: a list of dicts that contain 'ip_address' elements
:returns: None
"""
try:
for fixed_ip in fixed_ips:
ip_address = fixed_ip[consts.IP_ADDRESS]
interface = network_utils.get_interface_name(
ip_address, net_ns=consts.AMPHORA_NAMESPACE)
ip_advertisement.send_ip_advertisement(
interface, ip_address, net_ns=consts.AMPHORA_NAMESPACE)
except Exception as e:
LOG.debug('Send member advertisement failed due to: %s', str(e))