Source code for heat.db.sqlalchemy.utils

#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

# SQLAlchemy helper functions

import sqlalchemy
from sqlalchemy.orm import exc
import tenacity

[docs]def clone_table(name, parent, meta, newcols=None, ignorecols=None, swapcols=None, ignorecons=None): """Helper function that clones parent table schema onto new table. :param name: new table name :param parent: parent table to copy schema from :param newcols: names of new columns to be added :param ignorecols: names of columns to be ignored while cloning :param swapcols: alternative column schema :param ignorecons: names of constraints to be ignored :return: sqlalchemy.Table instance """ newcols = newcols or [] ignorecols = ignorecols or [] swapcols = swapcols or {} ignorecons = ignorecons or [] cols = [c.copy() for c in parent.columns if not in ignorecols if not in swapcols] cols.extend(swapcols.values()) cols.extend(newcols) new_table = sqlalchemy.Table(name, meta, *(cols)) def _is_ignorable(cons): # consider constraints on columns only if hasattr(cons, 'columns'): for col in ignorecols: if col in cons.columns: return True return False constraints = [c.copy() for c in parent.constraints if not in ignorecons if not _is_ignorable(c)] for c in constraints: new_table.append_constraint(c) new_table.create() return new_table
[docs]def migrate_data(migrate_engine, table, new_table, skip_columns=None): table_name = list_of_rows = list( colnames = [ for c in table.columns] for row in list_of_rows: values = dict(zip(colnames, map(lambda colname: getattr(row, colname), colnames))) if skip_columns is not None: for column in skip_columns: del values[column] migrate_engine.execute(new_table.insert(values)) table.drop() new_table.rename(table_name)
[docs]def retry_on_stale_data_error(func): wrapper = tenacity.retry( stop=tenacity.stop_after_attempt(3), retry=tenacity.retry_if_exception_type(exc.StaleDataError), reraise=True) return wrapper(func)