Source code for octavia.certificates.manager.barbican

# Copyright (c) 2014 Rackspace US, Inc
# Copyright (c) 2017 GoDaddy
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

"""
Cert manager implementation for Barbican using a single PKCS12 secret
"""
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import pkcs12 as c_pkcs12
from cryptography import x509
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import encodeutils
from oslo_utils import excutils
from stevedore import driver as stevedore_driver

from octavia.certificates.common import pkcs12
from octavia.certificates.manager import barbican_legacy
from octavia.certificates.manager import cert_mgr
from octavia.common import exceptions
from octavia.common.tls_utils import cert_parser

LOG = logging.getLogger(__name__)


[docs] class BarbicanCertManager(cert_mgr.CertManager): """Certificate Manager that wraps the Barbican client API.""" def __init__(self): super().__init__() self.auth = stevedore_driver.DriverManager( namespace='octavia.barbican_auth', name=cfg.CONF.certificates.barbican_auth, invoke_on_load=True, ).driver
[docs] def store_cert(self, context, certificate, private_key, intermediates=None, private_key_passphrase=None, expiration=None, name="PKCS12 Certificate Bundle"): """Stores a certificate in the certificate manager. :param context: Oslo context of the request :param certificate: PEM encoded TLS certificate :param private_key: private key for the supplied certificate :param intermediates: ordered and concatenated intermediate certs :param private_key_passphrase: optional passphrase for the supplied key :param expiration: the expiration time of the cert in ISO 8601 format :param name: a friendly name for the cert :returns: the container_ref of the stored cert :raises Exception: if certificate storage fails """ connection = self.auth.get_barbican_client(context.project_id) LOG.info("Storing certificate secret '%s' in Barbican.", name) if private_key_passphrase: raise exceptions.CertificateStorageException( "Passphrase protected PKCS12 certificates are not supported.") x509_cert = x509.load_pem_x509_certificate(certificate) x509_pk = serialization.load_pem_private_key(private_key, None) cas = None if intermediates: cert_ints = list(cert_parser.get_intermediates_pems(intermediates)) cas = [ x509.load_pem_x509_certificate(ci) for ci in cert_ints] try: certificate_secret = connection.secrets.create( payload=c_pkcs12.serialize_key_and_certificates( name=encodeutils.safe_encode(name), key=x509_pk, cert=x509_cert, cas=cas, encryption_algorithm=serialization.NoEncryption() ), expiration=expiration, name=name ) certificate_secret.store() return certificate_secret.secret_ref except Exception as e: with excutils.save_and_reraise_exception(): LOG.error('Error storing certificate data: %s', str(e)) return None
[docs] def get_cert(self, context, cert_ref, resource_ref=None, check_only=False, service_name=None): """Retrieves the specified cert and registers as a consumer. :param context: Oslo context of the request :param cert_ref: the UUID of the cert to retrieve :param resource_ref: Full HATEOAS reference to the consuming resource :param check_only: Read Certificate data without registering :param service_name: Friendly name for the consuming service :return: octavia.certificates.common.Cert representation of the certificate data :raises Exception: if certificate retrieval fails """ connection = self.auth.get_barbican_client(context.project_id) LOG.info('Loading certificate secret %s from Barbican.', cert_ref) try: cert_secret = connection.secrets.get(secret_ref=cert_ref) return pkcs12.PKCS12Cert(cert_secret.payload) except exceptions.UnreadablePKCS12: raise except Exception as e: LOG.warning('Failed to load PKCS12Cert for secret %s with %s', cert_ref, str(e)) LOG.warning('Falling back to the barbican_legacy implementation.') # If our get fails, try with the legacy driver. # TODO(rm_work): Remove this code when the deprecation cycle for # the legacy driver is complete. legacy_mgr = barbican_legacy.BarbicanCertManager() legacy_cert = legacy_mgr.get_cert( context, cert_ref, resource_ref=resource_ref, check_only=check_only, service_name=service_name ) return legacy_cert
[docs] def delete_cert(self, context, cert_ref, resource_ref, service_name=None): """Deregister as a consumer for the specified cert. :param context: Oslo context of the request :param cert_ref: the UUID of the cert to retrieve :param resource_ref: Full HATEOAS reference to the consuming resource :param service_name: Friendly name for the consuming service :raises Exception: if deregistration fails """ # TODO(rm_work): We won't take any action on a delete in this driver, # but for now try the legacy driver's delete and ignore failure. try: legacy_mgr = barbican_legacy.BarbicanCertManager(auth=self.auth) legacy_mgr.delete_cert( context, cert_ref, resource_ref, service_name=service_name) except Exception: # If the delete failed, it was probably because it isn't legacy # (this will be fixed once Secrets have Consumer registration). pass
[docs] def set_acls(self, context, cert_ref): LOG.debug('Setting project ACL for certificate secret...') self.auth.ensure_secret_access(context, cert_ref) # TODO(velizarx): Remove this code when the deprecation cycle for # the legacy driver is complete. legacy_mgr = barbican_legacy.BarbicanCertManager(auth=self.auth) legacy_mgr.set_acls(context, cert_ref)
[docs] def unset_acls(self, context, cert_ref): LOG.debug('Unsetting project ACL for certificate secret...') self.auth.revoke_secret_access(context, cert_ref) # TODO(velizarx): Remove this code when the deprecation cycle for # the legacy driver is complete. legacy_mgr = barbican_legacy.BarbicanCertManager(auth=self.auth) legacy_mgr.unset_acls(context, cert_ref)
[docs] def get_secret(self, context, secret_ref): """Retrieves a secret payload by reference. :param context: Oslo context of the request :param secret_ref: The secret reference ID :return: The secret payload :raises CertificateStorageException: if retrieval fails """ connection = self.auth.get_barbican_client(context.project_id) LOG.info('Loading secret %s from Barbican.', secret_ref) try: secret = connection.secrets.get(secret_ref=secret_ref) return secret.payload except Exception as e: LOG.error("Failed to access secret for %s due to: %s.", secret_ref, str(e)) raise exceptions.CertificateRetrievalException(ref=secret_ref)