Source code for ironic.common.checksum_utils

# Copyright (c) 2024 Red Hat, Inc.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import os
import re
import time
from urllib import parse as urlparse

from oslo_log import log as logging
from oslo_utils import fileutils

from ironic.common import exception
from ironic.common.i18n import _
from ironic.common import image_service
from ironic.conf import CONF

LOG = logging.getLogger(__name__)


# REGEX matches for Checksum file payloads
# If this list requires changes, it should be changed in
# ironic-python-agent (extensions/standby.py) as well.

MD5_MATCH = r"^([a-fA-F\d]{32})\s"  # MD5 at beginning of line
MD5_MATCH_END = r"\s([a-fA-F\d]{32})$"  # MD5 at end of line
MD5_MATCH_ONLY = r"^([a-fA-F\d]{32})$"  # MD5 only
SHA256_MATCH = r"^([a-fA-F\d]{64})\s"  # SHA256 at beginning of line
SHA256_MATCH_END = r"\s([a-fA-F\d]{64})$"  # SHA256 at end of line
SHA256_MATCH_ONLY = r"^([a-fA-F\d]{64})$"  # SHA256 only
SHA512_MATCH = r"^([a-fA-F\d]{128})\s"  # SHA512 at beginning of line
SHA512_MATCH_END = r"\s([a-fA-F\d]{128})$"  # SHA512 at end of line
SHA512_MATCH_ONLY = r"^([a-fA-F\d]{128})$"  # SHA512 only
FILENAME_MATCH_END = r"\s[*]?{filename}$"  # Filename binary/text end of line
FILENAME_MATCH_PARENTHESES = r"\s\({filename}\)\s"  # CentOS images

CHECKSUM_MATCHERS = (MD5_MATCH, MD5_MATCH_END, SHA256_MATCH, SHA256_MATCH_END,
                     SHA512_MATCH, SHA512_MATCH_END)
CHECKSUM_ONLY_MATCHERS = (MD5_MATCH_ONLY, SHA256_MATCH_ONLY, SHA512_MATCH_ONLY)
FILENAME_MATCHERS = (FILENAME_MATCH_END, FILENAME_MATCH_PARENTHESES)


[docs] def validate_checksum(path, checksum, checksum_algo=None): """Validate image checksum. :param path: File path in the form of a string to calculate a checksum which is compared to the checksum field. :param checksum: The supplied checksum value, a string, which will be compared to the file. :param checksum_algo: The checksum type of the algorithm. :raises: ImageChecksumError if the supplied data cannot be parsed or if the supplied value does not match the supplied checksum value. """ # TODO(TheJilia): At some point, we likely need to compare # the incoming checksum algorithm upfront, ut if one is invoked which # is not supported, hashlib will raise ValueError. use_checksum_algo = None if ":" in checksum: # A form of communicating the checksum algorithm is to delimit the # type from the value. See ansible deploy interface where this # is most evident. split_checksum = checksum.split(":") use_checksum = split_checksum[1] use_checksum_algo = split_checksum[0] else: use_checksum = checksum if not use_checksum_algo: use_checksum_algo = checksum_algo # If we have a zero length value, but we split it, we have # invalid input. Also, checksum is what we expect, algorithm is # optional. This guards against the split of a value which is # image_checksum = "sha256:" which is a potential side effect of # splitting the string. if use_checksum == '': raise exception.ImageChecksumError() # Make everything lower case since we don't expect mixed case, # but we may have human originated input on the supplied algorithm. try: if not use_checksum_algo: # This is backwards compatible support for a bare checksum. calculated = compute_image_checksum(path) else: calculated = compute_image_checksum(path, use_checksum_algo.lower()) except ValueError: # ValueError is raised when an invalid/unsupported/unknown # checksum algorithm is invoked. LOG.error("Failed to generate checksum for file %(path)s, possible " "invalid checksum algorithm: %(algo)s", {"path": path, "algo": use_checksum_algo}) raise exception.ImageChecksumAlgorithmFailure() except OSError: LOG.error("Failed to read file %(path)s to compute checksum.", {"path": path}) raise exception.ImageChecksumFileReadFailure() if (use_checksum is not None and calculated.lower() != use_checksum.lower()): LOG.error("We were supplied a checksum value of %(supplied)s, but " "calculated a value of %(value)s. This is a fatal error.", {"supplied": use_checksum, "value": calculated}) raise exception.ImageChecksumError()
[docs] def compute_image_checksum(image_path, algorithm='md5'): """Compute checksum by given image path and algorithm. :param image_path: The path to the file to undergo checksum calculation. :param algorithm: The checksum algorithm to utilize. Defaults to 'md5' due to historical support reasons in Ironic. :returns: The calculated checksum value. :raises: ValueError when the checksum algorithm is not supported by the system. """ time_start = time.time() LOG.debug('Start computing %(algo)s checksum for image %(image)s.', {'algo': algorithm, 'image': image_path}) checksum = fileutils.compute_file_checksum(image_path, algorithm=algorithm) time_elapsed = time.time() - time_start LOG.debug('Computed %(algo)s checksum for image %(image)s in ' '%(delta).2f seconds, checksum value: %(checksum)s.', {'algo': algorithm, 'image': image_path, 'delta': time_elapsed, 'checksum': checksum}) return checksum
[docs] def get_checksum_and_algo(instance_info): """Get and return the image checksum and algo. :param instance_info: The node instance info, or newly updated/generated instance_info value. :returns: A tuple containing two values, a checksum and algorithm, if available. """ checksum_algo = None if 'image_os_hash_value' in instance_info.keys(): # A value set by image_os_hash_value supersedes other # possible uses as it is specific. checksum = instance_info.get('image_os_hash_value') checksum_algo = instance_info.get('image_os_hash_algo') else: checksum = instance_info.get('image_checksum') if is_checksum_url(checksum): image_source = instance_info.get('image_source') checksum = get_checksum_from_url(checksum, image_source) # NOTE(TheJulia): This is all based on SHA-2 lengths. # SHA-3 would require a hint and it would not be a fixed length. # That said, SHA-2 is still valid and has not been withdrawn. checksum_len = len(checksum) if checksum_len == 128: # SHA2-512 is 512 bits, 128 characters. checksum_algo = "sha512" elif checksum_len == 64: checksum_algo = "sha256" if checksum_len == 32 and not CONF.agent.allow_md5_checksum: # MD5 not permitted and the checksum is the length of MD5 # and not otherwise defined. LOG.error('Cannot compute the checksum as it uses MD5 ' 'and is disabled by configuration. If the checksum ' 'is *not* MD5, please specify the algorithm.') raise exception.ImageChecksumAlgorithmFailure() return checksum, checksum_algo
[docs] def is_checksum_url(checksum): """Identify if checksum is not a url. :param checksum: The user supplied checksum value. :returns: True if the checksum is a url, otherwise False. :raises: ImageChecksumURLNotSupported should the conductor have this support disabled. """ if (checksum.startswith('http://') or checksum.startswith('https://')): if CONF.conductor.disable_support_for_checksum_files: raise exception.ImageChecksumURLNotSupported() return True else: return False
[docs] def get_checksum_from_url(checksum, image_source): """Gets a checksum value based upon a remote checksum URL file. :param checksum: The URL to the checksum URL content. :param image_soource: The image source utilized to match with the contents of the URL payload file. :raises: ImageDownloadFailed when the checksum file cannot be accessed or cannot be parsed. """ LOG.debug('Attempting to download checksum from: %(checksum)s.', {'checksum': checksum}) # Directly invoke the image service and get the checksum data. resp = image_service.HttpImageService.get(checksum) checksum_url = str(checksum) # NOTE(TheJulia): The rest of this method is taken from # ironic-python-agent. If a change is required here, it may # be required in ironic-python-agent (extensions/standby.py). lines = [line.strip() for line in resp.split('\n') if line.strip()] if not lines: raise exception.ImageDownloadFailed(image_href=checksum, reason=_('Checksum file empty.')) elif len(lines) == 1: # Special case - checksums file with only the checksum itself if ' ' not in lines[0]: for matcher in CHECKSUM_ONLY_MATCHERS: checksum = re.findall(matcher, lines[0]) if checksum: return checksum[0] raise exception.ImageDownloadFailed( image_href=checksum_url, reason=( _("Invalid checksum file (No valid checksum found)"))) # FIXME(dtantsur): can we assume the same name for all images? expected_fname = os.path.basename(urlparse.urlparse( image_source).path) for line in lines: # Ignore comment lines if line.startswith("#"): continue # Ignore checksums for other files for matcher in FILENAME_MATCHERS: if re.findall(matcher.format(filename=expected_fname), line): break else: continue for matcher in CHECKSUM_MATCHERS: checksum = re.findall(matcher, line) if checksum: return checksum[0] raise exception.ImageDownloadFailed( image_href=checksum, reason=(_("Checksum file does not contain name %s") % expected_fname))