ceilometer.publisher.http

Source code for ceilometer.publisher.http

#
# Copyright 2016 IBM
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import json

from oslo_log import log
from oslo_utils import strutils
import requests
from requests import adapters
from six.moves.urllib import parse as urlparse

from ceilometer import publisher

LOG = log.getLogger(__name__)


[docs]class HttpPublisher(publisher.ConfigPublisherBase): """Publish metering data to a http endpoint This publisher pushes metering data to a specified http endpoint. The endpoint should be configured in ceilometer pipeline configuration file. If the `timeout` and/or `max_retries` are not specified, the default `timeout` and `max_retries` will be set to 5 and 2 respectively. Additional parameters are: - ssl certificate verification can be disabled by setting `verify_ssl` to False - batching can be configured by `batch` - Basic authentication can be configured using the URL authentication scheme: http://username:password@example.com - For certificate authentication, `clientcert` and `clientkey` are the paths to the certificate and key files respectively. `clientkey` is only required if the clientcert file doesn't already contain the key. All of the parameters mentioned above get removed during processing, with the remaining portion of the URL being used as the actual endpoint. e.g. https://username:password@example.com/path?verify_ssl=False&q=foo will result in a call to https://example.com/path?q=foo To use this publisher for samples, add the following section to the /etc/ceilometer/pipeline.yaml file or simply add it to an existing pipeline:: - name: meter_file meters: - "*" publishers: - http://host:80/path?timeout=1&max_retries=2&batch=False In the event_pipeline.yaml file, you can use the publisher in one of the sinks like the following: - name: event_sink transformers: publishers: - http://host:80/path?timeout=1&max_retries=2 """ HEADERS = {'Content-type': 'application/json'} def __init__(self, conf, parsed_url): super(HttpPublisher, self).__init__(conf, parsed_url) if not parsed_url.hostname: raise ValueError('The hostname of an endpoint for ' 'HttpPublisher is required') # non-numeric port from the url string will cause a ValueError # exception when the port is read. Do a read to make sure the port # is valid, if not, ValueError will be thrown. parsed_url.port # Handling other configuration options in the query string params = urlparse.parse_qs(parsed_url.query) self.timeout = self._get_param(params, 'timeout', 5, int) self.max_retries = self._get_param(params, 'max_retries', 2, int) self.poster = ( self._batch_post if strutils.bool_from_string(self._get_param( params, 'batch', True)) else self._individual_post) verify_ssl = self._get_param(params, 'verify_ssl', True) try: self.verify_ssl = strutils.bool_from_string(verify_ssl, strict=True) except ValueError: self.verify_ssl = (verify_ssl or True) username = parsed_url.username password = parsed_url.password if username: self.client_auth = (username, password) netloc = parsed_url.netloc.replace(username+':'+password+'@', '') else: self.client_auth = None netloc = parsed_url.netloc clientcert = self._get_param(params, 'clientcert', None) clientkey = self._get_param(params, 'clientkey', None) if clientcert: if clientkey: self.client_cert = (clientcert, clientkey) else: self.client_cert = clientcert else: self.client_cert = None self.raw_only = strutils.bool_from_string( self._get_param(params, 'raw_only', False)) kwargs = {'max_retries': self.max_retries, 'pool_connections': conf.max_parallel_requests, 'pool_maxsize': conf.max_parallel_requests} self.session = requests.Session() if parsed_url.scheme in ["http", "https"]: scheme = parsed_url.scheme else: ssl = self._get_param(params, 'ssl', False) try: ssl = strutils.bool_from_string(ssl, strict=True) except ValueError: ssl = (ssl or False) scheme = "https" if ssl else "http" # authentication & config params have been removed, so use URL with # updated query string self.target = urlparse.urlunsplit([ scheme, netloc, parsed_url.path, urlparse.urlencode(params), parsed_url.fragment]) self.session.mount(self.target, adapters.HTTPAdapter(**kwargs)) LOG.debug('HttpPublisher for endpoint %s is initialized!' % self.target) @staticmethod def _get_param(params, name, default_value, cast=None): try: return cast(params.pop(name)[-1]) if cast else params.pop(name)[-1] except (ValueError, TypeError, KeyError): LOG.debug('Default value %(value)s is used for %(name)s' % {'value': default_value, 'name': name}) return default_value def _individual_post(self, data): for d in data: self._do_post(json.dumps(data)) def _batch_post(self, data): if not data: LOG.debug('Data set is empty!') return self._do_post(json.dumps(data)) def _do_post(self, data): LOG.trace('Message: %s', data) try: res = self.session.post(self.target, data=data, headers=self.HEADERS, timeout=self.timeout, auth=self.client_auth, cert=self.client_cert, verify=self.verify_ssl) res.raise_for_status() LOG.debug('Message posting to %s: status code %d.', self.target, res.status_code) except requests.exceptions.HTTPError: LOG.exception('Status Code: %(code)s. ' 'Failed to dispatch message: %(data)s' % {'code': res.status_code, 'data': data})
[docs] def publish_samples(self, samples): """Send a metering message for publishing :param samples: Samples from pipeline after transformation """ self.poster([sample.as_dict() for sample in samples])
[docs] def publish_events(self, events): """Send an event message for publishing :param events: events from pipeline after transformation """ if self.raw_only: data = [evt.as_dict()['raw']['payload'] for evt in events if evt.as_dict().get('raw', {}).get('payload')] else: data = [event.serialize() for event in events] self.poster(data)
Creative Commons Attribution 3.0 License

Except where otherwise noted, this document is licensed under Creative Commons Attribution 3.0 License. See all OpenStack Legal Documents.