Source code for octavia.amphorae.backends.utils.keepalivedlvs_query

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import ipaddress
import os
import re
import subprocess

from octavia_lib.common import constants as lib_consts
from oslo_log import log as logging

from octavia.amphorae.backends.agent.api_server import util
from octavia.common import constants

LOG = logging.getLogger(__name__)
KERNEL_LVS_PATH = '/proc/net/ip_vs'
KERNEL_LVS_STATS_PATH = '/proc/net/ip_vs_stats'
LVS_KEY_REGEX = re.compile(r"RemoteAddress:Port\s+(.*$)")
V4_RS_VALUE_REGEX = re.compile(r"(\w{8}:\w{4})\s+(.*$)")
V4_HEX_IP_REGEX = re.compile(r"(\w{2})(\w{2})(\w{2})(\w{2})")
V6_RS_VALUE_REGEX = re.compile(r"(\[[\[\w{4}:]+\b\]:\w{4})\s+(.*$)")

NS_REGEX = re.compile(r"net_namespace\s(\w+-\w+)")
VS_ADDRESS_REGEX = re.compile(r"virtual_server_group .* \{\n"
                              r"\s+([a-f\d\.:]+)\s(\d{1,5})\n")
RS_ADDRESS_REGEX = re.compile(r"real_server\s([a-f\d\.:]+)\s(\d{1,5})")
CONFIG_COMMENT_REGEX = re.compile(
    r"#\sConfiguration\sfor\s(\w+)\s(\w{8}-\w{4}-\w{4}-\w{4}-\w{12})")
DISABLED_CONFIG_COMMENT_REGEX = re.compile(
    r"#\s(\w+)\s(\w{8}-\w{4}-\w{4}-\w{4}-\w{12}) is disabled")

CHECKER_REGEX = re.compile(r"(MISC_CHECK|HTTP_GET|TCP_CHECK)")


[docs] def read_kernel_file(ns_name, file_path): cmd = ("ip netns exec {ns} cat {lvs_stat_path}".format( ns=ns_name, lvs_stat_path=file_path)) try: output = subprocess.check_output(cmd.split(), stderr=subprocess.STDOUT) except subprocess.CalledProcessError as e: LOG.error("Failed to get kernel lvs status in ns %(ns_name)s " "%(kernel_lvs_path)s: %(err)s %(out)s", {'ns_name': ns_name, 'kernel_lvs_path': file_path, 'err': e, 'out': e.output}) raise e # py3 treat the output as bytes type. if isinstance(output, bytes): output = output.decode('utf-8') return output
[docs] def get_listener_realserver_mapping(ns_name, listener_ip_ports, health_monitor_enabled): # returned result: # actual_member_result = {'rs_ip:listened_port': { # 'status': 'UP', # 'Forward': forward_type, # 'Weight': 5, # 'ActiveConn': 0, # 'InActConn': 0 # }} idex_list = [] for listener_ip_port in listener_ip_ports: listener_ip, listener_port = listener_ip_port.rsplit(':', 1) ip_obj = ipaddress.ip_address(listener_ip.strip('[]')) output = read_kernel_file(ns_name, KERNEL_LVS_PATH).split('\n') if ip_obj.version == 4: ip_to_hex_format = "%.8X" % ip_obj._ip else: ip_to_hex_format = r'\[' + ip_obj.exploded + r'\]' port_hex_format = "%.4X" % int(listener_port) idex_list.append(ip_to_hex_format + ':' + port_hex_format) idex = "({})".format("|".join(idex_list)) if health_monitor_enabled: member_status = constants.UP else: member_status = constants.NO_CHECK actual_member_result = {} find_target_block = False result_keys = [] for line in output: if 'RemoteAddress:Port' in line: result_keys = re.split(r'\s+', LVS_KEY_REGEX.findall(line)[0].strip()) elif (line.startswith(constants.PROTOCOL_UDP) or line.startswith(lib_consts.PROTOCOL_SCTP)): find_target_block = re.match(r'^(UDP|SCTP)\s+%s\s+\w+' % idex, line) is not None elif find_target_block and line: rs_is_ipv4 = True all_values = V4_RS_VALUE_REGEX.findall(line) # If can not get all_values with ipv4 regex, then this line must be # a ipv6 real server record. if not all_values: all_values = V6_RS_VALUE_REGEX.findall(line) rs_is_ipv4 = False all_values = all_values[0] ip_port = all_values[0] result_values = re.split(r"\s+", all_values[1].strip()) member_ip, member_port = ip_port.rsplit(':', 1) port_string = str(int(member_port, 16)) if rs_is_ipv4: ip_string = ipaddress.ip_address(int(member_ip, 16)).compressed member_ip_port_string = ip_string + ':' + port_string else: ip_string = ipaddress.ip_address( member_ip.strip('[]')).compressed member_ip_port_string = '[' + ip_string + ']:' + port_string result_key_count = len(result_keys) for index in range(result_key_count): if member_ip_port_string not in actual_member_result: actual_member_result[ member_ip_port_string] = {'status': member_status, result_keys[index]: result_values[index]} else: # The other values include the weight actual_member_result[ member_ip_port_string][ result_keys[index]] = result_values[index] continue return actual_member_result
[docs] def get_lvs_listener_resource_ipports_nsname(listener_id): # resource_ipport_mapping = {'Listener': {'id': listener-id, # 'ipports': [ipport1, ipport2]}, # 'Pool': {'id': pool-id}, # 'Members': [{'id': member-id-1, # 'ipport': ipport}, # {'id': member-id-2, # 'ipport': ipport}], # 'HealthMonitor': {'id': healthmonitor-id}} resource_ipport_mapping = {} with open(util.keepalived_lvs_cfg_path(listener_id), encoding='utf-8') as f: cfg = f.read() ret = VS_ADDRESS_REGEX.findall(cfg) def _escape_ip(ip): ret = ipaddress.ip_address(ip) if ret.version == 6: return "[" + ret.compressed + "]" return ret.compressed listener_ip_ports = [ _escape_ip(ip_port[0]) + ":" + ip_port[1] for ip_port in ret ] ns_name = NS_REGEX.findall(cfg)[0] disabled_resource_ids = DISABLED_CONFIG_COMMENT_REGEX.findall(cfg) listener_disabled = any(True for resource in disabled_resource_ids if resource[0] == 'Listener') if listener_disabled: return None, ns_name if not listener_ip_ports: # If not get listener_ip_port from the lvs config file, # that means the listener's default pool have no enabled member # yet. But at this moment, we can get listener_id and ns_name, so # for this function, we will just return ns_name return resource_ipport_mapping, ns_name cfg_line = cfg.split('\n') rs_ip_port_list = [] for line in cfg_line: if 'real_server' in line: res = RS_ADDRESS_REGEX.findall(line) rs_ip_port_list.append(res[0]) resource_type_ids = CONFIG_COMMENT_REGEX.findall(cfg) for resource_type, resource_id in resource_type_ids: value = {'id': resource_id} if resource_type == 'Member': resource_type = '%ss' % resource_type if resource_type not in resource_ipport_mapping: value = [value] if resource_type not in resource_ipport_mapping: resource_ipport_mapping[resource_type] = value elif resource_type == 'Members': resource_ipport_mapping[resource_type].append(value) disabled_member_ids = [ resource[1] for resource in disabled_resource_ids if resource[0] == 'Member' ] resource_type = 'Members' for member_id in disabled_member_ids: value = {'id': member_id, 'ipport': None} if resource_type not in resource_ipport_mapping: resource_ipport_mapping[resource_type] = [] resource_ipport_mapping[resource_type].append(value) if rs_ip_port_list: rs_ip_port_count = len(rs_ip_port_list) for index in range(rs_ip_port_count): member_ip = ipaddress.ip_address( rs_ip_port_list[index][0]) if member_ip.version == 6: rs_ip_port_list[index] = ( '[' + member_ip.compressed + ']', rs_ip_port_list[index][1]) resource_ipport_mapping['Members'][index]['ipport'] = ( rs_ip_port_list[index][0] + ':' + rs_ip_port_list[index][1]) resource_ipport_mapping['Listener']['ipports'] = ( listener_ip_ports) return resource_ipport_mapping, ns_name
[docs] def get_lvs_listener_pool_status(listener_id): (resource_ipport_mapping, ns_name) = get_lvs_listener_resource_ipports_nsname(listener_id) if 'Pool' not in resource_ipport_mapping: return {} if 'Members' not in resource_ipport_mapping: return {'lvs': { 'uuid': resource_ipport_mapping['Pool']['id'], 'status': constants.UP, 'members': {} }} config_path = util.keepalived_lvs_cfg_path(listener_id) pids_pathes = util.keepalived_lvs_pids_path(listener_id) config_stat = os.stat(config_path) check_pid_stat = os.stat(pids_pathes[2]) # Indicates that keepalived configuration has been updated but the service # has yet to be restarted. # NOTE: It only works if we are doing a RESTART on configuration change, # Iaa34db6cb1dfed98e96a585c5d105e263c7efa65 forces a RESTART instead of a # RELOAD, we need to be careful if we want to switch back to RELOAD after # updating to a recent keepalived release. restarting = config_stat.st_mtime > check_pid_stat.st_mtime with open(util.keepalived_lvs_cfg_path(listener_id), encoding='utf-8') as f: cfg = f.read() hm_enabled = len(CHECKER_REGEX.findall(cfg)) > 0 realserver_result = get_listener_realserver_mapping( ns_name, resource_ipport_mapping['Listener']['ipports'], hm_enabled) pool_status = constants.UP member_results = {} if realserver_result: member_ip_port_list = [ member['ipport'] for member in resource_ipport_mapping['Members']] down_member_ip_port_set = set( member_ip_port_list) - set(list(realserver_result.keys())) for member_ip_port in member_ip_port_list: member_id = None for member in resource_ipport_mapping['Members']: if member['ipport'] == member_ip_port: member_id = member['id'] if member_ip_port is None: status = constants.MAINT elif member_ip_port in down_member_ip_port_set: status = ( constants.RESTARTING if restarting else constants.DOWN) elif int(realserver_result[member_ip_port]['Weight']) == 0: status = constants.DRAIN else: status = realserver_result[member_ip_port]['status'] if member_id: member_results[member_id] = status else: if hm_enabled: pool_status = constants.DOWN for member in resource_ipport_mapping['Members']: if member['ipport'] is None: member_results[member['id']] = constants.MAINT elif hm_enabled: member_results[member['id']] = ( constants.RESTARTING if restarting else constants.DOWN) else: member_results[member['id']] = constants.NO_CHECK return { 'lvs': { 'uuid': resource_ipport_mapping['Pool']['id'], 'status': pool_status, 'members': member_results } }
[docs] def get_ipvsadm_info(ns_name, is_stats_cmd=False): cmd_list = ['ip', 'netns', 'exec', ns_name, 'ipvsadm', '-Ln'] # use --exact to ensure output is integer only if is_stats_cmd: cmd_list += ['--stats', '--exact'] output = subprocess.check_output(cmd_list, stderr=subprocess.STDOUT) if isinstance(output, bytes): output = output.decode('utf-8') output = output.split('\n') fields = [] # mapping = {'listeneripport': {'Linstener': vs_values, # 'members': [rs_values1, rs_values2]}} last_key = None value_mapping = {} output_line_num = len(output) def split_line(line): return re.sub(r'\s+', ' ', line.strip()).split(' ') for line_num in range(output_line_num): # ipvsadm -Ln if 'Flags' in output[line_num]: fields = split_line(output[line_num]) elif fields and 'Flags' in fields and fields.index('Flags') == len( fields) - 1: fields.extend(split_line(output[line_num])) # ipvsadm -Ln --stats elif 'Prot' in output[line_num]: fields = split_line(output[line_num]) elif 'RemoteAddress' in output[line_num]: start = fields.index('LocalAddress:Port') + 1 temp_fields = fields[start:] fields.extend(split_line(output[line_num])) fields.extend(temp_fields) # here we get the all fields elif (constants.PROTOCOL_UDP in output[line_num] or lib_consts.PROTOCOL_SCTP in output[line_num]): # if UDP/TCP in this line, we can know this line is # VS configuration. vs_values = split_line(output[line_num]) for value in vs_values: if ':' in value: value_mapping[value] = {'Listener': vs_values, 'Members': []} last_key = value break # here the line must be a RS which belongs to a VS elif '->' in output[line_num] and last_key: rs_values = split_line(output[line_num]) rs_values.remove('->') value_mapping[last_key]['Members'].append(rs_values) index = fields.index('->') vs_fields = fields[:index] if 'Flags' in vs_fields: vs_fields.remove('Flags') rs_fields = fields[index + 1:] for key in list(value_mapping.keys()): value_mapping[key]['Listener'] = list( zip(vs_fields, value_mapping[key]['Listener'])) member_res = [] for member_value in value_mapping[key]['Members']: member_res.append(list(zip(rs_fields, member_value))) value_mapping[key]['Members'] = member_res return value_mapping
[docs] def get_lvs_listeners_stats(): lvs_listener_ids = util.get_lvs_listeners() need_check_listener_ids = [ listener_id for listener_id in lvs_listener_ids if util.is_lvs_listener_running(listener_id)] ipport_mapping = {} listener_stats_res = {} for check_listener_id in need_check_listener_ids: # resource_ipport_mapping = {'Listener': {'id': listener-id, # 'ipport': ipport}, # 'Pool': {'id': pool-id}, # 'Members': [{'id': member-id-1, # 'ipport': ipport}, # {'id': member-id-2, # 'ipport': ipport}], # 'HealthMonitor': {'id': healthmonitor-id}} resource_ipport_mapping, ns_name = ( get_lvs_listener_resource_ipports_nsname(check_listener_id)) # Listener is disabled, we don't need to send an update if resource_ipport_mapping is None: continue # Since we found the keepalived running, acknowledge the listener # in the heartbeat. If this listener has a pool and members, # the stats will be updated later in the code flow. listener_stats_res.update({ check_listener_id: { 'stats': { 'bout': 0, 'bin': 0, 'scur': 0, 'stot': 0, 'ereq': 0}, 'status': constants.OPEN}}) # If we can not read the lvs configuration from file, that means # the pool of this listener may own zero enabled member, but the # keepalived process is running. So we need to skip it. if not resource_ipport_mapping: continue ipport_mapping.update({check_listener_id: resource_ipport_mapping}) # So here, if we can not get any ipport_mapping, # we do nothing, just return if not ipport_mapping: return listener_stats_res # contains bout, bin, scur, stot, ereq, status # bout(OutBytes), bin(InBytes), stot(Conns) from cmd ipvsadm -Ln --stats # scur(ActiveConn) from cmd ipvsadm -Ln # status, can see configuration in any cmd, treat it as OPEN # ereq is still 0, as UDP case does not support it. scur_res = get_ipvsadm_info(constants.AMPHORA_NAMESPACE) stats_res = get_ipvsadm_info(constants.AMPHORA_NAMESPACE, is_stats_cmd=True) for listener_id, ipport in ipport_mapping.items(): listener_ipports = ipport['Listener']['ipports'] # This would be in Error, wait for the next loop to sync for the # listener at this moment. Also this is for skip the case no enabled # member in UDP listener, so we don't check it for failover. scur_found = stats_found = False for listener_ipport in listener_ipports: if listener_ipport in scur_res: scur_found = True if listener_ipport in stats_res: stats_found = True if not scur_found or not stats_found: continue scur, bout, bin, stot, ereq = 0, 0, 0, 0, 0 # As all results contain this listener, so its status should be OPEN status = constants.OPEN # Get scur for listener_ipport in listener_ipports: if listener_ipport not in scur_res: continue for m in scur_res[listener_ipport]['Members']: for item in m: if item[0] == 'ActiveConn': scur += int(item[1]) # Get bout, bin, stot for item in stats_res[listener_ipport]['Listener']: if item[0] == 'Conns': stot += int(item[1]) elif item[0] == 'OutBytes': bout += int(item[1]) elif item[0] == 'InBytes': bin += int(item[1]) listener_stats_res.update({ listener_id: { 'stats': { 'bout': bout, 'bin': bin, 'scur': scur, 'stot': stot, 'ereq': ereq}, 'status': status}}) return listener_stats_res