ceilometer.tests.unit.publisher.test_prometheus

Source code for ceilometer.tests.unit.publisher.test_prometheus

#
# Copyright 2016 IBM
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for ceilometer/publisher/prometheus.py
"""

import datetime
import mock
from oslotest import base
import requests
from six.moves.urllib import parse as urlparse
import uuid

from ceilometer.publisher import prometheus
from ceilometer import sample
from ceilometer import service


[docs]class TestPrometheusPublisher(base.BaseTestCase): resource_id = str(uuid.uuid4()) sample_data = [ sample.Sample( name='alpha', type=sample.TYPE_CUMULATIVE, unit='', volume=1, user_id='test', project_id='test', resource_id=resource_id, timestamp=datetime.datetime.utcnow().isoformat(), resource_metadata={'name': 'TestPublish'}, ), sample.Sample( name='beta', type=sample.TYPE_DELTA, unit='', volume=3, user_id='test', project_id='test', resource_id=resource_id, timestamp=datetime.datetime.utcnow().isoformat(), resource_metadata={'name': 'TestPublish'}, ), sample.Sample( name='gamma', type=sample.TYPE_GAUGE, unit='', volume=5, user_id='test', project_id='test', resource_id=resource_id, timestamp=datetime.datetime.now().isoformat(), resource_metadata={'name': 'TestPublish'}, ), sample.Sample( name='delta.epsilon', type=sample.TYPE_GAUGE, unit='', volume=7, user_id='test', project_id='test', resource_id=resource_id, timestamp=datetime.datetime.now().isoformat(), resource_metadata={'name': 'TestPublish'}, ), ]
[docs] def setUp(self): super(TestPrometheusPublisher, self).setUp() self.CONF = service.prepare_service([], [])
[docs] def test_post_samples(self): """Test publisher post.""" parsed_url = urlparse.urlparse( 'prometheus://localhost:90/metrics/job/os') publisher = prometheus.PrometheusPublisher(self.CONF, parsed_url) res = requests.Response() res.status_code = 200 with mock.patch.object(requests.Session, 'post', return_value=res) as m_req: publisher.publish_samples(self.sample_data) data = """# TYPE alpha counter alpha{resource_id="%s"} 1 beta{resource_id="%s"} 3 # TYPE gamma gauge gamma{resource_id="%s"} 5 # TYPE delta_epsilon gauge delta_epsilon{resource_id="%s"} 7 """ % (self.resource_id, self.resource_id, self.resource_id, self.resource_id) expected = [ mock.call('http://localhost:90/metrics/job/os', auth=None, cert=None, data=data, headers={'Content-type': 'plain/text'}, timeout=5, verify=True) ] self.assertEqual(expected, m_req.mock_calls)
[docs] def test_post_samples_ssl(self): """Test publisher post.""" parsed_url = urlparse.urlparse( 'prometheus://localhost:90/metrics/job/os?ssl=1') publisher = prometheus.PrometheusPublisher(self.CONF, parsed_url) res = requests.Response() res.status_code = 200 with mock.patch.object(requests.Session, 'post', return_value=res) as m_req: publisher.publish_samples(self.sample_data) data = """# TYPE alpha counter alpha{resource_id="%s"} 1 beta{resource_id="%s"} 3 # TYPE gamma gauge gamma{resource_id="%s"} 5 # TYPE delta_epsilon gauge delta_epsilon{resource_id="%s"} 7 """ % (self.resource_id, self.resource_id, self.resource_id, self.resource_id) expected = [ mock.call('https://localhost:90/metrics/job/os', auth=None, cert=None, data=data, headers={'Content-type': 'plain/text'}, timeout=5, verify=True) ] self.assertEqual(expected, m_req.mock_calls)
Creative Commons Attribution 3.0 License

Except where otherwise noted, this document is licensed under Creative Commons Attribution 3.0 License. See all OpenStack Legal Documents.