Source code for octavia.cmd.driver_agent

# Copyright 2018 Rackspace, US Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

from functools import partial
import multiprocessing
import os
import signal
import sys
import time

from oslo_config import cfg
from oslo_log import log as logging
from oslo_reports import guru_meditation_report as gmr
import setproctitle
from stevedore import enabled as stevedore_enabled

from octavia.api.drivers.driver_agent import driver_listener
from octavia.common import service
from octavia import version

CONF = cfg.CONF
LOG = logging.getLogger(__name__)
PROVIDER_AGENT_PROCESSES = []


def _mutate_config(*args, **kwargs):
    CONF.mutate_config_files()


def _handle_mutate_config(status_proc_pid, stats_proc_pid, *args, **kwargs):
    LOG.info("Driver agent received HUP signal, mutating config.")
    _mutate_config()
    os.kill(status_proc_pid, signal.SIGHUP)
    os.kill(stats_proc_pid, signal.SIGHUP)


def _check_if_provider_agent_enabled(extension):
    if extension.name in CONF.driver_agent.enabled_provider_agents:
        return True
    return False


def _process_wrapper(exit_event, proc_name, function, agent_name=None):
    signal.signal(signal.SIGINT, signal.SIG_IGN)
    signal.signal(signal.SIGHUP, _mutate_config)
    if agent_name:
        process_title = 'octavia-driver-agent - {} -- {}'.format(
            proc_name, agent_name)
    else:
        process_title = f'octavia-driver-agent - {proc_name}'
    setproctitle.setproctitle(process_title)
    while not exit_event.is_set():
        try:
            function(exit_event)
        except Exception as e:
            if agent_name:
                LOG.exception('Provider agent "%s" raised exception: %s. '
                              'Restarting the "%s" provider agent.',
                              agent_name, str(e), agent_name)
            else:
                LOG.exception('%s raised exception: %s. '
                              'Restarting %s.',
                              proc_name, str(e), proc_name)
            time.sleep(1)
            continue
        break


def _start_provider_agents(exit_event):
    extensions = stevedore_enabled.EnabledExtensionManager(
        namespace='octavia.driver_agent.provider_agents',
        check_func=_check_if_provider_agent_enabled)
    for ext in extensions:
        ext_process = multiprocessing.Process(
            name=ext.name, target=_process_wrapper,
            args=(exit_event, 'provider_agent', ext.plugin),
            kwargs={'agent_name': ext.name})
        PROVIDER_AGENT_PROCESSES.append(ext_process)
        ext_process.start()
        LOG.info('Started enabled provider agent: "%s" with PID: %d.',
                 ext.name, ext_process.pid)


[docs] def main(): service.prepare_service(sys.argv) LOG.debug('Full set of CONF:') CONF.log_opt_values(LOG, logging.DEBUG) gmr.TextGuruMeditation.setup_autorun(version) processes = [] exit_event = multiprocessing.Event() status_listener_proc = multiprocessing.Process( name='status_listener', target=_process_wrapper, args=(exit_event, 'status_listener', driver_listener.status_listener)) processes.append(status_listener_proc) LOG.info("Driver agent status listener process starts:") status_listener_proc.start() stats_listener_proc = multiprocessing.Process( name='stats_listener', target=_process_wrapper, args=(exit_event, 'stats_listener', driver_listener.stats_listener)) processes.append(stats_listener_proc) LOG.info("Driver agent statistics listener process starts:") stats_listener_proc.start() get_listener_proc = multiprocessing.Process( name='get_listener', target=_process_wrapper, args=(exit_event, 'get_listener', driver_listener.get_listener)) processes.append(get_listener_proc) LOG.info("Driver agent get listener process starts:") get_listener_proc.start() _start_provider_agents(exit_event) def process_cleanup(*args, **kwargs): LOG.info("Driver agent exiting due to signal.") exit_event.set() status_listener_proc.join() stats_listener_proc.join() get_listener_proc.join() for proc in PROVIDER_AGENT_PROCESSES: LOG.info('Waiting up to %s seconds for provider agent "%s" to ' 'shutdown.', CONF.driver_agent.provider_agent_shutdown_timeout, proc.name) try: proc.join(CONF.driver_agent.provider_agent_shutdown_timeout) if proc.exitcode is None: # TODO(johnsom) Change to proc.kill() once # python 3.7 or newer only os.kill(proc.pid, signal.SIGKILL) LOG.warning( 'Forcefully killed "%s" provider agent because it ' 'failed to shutdown in %s seconds.', proc.name, CONF.driver_agent.provider_agent_shutdown_timeout) except Exception as e: LOG.warning('Unknown error "%s" while shutting down "%s", ' 'ignoring and continuing shutdown process.', str(e), proc.name) else: LOG.info('Provider agent "%s" has successfully shutdown.', proc.name) signal.signal(signal.SIGTERM, process_cleanup) signal.signal(signal.SIGHUP, partial( _handle_mutate_config, status_listener_proc.pid, stats_listener_proc.pid, get_listener_proc.pid)) try: for process in processes: process.join() except KeyboardInterrupt: process_cleanup()