Source code for octavia.amphorae.backends.agent.api_server.loadbalancer

# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import hashlib
import io
import os
import re
import shutil
import stat
import subprocess
import time

import flask
import jinja2
from oslo_config import cfg
from oslo_log import log as logging
import webob
from werkzeug import exceptions

from octavia.amphorae.backends.agent.api_server import haproxy_compatibility
from octavia.amphorae.backends.agent.api_server import util
from octavia.amphorae.backends.utils import haproxy_query
from octavia.common import constants as consts
from octavia.common import utils as octavia_utils

LOG = logging.getLogger(__name__)
BUFFER = 100
HAPROXY_RELOAD_RETRIES = 3
HAPROXY_QUERY_RETRIES = 5

CONF = cfg.CONF

SYSTEMD_CONF = 'systemd.conf.j2'

JINJA_ENV = jinja2.Environment(
    autoescape=True,
    loader=jinja2.FileSystemLoader(os.path.dirname(
        os.path.realpath(__file__)
    ) + consts.AGENT_API_TEMPLATES))
SYSTEMD_TEMPLATE = JINJA_ENV.get_template(SYSTEMD_CONF)


# Wrap a stream so we can compute the md5 while reading
[docs] class Wrapped: def __init__(self, stream_): self.stream = stream_ self.hash = hashlib.md5(usedforsecurity=False) # nosec
[docs] def read(self, line): block = self.stream.read(line) if block: self.hash.update(block) return block
[docs] def get_md5(self): return self.hash.hexdigest()
def __getattr__(self, attr): return getattr(self.stream, attr)
[docs] class Loadbalancer:
[docs] def get_haproxy_config(self, lb_id): """Gets the haproxy config :param listener_id: the id of the listener """ self._check_lb_exists(lb_id) with open(util.config_path(lb_id), encoding='utf-8') as file: cfg = file.read() resp = webob.Response(cfg, content_type='text/plain') resp.headers['ETag'] = ( hashlib.md5(octavia_utils.b(cfg), usedforsecurity=False).hexdigest()) # nosec return resp
[docs] def upload_haproxy_config(self, amphora_id, lb_id): """Upload the haproxy config :param amphora_id: The id of the amphora to update :param lb_id: The id of the loadbalancer """ stream = Wrapped(flask.request.stream) # We have to hash here because HAProxy has a string length limitation # in the configuration file "peer <peername>" lines peer_name = octavia_utils.base64_sha1_string(amphora_id).rstrip('=') if not os.path.exists(util.haproxy_dir(lb_id)): os.makedirs(util.haproxy_dir(lb_id)) name = os.path.join(util.haproxy_dir(lb_id), 'haproxy.cfg.new') flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC # mode 00600 mode = stat.S_IRUSR | stat.S_IWUSR b = stream.read(BUFFER) s_io = io.StringIO() while b: # Write haproxy configuration to StringIO s_io.write(b.decode('utf8')) b = stream.read(BUFFER) # Since haproxy user_group is now auto-detected by the amphora agent, # remove it from haproxy configuration in case it was provided # by an older Octavia controller. This is needed in order to prevent # a duplicate entry for 'group' in haproxy configuration, which will # result an error when haproxy starts. new_config = re.sub(r"\s+group\s.+", "", s_io.getvalue()) # Handle any haproxy version compatibility issues new_config = haproxy_compatibility.process_cfg_for_version_compat( new_config) with os.fdopen(os.open(name, flags, mode), 'w') as file: file.write(new_config) # use haproxy to check the config cmd = (f"haproxy -c -L {peer_name} -f {name} -f " f"{consts.HAPROXY_USER_GROUP_CFG}") try: subprocess.check_output(cmd.split(), stderr=subprocess.STDOUT, encoding='utf-8') except subprocess.CalledProcessError as e: LOG.error("Failed to verify haproxy file: %s %s", e, e.output) # Save the last config that failed validation for debugging os.rename(name, ''.join([name, '-failed'])) return webob.Response( json={'message': "Invalid request", 'details': e.output}, status=400) # file ok - move it os.rename(name, util.config_path(lb_id)) init_path = util.init_path(lb_id) template = SYSTEMD_TEMPLATE # Render and install the network namespace systemd service util.install_netns_systemd_service() util.run_systemctl_command( consts.ENABLE, consts.AMP_NETNS_SVC_PREFIX + '.service', False) # mode 00644 mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH hap_major, hap_minor = haproxy_compatibility.get_haproxy_versions() if not os.path.exists(init_path): with os.fdopen(os.open(init_path, flags, mode), 'w') as text_file: text = template.render( peer_name=peer_name, haproxy_pid=util.pid_path(lb_id), haproxy_cmd=util.CONF.haproxy_amphora.haproxy_cmd, haproxy_cfg=util.config_path(lb_id), haproxy_state_file=util.state_file_path(lb_id), haproxy_socket=util.haproxy_sock_path(lb_id), haproxy_user_group_cfg=consts.HAPROXY_USER_GROUP_CFG, amphora_netns=consts.AMP_NETNS_SVC_PREFIX, amphora_nsname=consts.AMPHORA_NAMESPACE, haproxy_major_version=hap_major, haproxy_minor_version=hap_minor ) text_file.write(text) # Make sure the new service is enabled on boot try: util.run_systemctl_command( consts.ENABLE, consts.LOADBALANCER_SYSTEMD % lb_id) except subprocess.CalledProcessError as e: return webob.Response(json={ 'message': "Error enabling octavia-keepalived service", 'details': e.output}, status=500) res = webob.Response(json={'message': 'OK'}, status=202) res.headers['ETag'] = stream.get_md5() return res
def _check_haproxy_uptime(self, lb_id): stat_sock_file = util.haproxy_sock_path(lb_id) lb_query = haproxy_query.HAProxyQuery(stat_sock_file) retries = HAPROXY_QUERY_RETRIES for idx in range(retries): try: info = lb_query.show_info() uptime_sec = info['Uptime_sec'] except Exception as e: LOG.warning('Failed to get haproxy info: %s, retrying.', e) time.sleep(1) continue uptime = int(uptime_sec) return uptime LOG.error('Failed to get haproxy uptime after %d tries.', retries) return None
[docs] def start_stop_lb(self, lb_id, action): action = action.lower() if action not in [consts.AMP_ACTION_START, consts.AMP_ACTION_STOP, consts.AMP_ACTION_RELOAD]: return webob.Response(json={ 'message': 'Invalid Request', 'details': f"Unknown action: {action}"}, status=400) self._check_lb_exists(lb_id) is_vrrp = (CONF.controller_worker.loadbalancer_topology == consts.TOPOLOGY_ACTIVE_STANDBY) if is_vrrp: util.vrrp_check_script_update(lb_id, action) # HAProxy does not start the process when given a reload # so start it if haproxy is not already running if action == consts.AMP_ACTION_RELOAD: if consts.OFFLINE == self._check_haproxy_status(lb_id): action = consts.AMP_ACTION_START else: # We first have to save the state when we reload haproxy_state_file = util.state_file_path(lb_id) stat_sock_file = util.haproxy_sock_path(lb_id) lb_query = haproxy_query.HAProxyQuery(stat_sock_file) if not lb_query.save_state(haproxy_state_file): # We accept to reload haproxy even if the state_file is # not generated, but we probably want to know about that # failure! LOG.warning('Failed to save haproxy-%s state!', lb_id) retries = (HAPROXY_RELOAD_RETRIES if action == consts.AMP_ACTION_RELOAD else 1) saved_exc = None for idx in range(retries): try: util.run_systemctl_command( action, consts.LOADBALANCER_SYSTEMD % lb_id) except subprocess.CalledProcessError as e: # Mitigation for # https://bugs.launchpad.net/octavia/+bug/2054666 if ('is not active, cannot reload.' in e.output and action == consts.AMP_ACTION_RELOAD): saved_exc = e # Wait a few seconds and check that haproxy was restarted uptime = self._check_haproxy_uptime(lb_id) # If haproxy is not reachable or was restarted more than 15 # sec ago, let's retry (or maybe restart?) if not uptime or uptime > 15: continue # haproxy probably crashed and was restarted, log it and # continue LOG.warning("An error occured with haproxy while it " "was reloaded, check the haproxy logs for " "more details.") break if 'Job is already running' not in e.output: return webob.Response(json={ 'message': f"Error {action}ing haproxy", 'details': e.output }, status=500) break else: # no break, we reach the retry limit for reloads return webob.Response(json={ 'message': f"Error {action}ing haproxy", 'details': saved_exc.output}, status=500) # If we are not in active/standby we need to send an IP # advertisement (GARP or NA). Keepalived handles this for # active/standby load balancers. if not is_vrrp and action in [consts.AMP_ACTION_START, consts.AMP_ACTION_RELOAD]: util.send_vip_advertisements(lb_id) if action in [consts.AMP_ACTION_STOP, consts.AMP_ACTION_RELOAD]: return webob.Response(json={ 'message': 'OK', 'details': f'Listener {lb_id} {action}ed'}, status=202) details = ( f'Configuration file is valid\nhaproxy daemon for {lb_id} started' ) return webob.Response(json={'message': 'OK', 'details': details}, status=202)
[docs] def delete_lb(self, lb_id): try: self._check_lb_exists(lb_id) except exceptions.HTTPException: return webob.Response(json={'message': 'OK'}) # check if that haproxy is still running and if stop it if os.path.exists(util.pid_path(lb_id)) and os.path.exists( os.path.join('/proc', util.get_haproxy_pid(lb_id))): try: util.run_systemctl_command( consts.STOP, consts.LOADBALANCER_SYSTEMD % lb_id) except subprocess.CalledProcessError as e: LOG.error("Failed to stop haproxy-%s service: %s %s", lb_id, e, e.output) return webob.Response(json={ 'message': "Error stopping haproxy", 'details': e.output}, status=500) # parse config and delete stats socket try: stats_socket = util.parse_haproxy_file(lb_id)[0] os.remove(stats_socket) except Exception: pass # Since this script should be deleted at LB delete time # we can check for this path to see if VRRP is enabled # on this amphora and not write the file if VRRP is not in use if os.path.exists(util.keepalived_check_script_path()): util.vrrp_check_script_update( lb_id, action=consts.AMP_ACTION_STOP) # delete the ssl files try: shutil.rmtree(self._cert_dir(lb_id)) except Exception: pass # disable the service init_path = util.init_path(lb_id) util.run_systemctl_command( consts.DISABLE, consts.LOADBALANCER_SYSTEMD % lb_id, False) # delete the directory + init script for that listener shutil.rmtree(util.haproxy_dir(lb_id)) if os.path.exists(init_path): os.remove(init_path) return webob.Response(json={'message': 'OK'})
[docs] def get_all_listeners_status(self, other_listeners=None): """Gets the status of all listeners This method will not consult the stats socket so a listener might show as ACTIVE but still be in ERROR Currently type==SSL is also not detected """ listeners = [] for lb in util.get_loadbalancers(): stats_socket, listeners_on_lb = util.parse_haproxy_file(lb) for listener_id, listener in listeners_on_lb.items(): listeners.append({ 'status': consts.ACTIVE, 'uuid': listener_id, 'type': listener['mode'], }) if other_listeners: listeners = listeners + other_listeners return webob.Response(json=listeners, content_type='application/json')
[docs] def upload_certificate(self, lb_id, filename): self._check_ssl_filename_format(filename) # create directory if not already there if not os.path.exists(self._cert_dir(lb_id)): os.makedirs(self._cert_dir(lb_id)) stream = Wrapped(flask.request.stream) file = self._cert_file_path(lb_id, filename) flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC # mode 00600 mode = stat.S_IRUSR | stat.S_IWUSR with os.fdopen(os.open(file, flags, mode), 'wb') as crt_file: b = stream.read(BUFFER) while b: crt_file.write(b) b = stream.read(BUFFER) resp = webob.Response(json={'message': 'OK'}) resp.headers['ETag'] = stream.get_md5() return resp
[docs] def get_certificate_md5(self, lb_id, filename): self._check_ssl_filename_format(filename) cert_path = self._cert_file_path(lb_id, filename) path_exists = os.path.exists(cert_path) if not path_exists: return webob.Response(json={ 'message': 'Certificate Not Found', 'details': f"No certificate with filename: {filename}"}, status=404) with open(cert_path, encoding='utf-8') as crt_file: cert = crt_file.read() md5sum = hashlib.md5(octavia_utils.b(cert), usedforsecurity=False).hexdigest() # nosec resp = webob.Response(json={'md5sum': md5sum}) resp.headers['ETag'] = md5sum return resp
[docs] def delete_certificate(self, lb_id, filename): self._check_ssl_filename_format(filename) if os.path.exists(self._cert_file_path(lb_id, filename)): os.remove(self._cert_file_path(lb_id, filename)) return webob.Response(json={'message': 'OK'})
def _get_listeners_on_lb(self, lb_id): if os.path.exists(util.pid_path(lb_id)): if os.path.exists( os.path.join('/proc', util.get_haproxy_pid(lb_id))): # Check if the listener is disabled with open(util.config_path(lb_id), encoding='utf-8') as file: cfg = file.read() m = re.findall('^frontend (.*)$', cfg, re.MULTILINE) return m or [] else: # pid file but no process... return [] else: return [] def _check_lb_exists(self, lb_id): # check if we know about that lb if lb_id not in util.get_loadbalancers(): raise exceptions.HTTPException( response=webob.Response(json={ 'message': 'Loadbalancer Not Found', 'details': f"No loadbalancer with UUID: {lb_id}"}, status=404)) def _check_ssl_filename_format(self, filename): # check if the format is (xxx.)*xxx.pem if not re.search(r'(\w.)+pem', filename): raise exceptions.HTTPException( response=webob.Response(json={ 'message': 'Filename has wrong format'}, status=400)) def _cert_dir(self, lb_id): return os.path.join(util.CONF.haproxy_amphora.base_cert_dir, lb_id) def _cert_file_path(self, lb_id, filename): return os.path.join(self._cert_dir(lb_id), filename) def _check_haproxy_status(self, lb_id): if os.path.exists(util.pid_path(lb_id)): if os.path.exists( os.path.join('/proc', util.get_haproxy_pid(lb_id))): return consts.ACTIVE return consts.OFFLINE