import multiprocessing as multiproc
import ssl
import sys

from oslo_config import cfg
from oslo_reports import guru_meditation_report as gmr

from octavia.amphorae.backends.agent.api_server import server
from octavia.amphorae.backends.health_daemon import health_daemon
from octavia.common import service
from octavia.common import utils
from octavia import version

HM_SENDER_CMD_QUEUE = multiproc.Queue()

[docs] class AmphoraAgent( def __init__(self, app, options=None): self.options = options or {} self.application = app super().__init__()
[docs] def load_config(self): config = {key: value for key, value in self.options.items() if key in self.cfg.settings and value is not None} for key, value in config.items(): self.cfg.set(key.lower(), value)
[docs] def load(self): return self.application
# start api server
[docs] def main(): # comment out to improve logging service.prepare_service(sys.argv) gmr.TextGuruMeditation.setup_autorun(version) health_sender_proc = multiproc.Process(name='HM_sender', target=health_daemon.run_sender, args=(HM_SENDER_CMD_QUEUE,)) health_sender_proc.daemon = True health_sender_proc.start() # Initiate server class server_instance = server.Server() bind_ip_port = utils.ip_port_str(CONF.haproxy_amphora.bind_host, CONF.haproxy_amphora.bind_port) proto = CONF.amphora_agent.agent_tls_protocol.replace('.', '_') options = { 'bind': bind_ip_port, 'workers': 1, 'timeout': CONF.amphora_agent.agent_request_read_timeout, 'certfile': CONF.amphora_agent.agent_server_cert, 'ca_certs': CONF.amphora_agent.agent_server_ca, 'cert_reqs': ssl.CERT_REQUIRED, 'ssl_version': getattr(ssl, "PROTOCOL_%s" % proto), 'preload_app': True, 'accesslog': '/var/log/amphora-agent.log', 'errorlog': '/var/log/amphora-agent.log', 'loglevel': 'debug', 'syslog': True, 'syslog_facility': 'local{}'.format( CONF.amphora_agent.administrative_log_facility), 'syslog_addr': 'unix://run/rsyslog/octavia/log#dgram', } AmphoraAgent(, options).run()