# Copyright 2011-2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import re
import shutil
import stat
import subprocess
import flask
import jinja2
from oslo_config import cfg
from oslo_log import log as logging
import webob
from werkzeug import exceptions
from octavia.amphorae.backends.agent.api_server import loadbalancer
from octavia.amphorae.backends.agent.api_server import lvs_listener_base
from octavia.amphorae.backends.agent.api_server import util
from octavia.common import constants as consts
BUFFER = 100
CHECK_SCRIPT_NAME = 'udp_check.sh'
CONF = cfg.CONF
KEEPALIVED_CHECK_SCRIPT_NAME = 'lvs_udp_check.sh'
LOG = logging.getLogger(__name__)
j2_env = jinja2.Environment(autoescape=True, loader=jinja2.FileSystemLoader(
os.path.dirname(os.path.realpath(__file__)) + consts.AGENT_API_TEMPLATES))
SYSTEMD_TEMPLATE = j2_env.get_template(consts.KEEPALIVED_JINJA2_SYSTEMD)
check_script_file_template = j2_env.get_template(
consts.KEEPALIVED_CHECK_SCRIPT)
[docs]
class KeepalivedLvs(lvs_listener_base.LvsListenerApiServerBase):
_SUBSCRIBED_AMP_COMPILE = ['keepalived', 'ipvsadm']
[docs]
def upload_lvs_listener_config(self, listener_id):
stream = loadbalancer.Wrapped(flask.request.stream)
NEED_CHECK = True
if not os.path.exists(util.keepalived_lvs_dir()):
os.makedirs(util.keepalived_lvs_dir())
if not os.path.exists(util.keepalived_backend_check_script_dir()):
current_file_dir, _ = os.path.split(os.path.abspath(__file__))
try:
script_dir = os.path.join(os.path.abspath(
os.path.join(current_file_dir, '../..')), 'utils')
assert True is os.path.exists(script_dir)
assert True is os.path.exists(os.path.join(
script_dir, CHECK_SCRIPT_NAME))
except Exception as e:
raise exceptions.Conflict(
description='%(file_name)s not Found for '
'UDP Listener %(listener_id)s' %
{'file_name': CHECK_SCRIPT_NAME,
'listener_id': listener_id}) from e
os.makedirs(util.keepalived_backend_check_script_dir())
shutil.copy2(os.path.join(script_dir, CHECK_SCRIPT_NAME),
util.keepalived_backend_check_script_path())
os.chmod(util.keepalived_backend_check_script_path(), stat.S_IEXEC)
# Based on current topology setting, only the amphora instances in
# Active-Standby topology will create the directory below. So for
# Single topology, it should not create the directory and the check
# scripts for status change.
if (CONF.controller_worker.loadbalancer_topology !=
consts.TOPOLOGY_ACTIVE_STANDBY):
NEED_CHECK = False
conf_file = util.keepalived_lvs_cfg_path(listener_id)
flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC
# mode 00644
mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH
with os.fdopen(os.open(conf_file, flags, mode), 'wb') as f:
b = stream.read(BUFFER)
while b:
f.write(b)
b = stream.read(BUFFER)
file_path = util.keepalived_lvs_init_path(listener_id)
template = SYSTEMD_TEMPLATE
# Render and install the network namespace systemd service
util.install_netns_systemd_service()
util.run_systemctl_command(
consts.ENABLE, consts.AMP_NETNS_SVC_PREFIX, False)
# Render and install the keepalivedlvs init script
# mode 00644
mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH
keepalived_pid, vrrp_pid, check_pid = util.keepalived_lvs_pids_path(
listener_id)
if not os.path.exists(file_path):
with os.fdopen(os.open(file_path, flags, mode), 'w') as text_file:
text = template.render(
keepalived_pid=keepalived_pid,
vrrp_pid=vrrp_pid,
check_pid=check_pid,
keepalived_cmd=consts.KEEPALIVED_CMD,
keepalived_cfg=util.keepalived_lvs_cfg_path(listener_id),
amphora_nsname=consts.AMPHORA_NAMESPACE,
amphora_netns=consts.AMP_NETNS_SVC_PREFIX,
administrative_log_facility=(
CONF.amphora_agent.administrative_log_facility),
)
text_file.write(text)
# Make sure the keepalivedlvs service is enabled on boot
try:
util.run_systemctl_command(
consts.ENABLE,
consts.KEEPALIVEDLVS_SYSTEMD % listener_id)
except subprocess.CalledProcessError as e:
return webob.Response(json={
'message': ("Error enabling "
"octavia-keepalivedlvs service"),
'details': e.output}, status=500)
if NEED_CHECK:
# inject the check script for keepalived process
script_path = os.path.join(util.keepalived_check_scripts_dir(),
KEEPALIVED_CHECK_SCRIPT_NAME)
if not os.path.exists(script_path):
if not os.path.exists(util.keepalived_check_scripts_dir()):
os.makedirs(util.keepalived_check_scripts_dir())
with os.fdopen(os.open(script_path, flags, stat.S_IEXEC),
'w') as script_file:
text = check_script_file_template.render(
consts=consts,
keepalived_lvs_pid_dir=util.keepalived_lvs_dir()
)
script_file.write(text)
util.vrrp_check_script_update(None, consts.AMP_ACTION_START)
res = webob.Response(json={'message': 'OK'}, status=200)
res.headers['ETag'] = stream.get_md5()
return res
def _check_lvs_listener_exists(self, listener_id):
if not os.path.exists(util.keepalived_lvs_cfg_path(listener_id)):
raise exceptions.HTTPException(
response=webob.Response(json={
'message': 'UDP Listener Not Found',
'details': f"No UDP listener with UUID: {listener_id}"},
status=404))
[docs]
def get_lvs_listener_config(self, listener_id):
"""Gets the keepalivedlvs config
:param listener_id: the id of the listener
"""
self._check_lvs_listener_exists(listener_id)
with open(util.keepalived_lvs_cfg_path(listener_id),
encoding='utf-8') as file:
cfg = file.read()
resp = webob.Response(cfg, content_type='text/plain')
return resp
[docs]
def manage_lvs_listener(self, listener_id, action):
action = action.lower()
if action not in [consts.AMP_ACTION_START,
consts.AMP_ACTION_STOP,
consts.AMP_ACTION_RELOAD]:
return webob.Response(json={
'message': 'Invalid Request',
'details': f"Unknown action: {action}"}, status=400)
# When octavia requests a reload of keepalived, force a restart since
# a keepalived reload doesn't restore members in their initial state.
#
# TODO(gthiemonge) remove this when keepalived>=2.0.14 is widely use
if action == consts.AMP_ACTION_RELOAD:
action = consts.AMP_ACTION_RESTART
self._check_lvs_listener_exists(listener_id)
if action == consts.AMP_ACTION_RELOAD:
if consts.OFFLINE == self._check_lvs_listener_status(listener_id):
action = consts.AMP_ACTION_START
try:
util.run_systemctl_command(
action, consts.KEEPALIVEDLVS_SYSTEMD % listener_id)
except subprocess.CalledProcessError as e:
return webob.Response(json={
'message': (f"Failed to {action} keepalivedlvs listener "
f"{listener_id}"),
'details': e.output}, status=500)
is_vrrp = (CONF.controller_worker.loadbalancer_topology ==
consts.TOPOLOGY_ACTIVE_STANDBY)
# TODO(gthiemonge) remove RESTART from the list (same as previous todo
# in this function)
if not is_vrrp and action in [consts.AMP_ACTION_START,
consts.AMP_ACTION_RESTART,
consts.AMP_ACTION_RELOAD]:
util.send_vip_advertisements(listener_id=listener_id)
return webob.Response(
json={'message': 'OK',
'details': (f'keepalivedlvs listener {listener_id} '
f'{action}ed')},
status=202)
def _check_lvs_listener_status(self, listener_id):
if os.path.exists(util.keepalived_lvs_pids_path(listener_id)[0]):
if os.path.exists(os.path.join(
'/proc', util.get_keepalivedlvs_pid(listener_id))):
# Check if the listener is disabled
with open(util.keepalived_lvs_cfg_path(listener_id),
encoding='utf-8') as file:
cfg = file.read()
m = re.search('virtual_server', cfg)
if m:
return consts.ACTIVE
return consts.OFFLINE
return consts.ERROR
return consts.OFFLINE
[docs]
def get_all_lvs_listeners_status(self):
"""Gets the status of all UDP listeners
Gets the status of all UDP listeners on the amphora.
"""
listeners = []
for lvs_listener in util.get_lvs_listeners():
status = self._check_lvs_listener_status(lvs_listener)
listeners.append({
'status': status,
'uuid': lvs_listener,
'type': 'UDP',
})
return listeners
[docs]
def delete_lvs_listener(self, listener_id):
try:
self._check_lvs_listener_exists(listener_id)
except exceptions.HTTPException:
return webob.Response(json={'message': 'OK'})
# check if that keepalived is still running and if stop it
keepalived_pid, vrrp_pid, check_pid = util.keepalived_lvs_pids_path(
listener_id)
if os.path.exists(keepalived_pid) and os.path.exists(
os.path.join('/proc',
util.get_keepalivedlvs_pid(listener_id))):
try:
util.run_systemctl_command(
consts.STOP,
consts.KEEPALIVEDLVS_SYSTEMD % listener_id)
except subprocess.CalledProcessError as e:
LOG.error("Failed to stop keepalivedlvs service: %s", e)
return webob.Response(json={
'message': "Error stopping keepalivedlvs",
'details': e.output}, status=500)
# Since the lvs check script based on the keepalived pid file for
# checking whether it is alived. So here, we had stop the keepalived
# process by the previous step, must make sure the pid files are not
# exist.
if (os.path.exists(keepalived_pid) or
os.path.exists(vrrp_pid) or os.path.exists(check_pid)):
for pid in [keepalived_pid, vrrp_pid, check_pid]:
os.remove(pid)
# disable the service
init_path = util.keepalived_lvs_init_path(listener_id)
try:
util.run_systemctl_command(
consts.DISABLE,
consts.KEEPALIVEDLVS_SYSTEMD % listener_id)
except subprocess.CalledProcessError as e:
LOG.error("Failed to disable "
"octavia-keepalivedlvs-%(list)s service: "
"%(err)s", {'list': listener_id, 'err': str(e)})
return webob.Response(json={
'message': (
f"Error disabling octavia-keepalivedlvs-{listener_id} "
"service"),
'details': e.output}, status=500)
# delete init script ,config file and log file for that listener
if os.path.exists(init_path):
os.remove(init_path)
if os.path.exists(util.keepalived_lvs_cfg_path(listener_id)):
os.remove(util.keepalived_lvs_cfg_path(listener_id))
return webob.Response(json={'message': 'OK'})