octavia.certificates.manager.barbican

Source code for octavia.certificates.manager.barbican

# Copyright (c) 2014 Rackspace US, Inc
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

"""
Cert manager implementation for Barbican
"""
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
from stevedore import driver as stevedore_driver

from octavia.certificates.common import barbican as barbican_common
from octavia.certificates.manager import cert_mgr

LOG = logging.getLogger(__name__)


[docs]class BarbicanCertManager(cert_mgr.CertManager): """Certificate Manager that wraps the Barbican client API.""" def __init__(self): super(BarbicanCertManager, self).__init__() self.auth = stevedore_driver.DriverManager( namespace='octavia.barbican_auth', name=cfg.CONF.certificates.barbican_auth, invoke_on_load=True, ).driver
[docs] def store_cert(self, project_id, certificate, private_key, intermediates=None, private_key_passphrase=None, expiration=None, name='Octavia TLS Cert'): """Stores a certificate in the certificate manager. :param certificate: PEM encoded TLS certificate :param private_key: private key for the supplied certificate :param intermediates: ordered and concatenated intermediate certs :param private_key_passphrase: optional passphrase for the supplied key :param expiration: the expiration time of the cert in ISO 8601 format :param name: a friendly name for the cert :returns: the container_ref of the stored cert :raises Exception: if certificate storage fails """ connection = self.auth.get_barbican_client(project_id) LOG.info("Storing certificate container '%s' in Barbican.", name) certificate_secret = None private_key_secret = None intermediates_secret = None pkp_secret = None try: certificate_secret = connection.secrets.create( payload=certificate, expiration=expiration, name="Certificate" ) private_key_secret = connection.secrets.create( payload=private_key, expiration=expiration, name="Private Key" ) certificate_container = connection.containers.create_certificate( name=name, certificate=certificate_secret, private_key=private_key_secret ) if intermediates: intermediates_secret = connection.secrets.create( payload=intermediates, expiration=expiration, name="Intermediates" ) certificate_container.intermediates = intermediates_secret if private_key_passphrase: pkp_secret = connection.secrets.create( payload=private_key_passphrase, expiration=expiration, name="Private Key Passphrase" ) certificate_container.private_key_passphrase = pkp_secret certificate_container.store() return certificate_container.container_ref except Exception as e: for i in [certificate_secret, private_key_secret, intermediates_secret, pkp_secret]: if i and i.secret_ref: old_ref = i.secret_ref try: i.delete() LOG.info('Deleted secret %s (%s) during rollback.', i.name, old_ref) except Exception: LOG.warning('Failed to delete %s (%s) during ' 'rollback. This might not be a problem.', i.name, old_ref) with excutils.save_and_reraise_exception(): LOG.error('Error storing certificate data: %s', e)
[docs] def get_cert(self, project_id, cert_ref, resource_ref=None, check_only=False, service_name='Octavia'): """Retrieves the specified cert and registers as a consumer. :param cert_ref: the UUID of the cert to retrieve :param resource_ref: Full HATEOAS reference to the consuming resource :param check_only: Read Certificate data without registering :param service_name: Friendly name for the consuming service :return: octavia.certificates.common.Cert representation of the certificate data :raises Exception: if certificate retrieval fails """ connection = self.auth.get_barbican_client(project_id) LOG.info('Loading certificate container %s from Barbican.', cert_ref) try: if check_only: cert_container = connection.containers.get( container_ref=cert_ref ) else: cert_container = connection.containers.register_consumer( container_ref=cert_ref, name=service_name, url=resource_ref ) return barbican_common.BarbicanCert(cert_container) except Exception as e: with excutils.save_and_reraise_exception(): LOG.error('Error getting %s: %s', cert_ref, e)
[docs] def delete_cert(self, project_id, cert_ref, resource_ref=None, service_name='Octavia'): """Deregister as a consumer for the specified cert. :param cert_ref: the UUID of the cert to retrieve :param resource_ref: Full HATEOAS reference to the consuming resource :param service_name: Friendly name for the consuming service :raises Exception: if deregistration fails """ connection = self.auth.get_barbican_client(project_id) LOG.info('Deregistering as a consumer of %s in Barbican.', cert_ref) try: connection.containers.remove_consumer( container_ref=cert_ref, name=service_name, url=resource_ref ) except Exception as e: with excutils.save_and_reraise_exception(): LOG.error('Error deregistering as a consumer of %s: %s', cert_ref, e)
Creative Commons Attribution 3.0 License

Except where otherwise noted, this document is licensed under Creative Commons Attribution 3.0 License. See all OpenStack Legal Documents.