watcher.db.sqlalchemy.job_store

Source code for watcher.db.sqlalchemy.job_store

# -*- encoding: utf-8 -*-
# Copyright (c) 2017 Servionica LTD
#
# Authors: Alexander Chadin <a.chadin@servionica.ru>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from oslo_serialization import jsonutils

from apscheduler.jobstores.base import ConflictingIdError
from apscheduler.jobstores import sqlalchemy
from apscheduler.util import datetime_to_utc_timestamp
from apscheduler.util import maybe_ref

from watcher.common import context
from watcher.common import service
from watcher import objects

try:
    import cPickle as pickle
except ImportError:  # pragma: nocover
    import pickle

from sqlalchemy import Table, MetaData, select, and_
from sqlalchemy.exc import IntegrityError


[docs]class WatcherJobStore(sqlalchemy.SQLAlchemyJobStore): """Stores jobs in a database table using SQLAlchemy. The table will be created if it doesn't exist in the database. Plugin alias: ``sqlalchemy`` :param str url: connection string :param engine: an SQLAlchemy Engine to use instead of creating a new one based on ``url`` :param str tablename: name of the table to store jobs in :param metadata: a :class:`~sqlalchemy.MetaData` instance to use instead of creating a new one :param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the highest available :param dict tag: tag description """ def __init__(self, url=None, engine=None, tablename='apscheduler_jobs', metadata=None, pickle_protocol=pickle.HIGHEST_PROTOCOL, tag=None): super(WatcherJobStore, self).__init__(url, engine, tablename, metadata, pickle_protocol) metadata = maybe_ref(metadata) or MetaData() self.jobs_t = Table(tablename, metadata, autoload=True, autoload_with=engine) service_ident = service.ServiceHeartbeat.get_service_name() self.tag = tag or {'host': service_ident[0], 'name': service_ident[1]} self.service_id = objects.Service.list(context=context.make_context(), filters=self.tag)[0].id
[docs] def start(self, scheduler, alias): # There should be called 'start' method of parent of SQLAlchemyJobStore super(self.__class__.__bases__[0], self).start(scheduler, alias)
[docs] def add_job(self, job): insert = self.jobs_t.insert().values(**{ 'id': job.id, 'next_run_time': datetime_to_utc_timestamp(job.next_run_time), 'job_state': pickle.dumps(job.__getstate__(), self.pickle_protocol), 'service_id': self.service_id, 'tag': jsonutils.dumps(self.tag) }) try: self.engine.execute(insert) except IntegrityError: raise ConflictingIdError(job.id)
[docs] def get_all_jobs(self): jobs = self._get_jobs(self.jobs_t.c.tag == jsonutils.dumps(self.tag)) self._fix_paused_jobs_sorting(jobs) return jobs
def _get_jobs(self, *conditions): jobs = [] conditions += (self.jobs_t.c.service_id == self.service_id,) selectable = select( [self.jobs_t.c.id, self.jobs_t.c.job_state, self.jobs_t.c.tag] ).order_by(self.jobs_t.c.next_run_time).where(and_(*conditions)) failed_job_ids = set() for row in self.engine.execute(selectable): try: jobs.append(self._reconstitute_job(row.job_state)) except Exception: self._logger.exception( 'Unable to restore job "%s" -- removing it', row.id) failed_job_ids.add(row.id) # Remove all the jobs we failed to restore if failed_job_ids: delete = self.jobs_t.delete().where( self.jobs_t.c.id.in_(failed_job_ids)) self.engine.execute(delete) return jobs
Creative Commons Attribution 3.0 License

Except where otherwise noted, this document is licensed under Creative Commons Attribution 3.0 License. See all OpenStack Legal Documents.