Source code for octavia.amphorae.backends.agent.api_server.amphora_info

# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
#    under the License.

import os
import re
import socket
import subprocess

from oslo_log import log as logging
import pyroute2
import webob

from octavia.amphorae.backends.agent import api_server
from octavia.amphorae.backends.agent.api_server import util
from octavia.amphorae.backends.utils import network_utils
from octavia.common import constants as consts
from octavia.common import exceptions

LOG = logging.getLogger(__name__)


[docs] class AmphoraInfo: def __init__(self, osutils): self._osutils = osutils
[docs] def compile_amphora_info(self, extend_lvs_driver=None): extend_body = {} if extend_lvs_driver: extend_body = self._get_extend_body_from_lvs_driver( extend_lvs_driver) body = {'hostname': socket.gethostname(), 'haproxy_version': self._get_version_of_installed_package('haproxy'), 'api_version': api_server.VERSION} if extend_body: body.update(extend_body) return webob.Response(json=body)
[docs] def compile_amphora_details(self, extend_lvs_driver=None): haproxy_loadbalancer_list = sorted(util.get_loadbalancers()) haproxy_listener_list = sorted(util.get_listeners()) extend_body = {} lvs_listener_list = [] if extend_lvs_driver: lvs_listener_list = util.get_lvs_listeners() extend_data = self._get_extend_body_from_lvs_driver( extend_lvs_driver) lvs_count = self._count_lvs_listener_processes( extend_lvs_driver, lvs_listener_list) extend_body['lvs_listener_process_count'] = lvs_count extend_body.update(extend_data) meminfo = self._get_meminfo() cpu = self._cpu() st = os.statvfs('/') listeners = ( sorted(set(haproxy_listener_list + lvs_listener_list)) if lvs_listener_list else haproxy_listener_list) body = {'hostname': socket.gethostname(), 'haproxy_version': self._get_version_of_installed_package('haproxy'), 'api_version': api_server.VERSION, 'networks': self._get_networks(), 'active': True, 'haproxy_count': self._count_haproxy_processes(haproxy_loadbalancer_list), 'cpu_count': os.cpu_count(), 'cpu': { 'total': cpu['total'], 'user': cpu['user'], 'system': cpu['system'], 'soft_irq': cpu['softirq'], }, 'memory': { 'total': meminfo['MemTotal'], 'free': meminfo['MemFree'], 'buffers': meminfo['Buffers'], 'cached': meminfo['Cached'], 'swap_used': meminfo['SwapCached'], 'shared': meminfo['Shmem'], 'slab': meminfo['Slab'], }, 'disk': { 'used': (st.f_blocks - st.f_bfree) * st.f_frsize, 'available': st.f_bavail * st.f_frsize}, 'load': self._load(), 'active_tuned_profiles': self._get_active_tuned_profiles(), 'topology': consts.TOPOLOGY_SINGLE, 'topology_status': consts.TOPOLOGY_STATUS_OK, 'listeners': listeners, 'packages': {}} if extend_body: body.update(extend_body) return webob.Response(json=body)
def _get_version_of_installed_package(self, name): cmd = self._osutils.cmd_get_version_of_installed_package(name) version = subprocess.check_output(cmd.split(), encoding='utf-8') return version def _count_haproxy_processes(self, lb_list): num = 0 for lb_id in lb_list: if util.is_lb_running(lb_id): # optional check if it's still running num += 1 return num def _count_lvs_listener_processes(self, lvs_driver, listener_list): num = 0 for listener_id in listener_list: if util.is_lvs_listener_running(listener_id): # optional check if it's still running num += 1 return num def _get_extend_body_from_lvs_driver(self, extend_lvs_driver): extend_info = extend_lvs_driver.get_subscribed_amp_compile_info() extend_data = {} for extend in extend_info: package_version = self._get_version_of_installed_package(extend) extend_data[f'{extend}_version'] = package_version return extend_data def _get_meminfo(self): re_parser = re.compile(r'^(?P<key>\S*):\s*(?P<value>\d*)\s*kB') result = {} with open('/proc/meminfo', encoding='utf-8') as meminfo: for line in meminfo: match = re_parser.match(line) if not match: continue # skip lines that don't parse key, value = match.groups(['key', 'value']) result[key] = int(value) return result def _cpu(self): with open('/proc/stat', encoding='utf-8') as f: cpu = f.readline() vals = cpu.split(' ') return { 'user': vals[2], 'nice': vals[3], 'system': vals[4], 'idle': vals[5], 'iowait': vals[6], 'irq': vals[7], 'softirq': vals[8], 'total': sum(int(i) for i in vals[2:]) } def _load(self): with open('/proc/loadavg', encoding='utf-8') as f: load = f.readline() vals = load.split(' ') return vals[:3] def _get_networks(self): networks = {} with pyroute2.NetNS(consts.AMPHORA_NAMESPACE) as netns: for interface in netns.get_links(): interface_name = None for item in interface['attrs']: if (item[0] == consts.IFLA_IFNAME and not item[1].startswith('eth')): break if item[0] == consts.IFLA_IFNAME: interface_name = item[1] if item[0] == 'IFLA_STATS64': networks[interface_name] = { 'network_tx': item[1]['tx_bytes'], 'network_rx': item[1]['rx_bytes']} return networks
[docs] def get_interface(self, ip_addr): try: interface = network_utils.get_interface_name( ip_addr, net_ns=consts.AMPHORA_NAMESPACE) except exceptions.InvalidIPAddress: return webob.Response(json={'message': "Invalid IP address"}, status=400) except exceptions.NotFound: return webob.Response( json={'message': "Error interface not found for IP address"}, status=404) return webob.Response(json={'message': 'OK', 'interface': interface}, status=200)
def _get_active_tuned_profiles(self) -> str: """Returns the active TuneD profile(s)""" try: with open("/etc/tuned/active_profile", encoding="utf-8") as f: return f.read(1024).strip() except OSError as ex: LOG.debug("Reading active TuneD profiles failed: %r", ex) return ""