Source code for octavia.amphorae.backends.agent.api_server.server

# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import os
import stat

import flask
from jsonschema import validate
from oslo_config import cfg
from oslo_log import log as logging
import webob
from werkzeug import exceptions

from octavia.amphorae.backends.agent import api_server
from octavia.amphorae.backends.agent.api_server import amphora_info
from octavia.amphorae.backends.agent.api_server import certificate_update
from octavia.amphorae.backends.agent.api_server import keepalived
from octavia.amphorae.backends.agent.api_server import keepalivedlvs
from octavia.amphorae.backends.agent.api_server import loadbalancer
from octavia.amphorae.backends.agent.api_server import osutils
from octavia.amphorae.backends.agent.api_server import plug
from octavia.amphorae.backends.agent.api_server import rules_schema
from octavia.amphorae.backends.agent.api_server import util
from octavia.amphorae.backends.utils import nftable_utils
from octavia.common import constants as consts


BUFFER = 1024
CONF = cfg.CONF
PATH_PREFIX = '/' + api_server.VERSION
LOG = logging.getLogger(__name__)


# make the error pages all json
[docs] def make_json_error(ex): code = ex.code if isinstance(ex, exceptions.HTTPException) else 500 response = webob.Response(json={'error': str(ex), 'http_code': code}) response.status_code = code return response
[docs] def register_app_error_handler(app): for code in exceptions.default_exceptions: app.register_error_handler(code, make_json_error)
[docs] class Server: def __init__(self): self.app = flask.Flask(__name__) self._osutils = osutils.BaseOS.get_os_util() self._keepalived = keepalived.Keepalived() self._loadbalancer = loadbalancer.Loadbalancer() self._lvs_listener = keepalivedlvs.KeepalivedLvs() self._plug = plug.Plug(self._osutils) self._amphora_info = amphora_info.AmphoraInfo(self._osutils) register_app_error_handler(self.app) self._plug.plug_lo() self.app.add_url_rule(rule='/', view_func=self.version_discovery, methods=['GET']) self.app.add_url_rule(rule=PATH_PREFIX + '/loadbalancer/<amphora_id>/<lb_id>/haproxy', view_func=self.upload_haproxy_config, methods=['PUT']) # TODO(gthiemonge) rename 'udp_listener' endpoint to 'lvs_listener' # when api_version is bumped self.app.add_url_rule(rule=PATH_PREFIX + '/listeners/<amphora_id>/<listener_id>' '/udp_listener', view_func=self.upload_lvs_listener_config, methods=['PUT']) self.app.add_url_rule(rule=PATH_PREFIX + '/loadbalancer/<lb_id>/haproxy', view_func=self.get_haproxy_config, methods=['GET']) # TODO(gthiemonge) rename 'udp_listener' endpoint to 'lvs_listener' # when api_version is bumped self.app.add_url_rule(rule=PATH_PREFIX + '/listeners/<listener_id>/udp_listener', view_func=self.get_lvs_listener_config, methods=['GET']) self.app.add_url_rule(rule=PATH_PREFIX + '/loadbalancer/<object_id>/<action>', view_func=self.start_stop_lb_object, methods=['PUT']) self.app.add_url_rule(rule=PATH_PREFIX + '/listeners/<object_id>', view_func=self.delete_lb_object, methods=['DELETE']) self.app.add_url_rule(rule=PATH_PREFIX + '/config', view_func=self.upload_config, methods=['PUT']) self.app.add_url_rule(rule=PATH_PREFIX + '/details', view_func=self.get_details, methods=['GET']) self.app.add_url_rule(rule=PATH_PREFIX + '/info', view_func=self.get_info, methods=['GET']) self.app.add_url_rule(rule=PATH_PREFIX + '/listeners', view_func=self.get_all_listeners_status, methods=['GET']) self.app.add_url_rule(rule=PATH_PREFIX + '/loadbalancer/<lb_id>' '/certificates/<filename>', view_func=self.upload_certificate, methods=['PUT']) self.app.add_url_rule(rule=PATH_PREFIX + '/loadbalancer/<lb_id>' '/certificates/<filename>', view_func=self.get_certificate_md5, methods=['GET']) self.app.add_url_rule(rule=PATH_PREFIX + '/loadbalancer/<lb_id>' '/certificates/<filename>', view_func=self.delete_certificate, methods=['DELETE']) self.app.add_url_rule(rule=PATH_PREFIX + '/plug/vip/<vip>', view_func=self.plug_vip, methods=['POST']) self.app.add_url_rule(rule=PATH_PREFIX + '/plug/network', view_func=self.plug_network, methods=['POST']) self.app.add_url_rule(rule=PATH_PREFIX + '/certificate', view_func=self.upload_cert, methods=['PUT']) self.app.add_url_rule(rule=PATH_PREFIX + '/vrrp/upload', view_func=self.upload_vrrp_config, methods=['PUT']) self.app.add_url_rule(rule=PATH_PREFIX + '/vrrp/<action>', view_func=self.manage_service_vrrp, methods=['PUT']) self.app.add_url_rule(rule=PATH_PREFIX + '/interface/<ip_addr>', view_func=self.get_interface, methods=['GET']) self.app.add_url_rule(rule=PATH_PREFIX + '/interface/<ip_addr>/rules', view_func=self.set_interface_rules, methods=['PUT'])
[docs] def upload_haproxy_config(self, amphora_id, lb_id): return self._loadbalancer.upload_haproxy_config(amphora_id, lb_id)
[docs] def upload_lvs_listener_config(self, amphora_id, listener_id): return self._lvs_listener.upload_lvs_listener_config(listener_id)
[docs] def get_haproxy_config(self, lb_id): return self._loadbalancer.get_haproxy_config(lb_id)
[docs] def get_lvs_listener_config(self, listener_id): return self._lvs_listener.get_lvs_listener_config(listener_id)
[docs] def start_stop_lb_object(self, object_id, action): backend = util.get_backend_for_lb_object(object_id) if backend == consts.LVS_BACKEND: return self._lvs_listener.manage_lvs_listener( listener_id=object_id, action=action) return self._loadbalancer.start_stop_lb(lb_id=object_id, action=action)
[docs] def delete_lb_object(self, object_id): backend = util.get_backend_for_lb_object(object_id) if backend == consts.LVS_BACKEND: return self._lvs_listener.delete_lvs_listener(object_id) return self._loadbalancer.delete_lb(object_id)
[docs] def get_details(self): return self._amphora_info.compile_amphora_details( extend_lvs_driver=self._lvs_listener)
[docs] def get_info(self): return self._amphora_info.compile_amphora_info( extend_lvs_driver=self._lvs_listener)
[docs] def get_all_listeners_status(self): lvs_listeners = self._lvs_listener.get_all_lvs_listeners_status() return self._loadbalancer.get_all_listeners_status( other_listeners=lvs_listeners)
[docs] def upload_certificate(self, lb_id, filename): return self._loadbalancer.upload_certificate(lb_id, filename)
[docs] def get_certificate_md5(self, lb_id, filename): return self._loadbalancer.get_certificate_md5(lb_id, filename)
[docs] def delete_certificate(self, lb_id, filename): return self._loadbalancer.delete_certificate(lb_id, filename)
[docs] def plug_vip(self, vip): # Catch any issues with the subnet info json try: net_info = flask.request.get_json() assert type(net_info) is dict assert 'subnet_cidr' in net_info assert 'gateway' in net_info assert 'mac_address' in net_info except Exception as e: raise exceptions.BadRequest( description='Invalid subnet information') from e return self._plug.plug_vip(vip, net_info['subnet_cidr'], net_info['gateway'], net_info['mac_address'], net_info.get('mtu'), net_info.get('vrrp_ip'), net_info.get('host_routes', ()), net_info.get('additional_vips', ()), net_info.get('is_sriov', False))
[docs] def plug_network(self): try: port_info = flask.request.get_json() assert type(port_info) is dict assert 'mac_address' in port_info except Exception as e: raise exceptions.BadRequest( description='Invalid port information') from e return self._plug.plug_network(port_info['mac_address'], port_info.get('fixed_ips'), port_info.get('mtu'), port_info.get('vip_net_info'))
[docs] def upload_cert(self): return certificate_update.upload_server_cert()
[docs] def upload_vrrp_config(self): return self._keepalived.upload_keepalived_config()
[docs] def manage_service_vrrp(self, action): return self._keepalived.manager_keepalived_service(action)
[docs] def get_interface(self, ip_addr): return self._amphora_info.get_interface(ip_addr)
[docs] def upload_config(self): try: stream = flask.request.stream file_path = cfg.find_config_files(project=CONF.project, prog=CONF.prog)[0] flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC # mode 00600 mode = stat.S_IRUSR | stat.S_IWUSR with os.fdopen(os.open(file_path, flags, mode), 'wb') as cfg_file: b = stream.read(BUFFER) while b: cfg_file.write(b) b = stream.read(BUFFER) CONF.mutate_config_files() except Exception as e: LOG.error("Unable to update amphora-agent configuration: %s", str(e)) return webob.Response(json={ 'message': "Unable to update amphora-agent configuration.", 'details': str(e)}, status=500) return webob.Response(json={'message': 'OK'}, status=202)
[docs] def version_discovery(self): return webob.Response(json={'api_version': api_server.VERSION})
[docs] def set_interface_rules(self, ip_addr): interface_webob = self._amphora_info.get_interface(ip_addr) if interface_webob.status_code != 200: return interface_webob interface = interface_webob.json['interface'] try: rules_info = flask.request.get_json() validate(rules_info, rules_schema.SUPPORTED_RULES_SCHEMA) except Exception as e: raise exceptions.BadRequest( description='Invalid rules information') from e nftable_utils.write_nftable_vip_rules_file(interface, rules_info) nftable_utils.load_nftables_file() return webob.Response(json={'message': 'OK'}, status=200)