Source code for octavia.certificates.manager.castellan_mgr
# Copyright (c) 2017 GoDaddy
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Cert manager implementation for Castellan
"""
from castellan.common.objects import opaque_data
from castellan import key_manager
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import pkcs12 as c_pkcs12
from oslo_config import cfg
from oslo_log import log as logging
from octavia.certificates.common import pkcs12
from octavia.certificates.manager import cert_mgr
from octavia.common import exceptions
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
[docs]
class CastellanCertManager(cert_mgr.CertManager):
"""Certificate Manager for the Castellan library."""
def __init__(self):
super().__init__()
self.manager = key_manager.API(CONF)
[docs]
def store_cert(self, context, certificate, private_key, intermediates=None,
private_key_passphrase=None, expiration=None,
name="PKCS12 Certificate Bundle"):
if private_key_passphrase:
raise exceptions.CertificateStorageException(
"Passphrases protected PKCS12 certificates are not supported.")
p12_data = opaque_data.OpaqueData(
c_pkcs12.serialize_key_and_certificates(
name=None,
key=private_key,
cert=certificate,
cas=intermediates,
encryption_algorithm=serialization.NoEncryption()
),
name=name
)
self.manager.store(context, p12_data)
[docs]
def get_cert(self, context, cert_ref, resource_ref=None, check_only=False,
service_name=None):
certbag = self.manager.get(context, cert_ref)
certbag_data = certbag.get_encoded()
cert = pkcs12.PKCS12Cert(certbag_data)
return cert
[docs]
def delete_cert(self, context, cert_ref, resource_ref, service_name=None):
# Delete is not a great name for this -- we don't delete anything
# in reality, we just do cleanup here. For castellan, none is required
pass
[docs]
def set_acls(self, context, cert_ref):
# We don't manage ACL based access for things retrieved via Castellan
# because we assume we have elevated access to the secret store.
pass
[docs]
def unset_acls(self, context, cert_ref):
# We don't manage ACL based access for things retrieved via Castellan
# because we assume we have elevated access to the secret store.
pass
[docs]
def get_secret(self, context, secret_ref):
try:
certbag = self.manager.get(context, secret_ref)
certbag_data = certbag.get_encoded()
except Exception as e:
LOG.error("Failed to access secret for %s due to: %s.",
secret_ref, str(e))
raise exceptions.CertificateRetrievalException(ref=secret_ref)
return certbag_data