Source code for glance.api.v2.cached_images

# Copyright 2018 RedHat Inc.
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

"""
Controller for Image Cache Management API
"""

import queue
import threading

import glance_store
from oslo_config import cfg
from oslo_log import log as logging
import webob.exc

from glance.api import policy
from glance.api.v2 import policy as api_policy
from glance.common import exception
from glance.common import wsgi
import glance.db
import glance.gateway
from glance.i18n import _
from glance import image_cache
import glance.notifier


CONF = cfg.CONF
LOG = logging.getLogger(__name__)
WORKER = None


[docs] class CacheController(object): """ A controller for managing cached images. """ def __init__(self, db_api=None, policy_enforcer=None, notifier=None, store_api=None): global WORKER if not CONF.image_cache_dir: self.cache = None else: self.cache = image_cache.ImageCache() self.policy = policy_enforcer or policy.Enforcer() self.db_api = db_api or glance.db.get_api() self.notifier = notifier or glance.notifier.Notifier() self.store_api = store_api or glance_store self.gateway = glance.gateway.Gateway(self.db_api, self.store_api, self.notifier, self.policy) # Initialize the worker only if cache is enabled if CONF.image_cache_dir and not WORKER: # If we're the first, start the thread WORKER = CacheWorker() WORKER.start() LOG.debug('Started cache worker thread') def _enforce(self, req, image=None, new_policy=None): """Authorize request against given policy""" if not new_policy: new_policy = 'manage_image_cache' try: api_policy.CacheImageAPIPolicy( req.context, image=image, enforcer=self.policy, policy_str=new_policy).manage_image_cache() except exception.Forbidden: LOG.debug("User not permitted by '%s' policy", new_policy) raise webob.exc.HTTPForbidden() if not CONF.image_cache_dir: msg = _("Caching via API is not supported at this site.") raise webob.exc.HTTPNotFound(explanation=msg)
[docs] def get_cached_images(self, req): """ GET /cached_images Returns a mapping of records about cached images. """ self._enforce(req) images = self.cache.get_cached_images() return dict(cached_images=images)
[docs] def delete_cached_image(self, req, image_id): """ DELETE /cached_images/<IMAGE_ID> Removes an image from the cache. """ self._enforce(req) self.cache.delete_cached_image(image_id)
[docs] def delete_cached_images(self, req): """ DELETE /cached_images - Clear all active cached images Removes all images from the cache. """ self._enforce(req) return dict(num_deleted=self.cache.delete_all_cached_images())
[docs] def get_queued_images(self, req): """ GET /queued_images Returns a mapping of records about queued images. """ self._enforce(req) images = self.cache.get_queued_images() return dict(queued_images=images)
[docs] def queue_image(self, req, image_id): """ PUT /queued_images/<IMAGE_ID> Queues an image for caching. We do not check to see if the image is in the registry here. That is done by the prefetcher... """ self._enforce(req) self.cache.queue_image(image_id)
[docs] def delete_queued_image(self, req, image_id): """ DELETE /queued_images/<IMAGE_ID> Removes an image from the cache. """ self._enforce(req) self.cache.delete_queued_image(image_id)
[docs] def delete_queued_images(self, req): """ DELETE /queued_images - Clear all active queued images Removes all images from the cache. """ self._enforce(req) return dict(num_deleted=self.cache.delete_all_queued_images())
[docs] def delete_cache_entry(self, req, image_id): """ DELETE /cache/<IMAGE_ID> - Remove image from cache Removes the image from cache or queue. """ image_repo = self.gateway.get_repo(req.context) try: image = image_repo.get(image_id) except exception.NotFound: # We are going to raise this error only if image is # not present in cache or queue list image = None if not self.image_exists_in_cache(image_id): msg = _("Image %s not found.") % image_id LOG.warning(msg) raise webob.exc.HTTPNotFound(explanation=msg) self._enforce(req, new_policy='cache_delete', image=image) self.cache.delete_cached_image(image_id) self.cache.delete_queued_image(image_id)
[docs] def image_exists_in_cache(self, image_id): queued_images = self.cache.get_queued_images() if image_id in queued_images: return True cached_images = self.cache.get_cached_images() if image_id in [image['image_id'] for image in cached_images]: return True return False
[docs] def clear_cache(self, req): """ DELETE /cache - Clear cache and queue Removes all images from cache and queue. """ self._enforce(req, new_policy='cache_delete') target = req.headers.get('x-image-cache-clear-target', '').lower() if target == '': res = dict(cache_deleted=self.cache.delete_all_cached_images(), queue_deleted=self.cache.delete_all_queued_images()) elif target == 'cache': res = dict(cache_deleted=self.cache.delete_all_cached_images()) elif target == 'queue': res = dict(queue_deleted=self.cache.delete_all_queued_images()) else: reason = (_("If provided 'x-image-cache-clear-target' must be " "'cache', 'queue' or empty string.")) raise webob.exc.HTTPBadRequest(explanation=reason, request=req, content_type='text/plain') return res
[docs] def get_cache_state(self, req): """ GET /cache/ - Get currently cached and queued images Returns dict of cached and queued images """ self._enforce(req, new_policy='cache_list') return dict(cached_images=self.cache.get_cached_images(), queued_images=self.cache.get_queued_images())
[docs] def queue_image_from_api(self, req, image_id): """ PUT /cache/<IMAGE_ID> Queues an image for caching. We do not check to see if the image is in the registry here. That is done by the prefetcher... """ image_repo = self.gateway.get_repo(req.context) try: image = image_repo.get(image_id) except exception.NotFound: msg = _("Image %s not found.") % image_id LOG.warning(msg) raise webob.exc.HTTPNotFound(explanation=msg) self._enforce(req, new_policy='cache_image', image=image) if image.status != 'active': msg = _("Only images with status active can be targeted for " "queueing") raise webob.exc.HTTPBadRequest(explanation=msg) self.cache.queue_image(image_id) WORKER.submit(image_id)
[docs] class CacheWorker(threading.Thread): EXIT_SENTINEL = object() def __init__(self, *args, **kwargs): self.q = queue.Queue(maxsize=-1) # NOTE(abhishekk): Importing the prefetcher just in time to avoid # import loop during initialization from glance.image_cache import prefetcher # noqa self.prefetcher = prefetcher.Prefetcher() super().__init__(*args, **kwargs) # NOTE(abhishekk): Setting daemon to True because if `atexit` event # handler is not called due to some reason the main process will # not hang for the thread which will never exit. self.daemon = True
[docs] def submit(self, job): self.q.put(job)
[docs] def terminate(self): # NOTE(danms): Make the API workers call this before we exit # to make sure any cache operations finish. LOG.info('Signaling cache worker thread to exit') self.q.put(self.EXIT_SENTINEL) self.join() LOG.info('Cache worker thread exited')
[docs] def run(self): while True: task = self.q.get() if task == self.EXIT_SENTINEL: LOG.debug("CacheWorker thread exiting") break LOG.debug("Processing image '%s' for caching", task) self.prefetcher.fetch_image_into_cache(task) # do whatever work you have to do on task self.q.task_done() LOG.debug("Caching of an image '%s' is complete", task)
[docs] class CachedImageDeserializer(wsgi.JSONRequestDeserializer): pass
[docs] class CachedImageSerializer(wsgi.JSONResponseSerializer):
[docs] def queue_image_from_api(self, response, result): response.status_int = 202
[docs] def clear_cache(self, response, result): response.status_int = 204
[docs] def delete_cache_entry(self, response, result): response.status_int = 204
[docs] def create_resource(): """Cached Images resource factory method""" deserializer = CachedImageDeserializer() serializer = CachedImageSerializer() return wsgi.Resource(CacheController(), deserializer, serializer)