Source code for glance.async_

# Copyright 2014 OpenStack Foundation
# All Rights Reserved.
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import futurist
from oslo_log import log as logging

from glance.i18n import _LE

LOG = logging.getLogger(__name__)

[docs] class TaskExecutor(object): """Base class for Asynchronous task executors. It does not support the execution mechanism. Provisions the extensible classes with necessary variables to utilize important Glance modules like, context, task_repo, image_repo, image_factory. Note: It also gives abstraction for the standard pre-processing and post-processing operations to be executed by a task. These may include validation checks, security checks, introspection, error handling etc. The aim is to give developers an abstract sense of the execution pipeline logic. Args: context: glance.context.RequestContext object for AuthZ and AuthN checks task_repo: glance.db.TaskRepo object which acts as a translator for glance.domain.Task and glance.domain.TaskStub objects into ORM semantics image_repo: glance.db.ImageRepo object which acts as a translator for glance.domain.Image object into ORM semantics image_factory: glance.domain.ImageFactory object to be used for creating new images for certain types of tasks viz. import, cloning admin_repo: glance.db.ImageRepo object which acts as a translator for glance.domain.Image object into ORM semantics, but with an admin context (optional) """ def __init__(self, context, task_repo, image_repo, image_factory, admin_repo=None): self.context = context self.task_repo = task_repo self.image_repo = image_repo self.image_factory = image_factory self.admin_repo = admin_repo
[docs] def begin_processing(self, task_id): task = self.task_repo.get(task_id) task.begin_processing() # start running self._run(task_id, task.type)
def _run(self, task_id, task_type): task = self.task_repo.get(task_id) msg = _LE("This execution of Tasks is not setup. Please consult the " "project documentation for more information on the " "executors available.") LOG.error(msg)"Internal error occurred while trying to process task."))
[docs] class ThreadPoolModel(object): """Base class for an abstract ThreadPool. Do not instantiate this directly, use one of the concrete implementations. """ DEFAULTSIZE = 1
[docs] @staticmethod def get_threadpool_executor_class(): """Returns a futurist.ThreadPoolExecutor class.""" pass
def __init__(self, size=None): if size is None: size = self.DEFAULTSIZE threadpool_cls = self.get_threadpool_executor_class() LOG.debug('Creating threadpool model %r with size %i', threadpool_cls.__name__, size) self.pool = threadpool_cls(size)
[docs] def spawn(self, fn, *args, **kwargs): """Spawn a function with args using the thread pool.""" LOG.debug('Spawning with %s: %s(%s, %s)', self.get_threadpool_executor_class().__name__, fn, args, kwargs) return self.pool.submit(fn, *args, **kwargs)
[docs] def map(self, fn, iterable): """Map a function to each value in an iterable. This spawns a thread for each item in the provided iterable, generating results in the same order. Each item will spawn a thread, and each may run in parallel up to the limit of the pool. :param fn: A function to work on each item :param iterable: A sequence of items to process :returns: A generator of results in the same order """ threads = [] for i in iterable: threads.append(self.spawn(fn, i)) for future in threads: yield future.result()
[docs] class EventletThreadPoolModel(ThreadPoolModel): """A ThreadPoolModel suitable for use with evenlet/greenthreads.""" DEFAULTSIZE = 1024
[docs] @staticmethod def get_threadpool_executor_class(): return futurist.GreenThreadPoolExecutor
[docs] class NativeThreadPoolModel(ThreadPoolModel): """A ThreadPoolModel suitable for use with native threads.""" DEFAULTSIZE = 16
[docs] @staticmethod def get_threadpool_executor_class(): return futurist.ThreadPoolExecutor
[docs] def set_threadpool_model(thread_type): """Set the system-wide threadpool model. This sets the type of ThreadPoolModel to use globally in the process. It should be called very early in init, and only once. :param thread_type: A string indicating the threading type in use, either "eventlet" or "native" :raises: RuntimeError if the model is already set or some thread_type other than one of the supported ones is provided. """ global _THREADPOOL_MODEL if thread_type == 'native': model = NativeThreadPoolModel elif thread_type == 'eventlet': model = EventletThreadPoolModel else: raise RuntimeError( ('Invalid thread type %r ' '(must be "native" or "eventlet")') % (thread_type)) if _THREADPOOL_MODEL is model: # Re-setting the same model is fine... return if _THREADPOOL_MODEL is not None: # ...changing it is not. raise RuntimeError('Thread model is already set')'Threadpool model set to %r', model.__name__) _THREADPOOL_MODEL = model
[docs] def get_threadpool_model(): """Returns the system-wide threadpool model class. This must be called after set_threadpool_model() whenever some code needs to know what the threadpool implementation is. This may only be called after set_threadpool_model() has been called to set the desired threading mode. If it is called before the model is set, it will raise AssertionError. This would likely be the case if this got run in a test before the model was initialized, or if glance modules that use threading were imported and run from some other code without setting the model first. :raises: AssertionError if the model has not yet been set. """ global _THREADPOOL_MODEL assert _THREADPOOL_MODEL return _THREADPOOL_MODEL