# Copyright 2010 OpenStack Foundation
# Copyright 2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import datetime
from http import client as http_client
from operator import itemgetter
import os
import shutil
from urllib import parse as urlparse
from oslo_log import log
from oslo_utils import strutils
from oslo_utils import uuidutils
import requests
from ironic.common import exception
from ironic.common.glance_service.image_service import GlanceImageService
from ironic.common.i18n import _
from ironic.common import oci_registry
from ironic.common import utils
from ironic.conf import CONF
IMAGE_CHUNK_SIZE = 1024 * 1024 # 1mb
# NOTE(JayF): This is the check-of-last-resort; we also have an allowlist
# enabled by default. These represent paths that under no circumstances should
# we access for file:// URLs
BLOCKED_FILE_URL_PATHS = {'/dev', '/sys', '/proc', '/boot', '/etc', '/run'}
LOG = log.getLogger(__name__)
[docs]
class BaseImageService(object, metaclass=abc.ABCMeta):
"""Provides retrieval of disk images."""
[docs]
@abc.abstractmethod
def validate_href(self, image_href):
"""Validate image reference.
:param image_href: Image reference.
:raises: exception.ImageRefValidationFailed.
:returns: Information needed to further operate with an image.
"""
[docs]
@abc.abstractmethod
def download(self, image_href, image_file):
"""Downloads image to specified location.
:param image_href: Image reference.
:param image_file: File object to write data to.
:raises: exception.ImageRefValidationFailed.
:raises: exception.ImageDownloadFailed.
"""
[docs]
@abc.abstractmethod
def show(self, image_href):
"""Get dictionary of image properties.
:param image_href: Image reference.
:raises: exception.ImageRefValidationFailed.
:returns: dictionary of image properties. It has three of them: 'size',
'updated_at' and 'properties'. 'updated_at' attribute is a naive
UTC datetime object.
"""
@property
def is_auth_set_needed(self):
"""Property to notify the caller if it needs to set authentication."""
return False
@property
def transfer_verified_checksum(self):
"""The transferred artifact checksum."""
return None
[docs]
class HttpImageService(BaseImageService):
"""Provides retrieval of disk images using HTTP."""
[docs]
@staticmethod
def gen_auth_from_conf_user_pass(image_href):
"""This function is used to pass the credentials to the chosen
credential verifier and in case the verification is successful
generate the compatible authentication object that will be used
with the request(s). This function handles the authentication object
generation for authentication strategies that are username+password
based. Credentials are collected from the oslo.config framework.
:param image_href: href of the image that is being acted upon
:return: Authentication object used directly by the request library
:rtype: requests.auth.HTTPBasicAuth
"""
image_server_user = None
image_server_password = None
if CONF.deploy.image_server_auth_strategy == 'http_basic':
HttpImageService.verify_basic_auth_cred_format(
CONF.deploy.image_server_user,
CONF.deploy.image_server_password,
image_href)
image_server_user = CONF.deploy.image_server_user
image_server_password = CONF.deploy.image_server_password
else:
return None
return requests.auth.HTTPBasicAuth(image_server_user,
image_server_password)
[docs]
def validate_href(self, image_href, secret=False):
"""Validate HTTP image reference.
:param image_href: Image reference.
:param secret: Specify if image_href being validated should not be
shown in exception message.
:raises: exception.ImageRefValidationFailed if HEAD request failed or
returned response code not equal to 200.
:raises: exception.ImageRefIsARedirect if the supplied URL is a
redirect to a different URL. The caller may be able to handle
this.
:returns: Response to HEAD request.
"""
output_url = 'secreturl' if secret else image_href
try:
verify = strutils.bool_from_string(CONF.webserver_verify_ca,
strict=True)
except ValueError:
verify = CONF.webserver_verify_ca
try:
auth = HttpImageService.gen_auth_from_conf_user_pass(image_href)
# NOTE(TheJulia): Head requests do not work on things that are not
# files, but they can be responded with redirects or a 200 OK....
# We don't want to permit endless redirects either, thus not
# request an override to the requests default to try and resolve
# redirects as otherwise we might end up with something like
# HTTPForbidden or a list of files. Both should be okay to at
# least know things are okay in a limited fashion.
response = requests.head(image_href, verify=verify,
timeout=CONF.webserver_connection_timeout,
auth=auth)
if response.status_code == http_client.MOVED_PERMANENTLY:
# NOTE(TheJulia): In the event we receive a redirect, we need
# to notify the caller. Before this we would just fail,
# but a url which is missing a trailing slash results in a
# redirect to a target path, and the caller *may* actually
# care about that.
redirect = requests.Session().get_redirect_target(response)
# Extra guard because this is pointless if there is no
# location in the field. Requests also properly formats
# our string for us, or gives us None.
if redirect:
raise exception.ImageRefIsARedirect(
image_ref=image_href,
redirect_url=redirect)
if (response.status_code == http_client.FORBIDDEN
and str(image_href).endswith('/')):
LOG.warning('Attempted to validate a URL %s, however we '
'received an HTTP Forbidden response and the '
'url ends with trailing slash (/), suggesting '
'non-image deploy may be in progress with '
'a webserver which is not permitting an index '
'to be generated. We will treat this as valid, '
'but return the response.', image_href)
return response
# NOTE(TheJulia): Any file list reply will proceed past here just
# fine as they are conveyed as an HTTP 200 OK response with a
# server rendered HTML document payload.
if response.status_code != http_client.OK:
raise exception.ImageRefValidationFailed(
image_href=output_url,
reason=_("Got HTTP code %s instead of 200 in response "
"to HEAD request.") % response.status_code)
except (OSError, requests.ConnectionError,
requests.RequestException) as e:
raise exception.ImageRefValidationFailed(image_href=output_url,
reason=str(e))
return response
[docs]
def download(self, image_href, image_file):
"""Downloads image to specified location.
:param image_href: Image reference.
:param image_file: File object to write data to.
:raises: exception.ImageRefValidationFailed if GET request returned
response code not equal to 200.
:raises: exception.ImageDownloadFailed if:
* IOError happened during file write;
* GET request failed.
"""
try:
verify = strutils.bool_from_string(CONF.webserver_verify_ca,
strict=True)
except ValueError:
verify = CONF.webserver_verify_ca
try:
auth = HttpImageService.gen_auth_from_conf_user_pass(image_href)
response = requests.get(image_href, stream=True, verify=verify,
timeout=CONF.webserver_connection_timeout,
auth=auth)
if response.status_code != http_client.OK:
raise exception.ImageRefValidationFailed(
image_href=image_href,
reason=_("Got HTTP code %s instead of 200 in response "
"to GET request.") % response.status_code)
with response.raw as input_img:
shutil.copyfileobj(input_img, image_file, IMAGE_CHUNK_SIZE)
except (OSError, requests.ConnectionError, requests.RequestException,
IOError) as e:
raise exception.ImageDownloadFailed(image_href=image_href,
reason=str(e))
[docs]
def show(self, image_href):
"""Get dictionary of image properties.
:param image_href: Image reference.
:raises: exception.ImageRefValidationFailed if:
* HEAD request failed;
* HEAD request returned response code not equal to 200;
* Content-Length header not found in response to HEAD request.
:returns: dictionary of image properties. It has three of them: 'size',
'updated_at' and 'properties'. 'updated_at' attribute is a naive
UTC datetime object.
"""
response = self.validate_href(image_href)
image_size = response.headers.get('Content-Length')
if image_size is None:
raise exception.ImageRefValidationFailed(
image_href=image_href,
reason=_("Cannot determine image size as there is no "
"Content-Length header specified in response "
"to HEAD request."))
# Parse last-modified header to return naive datetime object
str_date = response.headers.get('Last-Modified')
date = None
if str_date:
http_date_format_strings = [
'%a, %d %b %Y %H:%M:%S GMT', # RFC 822
'%A, %d-%b-%y %H:%M:%S GMT', # RFC 850
'%a %b %d %H:%M:%S %Y' # ANSI C
]
for fmt in http_date_format_strings:
try:
date = datetime.datetime.strptime(str_date, fmt)
break
except ValueError:
continue
no_cache = 'no-store' in response.headers.get('Cache-Control', '')
return {
'size': int(image_size),
'updated_at': date,
'properties': {},
'no_cache': no_cache,
}
[docs]
@staticmethod
def get(image_href):
"""Downloads content and returns the response text.
:param image_href: Image reference.
:raises: exception.ImageRefValidationFailed if GET request returned
response code not equal to 200.
:raises: exception.ImageDownloadFailed if:
* IOError happened during file write;
* GET request failed.
"""
try:
verify = strutils.bool_from_string(CONF.webserver_verify_ca,
strict=True)
except ValueError:
verify = CONF.webserver_verify_ca
try:
auth = HttpImageService.gen_auth_from_conf_user_pass(image_href)
response = requests.get(image_href, stream=False, verify=verify,
timeout=CONF.webserver_connection_timeout,
auth=auth)
if response.status_code != http_client.OK:
raise exception.ImageRefValidationFailed(
image_href=image_href,
reason=_("Got HTTP code %s instead of 200 in response "
"to GET request.") % response.status_code)
return response.text
except (OSError, requests.ConnectionError, requests.RequestException,
IOError) as e:
raise exception.ImageDownloadFailed(image_href=image_href,
reason=str(e))
[docs]
class OciImageService(BaseImageService):
"""Image Service class for accessing an OCI Container Registry."""
# Holding place on the instantiated class for the image processing
# request to house authentication data, because we have to support
# varying authentication to backend services.
_user_auth_data = None
# Field to house the verified checksum of the last downloaded content
# by the running class.
_verified_checksum = None
_client = None
def __init__(self):
verify = strutils.bool_from_string(CONF.webserver_verify_ca,
strict=True)
# Creates a client which we can use for actions.
# Note, this is not yet authenticated!
self._client = oci_registry.OciClient(verify=verify)
def _validate_url_is_specific(self, image_href):
"""Identifies if the supplied image_href is a manifest pointer.
Identifies if the image_href value is specific, and performs basic
data validation on the digest value to ensure it is as expected.
As a note, this does *not* consider a URL with a tag value as
specific enough, because that is a starting point in the data
structure view which can have multiple artifacts nested within
that view.
:param image_href: The user supplied image_href value to evaluate
if the URL is specific to to a specific manifest,
or is otherwise generalized and needs to be
identified.
:raises: OciImageNotSpecifc if the supplied image_href lacks a
required manifest digest value, or if the digest value
is not understood.
:raises: ImageRefValidationFailed if the supplied image_href
appears to be malformed and lacking a digest value,
or if the supplied data and values are the incorrect
length and thus invalid.
"""
href = urlparse.urlparse(image_href)
# Identify if we have an @ character denoting manifest
# reference in the path.
split_path = str(href.path).split('@')
if len(split_path) < 2:
# Lacks a manifest digest pointer being referenced.
raise exception.OciImageNotSpecific(image_ref=image_href)
# Extract the digest for evaluation.
hash_array = split_path[1].split(':')
if len(hash_array) < 2:
# We cannot parse something we don't understand. Specifically the
# supplied data appaears to be invalid.
raise exception.ImageRefValidationFailed(
image_href=image_href,
reason='Lacking required digest value')
algo = hash_array[0].lower()
value = hash_array[1].lower()
# Sanity check the checksum hash lengths to match types we expect.
# NOTE(TheJulia): Generally everything is sha256 with container
# registries, however there are open patches to also embrace sha512
# in the upstream registry code base.
if 'sha256' == algo:
if 64 != len(value):
raise exception.ImageRefValidationFailed(
image_href=image_href,
reason='Manifest digest length incorrect and does not '
'match the expected lenngth of the algorithm.')
elif 'sha512' == algo:
# While sha256 seems to be the convention, the go libraries and
# even the transport reference don't seem to explicitly set an
# expectation of what type. This is likely some future proofing
# more than anything else.
if 128 != len(value):
raise exception.ImageRefValidationFailed(
image_href=image_href,
reason='Manifest digest length incorrect and does not '
'match the expected lenngth of the algorithm.')
else:
LOG.error('Failed to parse %(image_href)s, unknown digest '
'algorithm %(algo)s.',
{'image_href': image_href,
'algo': algo})
raise exception.OciImageNotSpecific(image_ref=image_href)
[docs]
def validate_href(self, image_href, secret=None):
"""Validate OCI image reference.
This method is an alias of the ``show`` method on this class, which
exists only for API compatibility reasons. Ultimately, the show
method performs all of the same validation required.
:param image_href: Image reference.
:param secret: Unused setting.
:raises: exception.ImageRefValidationFailed
:raises: exception.OciImageNotSpecific
:returns: Identical output to the ``show`` method on this class
as this method is an alias of the ``show``.
"""
return self.show(image_href)
[docs]
def download(self, image_href, image_file):
"""Downloads image to specified location.
:param image_href: Image reference.
:param image_file: File object to write data to.
:raises: exception.ImageRefValidationFailed.
:raises: exception.ImageDownloadFailed.
:raises: exception.OciImageNotSpecific.
"""
# Call not permitted until we have a specific image_source.
self._validate_url_is_specific(image_href)
csum = self._client.download_blob_from_manifest(image_href,
image_file)
self._verified_checksum = csum
[docs]
def show(self, image_href):
"""Get dictionary of image properties.
:param image_href: Image reference.
:raises: exception.ImageRefValidationFailed.
:raises: exception.OciImageNotSpecific.
:returns: dictionary of image properties. It has three of them: 'size',
'checksum', and 'digest'
"""
self._validate_url_is_specific(image_href)
manifest = self._client.get_manifest(image_href)
layers = manifest.get('layers', [{}])
size = layers[0].get('size', 0)
digest = layers[0].get('digest')
checksum = None
if digest and ':' in digest:
# This should always be the case, but just being
# defensive given array interaction.
checksum = digest.split(':')[1]
# Return values to the caller so size handling can be
# navigated with the image cache, checksum saved to make
# everyone happy, and the original digest value to help
# generate a blob url path to enable download.
return {'size': size,
'checksum': checksum,
'digest': digest}
@property
def is_auth_set_needed(self):
"""Property to notify the caller if it needs to set authentication."""
return True
@property
def transfer_verified_checksum(self):
"""Property to notify the caller if it needs to set authentication."""
return self._verified_checksum
[docs]
def set_image_auth(self, image_url, auth_data):
"""Sets the supplied auth_data dictionary on the class for use later.
Provides a mechanism to inform the image service of specific
credentials without wiring this in as a first class citizen in
all image service interfaces.
:param auth_data: The authentication data dictionary holding username,
password, or other authentication data which may
be used by this client class.
:returns: None
:raises: AssertionError should this method be called twice
in the same workflow.
"""
if self._user_auth_data:
raise AssertionError("BUG: _user_auth_data should only be set"
"once in a overall workflow.")
if not auth_data and not CONF.oci.authentication_config:
# We have no data, and no settings, we should just quietly
# return, there is nothing to do.
return
if auth_data:
# Set a username and password. Bearer auth expects
# no valid user name in the code path of the oci client.
# The important as the passwords with bearer auth are
# full tokens.
self._user_auth_data = auth_data
username = auth_data.get('username')
password = auth_data.get('password')
else:
# Set username and password to None so the OCI client loads
# auth data from configuration.
username = None
password = None
self._client.authenticate(image_url, username, password)
[docs]
def identify_specific_image(self, image_href, image_download_source=None,
cpu_arch=None):
"""Identify a specific OCI Registry Artifact.
This method supports the caller, but is located in the image service
code to provide it access to the Container Registry client code which
holds the lower level methods.
The purpose of this method is to take the user requested image_href
and identify the best matching artifact attached to a container
registry's entry. This is because the container registry can
contain many artifacts which can be distributed and allocated
by different types. To achieve this goal, this method utilizes
the image_download_source to weight the preference of type of
file to look for, and the CPU architecture to enable support
for mutli-arch container registries.
In order to inform the caller about the url, as well as related
data, such as the manifest which points to the artifact, artifact
digest, known original filename of the artifact, this method
returns a dictionary with several fields which may be useful
to aid in understanding of what artifact was chosen.
:param image_href: The image URL as supplied by the Ironic user.
:param image_download_source: The Ironic image_download_source
value, defaults to None. When a value of 'local' is provided,
this method prefers selection of qcow images over raw images.
Otherwise, raw images are the preference.
:param cpu_arch: The Bare Metal node's defined CPU architecture,
if any. Defaults to None. When used, a direct match is sought
in the remote container registry. If 'x86_64' or 'amd64' is used,
the code searches for the values in the remote registry
interchangeably due to OCI data model standardizing on `amd64` as
the default value for 64bit x86 Architectures.
:returns: A dictionary with multiple values to the caller to aid
in returning the required HTTP URL, but also metadata about the
selected artifact including size, filename, blob digest, related
manifest digest, the remote recorded mediaType value, if the file
appears compressed, if the file appears to be a raw disk image,
any HTTP Authorization secret, if applicable, and the OCI
image manifest URL. As needs could be different based upon
different selection algorithms and evolving standards/approaches
in use of OCI registries, the dictionary can also be empty, or
contain different values and any caller should defensively use
information as needed. If a record is *not* found, a empty
dictionary is the result set. Under normal circumstances, the
result looks something like this example.
{
'image_url': 'https://fqdn/path',
'image_size': 1234567,
'image_filename': 'filename.raw.zstd',
'image_checksum': 'f00f...',
'image_container_blob_digest': 'sha256:f00f...',
'image_media_type': 'application/zstd,
'image_compression_type': 'zstd',
'image_disk_format': 'raw',
'image_request_authorization_secret': None,
'oci_image_manifest_url': 'https://fqdn/path@sha256:123f...',
}
"""
# TODO(TheJulia): Ideally we should call the referrers endpoint
# in the remote API, however, it is *very* new only having been
# approved in Mid-2024, is not widely available. It would allow
# the overall query sequence to take more of streamlined flow
# as opposed to the existing code which gets the index and then
# looks into manifest data.
# See
# https://github.com/opencontainers/image-spec/pull/934
# https://github.com/opencontainers/distribution-spec/pull/335
# An image_url tells us if we've found something matching what
# we're looking for.
image_url = None
requested_image = urlparse.urlparse(image_href)
if requested_image.path and '@' in requested_image.path:
LOG.debug('We have been given a specific URL, as such we are '
'skipping specific artifact detection.')
# We have a specific URL, we don't need to do anything else.
# FIXME(TheJulia): We need to improve this. Essentially we
# need to go get the image url
manifest = self.show(image_href)
# Identify the blob URL from the defining manifest for IPA.
image_url = self._client.get_blob_url(image_href,
manifest['digest'])
cached_auth = self._client.get_cached_auth()
return {
# Return an OCI url in case Ironic is doing the download
'oci_image_manifest_url': image_href,
# Return a checksum, so we don't make the checksum code
# angry!
'image_checksum': manifest['checksum'],
'image_url': image_url,
# NOTE(TheJulia) With the OCI data model, there is *no*
# way for us to know what the disk image format is.
# We can't look up, we're pointed at a manifest URL
# with limited information.
'image_disk_format': 'unknown',
'image_request_authorization_secret': cached_auth,
}
# Query the remote API for a list index list of manifests
artifact_index = self._client.get_artifact_index(image_href)
manifests = artifact_index.get('manifests', [])
if len(manifests) < 1:
# This is likely not going to happen, but we have nothing
# to identify and deploy based upon, so nothing found
# for user consistency.
raise exception.ImageNotFound(image_id=image_href)
if image_download_source == 'swift':
raise exception.InvalidParameterValue(
err="An image_download_source of swift is incompatible with "
"retrieval of artifacts from an OCI container registry.")
# Determine our preferences for matching
if image_download_source == 'local':
# These types are qcow2 images, we can download these and convert
# them, but it is okay for us to match a raw appearing image
# if we don't have a qcow available.
disk_format_priority = {'qcow2': 1,
'qemu': 2,
'raw': 3,
'applehv': 4}
else:
# applehv appears to be a raw image,
# raw is the Ironic community preference.
disk_format_priority = {'qcow2': 3,
'qemu': 4,
'raw': 1,
'applehv': 2}
# First thing to do, filter by disk types
# and assign a selection priority... since Ironic can handle
# several different formats without issue.
new_manifests = []
for manifest in manifests:
artifact_format = manifest.get('annotations', {}).get('disktype')
if artifact_format in disk_format_priority.keys():
manifest['_priority'] = disk_format_priority[artifact_format]
else:
manifest['_priority'] = 100
new_manifests.append(manifest)
sorted_manifests = sorted(new_manifests, key=itemgetter('_priority'))
# Iterate through the entries of manifests and evaluate them
# one by one to identify a likely item.
for manifest in sorted_manifests:
# First evaluate the architecture because ironic can operated in
# an architecture agnostic mode... and we *can* match on it, but
# it is one of the most constraining factors.
if cpu_arch:
# NOTE(TheJulia): amd64 is the noted standard format in the
# API for x86_64. One thing, at least observing quay.io hosted
# artifacts is that there is heavy use of x86_64 as instead
# of amd64 as expected by the specification. This same sort
# of pattern extends to arm64/aarch64.
if cpu_arch in ['x86_64', 'amd64']:
possible_cpu_arch = ['x86_64', 'amd64']
elif cpu_arch in ['arm64', 'aarch64']:
possible_cpu_arch = ['aarch64', 'arm64']
else:
possible_cpu_arch = [cpu_arch]
# Extract what the architecture is noted for the image, from
# the platform field.
architecture = manifest.get('platform', {}).get('architecture')
if architecture and architecture not in possible_cpu_arch:
# skip onward, we don't have a localized match
continue
# One thing podman is doing, and an ORAS client can set for
# upload, is annotations. This is ultimately the first point
# where we can identify likely artifacts.
# We also pre-sorted on disktype earlier, so in theory based upon
# preference, we should have the desired result as our first
# matching hint which meets the criteria.
disktype = manifest.get('annotations', {}).get('disktype')
if disktype:
if disktype in disk_format_priority.keys():
identified_manifest_digest = manifest.get('digest')
blob_manifest = self._client.get_manifest(
image_href, identified_manifest_digest)
layers = blob_manifest.get('layers', [])
if len(layers) != 1:
# This is a *multilayer* artifact, meaning a container
# construction, not a blob artifact in the OCI
# container registry. Odds are we're at the end of
# the references for what the user has requested
# consideration of as well, so it is good to log here.
LOG.info('Skipping consideration of container '
'registry manifest %s as it has multiple'
'layers.',
identified_manifest_digest)
continue
# NOTE(TheJulia): The resulting layer contents, has a
# mandatory mediaType value, which may be something like
# application/zstd or application/octet-stream and the
# an optional org.opencontainers.image.title annotation
# which would contain the filename the file was stored
# with in alignment with OARS annotations. Furthermore,
# there is an optional artifactType value with OCI
# distribution spec 1.1 (mid-2024) which could have
# been stored when the artifact was uploaded,
# but is optional. In any event, this is only available
# on the manifest contents, not further up unless we have
# the newer referrers API available. As of late 2024,
# quay.io did not offer the referrers API.
chosen_layer = layers[0]
blob_digest = chosen_layer.get('digest')
# Use the client helper to assemble a blob url, so we
# have consistency with what we expect and what we parse.
image_url = self._client.get_blob_url(image_href,
blob_digest)
image_size = chosen_layer.get('size')
chosen_original_filename = chosen_layer.get(
'annotations', {}).get(
'org.opencontainers.image.title')
manifest_digest = manifest.get('digest')
media_type = chosen_layer.get('mediaType')
is_raw_image = disktype in ['raw', 'applehv']
break
else:
# The case of there being no disk type in the entry.
# The only option here is to query the manifest contents out
# and based decisions upon that. :\
# We could look at the layers, count them, and maybe look at
# artifact types.
continue
if image_url:
# NOTE(TheJulia): Doing the final return dict generation as a
# last step in order to leave the door open to handling other
# types and structures for matches which don't use an annotation.
# TODO(TheJulia): We likely ought to check artifacttype,
# as well for any marker of the item being compressed.
# Also, shorthanded for +string format catching which is
# also a valid storage format.
if media_type.endswith('zstd'):
compression_type = 'zstd'
elif media_type.endswith('gzip'):
compression_type = 'gzip'
else:
compression_type = None
cached_auth = self._client.get_cached_auth()
# Generate new URL to reset the image_source to
# so download calls can use the OCI interface
# and code path moving forward.
url = urlparse.urlparse(image_href)
# Drop any trailing content indicating a tag
image_path = url.path.split(':')[0]
manifest = f'{url.scheme}://{url.netloc}{image_path}@{manifest_digest}' # noqa
return {
'image_url': image_url,
'image_size': image_size,
'image_filename': chosen_original_filename,
'image_checksum': blob_digest.split(':')[1],
'image_container_manifest_digest': manifest_digest,
'image_media_type': media_type,
'image_compression_type': compression_type,
'image_disk_format': 'raw' if is_raw_image else 'qcow2',
'image_request_authorization_secret': cached_auth,
'oci_image_manifest_url': manifest,
}
else:
# NOTE(TheJulia): This is likely future proofing, suggesting a
# future case where we're looking at the container, and we're not
# finding disk images, but it does look like a legitimate
# container. As such, here we're just returning an empty dict,
# and we can sort out the rest of the details once we get there.
return {}
[docs]
class FileImageService(BaseImageService):
"""Provides retrieval of disk images available locally on the conductor."""
[docs]
def validate_href(self, image_href):
"""Validate local image reference.
:param image_href: Image reference.
:raises: exception.ImageRefValidationFailed if source image file
doesn't exist, is in a blocked path, or is not in an allowed path.
:returns: Path to image file if it exists and is allowed.
"""
image_path = urlparse.urlparse(image_href).path
# Check if the path is in the blocklist
rpath = os.path.abspath(image_path)
for bad in BLOCKED_FILE_URL_PATHS:
if rpath == bad or rpath.startswith(bad + os.sep):
raise exception.ImageRefValidationFailed(
image_href=image_href,
reason=_("Security: The path %s is not permitted in file "
"URLs" % bad)
)
# Check if the path is in the allowlist
for allowed in CONF.conductor.file_url_allowed_paths:
if rpath == allowed or rpath.startswith(allowed + os.sep):
break
else:
raise exception.ImageRefValidationFailed(
image_href=image_href,
reason=_(
"Security: Path %s is not allowed for image source "
"file URLs" % image_path)
)
# Check if the file exists
if not os.path.isfile(image_path):
raise exception.ImageRefValidationFailed(
image_href=image_href,
reason=_("Specified image file not found."))
return image_path
[docs]
def download(self, image_href, image_file):
"""Downloads image to specified location.
:param image_href: Image reference.
:param image_file: File object to write data to.
:raises: exception.ImageRefValidationFailed if source image file
doesn't exist.
:raises: exception.ImageDownloadFailed if exceptions were raised while
writing to file or creating hard link.
"""
source_image_path = self.validate_href(image_href)
dest_image_path = image_file.name
try:
image_file.close()
os.remove(dest_image_path)
# NOTE(dtantsur): os.link is supposed to follow symlinks, but it
# does not: https://github.com/python/cpython/issues/81793
real_image_path = os.path.realpath(source_image_path)
try:
os.link(real_image_path, dest_image_path)
except OSError as exc:
orig = (f' (real path {real_image_path})'
if real_image_path != source_image_path
else '')
LOG.debug('Could not create a link from %(src)s%(orig)s to '
'%(dest)s, will copy the content instead. '
'Error: %(exc)s.',
{'src': source_image_path, 'dest': dest_image_path,
'orig': orig, 'exc': exc})
else:
return
# NOTE(dtantsur): starting with Python 3.8, copyfile() uses
# efficient copying (i.e. sendfile) under the hood.
shutil.copyfile(source_image_path, dest_image_path)
except Exception as e:
raise exception.ImageDownloadFailed(image_href=image_href,
reason=str(e))
[docs]
def show(self, image_href):
"""Get dictionary of image properties.
:param image_href: Image reference.
:raises: exception.ImageRefValidationFailed if image file specified
doesn't exist.
:returns: dictionary of image properties. It has three of them: 'size',
'updated_at' and 'properties'. 'updated_at' attribute is a naive
UTC datetime object.
"""
source_image_path = self.validate_href(image_href)
return {
'size': os.path.getsize(source_image_path),
'updated_at': utils.unix_file_modification_datetime(
source_image_path),
'properties': {},
# No point in caching local file images
'no_cache': True,
}
protocol_mapping = {
'http': HttpImageService,
'https': HttpImageService,
'file': FileImageService,
'glance': GlanceImageService,
'oci': OciImageService,
}
[docs]
def get_image_service(image_href, client=None, context=None):
"""Get image service instance to download the image.
:param image_href: String containing href to get image service for.
:param client: Glance client to be used for download, used only if
image_href is Glance href.
:param context: request context, used only if image_href is Glance href.
:raises: exception.ImageRefValidationFailed if no image service can
handle specified href.
:returns: Instance of an image service class that is able to download
specified image.
"""
scheme = urlparse.urlparse(image_href).scheme.lower()
if not scheme:
if uuidutils.is_uuid_like(str(image_href)):
cls = GlanceImageService
else:
# TODO(TheJulia): Consider looking for a attributes
# which suggest a container registry reference...
# because surely people will try.
raise exception.ImageRefValidationFailed(
image_href=image_href,
reason=_('Scheme-less image href is not a UUID.'))
else:
cls = protocol_mapping.get(scheme)
if not cls:
raise exception.ImageRefValidationFailed(
image_href=image_href,
reason=_('Image download protocol %s is not supported.'
) % scheme)
if cls == GlanceImageService:
return cls(client, context)
return cls()
[docs]
def get_image_service_auth_override(node, permit_user_auth=True):
"""Collect image service authentication overrides
This method is intended to collect authentication credentials
together for submission to remote image services which may have
authentication requirements which are not presently available,
or where specific authentication details are required.
:param task: A Node object instance.
:param permit_user_auth: Option to allow the caller to indicate if
user provided authentication should be permitted.
:returns: A dictionary with username and password keys containing
credential to utilize or None if no value found.
"""
# NOTE(TheJulia): This is largely necessary as in a pure OpenStack
# operating context, we assume the caller is just a glance image UUID
# and that Glance holds the secret. Ironic would then utilize it's static
# authentication to interact with Glance.
# TODO(TheJulia): It was not lost on me that the overall *general* idea
# here could similarly be leveraged to *enable* private user image access.
# While that wouldn't necessarily be right here in the code, it would
# likely need to be able to be picked up for user based authentication.
if permit_user_auth and 'image_pull_secret' in node.instance_info:
return {
# Pull secrets appear to leverage basic auth, but provide a blank
# username, where the password is understood to be the pre-shared
# secret to leverage for authentication.
'username': '',
'password': node.instance_info.get('image_pull_secret'),
}
elif 'image_pull_secret' in node.driver_info:
# Enables fallback to the driver_info field, as it is considered
# administratively set.
return {
'username': '',
'password': node.driver_info.get('image_pull_secret'),
}
# In the future, we likely want to add logic here to enable condutor
# configuration housed credentials.
else:
return None
[docs]
def is_container_registry_url(image_href):
"""Determine if the supplied reference string is an OCI registry URL.
:param image_href: A string containing a url, sourced from the
original user request.
:returns: True if the URL appears to be an OCI image registry
URL. Otherwise, False.
"""
if not isinstance(image_href, str):
return False
# Possible future idea: engage urlparse, and look at just the path
# field, since shorthand style gets parsed out without a network
# location, and parses the entire string as a path so we can detect
# the shorthand url style without a protocol definition.
return image_href.startswith('oci://')