#! /usr/bin/env python

#    Copyright 2014 Hewlett-Packard Development Company, L.P.
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import errno
import os
import queue
import stat
import time

from oslo_config import cfg
from oslo_log import log as logging
import simplejson

from octavia.amphorae.backends.agent.api_server import util
from octavia.amphorae.backends.health_daemon import health_sender
from octavia.amphorae.backends.utils import haproxy_query
from octavia.amphorae.backends.utils import keepalivedlvs_query

LOG = logging.getLogger(__name__)
SEQ = 0
# MSG_VER is an incrementing integer heartbeat message format version
# this allows for backward compatibility when the amphora-agent is older
# than the controller version and the message format has backwards
# incompatible changes.
# ver 1 - Adds UDP listener status when no pool or members are present
# ver 2 - Switch to all listeners in a single combined haproxy config
# ver 3 - Switch stats reporting to deltas


DELTA_METRICS = ('bin', 'bout', 'ereq', 'stot')

# Filesystem persistent counters for statistics deltas

[docs]def get_counters_file(): global COUNTERS_FILE if COUNTERS_FILE is None: stats_file_path = os.path.join( CONF.haproxy_amphora.base_path, "stats_counters.json") # Open for read+write and create if necessary flags = os.O_RDWR | os.O_CREAT # mode 00644 mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP try: COUNTERS_FILE = os.fdopen(, flags, mode), 'r+') except OSError:"Failed to open `%s`, ignoring...", stats_file_path) return COUNTERS_FILE
[docs]def get_counters(): global COUNTERS if COUNTERS is None: try: COUNTERS = simplejson.load(get_counters_file()) or {} except (simplejson.JSONDecodeError, AttributeError): COUNTERS = {} return COUNTERS
[docs]def persist_counters(): """Attempt to persist the latest statistics values""" if COUNTERS is None: return try: stats = simplejson.dumps(COUNTERS) counters_file = get_counters_file() counters_file.truncate(0) counters_file.write(stats) counters_file.flush() except (OSError, AttributeError): LOG.warning("Couldn't persist statistics counter file!")
[docs]def list_sock_stat_files(hadir=None): stat_sock_files = {} if hadir is None: hadir = CONF.haproxy_amphora.base_path lb_ids = util.get_loadbalancers() for lb_id in lb_ids: sock_file = lb_id + ".sock" stat_sock_files[lb_id] = os.path.join(hadir, sock_file) return stat_sock_files
[docs]def run_sender(cmd_queue):'Health Manager Sender starting.') sender = health_sender.UDPStatusSender() keepalived_cfg_path = util.keepalived_cfg_path() keepalived_pid_path = util.keepalived_pid_path() while True: try: # If the keepalived config file is present check # that it is running, otherwise don't send the health # heartbeat if os.path.isfile(keepalived_cfg_path): # Is there a pid file for keepalived? with open(keepalived_pid_path, 'r', encoding='utf-8') as pid_file: pid = int(pid_file.readline()) os.kill(pid, 0) message = build_stats_message() sender.dosend(message) except (IOError, OSError) as e: if e.errno == errno.ENOENT: # Missing PID file, skip health heartbeat. LOG.error('Missing keepalived PID file %s, skipping health ' 'heartbeat.', keepalived_pid_path) elif e.errno == errno.ESRCH: # Keepalived is not running, skip health heartbeat. LOG.error('Keepalived is configured but not running, ' 'skipping health heartbeat.') else: LOG.exception('Failed to check keepalived and haproxy status ' 'due to exception %s, skipping health ' 'heartbeat.', str(e)) except Exception as e: LOG.exception('Failed to check keepalived and haproxy status due ' 'to exception %s, skipping health heartbeat.', str(e)) try: cmd = cmd_queue.get_nowait() if cmd == 'reload':'Reloading configuration') CONF.reload_config_files() elif cmd == 'shutdown':'Health Manager Sender shutting down.') break except queue.Empty: pass time.sleep(CONF.health_manager.heartbeat_interval)
[docs]def get_stats(stat_sock_file): stats_query = haproxy_query.HAProxyQuery(stat_sock_file) stats = stats_query.show_stat() pool_status = stats_query.get_pool_status() return stats, pool_status
[docs]def calculate_stats_deltas(listener_id, row): counters = get_counters() listener_counters = counters.get(listener_id, {}) counters[listener_id] = listener_counters delta_values = {} for metric_key in DELTA_METRICS: current_value = int(row[metric_key]) # Get existing counter for our metrics last_value = listener_counters.get(metric_key, 0) # Store the new absolute value listener_counters[metric_key] = current_value # Calculate a delta for each metric delta = current_value - last_value # Did HAProxy restart or reset counters? if delta < 0: delta = current_value # If so, reset ours. delta_values[metric_key] = delta return delta_values
[docs]def build_stats_message(): """Build a stats message based on retrieved listener statistics. Example version 3 message without UDP (note that values are deltas, not absolutes):: {"id": "<amphora_id>", "seq": 67, "listeners": { "<listener_id>": { "status": "OPEN", "stats": { "tx": 0, "rx": 0, "conns": 0, "totconns": 0, "ereq": 0 } } }, "pools": { "<pool_id>:<listener_id>": { "status": "UP", "members": { "<member_id>": "no check" } } }, "ver": 3 } """ global SEQ msg = {'id': CONF.amphora_agent.amphora_id, 'seq': SEQ, 'listeners': {}, 'pools': {}, 'ver': MSG_VER} SEQ += 1 stat_sock_files = list_sock_stat_files() # TODO(rm_work) There should only be one of these in the new config system for lb_id, stat_sock_file in stat_sock_files.items(): if util.is_lb_running(lb_id): (stats, pool_status) = get_stats(stat_sock_file) for row in stats: if row['svname'] == 'FRONTEND': listener_id = row['pxname'] delta_values = calculate_stats_deltas(listener_id, row) msg['listeners'][listener_id] = { 'status': row['status'], 'stats': {'tx': delta_values['bout'], 'rx': delta_values['bin'], 'conns': int(row['scur']), 'totconns': delta_values['stot'], 'ereq': delta_values['ereq']}} for pool_id, pool in pool_status.items(): msg['pools'][pool_id] = {"status": pool['status'], "members": pool['members']} # UDP listener part lvs_listener_ids = util.get_lvs_listeners() if lvs_listener_ids: listeners_stats = keepalivedlvs_query.get_lvs_listeners_stats() if listeners_stats: for listener_id, listener_stats in listeners_stats.items(): delta_values = calculate_stats_deltas( listener_id, listener_stats['stats']) pool_status = ( keepalivedlvs_query.get_lvs_listener_pool_status( listener_id)) lvs_listener_dict = {} lvs_listener_dict['status'] = listener_stats['status'] lvs_listener_dict['stats'] = { 'tx': delta_values['bout'], 'rx': delta_values['bin'], 'conns': listener_stats['stats']['scur'], 'totconns': delta_values['stot'], 'ereq': delta_values['ereq'] } if pool_status: pool_id = pool_status['lvs']['uuid'] msg['pools'][pool_id] = { "status": pool_status['lvs']['status'], "members": pool_status['lvs']['members'] } msg['listeners'][listener_id] = lvs_listener_dict persist_counters() return msg