# Copyright 2013 IBM Corp.
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import http.client as http
import urllib.parse as urlparse
import debtcollector
import glance_store
from oslo_config import cfg
from oslo_log import log as logging
import oslo_serialization.jsonutils as json
from oslo_utils import encodeutils
from oslo_utils import uuidutils
import webob.exc
from glance.api import common
from glance.api import policy
from glance.api.v2 import policy as api_policy
from glance.common import exception
from glance.common import timeutils
from glance.common import wsgi
import glance.db
import glance.gateway
from glance.i18n import _, _LW
import glance.notifier
import glance.schema
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
CONF.import_opt('task_time_to_live', 'glance.common.config', group='task')
_DEPRECATION_MESSAGE = ("The task API is being deprecated and "
"it will be superseded by the new image import "
"API. Please refer to this link for more "
"information about the aforementioned process: "
"https://specs.openstack.org/openstack/glance-specs/"
"specs/mitaka/approved/image-import/"
"image-import-refactor.html")
[docs]
class TasksController(object):
"""Manages operations on tasks."""
def __init__(self, db_api=None, policy_enforcer=None, notifier=None,
store_api=None):
self.db_api = db_api or glance.db.get_api()
self.policy = policy_enforcer or policy.Enforcer()
self.notifier = notifier or glance.notifier.Notifier()
self.store_api = store_api or glance_store
self.gateway = glance.gateway.Gateway(self.db_api, self.store_api,
self.notifier, self.policy)
[docs]
@debtcollector.removals.remove(message=_DEPRECATION_MESSAGE)
def create(self, req, task):
# NOTE(rosmaita): access to this call is enforced in the deserializer
ctxt = req.context
task_factory = self.gateway.get_task_factory(ctxt)
executor_factory = self.gateway.get_task_executor_factory(ctxt)
task_repo = self.gateway.get_task_repo(ctxt)
try:
new_task = task_factory.new_task(
task_type=task['type'],
owner=ctxt.owner,
task_input=task['input'],
image_id=task['input'].get('image_id'),
user_id=ctxt.user_id,
request_id=ctxt.request_id)
task_repo.add(new_task)
task_executor = executor_factory.new_task_executor(ctxt)
pool = common.get_thread_pool("tasks_pool")
pool.spawn(new_task.run, task_executor)
except exception.Forbidden as e:
msg = (_LW("Forbidden to create task. Reason: %(reason)s")
% {'reason': encodeutils.exception_to_unicode(e)})
LOG.warning(msg)
raise webob.exc.HTTPForbidden(explanation=e.msg)
return new_task
[docs]
@debtcollector.removals.remove(message=_DEPRECATION_MESSAGE)
def index(self, req, marker=None, limit=None, sort_key='created_at',
sort_dir='desc', filters=None):
# NOTE(rosmaita): access to this call is enforced in the deserializer
result = {}
if filters is None:
filters = {}
filters['deleted'] = False
if limit is None:
limit = CONF.limit_param_default
limit = min(CONF.api_limit_max, limit)
task_repo = self.gateway.get_task_stub_repo(req.context)
try:
tasks = task_repo.list(marker, limit, sort_key,
sort_dir, filters)
if len(tasks) != 0 and len(tasks) == limit:
result['next_marker'] = tasks[-1].task_id
except (exception.NotFound, exception.InvalidSortKey,
exception.InvalidFilterRangeValue) as e:
LOG.warning(encodeutils.exception_to_unicode(e))
raise webob.exc.HTTPBadRequest(explanation=e.msg)
except exception.Forbidden as e:
LOG.warning(encodeutils.exception_to_unicode(e))
raise webob.exc.HTTPForbidden(explanation=e.msg)
result['tasks'] = tasks
return result
[docs]
@debtcollector.removals.remove(message=_DEPRECATION_MESSAGE)
def get(self, req, task_id):
_enforce_access_policy(self.policy, req)
try:
task_repo = self.gateway.get_task_repo(req.context)
task = task_repo.get(task_id)
except exception.NotFound as e:
msg = (_LW("Failed to find task %(task_id)s. Reason: %(reason)s")
% {'task_id': task_id,
'reason': encodeutils.exception_to_unicode(e)})
LOG.warning(msg)
raise webob.exc.HTTPNotFound(explanation=e.msg)
except exception.Forbidden as e:
msg = (_LW("Forbidden to get task %(task_id)s. Reason:"
" %(reason)s")
% {'task_id': task_id,
'reason': encodeutils.exception_to_unicode(e)})
LOG.warning(msg)
raise webob.exc.HTTPForbidden(explanation=e.msg)
return task
[docs]
@debtcollector.removals.remove(message=_DEPRECATION_MESSAGE)
def delete(self, req, task_id):
_enforce_access_policy(self.policy, req)
msg = (_("This operation is currently not permitted on Glance Tasks. "
"They are auto deleted after reaching the time based on "
"their expires_at property."))
raise webob.exc.HTTPMethodNotAllowed(explanation=msg,
headers={'Allow': 'GET'},
body_template='${explanation}')
[docs]
class RequestDeserializer(wsgi.JSONRequestDeserializer):
_required_properties = ['type', 'input']
def _get_request_body(self, request):
output = super(RequestDeserializer, self).default(request)
if 'body' not in output:
msg = _('Body expected in request.')
raise webob.exc.HTTPBadRequest(explanation=msg)
return output['body']
def _validate_sort_dir(self, sort_dir):
if sort_dir not in ['asc', 'desc']:
msg = _('Invalid sort direction: %s') % sort_dir
raise webob.exc.HTTPBadRequest(explanation=msg)
return sort_dir
def _get_filters(self, filters):
status = filters.get('status')
if status:
if status not in ['pending', 'processing', 'success', 'failure']:
msg = _('Invalid status value: %s') % status
raise webob.exc.HTTPBadRequest(explanation=msg)
type = filters.get('type')
if type:
if type not in ['import']:
msg = _('Invalid type value: %s') % type
raise webob.exc.HTTPBadRequest(explanation=msg)
return filters
def _validate_marker(self, marker):
if marker and not uuidutils.is_uuid_like(marker):
msg = _('Invalid marker format')
raise webob.exc.HTTPBadRequest(explanation=msg)
return marker
def _validate_limit(self, limit):
try:
limit = int(limit)
except ValueError:
msg = _("limit param must be an integer")
raise webob.exc.HTTPBadRequest(explanation=msg)
if limit < 0:
msg = _("limit param must be positive")
raise webob.exc.HTTPBadRequest(explanation=msg)
return limit
def _validate_create_body(self, body):
"""Validate the body of task creating request"""
for param in self._required_properties:
if param not in body:
msg = _("Task '%s' is required") % param
raise webob.exc.HTTPBadRequest(explanation=msg)
def __init__(self, schema=None, policy_engine=None):
super(RequestDeserializer, self).__init__()
self.schema = schema or get_task_schema()
# want to enforce the access policy as early as possible
self.policy_engine = policy_engine or policy.Enforcer()
[docs]
def create(self, request):
_enforce_access_policy(self.policy_engine, request)
body = self._get_request_body(request)
self._validate_create_body(body)
try:
self.schema.validate(body)
except exception.InvalidObject as e:
raise webob.exc.HTTPBadRequest(explanation=e.msg)
task = {}
properties = body
for key in self._required_properties:
try:
task[key] = properties.pop(key)
except KeyError:
pass
return dict(task=task)
[docs]
def index(self, request):
_enforce_access_policy(self.policy_engine, request)
params = request.params.copy()
limit = params.pop('limit', None)
marker = params.pop('marker', None)
sort_dir = params.pop('sort_dir', 'desc')
query_params = {
'sort_key': params.pop('sort_key', 'created_at'),
'sort_dir': self._validate_sort_dir(sort_dir),
'filters': self._get_filters(params)
}
if marker is not None:
query_params['marker'] = self._validate_marker(marker)
if limit is not None:
query_params['limit'] = self._validate_limit(limit)
return query_params
[docs]
class ResponseSerializer(wsgi.JSONResponseSerializer):
def __init__(self, task_schema=None, partial_task_schema=None):
super(ResponseSerializer, self).__init__()
self.task_schema = task_schema or get_task_schema()
self.partial_task_schema = (partial_task_schema
or _get_partial_task_schema())
def _inject_location_header(self, response, task):
location = self._get_task_location(task)
response.headers['Location'] = location
def _get_task_location(self, task):
return '/v2/tasks/%s' % task.task_id
def _format_task(self, schema, task):
task_view = {
'id': task.task_id,
'input': task.task_input,
'type': task.type,
'status': task.status,
'owner': task.owner,
'message': task.message,
'result': task.result,
'created_at': timeutils.isotime(task.created_at),
'updated_at': timeutils.isotime(task.updated_at),
'self': self._get_task_location(task),
'schema': '/v2/schemas/task'
}
if task.image_id:
task_view['image_id'] = task.image_id
if task.request_id:
task_view['request_id'] = task.request_id
if task.user_id:
task_view['user_id'] = task.user_id
if task.expires_at:
task_view['expires_at'] = timeutils.isotime(task.expires_at)
task_view = schema.filter(task_view) # domain
return task_view
def _format_task_stub(self, schema, task):
task_view = {
'id': task.task_id,
'type': task.type,
'status': task.status,
'owner': task.owner,
'created_at': timeutils.isotime(task.created_at),
'updated_at': timeutils.isotime(task.updated_at),
'self': self._get_task_location(task),
'schema': '/v2/schemas/task'
}
if task.expires_at:
task_view['expires_at'] = timeutils.isotime(task.expires_at)
task_view = schema.filter(task_view) # domain
return task_view
[docs]
def create(self, response, task):
response.status_int = http.CREATED
self._inject_location_header(response, task)
self.get(response, task)
[docs]
def get(self, response, task):
task_view = self._format_task(self.task_schema, task)
response.unicode_body = json.dumps(task_view, ensure_ascii=False)
response.content_type = 'application/json'
[docs]
def index(self, response, result):
params = dict(response.request.params)
params.pop('marker', None)
query = urlparse.urlencode(params)
body = {
'tasks': [self._format_task_stub(self.partial_task_schema, task)
for task in result['tasks']],
'first': '/v2/tasks',
'schema': '/v2/schemas/tasks',
}
if query:
body['first'] = '%s?%s' % (body['first'], query)
if 'next_marker' in result:
params['marker'] = result['next_marker']
next_query = urlparse.urlencode(params)
body['next'] = '/v2/tasks?%s' % next_query
response.unicode_body = json.dumps(body, ensure_ascii=False)
response.content_type = 'application/json'
_TASK_SCHEMA = {
"id": {
"description": _("An identifier for the task"),
"pattern": _('^([0-9a-fA-F]){8}-([0-9a-fA-F]){4}-([0-9a-fA-F]){4}'
'-([0-9a-fA-F]){4}-([0-9a-fA-F]){12}$'),
"type": "string"
},
"type": {
"description": _("The type of task represented by this content"),
"enum": [
"import",
"api_image_import",
"location_import",
],
"type": "string"
},
"status": {
"description": _("The current status of this task"),
"enum": [
"pending",
"processing",
"success",
"failure"
],
"type": "string"
},
"input": {
"description": _("The parameters required by task, JSON blob"),
"type": ["null", "object"],
},
"result": {
"description": _("The result of current task, JSON blob"),
"type": ["null", "object"],
},
"owner": {
"description": _("An identifier for the owner of this task"),
"type": "string"
},
"message": {
"description": _("Human-readable informative message only included"
" when appropriate (usually on failure)"),
"type": "string",
},
"image_id": {
"description": _("Image associated with the task"),
"type": "string",
},
"request_id": {
"description": _("Human-readable informative request-id"),
"type": "string",
},
"user_id": {
"description": _("User associated with the task"),
"type": "string",
},
"expires_at": {
"description": _("Datetime when this resource would be"
" subject to removal"),
"type": ["null", "string"]
},
"created_at": {
"description": _("Datetime when this resource was created"),
"type": "string"
},
"updated_at": {
"description": _("Datetime when this resource was updated"),
"type": "string"
},
'self': {
'readOnly': True,
'type': 'string'
},
'schema': {
'readOnly': True,
'type': 'string'
}
}
def _enforce_access_policy(policy_engine, request):
api_policy.TasksAPIPolicy(
request.context,
enforcer=policy_engine).tasks_api_access()
[docs]
def get_task_schema():
properties = copy.deepcopy(_TASK_SCHEMA)
schema = glance.schema.Schema('task', properties)
return schema
def _get_partial_task_schema():
properties = copy.deepcopy(_TASK_SCHEMA)
hide_properties = ['input', 'result', 'message']
for key in hide_properties:
del properties[key]
schema = glance.schema.Schema('task', properties)
return schema
[docs]
def get_collection_schema():
task_schema = _get_partial_task_schema()
return glance.schema.CollectionSchema('tasks', task_schema)
[docs]
def create_resource():
"""Task resource factory method"""
task_schema = get_task_schema()
partial_task_schema = _get_partial_task_schema()
deserializer = RequestDeserializer(task_schema)
serializer = ResponseSerializer(task_schema, partial_task_schema)
controller = TasksController()
return wsgi.Resource(controller, deserializer, serializer)