Source code for ironic.common.oci_registry

#   Copyright 2025 Red Hat, Inc.
#
#   Licensed under the Apache License, Version 2.0 (the "License"); you may
#   not use this file except in compliance with the License. You may obtain
#   a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#   WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#   License for the specific language governing permissions and limitations
#   under the License.
#

# NOTE(TheJulia): This file is based upon, in part, some of the TripleO
# project container uploader.
# https://github.com/openstack-archive/tripleo-common/blame/stable/wallaby/tripleo_common/image/image_uploader.py

import base64
import json
import re
import requests
from requests import auth as requests_auth
import tenacity
from urllib import parse

from oslo_log import log as logging

from ironic.common import checksum_utils
from ironic.common import exception
from ironic.conf import CONF

LOG = logging.getLogger(__name__)


(
    CALL_MANIFEST,
    CALL_BLOB,
    CALL_TAGS,
) = (
    '%(image)s/manifests/%(tag)s',
    '%(image)s/blobs/%(digest)s',
    '%(image)s/tags/list',
)

(
    MEDIA_OCI_MANIFEST_V1,
    MEDIA_OCI_INDEX_V1,
) = (
    'application/vnd.oci.image.manifest.v1+json',
    'application/vnd.oci.image.index.v1+json',
)


[docs] class MakeSession(object): """Class method to uniformly create sessions. Sessions created by this class will retry on errors with an exponential backoff before raising an exception. Because our primary interaction is with the container registries the adapter will also retry on 401 and 404. This is being done because registries commonly return 401 when an image is not found, which is commonly a cache miss. See the adapter definitions for more on retry details. """ def __init__(self, verify=True): self.session = requests.Session() self.session.verify = verify
[docs] def create(self): return self.__enter__()
def __enter__(self): return self.session def __exit__(self, *args, **kwargs): self.session.close()
[docs] class RegistrySessionHelper(object): """Class with various registry session helpers This class contains a bunch of static methods to be used when making session requests against a container registry. The methods are primarily used to handle authentication/reauthentication for the requests against registries that require auth. """
[docs] @staticmethod def check_status(session, request, allow_reauth=True): """Check request status and trigger reauth This function can be used to check if we need to perform authentication for a container registry request because we've gotten a 401. """ text = getattr(request, 'text', 'unknown') reason = getattr(request, 'reason', 'unknown') status_code = getattr(request, 'status_code', None) headers = getattr(request, 'headers', {}) if status_code >= 300 and status_code != 401: LOG.info( 'OCI client got a Non-2xx: status %s, reason %s, text %s', status_code, reason, text) if status_code == 401: LOG.warning( 'OCI client failed: status %s, reason %s text %s', status_code, reason, text) www_auth = headers.get( 'www-authenticate', headers.get( 'Www-Authenticate' ) ) if www_auth: error = None # Handle docker.io shenanigans. docker.io will return 401 # for 403 and 404 but provide an error string. Other registries # like registry.redhat.io and quay.io do not do this. So if # we find an error string, check to see if we should reauth. do_reauth = allow_reauth if 'error=' in www_auth: error = re.search('error="(.*?)"', www_auth).group(1) LOG.warning( 'Error detected in auth headers: error %s', error) do_reauth = (error == 'invalid_token' and allow_reauth) if do_reauth: if hasattr(session, 'reauthenticate'): # This is a re-auth counter reauth = int(session.headers.get('_ReAuth', 0)) reauth += 1 session.headers['_ReAuth'] = str(reauth) session.reauthenticate(**session.auth_args) if status_code == 429: raise exception.ImageHostRateLimitFailure(image_ref=request.url) request.raise_for_status()
[docs] @staticmethod def check_redirect_trusted(request_response, request_session, stream=True, timeout=60): """Check if we've been redirected to a trusted source Because we may be using auth, we may not want to leak authentication keys to an untrusted source. If we get a redirect, we need to check that the redirect url is one of our sources that we trust. Otherwise we drop the Authorization header from the redirect request. We'll add the header back into the request session after performing the request to ensure that future usage of the session. :param: request_response: Response object of the request to check :param: request_session: Session to use when redirecting :param: stream: Should we stream the response of the redirect :param: timeout: Timeout for the redirect request """ # we're not a redirect, just return the original response if not (request_response.status_code >= 300 and request_response.status_code < 400): return request_response # parse the destination location redir_url = parse.urlparse(request_response.headers['Location']) # close the response since we're going to replace it request_response.close() auth_header = request_session.headers.pop('Authorization', None) # ok we got a redirect, let's check where we are going secure_cdn = CONF.oci.secure_cdn_registries # TODO(TheJulia): Consider breaking the session calls below into # a helper method, because as-is, this is basically impossible # to unit test the delienation in behavior. if len([h for h in secure_cdn if h in redir_url.netloc]) > 0: # we're going to a trusted location, add the header back and # return response request_session.headers.update({'Authorization': auth_header}) request_response = request_session.get(redir_url.geturl(), stream=stream, timeout=timeout) else: # we didn't trust the place we're going, request without auth but # add the auth back to the request session afterwards request_response = request_session.get(redir_url.geturl(), stream=stream, timeout=timeout) request_session.headers.update({'Authorization': auth_header}) request_response.encoding = 'utf-8' # recheck status here to make sure we didn't get a 401 from # our redirect host path. RegistrySessionHelper.check_status(session=request_session, request=request_response) return request_response
[docs] def get_token_from_config(fqdn): """Takes a FQDN for a container registry and consults auth config. This method evaluates named configuration parameter [oci]authentication_config and looks for pre-shared secrets in the supplied json file. It is written to defensively handle the file such that errors are not treated as fatal to the overall lookup process, but errors are logged. The expected file format is along the lines of: { "auths": { "domain.name": { "auth": "pre-shared-secret-value" } } } :param fqdn: A fully qualified domain name for interacting with the remote image registry. :returns: String value for the "auth" key which matches the supplied FQDN. """ if not CONF.oci.authentication_config: return auth = None try: with open(CONF.oci.authentication_config, 'r') as auth_file: auth_dict = json.load(auth_file) except OSError as e: LOG.error('Failed to load pre-shared authentication token ' 'data: %s', e) return except (json.JSONDecodeError, UnicodeDecodeError) as e: LOG.error('Unable to decode pre-shared authentication token ' 'data: %s', e) return try: # Limiting all key interactions here to capture any formatting # errors in one place. auth_dict = auth_dict['auths'] fqdn_dict = auth_dict.get(fqdn) auth = fqdn_dict.get('auth') except (AttributeError, KeyError): LOG.error('There was an error while looking up authentication ' 'for dns name %s. Possible misformatted file?') return return auth
[docs] @staticmethod def get_bearer_token(session, username=None, password=None, realm=None, service=None, scope=None): auth = None token_param = {} if service: token_param['service'] = service if scope: token_param['scope'] = scope if username: # NOTE(TheJulia): This won't be invoked under the current # client code which does not use a username. Tokens # have the username encoded within and the remote servers # know how to decode it. auth = requests.auth.HTTPBasicAuth(username, password) elif password: # This is a case where we have a pre-shared token. LOG.debug('Using user provided pre-shared authentication ' 'token to authenticate to the remote registry.') auth = requests.auth.HTTPBasicAuth('', password) else: realm_url = parse.urlparse(realm) local_token = RegistrySessionHelper.get_token_from_config( realm_url.netloc) if local_token: LOG.debug('Using a locally configured pre-shared key ' 'for authentication to the remote registry.') auth = requests.auth.HTTPBasicAuth('', local_token) auth_req = session.get(realm, params=token_param, auth=auth, timeout=CONF.webserver_connection_timeout) auth_req.raise_for_status() resp = auth_req.json() if 'token' not in resp: raise AttributeError('Invalid auth response, no token provide') return resp['token']
[docs] @staticmethod def parse_www_authenticate(header): auth_type = None auth_type_match = re.search('^([A-Za-z]*) ', header) if auth_type_match: auth_type = auth_type_match.group(1) if not auth_type: return (None, None, None) realm = None service = None if 'realm=' in header: realm = re.search('realm="(.*?)"', header).group(1) if 'service=' in header: service = re.search('service="(.*?)"', header).group(1) return (auth_type, realm, service)
@staticmethod @tenacity.retry( # Retry up to 5 times with longer time for rate limit reraise=True, retry=tenacity.retry_if_exception_type( exception.ImageHostRateLimitFailure ), wait=tenacity.wait_random_exponential(multiplier=1.5, max=60), stop=tenacity.stop_after_attempt(5) ) def _action(action, request_session, *args, **kwargs): """Perform a session action and retry if auth fails This function dynamically performs a specific type of call using the provided session (get, patch, post, etc). It will attempt a single re-authentication if the initial request fails with a 401. """ _action = getattr(request_session, action) try: req = _action(*args, **kwargs) if not kwargs.get('stream'): # The caller has requested a stream, likely download so # we really can't call check_status because it would force # full content transfer. RegistrySessionHelper.check_status(session=request_session, request=req) except requests.exceptions.HTTPError as e: if e.response.status_code == 401: req = _action(*args, **kwargs) RegistrySessionHelper.check_status(session=request_session, request=req) else: raise return req
[docs] @staticmethod def get(request_session, *args, **kwargs): """Perform a get and retry if auth fails This function is designed to be used when we perform a get to an authenticated source. This function will attempt a single re-authentication request if the first one fails. """ return RegistrySessionHelper._action('get', request_session, *args, **kwargs)
[docs] class OciClient(object): # The cached client authorization which may be used by for an # artifact being accessed by ironic-python-agent so we can retrieve # the authorization data and convey it to IPA without needing to # directly handle credentials to IPA. _cached_auth = None def __init__(self, verify): """Initialize the OCI container registry client class. :param verify: If certificate verification should be leveraged for the underlying HTTP client. """ # FIXME(TheJulia): This should come from configuration self.session = MakeSession(verify=verify).create()
[docs] def authenticate(self, image_url, username=None, password=None): """Authenticate to the remote container registry. :param image_url: The URL to utilise for the remote container registry. :param username: The username paraemter. :param password: The password parameter. :raises: AttributeError when an unknown authentication attribute has been specified by the remote service. :raises: ImageServiceAuthenticationRequired when the remote Container registry requires authentication but we do not have a credentials. """ url = self._image_to_url(image_url) image, tag = self._image_tag_from_url(url) scope = 'repository:%s:pull' % image[1:] url = self._build_url(url, path='/') # If authenticate is called an additional time.... # clear the authorization in the client. if self.session: self.session.headers.pop('Authorization', None) r = self.session.get(url, timeout=CONF.webserver_connection_timeout) LOG.debug('%s status code %s', url, r.status_code) if r.status_code == 200: # "Auth" was successful, returning. return self.session if r.status_code != 401: # Auth was rejected. r.raise_for_status() if 'www-authenticate' not in r.headers: # Something is wrong and unexpected. raise AttributeError( 'Unknown authentication method for headers: %s' % r.headers) auth = None www_auth = r.headers['www-authenticate'] token_param = {} (auth_type, realm, service) = \ RegistrySessionHelper.parse_www_authenticate(www_auth) if auth_type and auth_type.lower() == 'bearer': LOG.debug('Using bearer token auth') token = RegistrySessionHelper.get_bearer_token( self.session, username=username, password=password, realm=realm, service=service, scope=scope) elif auth_type and auth_type.lower() == 'basic': if not username or not password: raise exception.ImageServiceAuthenticationRequired( image_ref=image_url) auth = requests_auth.HTTPBasicAuth(username, password) rauth = self.session.get( url, params=token_param, auth=auth, timeout=CONF.webserver_connection_timeout) rauth.raise_for_status() token = ( base64.b64encode( bytes(username + ':' + password, 'utf-8')).decode('ascii') ) else: raise AttributeError( 'Unknown www-authenticate value: %s', www_auth) auth_header = '%s %s' % (auth_type, token) self.session.headers['Authorization'] = auth_header # Set a cached Authorization token value so we can extract it # if needed, useful for enabling something else to be able to # make that actual call. self._cached_auth = auth_header setattr(self.session, 'reauthenticate', self.authenticate) setattr( self.session, 'auth_args', dict( image_url=image_url, username=username, password=password, session=self.session ) )
@staticmethod def _get_response_text(response, encoding='utf-8', force_encoding=False): """Return request response text We need to set the encoding for the response other wise it will attempt to detect the encoding which is very time consuming. See https://github.com/psf/requests/issues/4235 for additional context. :param: response: requests Respoinse object :param: encoding: encoding to set if not currently set :param: force_encoding: set response encoding always """ if force_encoding or not response.encoding: response.encoding = encoding return response.text @classmethod def _build_url(cls, url, path): """Build an HTTPS URL from the input urlparse data. :param url: The urlparse result object with the netloc object which is extracted and used by this method. :param path: The path in the form of a string which is then assembled into an HTTPS URL to be used for access. :returns: A fully formed url in the form of https://ur. """ netloc = url.netloc scheme = 'https' return '%s://%s/v2%s' % (scheme, netloc, path) def _get_manifest(self, image_url, digest=None): if not digest: # Caller has the digest in the url, that's fine, lets # use that. digest = image_url.path.split('@')[1] image_path = image_url.path.split(':')[0] manifest_url = self._build_url( image_url, CALL_MANIFEST % {'image': image_path, 'tag': digest}) # Explicitly ask for the OCI artifact index manifest_headers = {'Accept': ", ".join([MEDIA_OCI_MANIFEST_V1])} try: manifest_r = RegistrySessionHelper.get( self.session, manifest_url, headers=manifest_headers, timeout=CONF.webserver_connection_timeout ) except requests.exceptions.HTTPError as e: if e.response.status_code == 401: # Authorization Required. raise exception.ImageServiceAuthenticationRequired( image_ref=manifest_url) if e.response.status_code in (403, 404): raise exception.ImageNotFound( image_id=image_url.geturl()) if e.response.status_code >= 500: raise exception.TemporaryFailure() raise manifest_str = self._get_response_text(manifest_r) checksum_utils.validate_text_checksum(manifest_str, digest) return json.loads(manifest_str) def _get_artifact_index(self, image_url): LOG.debug('Attempting to get the artifact index for: %s', image_url) parts = self._resolve_tag(image_url) index_url = self._build_url( image_url, CALL_MANIFEST % parts ) # Explicitly ask for the OCI artifact index index_headers = {'Accept': ", ".join([MEDIA_OCI_INDEX_V1])} try: index_r = RegistrySessionHelper.get( self.session, index_url, headers=index_headers, timeout=CONF.webserver_connection_timeout ) except requests.exceptions.HTTPError as e: if e.response.status_code == 401: # Authorization Required. raise exception.ImageServiceAuthenticationRequired( image_ref=index_url) if e.response.status_code in (403, 404): raise exception.ImageNotFound( image_id=image_url.geturl()) if e.response.status_code >= 500: raise exception.TemporaryFailure() raise index_str = self._get_response_text(index_r) # Return a dictionary to the caller so it can house the # filtering/sorting application logic. return json.loads(index_str) def _resolve_tag(self, image_url): """Attempts to resolve tags from a container URL.""" LOG.debug('Attempting to resolve tag for: %s', image_url) image, tag = self._image_tag_from_url(image_url) parts = { 'image': image, 'tag': tag } tags_url = self._build_url( image_url, CALL_TAGS % parts ) tag_headers = {'Accept': ", ".join([MEDIA_OCI_INDEX_V1])} try: tags_r = RegistrySessionHelper.get( self.session, tags_url, headers=tag_headers, timeout=CONF.webserver_connection_timeout) except requests.exceptions.HTTPError as e: if e.response.status_code == 401: # Authorization Required. raise exception.ImageServiceAuthenticationRequired( image_ref=tags_url) if e.response.status_code >= 500: raise exception.TemporaryFailure() raise tags = tags_r.json()['tags'] while 'next' in tags_r.links: next_url = parse.urljoin(tags_url, tags_r.links['next']['url']) tags_r = RegistrySessionHelper.get( self.session, next_url, headers=tag_headers, timeout=CONF.webserver_connection_timeout) tags.extend(tags_r.json()['tags']) if tag not in tags: raise exception.ImageNotFound( image_id=image_url.geturl()) return parts
[docs] def get_artifact_index(self, image): """Retrieve an index of artifacts in the Container Registry. :param image: The remote container registry URL in the form of oci://host/user/container:tag. :returns: A dictionary object representing the index of artifacts present in the container registry, in the form of manifest references along with any other metadata per entry which the remote registry returns such as annotations, and platform labeling which aids in artifact selection. """ image_url = self._image_to_url(image) return self._get_artifact_index(image_url)
[docs] def get_manifest(self, image, digest=None): """Retrieve a manifest from the remote API. This method is a wrapper for the _get_manifest helper, which normalizes the input URL, performs basic sanity checking, and then calls the underlying method to retrieve the manifest. The manifest is then returned to the caller in the form of a dictionary. :param image: The full URL to the desired manifest or the URL of the container and an accompanying digest parameter. :param digest: The Digest value for the requested manifest. :returns: A dictionary object representing the manifest as stored in the remote API. """ LOG.debug('Attempting to get manifest for: %s', image) if not digest and '@' in image: # Digest must be part of the URL, this is fine! url_split = image.split("@") image_url = self._image_to_url(url_split[0]) digest = url_split[1] elif digest and '@' in image: raise AttributeError('Invalid request - Appears to attempt ' 'to use a digest value and a digest in ' 'the provided URL.') else: image_url = self._image_to_url(image) return self._get_manifest(image_url, digest)
[docs] def get_blob_url(self, image, blob_digest): """Generates an HTTP representing an blob artifact. :param image: The OCI Container URL. :param blob_digest: The digest value representing the desired blob artifact. :returns: A HTTP URL string representing the blob URL which can be utilized by an HTTP client to retrieve the artifact. """ if not blob_digest and '@' in image: split_url = image.split('@') image_url = parse.urlparse(split_url[0]) blob_digest = split_url[1] elif blob_digest and '@' in image: split_url = image.split('@') image_url = parse.urlparse(split_url[0]) # The caller likely has a bug or bad pattern # which needs to be fixed else: image_url = parse.urlparse(image) # just in caes, split out the tag since it is not # used for a blob manifest lookup. image_path = image_url.path.split(':')[0] manifest_url = self._build_url( image_url, CALL_BLOB % {'image': image_path, 'digest': blob_digest}) return manifest_url
[docs] def get_cached_auth(self): """Retrieves the cached authentication header for reuse.""" # This enables the cached authentication data to be retrieved # to enable Ironic to provide that the data without shipping # credentials around directly. return self._cached_auth
[docs] def download_blob_from_manifest(self, manifest_url, image_file): """Retrieves the requested blob from the manifest URL... And saves the requested manifest's artifact as the requested image_file location, and then returns the verified checksum. :param manifest_url: The URL, in oci://host/user/container@digest formatted artifact manifest URL. This is *not* the digest value for the blob, which can only be discovered by retrieving the manifest. :param image_file: The image file object to write the blob to. :returns: The verified digest value matching the saved artifact. """ LOG.debug('Starting download blob download sequence for %s', manifest_url) manifest = self.get_manifest(manifest_url) layers = manifest.get('layers', []) layer_count = len(layers) if layer_count != 1: # This is not a blob manifest, it is the container, # or something else we don't understand. raise exception.ImageRefValidationFailed( 'Incorrect number of layers. Expected 1 layer, ' 'found %s layers.' % layer_count) blob_digest = layers[0].get('digest') blob_url = self.get_blob_url(manifest_url, blob_digest) LOG.debug('Identified download url for blob: %s', blob_url) # One which is an OCI URL with a manifest. try: resp = RegistrySessionHelper.get( self.session, blob_url, stream=True, timeout=CONF.webserver_connection_timeout ) resp = RegistrySessionHelper.check_redirect_trusted( resp, self.session, stream=True) if resp.status_code != 200: raise exception.ImageRefValidationFailed( image_href=blob_url, reason=("Got HTTP code %s instead of 200 in response " "to GET request.") % resp.status_code) # Reminder: image_file, is a file object handler. split_digest = blob_digest.split(':') # Invoke the transfer helper so the checksum can be calculated # in transfer. download_helper = checksum_utils.TransferHelper( resp, split_digest[0], split_digest[1]) # NOTE(TheJuila): If we *ever* try to have retry logic here, # remember to image_file.seek(0) to reset position. for chunk in download_helper: # write the desired file out image_file.write(chunk) LOG.debug('Download of %(manifest)s has completed. Transferred ' '%(bytes)s of %(total)s total bytes.', {'manifest': manifest_url, 'bytes': download_helper.bytes_transferred, 'total': download_helper.content_length}) if download_helper.checksum_matches: return blob_digest else: raise exception.ImageChecksumError() except requests.exceptions.HTTPError as e: LOG.debug('Encountered error while attempting to download %s', blob_url) # Stream changes the behavior, so odds of hitting # this area area a bit low unless an actual exception # is raised. if e.response.status_code == 401: # Authorization Required. raise exception.ImageServiceAuthenticationRequired( image_ref=blob_url) if e.response.status_code in (403, 404): raise exception.ImageNotFound(image_id=blob_url) if e.response.status_code >= 500: raise exception.TemporaryFailure() raise except (OSError, requests.ConnectionError, requests.RequestException, IOError) as e: raise exception.ImageDownloadFailed(image_href=blob_url, reason=str(e))
@classmethod def _image_tag_from_url(cls, image_url): """Identify image and tag from image_url. :param image_url: Input image url. :returns: Tuple of values, image URL which has been reconstructed and the requested tag. Defaults to 'latest' when a tag has not been identified as part of the supplied URL. """ if '@' in image_url.path: parts = image_url.path.split('@') tag = parts[-1] image = ':'.join(parts[:-1]) elif ':' in image_url.path: parts = image_url.path.split(':') tag = parts[-1] image = ':'.join(parts[:-1]) else: tag = 'latest' image = image_url.path return image, tag @classmethod def _image_to_url(cls, image): """Helper to create an OCI URL.""" if '://' not in image: # Slight bit of future proofing in case we ever support # identifying bare URLs. image = 'oci://' + image url = parse.urlparse(image) return url