oslo_db.sqlalchemy package¶
Subpackages¶
Submodules¶
oslo_db.sqlalchemy.enginefacade module¶
- exception oslo_db.sqlalchemy.enginefacade.AlreadyStartedError¶
Bases:
TypeError
Raises when a factory is being asked to initialize a second time.
Subclasses
TypeError
for legacy support.
- class oslo_db.sqlalchemy.enginefacade.LegacyEngineFacade(sql_connection, slave_connection=None, sqlite_fk=False, expire_on_commit=False, _conf=None, _factory=None, **kwargs)¶
Bases:
object
A helper class for removing of global engine instances from oslo.db.
Deprecated since version 1.12.0: Please use
oslo_db.sqlalchemy.enginefacade
for new development.As a library, oslo.db can’t decide where to store/when to create engine and sessionmaker instances, so this must be left for a target application.
On the other hand, in order to simplify the adoption of oslo.db changes, we’ll provide a helper class, which creates engine and sessionmaker on its instantiation and provides get_engine()/get_session() methods that are compatible with corresponding utility functions that currently exist in target projects, e.g. in Nova.
engine/sessionmaker instances will still be global (and they are meant to be global), but they will be stored in the app context, rather that in the oslo.db context.
Two important things to remember:
An Engine instance is effectively a pool of DB connections, so it’s meant to be shared (and it’s thread-safe).
A Session instance is not meant to be shared and represents a DB transactional context (i.e. it’s not thread-safe). sessionmaker is a factory of sessions.
- Parameters:
sql_connection (string) – the connection string for the database to use
slave_connection (string) – the connection string for the ‘slave’ database to use. If not provided, the master database will be used for all operations. Note: this is meant to be used for offloading of read operations to asynchronously replicated slaves to reduce the load on the master database.
sqlite_fk (bool) – enable foreign keys in SQLite
expire_on_commit (bool) – expire session objects on commit
Keyword arguments:
- Parameters:
mysql_sql_mode – the SQL mode to be used for MySQL sessions. (defaults to TRADITIONAL)
mysql_wsrep_sync_wait – value of wsrep_sync_wait for Galera (defaults to None, which indicates no setting will be passed)
connection_recycle_time – Time period for connections to be recycled upon checkout (defaults to 3600)
connection_debug – verbosity of SQL debugging information. -1=Off, 0=None, 100=Everything (defaults to 0)
max_pool_size – maximum number of SQL connections to keep open in a pool (defaults to SQLAlchemy settings)
max_overflow – if set, use this value for max_overflow with sqlalchemy (defaults to SQLAlchemy settings)
pool_timeout – if set, use this value for pool_timeout with sqlalchemy (defaults to SQLAlchemy settings)
sqlite_synchronous – if True, SQLite uses synchronous mode (defaults to True)
connection_trace – add python stack traces to SQL as comment strings (defaults to False)
max_retries – maximum db connection retries during startup. (setting -1 implies an infinite retry count) (defaults to 10)
retry_interval – interval between retries of opening a sql connection (defaults to 10)
thread_checkin – boolean that indicates that between each engine checkin event a sleep(0) will occur to allow other greenthreads to run (defaults to True)
- classmethod from_config(conf, sqlite_fk=False, expire_on_commit=False)¶
Initialize EngineFacade using oslo.config config instance options.
- Parameters:
conf (oslo_config.cfg.ConfigOpts) – oslo.config config instance
sqlite_fk (bool) – enable foreign keys in SQLite
expire_on_commit (bool) – expire session objects on commit
- get_engine(use_slave=False)¶
Get the engine instance (note, that it’s shared).
- Parameters:
use_slave (bool) – if possible, use ‘slave’ database for this engine. If the connection string for the slave database wasn’t provided, ‘master’ engine will be returned. (defaults to False)
- get_session(use_slave=False, **kwargs)¶
Get a Session instance.
- Parameters:
use_slave (bool) – if possible, use ‘slave’ database connection for this session. If the connection string for the slave database wasn’t provided, a session bound to the ‘master’ engine will be returned. (defaults to False)
Keyword arguments will be passed to a sessionmaker instance as is (if passed, they will override the ones used when the sessionmaker instance was created). See SQLAlchemy Session docs for details.
- get_sessionmaker(use_slave=False)¶
Get the sessionmaker instance used to create a Session.
This can be called for those cases where the sessionmaker() is to be temporarily injected with some state such as a specific connection.
- oslo_db.sqlalchemy.enginefacade.configure(**kw)¶
Apply configurational options to the global factory.
This method can only be called before any specific transaction-beginning methods have been called.
See also
_TransactionFactory.configure()
- oslo_db.sqlalchemy.enginefacade.get_legacy_facade()¶
Return a
LegacyEngineFacade
for the global factory.This facade will make use of the same engine and sessionmaker as this factory, however will not share the same transaction context; the legacy facade continues to work the old way of returning a new Session each time get_session() is called.
- oslo_db.sqlalchemy.enginefacade.reader = <oslo_db.sqlalchemy.enginefacade._TransactionContextManager object>¶
The global ‘reader’ starting point.
- oslo_db.sqlalchemy.enginefacade.transaction_context()¶
Construct a local transaction context.
- oslo_db.sqlalchemy.enginefacade.transaction_context_provider(klass)¶
Decorate a class with
session
andconnection
attributes.
- oslo_db.sqlalchemy.enginefacade.writer = <oslo_db.sqlalchemy.enginefacade._TransactionContextManager object>¶
The global ‘writer’ starting point.
oslo_db.sqlalchemy.engines module¶
Core SQLAlchemy connectivity routines.
- oslo_db.sqlalchemy.engines.create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None, mysql_wsrep_sync_wait=None, connection_recycle_time=3600, connection_debug=0, max_pool_size=None, max_overflow=None, pool_timeout=None, sqlite_synchronous=True, connection_trace=False, max_retries=10, retry_interval=10, thread_checkin=True, logging_name=None, json_serializer=None, json_deserializer=None, connection_parameters=None)¶
Return a new SQLAlchemy engine.
oslo_db.sqlalchemy.exc_filters module¶
Define exception redefinitions for SQLAlchemy DBAPI exceptions.
- oslo_db.sqlalchemy.exc_filters.filters(dbname, exception_type, regex)¶
Mark a function as receiving a filtered exception.
- Parameters:
dbname – string database name, e.g. ‘mysql’
exception_type – a SQLAlchemy database exception class, which extends from
sqlalchemy.exc.DBAPIError
.regex – a string, or a tuple of strings, that will be processed as matching regular expressions.
- oslo_db.sqlalchemy.exc_filters.handler(context)¶
Iterate through available filters and invoke those which match.
The first one which raises wins. The order in which the filters are attempted is sorted by specificity - dialect name or “*”, exception class per method resolution order (
__mro__
). Method resolution order is used so that filter rules indicating a more specific exception class are attempted first.
- oslo_db.sqlalchemy.exc_filters.register_engine(engine)¶
oslo_db.sqlalchemy.models module¶
SQLAlchemy models.
- class oslo_db.sqlalchemy.models.ModelBase¶
Bases:
object
Base class for models.
- get(key, default=None)¶
- items()¶
Make the model object behave like a dict.
- iteritems()¶
Make the model object behave like a dict.
- keys()¶
Make the model object behave like a dict.
- save(session)¶
Save this object.
- update(values)¶
Make the model object behave like a dict.
- class oslo_db.sqlalchemy.models.ModelIterator(model, columns)¶
Bases:
object
oslo_db.sqlalchemy.orm module¶
SQLAlchemy ORM connectivity and query structures.
- class oslo_db.sqlalchemy.orm.Query(entities: _ColumnsClauseArgument[Any] | Sequence[_ColumnsClauseArgument[Any]], session: Session | None = None)¶
Bases:
Query
Subclass of sqlalchemy.query with soft_delete() method.
- soft_delete(synchronize_session='evaluate')¶
- update_on_match(specimen, surrogate_key, values, **kw)¶
Emit an UPDATE statement matching the given specimen.
This is a method-version of oslo_db.sqlalchemy.update_match.update_on_match(); see that function for usage details.
- update_returning_pk(values, surrogate_key)¶
Perform an UPDATE, returning the primary key of the matched row.
This is a method-version of oslo_db.sqlalchemy.update_match.update_returning_pk(); see that function for usage details.
- class oslo_db.sqlalchemy.orm.Session(bind: _SessionBind | None = None, *, autoflush: bool = True, future: Literal[True] = True, expire_on_commit: bool = True, autobegin: bool = True, twophase: bool = False, binds: Dict[_SessionBindKey, _SessionBind] | None = None, enable_baked_queries: bool = True, info: _InfoType | None = None, query_cls: Type[Query[Any]] | None = None, autocommit: Literal[False] = False, join_transaction_mode: JoinTransactionMode = 'conditional_savepoint', close_resets_only: bool | _NoArg = _NoArg.NO_ARG)¶
Bases:
Session
oslo.db-specific Session subclass.
- oslo_db.sqlalchemy.orm.get_maker(engine, autocommit=False, expire_on_commit=False)¶
Return a SQLAlchemy sessionmaker using the given engine.
oslo_db.sqlalchemy.provision module¶
Provision test environment for specific DB backends
- class oslo_db.sqlalchemy.provision.Backend(database_type, url)¶
Bases:
object
Represent a particular database backend that may be provisionable.
The
Backend
object maintains a database type (e.g. database without specific driver type, such as “sqlite”, “postgresql”, etc.), a target URL, a baseEngine
for that URL object that can be used to provision databases and aBackendImpl
which knows how to perform operations against this type ofEngine
.- classmethod all_viable_backends()¶
Return an iterator of all
Backend
objects that are presentand provisionable.
- classmethod backend_for_database_type(database_type)¶
Return the
Backend
for the given database type.
- backends_by_database_type = {'mysql': <oslo_db.sqlalchemy.provision.Backend object>, 'postgresql': <oslo_db.sqlalchemy.provision.Backend object>, 'sqlite': <oslo_db.sqlalchemy.provision.Backend object>}¶
- create_named_database(ident, conditional=False)¶
Create a database with the given name.
- database_exists(ident)¶
Return True if a database of the given name exists.
- drop_all_objects(engine)¶
Drop all database objects.
Drops all database objects remaining on the default schema of the given engine.
- drop_named_database(ident, conditional=False)¶
Drop a database with the given name.
- provisioned_database_url(ident)¶
Given the identifier of an anoymous database, return a URL.
For hostname-based URLs, this typically involves switching just the ‘database’ portion of the URL with the given name and creating a URL.
For SQLite URLs, the identifier may be used to create a filename or may be ignored in the case of a memory database.
- class oslo_db.sqlalchemy.provision.BackendImpl(drivername)¶
Bases:
object
Provide database-specific implementations of key provisioning
functions.
BackendImpl
is owned by aBackend
instance which delegates to it for all database-specific features.- classmethod all_impls()¶
Return an iterator of all possible BackendImpl objects.
These are BackendImpls that are implemented, but not necessarily provisionable.
- abstract create_named_database(engine, ident, conditional=False)¶
Create a database with the given name.
- abstract create_opportunistic_driver_url()¶
Produce a string url known as the ‘opportunistic’ URL.
This URL is one that corresponds to an established OpenStack convention for a pre-established database login, which, when detected as available in the local environment, is automatically used as a test platform for a specific type of driver.
- default_engine_kwargs = {}¶
- dispose(engine)¶
- drop_additional_objects(conn)¶
- drop_all_objects(engine)¶
Drop all database objects.
Drops all database objects remaining on the default schema of the given engine.
Per-db implementations will also need to drop items specific to those systems, such as sequences, custom types (e.g. pg ENUM), etc.
- abstract drop_named_database(engine, ident, conditional=False)¶
Drop a database with the given name.
- impl = <oslo_db.sqlalchemy.utils.DialectSingleFunctionDispatcher object>¶
- provisioned_database_url(base_url, ident)¶
Return a provisioned database URL.
Given the URL of a particular database backend and the string name of a particular ‘database’ within that backend, return an URL which refers directly to the named database.
For hostname-based URLs, this typically involves switching just the ‘database’ portion of the URL with the given name and creating an engine.
For URLs that instead deal with DSNs, the rules may be more custom; for example, the engine may need to connect to the root URL and then emit a command to switch to the named database.
- supports_drop_fk = True¶
- class oslo_db.sqlalchemy.provision.BackendResource(database_type, ad_hoc_url=None)¶
Bases:
TestResourceManager
- clean(resource)¶
Override this to class method to hook into resource removal.
- isDirty()¶
Return True if this managers cached resource is dirty.
Calling when the resource is not currently held has undefined behaviour.
- make(dependency_resources)¶
Override this to construct resources.
- Parameters:
dependency_resources – A dict mapping name -> resource instance for the resources specified as dependencies.
- Returns:
The made resource.
- class oslo_db.sqlalchemy.provision.DatabaseResource(database_type, _enginefacade=None, provision_new_database=True, ad_hoc_url=None)¶
Bases:
TestResourceManager
Database resource which connects and disconnects to a URL.
For SQLite, this means the database is created implicitly, as a result of SQLite’s usual behavior. If the database is a file-based URL, it will remain after the resource has been torn down.
For all other kinds of databases, the resource indicates to connect and disconnect from that database.
- clean(resource)¶
Override this to class method to hook into resource removal.
- isDirty()¶
Return True if this managers cached resource is dirty.
Calling when the resource is not currently held has undefined behaviour.
- make(dependency_resources)¶
Override this to construct resources.
- Parameters:
dependency_resources – A dict mapping name -> resource instance for the resources specified as dependencies.
- Returns:
The made resource.
- class oslo_db.sqlalchemy.provision.ProvisionedDatabase(backend, enginefacade, engine, db_token)¶
Bases:
object
Represents a database engine pointing to a DB ready to run tests.
backend: an instance of
Backend
enginefacade: an instance of
_TransactionFactory
engine: a SQLAlchemy
Engine
- db_token: if provision_new_database were used, this is the randomly
generated name of the database. Note that with SQLite memory connections, this token is ignored. For a database that wasn’t actually created, will be None.
- backend¶
- db_token¶
- engine¶
- enginefacade¶
- class oslo_db.sqlalchemy.provision.Schema¶
Bases:
object
“Represents a database schema that has or will be populated.
This is a marker object as required by testresources but otherwise serves no purpose.
- database¶
- class oslo_db.sqlalchemy.provision.SchemaResource(database_resource, generate_schema, teardown=False)¶
Bases:
TestResourceManager
- clean(resource)¶
Override this to class method to hook into resource removal.
- isDirty()¶
Return True if this managers cached resource is dirty.
Calling when the resource is not currently held has undefined behaviour.
- make(dependency_resources)¶
Override this to construct resources.
- Parameters:
dependency_resources – A dict mapping name -> resource instance for the resources specified as dependencies.
- Returns:
The made resource.
oslo_db.sqlalchemy.session module¶
Session Handling for SQLAlchemy backend.
Recommended ways to use sessions within this framework:
Use the
enginefacade
system for connectivity, session and transaction management:from oslo_db.sqlalchemy import enginefacade @enginefacade.reader def get_foo(context, foo): return (model_query(models.Foo, context.session). filter_by(foo=foo). first()) @enginefacade.writer def update_foo(context, id, newfoo): (model_query(models.Foo, context.session). filter_by(id=id). update({'foo': newfoo})) @enginefacade.writer def create_foo(context, values): foo_ref = models.Foo() foo_ref.update(values) foo_ref.save(context.session) return foo_ref
In the above system, transactions are committed automatically, and are shared among all dependent database methods. Ensure that methods which “write” data are enclosed within @writer blocks.
Note
Statements in the session scope will not be automatically retried.
If you create models within the session, they need to be added, but you do not need to call model.save():
@enginefacade.writer def create_many_foo(context, foos): for foo in foos: foo_ref = models.Foo() foo_ref.update(foo) context.session.add(foo_ref) @enginefacade.writer def update_bar(context, foo_id, newbar): foo_ref = (model_query(models.Foo, context.session). filter_by(id=foo_id). first()) (model_query(models.Bar, context.session). filter_by(id=foo_ref['bar_id']). update({'bar': newbar}))
The two queries in update_bar can alternatively be expressed using a single query, which may be more efficient depending on scenario:
@enginefacade.writer def update_bar(context, foo_id, newbar): subq = (model_query(models.Foo.id, context.session). filter_by(id=foo_id). limit(1). subquery()) (model_query(models.Bar, context.session). filter_by(id=subq.as_scalar()). update({'bar': newbar}))
For reference, this emits approximately the following SQL statement:
UPDATE bar SET bar = '${newbar}' WHERE id=(SELECT bar_id FROM foo WHERE id = '${foo_id}' LIMIT 1);
Note
create_duplicate_foo is a trivially simple example of catching an exception while using a savepoint. Here we create two duplicate instances with same primary key, must catch the exception out of context managed by a single session:
@enginefacade.writer def create_duplicate_foo(context): foo1 = models.Foo() foo2 = models.Foo() foo1.id = foo2.id = 1 try: with context.session.begin_nested(): session.add(foo1) session.add(foo2) except exception.DBDuplicateEntry as e: handle_error(e)
The enginefacade system eliminates the need to decide when sessions need to be passed between methods. All methods should instead share a common context object; the enginefacade system will maintain the transaction across method calls.
@enginefacade.writer def myfunc(context, foo): # do some database things bar = _private_func(context, foo) return bar def _private_func(context, foo): with enginefacade.using_writer(context) as session: # do some other database things session.add(SomeObject()) return bar
Avoid
with_lockmode('UPDATE')
when possible.FOR UPDATE is not compatible with MySQL/Galera. Instead, an “opportunistic” approach should be used, such that if an UPDATE fails, the entire transaction should be retried. The @wrap_db_retry decorator is one such system that can be used to achieve this.
Enabling soft deletes:
To use/enable soft-deletes, SoftDeleteMixin may be used. For example:
class NovaBase(models.SoftDeleteMixin, models.ModelBase): pass
Efficient use of soft deletes:
While there is a
model.soft_delete()
method, preferquery.soft_delete()
. Some examples:@enginefacade.writer def soft_delete_bar(context): # synchronize_session=False will prevent the ORM from attempting # to search the Session for instances matching the DELETE; # this is typically not necessary for small operations. count = model_query(BarModel, context.session).\ find(some_condition).soft_delete(synchronize_session=False) if count == 0: raise Exception("0 entries were soft deleted") @enginefacade.writer def complex_soft_delete_with_synchronization_bar(context): # use synchronize_session='evaluate' when you'd like to attempt # to update the state of the Session to match that of the DELETE. # This is potentially helpful if the operation is complex and # continues to work with instances that were loaded, though # not usually needed. count = (model_query(BarModel, context.session). find(some_condition). soft_delete(synchronize_session='evaulate')) if count == 0: raise Exception("0 entries were soft deleted")
- oslo_db.sqlalchemy.session.EngineFacade¶
alias of
LegacyEngineFacade
- class oslo_db.sqlalchemy.session.Query(entities: _ColumnsClauseArgument[Any] | Sequence[_ColumnsClauseArgument[Any]], session: Session | None = None)¶
Bases:
Query
Subclass of sqlalchemy.query with soft_delete() method.
- soft_delete(synchronize_session='evaluate')¶
- update_on_match(specimen, surrogate_key, values, **kw)¶
Emit an UPDATE statement matching the given specimen.
This is a method-version of oslo_db.sqlalchemy.update_match.update_on_match(); see that function for usage details.
- update_returning_pk(values, surrogate_key)¶
Perform an UPDATE, returning the primary key of the matched row.
This is a method-version of oslo_db.sqlalchemy.update_match.update_returning_pk(); see that function for usage details.
- class oslo_db.sqlalchemy.session.Session(bind: _SessionBind | None = None, *, autoflush: bool = True, future: Literal[True] = True, expire_on_commit: bool = True, autobegin: bool = True, twophase: bool = False, binds: Dict[_SessionBindKey, _SessionBind] | None = None, enable_baked_queries: bool = True, info: _InfoType | None = None, query_cls: Type[Query[Any]] | None = None, autocommit: Literal[False] = False, join_transaction_mode: JoinTransactionMode = 'conditional_savepoint', close_resets_only: bool | _NoArg = _NoArg.NO_ARG)¶
Bases:
Session
oslo.db-specific Session subclass.
- oslo_db.sqlalchemy.session.create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None, mysql_wsrep_sync_wait=None, connection_recycle_time=3600, connection_debug=0, max_pool_size=None, max_overflow=None, pool_timeout=None, sqlite_synchronous=True, connection_trace=False, max_retries=10, retry_interval=10, thread_checkin=True, logging_name=None, json_serializer=None, json_deserializer=None, connection_parameters=None)¶
Return a new SQLAlchemy engine.
- oslo_db.sqlalchemy.session.get_maker(engine, autocommit=False, expire_on_commit=False)¶
Return a SQLAlchemy sessionmaker using the given engine.
oslo_db.sqlalchemy.test_base module¶
- oslo_db.sqlalchemy.test_base.backend_specific(*dialects)¶
Decorator to skip backend specific tests on inappropriate engines.
::dialects: list of dialects names under which the test will be launched.
oslo_db.sqlalchemy.test_fixtures module¶
- class oslo_db.sqlalchemy.test_fixtures.AdHocDbFixture(url=None)¶
Bases:
SimpleDbFixture
“Fixture which creates and disposes a database engine per test.
Also allows a specific URL to be passed, meaning the fixture can be hardcoded to a specific SQLite file.
For a SQLite, this fixture will create the named database upon setup and tear it down upon teardown. For other databases, the database is assumed to exist already and will remain after teardown.
- class oslo_db.sqlalchemy.test_fixtures.BaseDbFixture(driver=None, ident=None)¶
Bases:
Fixture
Base database provisioning fixture.
This serves as the base class for the other fixtures, but by itself does not implement _setUp(). It provides the basis for the flags implemented by the various capability mixins (GenerateSchema, DeletesFromSchema, etc.) as well as providing an abstraction over the provisioning objects, which are specific to testresources. Overall, consumers of this fixture just need to use the right classes and the testresources mechanics are taken care of.
- DRIVER = 'sqlite'¶
- get_enginefacade()¶
Return an enginefacade._TransactionContextManager.
This is typically a global variable like “context_manager” declared in the db/api.py module and is the object returned by enginefacade.transaction_context().
If left not implemented, the global enginefacade manager is used.
For the case where a project uses per-object or per-test enginefacades like Gnocchi, the get_per_test_enginefacade() method should also be implemented.
- get_per_test_enginefacade()¶
Return an enginefacade._TransactionContextManager per test.
This facade should be the one that the test expects the code to use. Usually this is the same one returned by get_engineafacade() which is the default. For special applications like Gnocchi, this can be overridden to provide an instance-level facade.
- class oslo_db.sqlalchemy.test_fixtures.DeletesFromSchema¶
Bases:
ResetsData
Mixin defining a fixture that can delete from all tables in place.
When DeletesFromSchema is present in a fixture, _DROP_SCHEMA_PER_TEST is now False; this means that the “teardown” flag of provision.SchemaResource will be False, which prevents SchemaResource from dropping all objects within the schema after each test.
This is a “capability” mixin that works in conjunction with classes that include BaseDbFixture as a base.
- delete_from_schema(engine)¶
A hook which should delete all data from an existing schema.
Should not drop any objects, just remove data from tables that needs to be reset between tests.
- reset_schema_data(engine, facade)¶
Reset the data in the schema.
- class oslo_db.sqlalchemy.test_fixtures.GeneratesSchema¶
Bases:
object
Mixin defining a fixture as generating a schema using create_all().
This is a “capability” mixin that works in conjunction with classes that include BaseDbFixture as a base.
- generate_schema_create_all(engine)¶
A hook which should generate the model schema using create_all().
This hook is called within the scope of creating the database assuming BUILD_WITH_MIGRATIONS is False.
- class oslo_db.sqlalchemy.test_fixtures.GeneratesSchemaFromMigrations¶
Bases:
GeneratesSchema
Mixin defining a fixture as generating a schema using migrations.
This is a “capability” mixin that works in conjunction with classes that include BaseDbFixture as a base.
- generate_schema_migrations(engine)¶
A hook which should generate the model schema using migrations.
This hook is called within the scope of creating the database assuming BUILD_WITH_MIGRATIONS is True.
- class oslo_db.sqlalchemy.test_fixtures.MySQLOpportunisticFixture(test, driver=None, ident=None)¶
Bases:
OpportunisticDbFixture
- DRIVER = 'mysql'¶
- class oslo_db.sqlalchemy.test_fixtures.OpportunisticDBTestMixin¶
Bases:
object
Test mixin that integrates the test suite with testresources.
There are three goals to this system:
Allow creation of “stub” test suites that will run all the tests in a parent suite against a specific kind of database (e.g. Mysql, Postgresql), where the entire suite will be skipped if that target kind of database is not available to the suite.
provide a test with a process-local, anonymously named schema within a target database, so that the test can run concurrently with other tests without conflicting data
provide compatibility with the testresources.OptimisingTestSuite, which organizes TestCase instances ahead of time into groups that all make use of the same type of database, setting up and tearing down a database schema once for the scope of any number of tests within. This technique is essential when testing against a non-SQLite database because building of a schema is expensive, and also is most ideally accomplished using the applications schema migration which are even more vastly slow than a straight create_all().
This mixin provides the .resources attribute required by testresources when using the OptimisingTestSuite.The .resources attribute then provides a collection of testresources.TestResourceManager objects, which are defined here in oslo_db.sqlalchemy.provision. These objects know how to find available database backends, build up temporary databases, and invoke schema generation and teardown instructions. The actual “build the schema objects” part of the equation, and optionally a “delete from all the tables” step, is provided by the implementing application itself.
- FIXTURE¶
alias of
OpportunisticDbFixture
- SKIP_ON_UNAVAILABLE_DB = True¶
- generate_fixtures()¶
- property resources¶
Provide a collection of TestResourceManager objects.
The collection here is memoized, both at the level of the test case itself, as well as in the fixture object(s) which provide those resources.
- setUp()¶
- class oslo_db.sqlalchemy.test_fixtures.OpportunisticDbFixture(test, driver=None, ident=None)¶
Bases:
BaseDbFixture
Fixture which uses testresources fully for optimised runs.
This fixture relies upon the use of the OpportunisticDBTestMixin to supply a test.resources attribute, and also works much more effectively when combined the testresources.OptimisingTestSuite. The optimize_package_test_loader() function should be used at the module and package levels to optimize database provisioning across many tests.
- class oslo_db.sqlalchemy.test_fixtures.PostgresqlOpportunisticFixture(test, driver=None, ident=None)¶
Bases:
OpportunisticDbFixture
- DRIVER = 'postgresql'¶
- class oslo_db.sqlalchemy.test_fixtures.ReplaceEngineFacadeFixture(enginefacade, replace_with_enginefacade)¶
Bases:
Fixture
A fixture that will plug the engine of one enginefacade into another.
This fixture can be used by test suites that already have their own non- oslo_db database setup / teardown schemes, to plug any URL or test-oriented enginefacade as-is into an enginefacade-oriented API.
For applications that use oslo.db’s testing fixtures, the ReplaceEngineFacade fixture is used internally.
E.g.:
class MyDBTest(TestCase): def setUp(self): from myapplication.api import main_enginefacade my_test_enginefacade = enginefacade.transaction_context() my_test_enginefacade.configure(connection=my_test_url) self.useFixture( ReplaceEngineFacadeFixture( main_enginefacade, my_test_enginefacade))
Above, the main_enginefacade object is the normal application level one, and my_test_enginefacade is a local one that we’ve created to refer to some testing database. Throughout the fixture’s setup, the application level enginefacade will use the engine factory and engines of the testing enginefacade, and at fixture teardown will be replaced back.
- class oslo_db.sqlalchemy.test_fixtures.ResetsData¶
Bases:
object
Mixin defining a fixture that resets schema data without dropping.
- reset_schema_data(engine, enginefacade)¶
Reset the data in the schema.
- setup_for_reset(engine, enginefacade)¶
“Perform setup that may be needed before the test runs.
- class oslo_db.sqlalchemy.test_fixtures.SimpleDbFixture(driver=None, ident=None)¶
Bases:
BaseDbFixture
Fixture which provides an engine from a fixed URL.
The SimpleDbFixture is generally appropriate only for a SQLite memory database, as this database is naturally isolated from other processes and does not require management of schemas. For tests that need to run specifically against MySQL or Postgresql, the OpportunisticDbFixture is more appropriate.
The database connection information itself comes from the provisoning system, matching the desired driver (typically sqlite) to the default URL that provisioning provides for this driver (in the case of sqlite, it’s the SQLite memory URL, e.g. sqlite://. For MySQL and Postgresql, it’s the familiar “openstack_citest” URL on localhost).
There are a variety of create/drop schemes that can take place:
The default is to procure a database connection on setup, and at teardown, an instruction is issued to “drop” all objects in the schema (e.g. tables, indexes). The SQLAlchemy engine itself remains referenced at the class level for subsequent re-use.
When the GeneratesSchema or GeneratesSchemaFromMigrations mixins are implemented, the appropriate generate_schema method is also called when the fixture is set up, by default this is per test.
When the DeletesFromSchema mixin is implemented, the generate_schema method is now only called once, and the “drop all objects” system is replaced with the delete_from_schema method. This allows the same database to remain set up with all schema objects intact, so that expensive migrations need not be run on every test.
The fixture does not dispose the engine at the end of a test. It is assumed the same engine will be re-used many times across many tests. The AdHocDbFixture extends this one to provide engine.dispose() at the end of a test.
This fixture is intended to work without needing a reference to the test itself, and therefore cannot take advantage of the OptimisingTestSuite.
- oslo_db.sqlalchemy.test_fixtures.optimize_module_test_loader()¶
Organize module-level tests into a testresources.OptimizingTestSuite.
This function provides a unittest-compatible load_tests hook for a given module; for per-package, use the
optimize_package_test_loader()
function.When a unitest or subunit style test runner is used, the function will be called in order to return a TestSuite containing the tests to run; this function ensures that this suite is an OptimisingTestSuite, which will organize the production of test resources across groups of tests at once.
The function is invoked as:
from oslo_db.sqlalchemy import test_fixtures load_tests = test_fixtures.optimize_module_test_loader()
The loader must be present in an individual module, and not the package level __init__.py.
The function also applies testscenarios expansion to all test collections. This so that an existing test suite that already needs to build TestScenarios from a load_tests call can still have this take place when replaced with this function.
- oslo_db.sqlalchemy.test_fixtures.optimize_package_test_loader(file_)¶
Organize package-level tests into a testresources.OptimizingTestSuite.
This function provides a unittest-compatible load_tests hook for a given package; for per-module, use the
optimize_module_test_loader()
function.When a unitest or subunit style test runner is used, the function will be called in order to return a TestSuite containing the tests to run; this function ensures that this suite is an OptimisingTestSuite, which will organize the production of test resources across groups of tests at once.
The function is invoked as:
from oslo_db.sqlalchemy import test_fixtures load_tests = test_fixtures.optimize_package_test_loader(__file__)
The loader must be present in the package level __init__.py.
The function also applies testscenarios expansion to all test collections. This so that an existing test suite that already needs to build TestScenarios from a load_tests call can still have this take place when replaced with this function.
oslo_db.sqlalchemy.test_migrations module¶
- class oslo_db.sqlalchemy.test_migrations.ModelsMigrationsSync¶
Bases:
object
A helper class for comparison of DB migration scripts and models.
It’s intended to be inherited by test cases in target projects. They have to provide implementations for methods used internally in the test (as we have no way to implement them here).
test_model_sync() will run migration scripts for the engine provided and then compare the given metadata to the one reflected from the database. The difference between MODELS and MIGRATION scripts will be printed and the test will fail, if the difference is not empty. The return value is really a list of actions, that should be performed in order to make the current database schema state (i.e. migration scripts) consistent with models definitions. It’s left up to developers to analyze the output and decide whether the models definitions or the migration scripts should be modified to make them consistent.
Output:
[( 'add_table', description of the table from models ), ( 'remove_table', description of the table from database ), ( 'add_column', schema, table name, column description from models ), ( 'remove_column', schema, table name, column description from database ), ( 'add_index', description of the index from models ), ( 'remove_index', description of the index from database ), ( 'add_constraint', description of constraint from models ), ( 'remove_constraint, description of constraint from database ), ( 'modify_nullable', schema, table name, column name, { 'existing_type': type of the column from database, 'existing_server_default': default value from database }, nullable from database, nullable from models ), ( 'modify_type', schema, table name, column name, { 'existing_nullable': database nullable, 'existing_server_default': default value from database }, database column type, type of the column from models ), ( 'modify_default', schema, table name, column name, { 'existing_nullable': database nullable, 'existing_type': type of the column from database }, connection column default value, default from models )]
Method include_object() can be overridden to exclude some tables from comparison (e.g. migrate_repo).
- compare_server_default(ctxt, ins_col, meta_col, insp_def, meta_def, rendered_meta_def)¶
Compare default values between model and db table.
Return True if the defaults are different, False if not, or None to allow the default implementation to compare these defaults.
- Parameters:
ctxt – alembic MigrationContext instance
insp_col – reflected column
meta_col – column from model
insp_def – reflected column default value
meta_def – column default value from model
rendered_meta_def – rendered column default value (from model)
- compare_type(ctxt, insp_col, meta_col, insp_type, meta_type)¶
Return True if types are different, False if not.
Return None to allow the default implementation to compare these types.
- Parameters:
ctxt – alembic MigrationContext instance
insp_col – reflected column
meta_col – column from model
insp_type – reflected column type
meta_type – column type from model
- abstract db_sync(engine)¶
Run migration scripts with the given engine instance.
This method must be implemented in subclasses and run migration scripts for a DB the given engine is connected to.
- filter_metadata_diff(diff)¶
Filter changes before assert in test_models_sync().
Allow subclasses to whitelist/blacklist changes. By default, no filtering is performed, changes are returned as is.
- Parameters:
diff – a list of differences (see compare_metadata() docs for details on format)
- Returns:
a list of differences
- abstract get_engine()¶
Return the engine instance to be used when running tests.
This method must be implemented in subclasses and return an engine instance to be used when running tests.
- abstract get_metadata()¶
Return the metadata instance to be used for schema comparison.
This method must be implemented in subclasses and return the metadata instance attached to the BASE model.
- include_object(object_, name, type_, reflected, compare_to)¶
Return True for objects that should be compared.
- Parameters:
object – a SchemaItem object such as a Table or Column object
name – the name of the object
type – a string describing the type of object (e.g. “table”)
reflected – True if the given object was produced based on table reflection, False if it’s from a local MetaData object
compare_to – the object being compared against, if available, else None
- test_models_sync()¶
oslo_db.sqlalchemy.types module¶
- class oslo_db.sqlalchemy.types.JsonEncodedDict(mysql_as_long=False, mysql_as_medium=False)¶
Bases:
JsonEncodedType
Represents dict serialized as json-encoded string in db.
Note that this type does NOT track mutations. If you want to update it, you have to assign existing value to a temporary variable, update, then assign back. See this page for more robust work around: http://docs.sqlalchemy.org/en/rel_1_0/orm/extensions/mutable.html
- cache_ok: bool | None = True¶
This type is safe to cache.
- type¶
alias of
dict
- class oslo_db.sqlalchemy.types.JsonEncodedList(mysql_as_long=False, mysql_as_medium=False)¶
Bases:
JsonEncodedType
Represents list serialized as json-encoded string in db.
Note that this type does NOT track mutations. If you want to update it, you have to assign existing value to a temporary variable, update, then assign back. See this page for more robust work around: http://docs.sqlalchemy.org/en/rel_1_0/orm/extensions/mutable.html
- cache_ok: bool | None = True¶
This type is safe to cache.
- type¶
alias of
list
- class oslo_db.sqlalchemy.types.JsonEncodedType(mysql_as_long=False, mysql_as_medium=False)¶
Bases:
TypeDecorator
Base column type for data serialized as JSON-encoded string in db.
- cache_ok: bool | None = True¶
This type is safe to cache.
- impl¶
alias of
Text
- process_bind_param(value, dialect)¶
Bind parameters to the process.
- process_result_value(value, dialect)¶
Process result value.
- type = None¶
- class oslo_db.sqlalchemy.types.SoftDeleteInteger(*args: Any, **kwargs: Any)¶
Bases:
TypeDecorator
Coerce a bound param to be a proper integer before passing it to DBAPI.
Some backends like PostgreSQL are very strict about types and do not perform automatic type casts, e.g. when trying to INSERT a boolean value like
false
into an integer column. Coercing of the bound param in DB layer by the means of a custom SQLAlchemy type decorator makes sure we always pass a proper integer value to a DBAPI implementation.This is not a general purpose boolean integer type as it specifically allows for arbitrary positive integers outside of the boolean int range (0, 1, False, True), so that it’s possible to have compound unique constraints over multiple columns including
deleted
(e.g. to soft-delete flavors with the same name in Nova without triggering a constraint violation):deleted
is set to be equal to a PK int value on deletion, 0 denotes a non-deleted row.- cache_ok: bool | None = True¶
This type is safe to cache.
- impl¶
alias of
Integer
- process_bind_param(value, dialect)¶
Return the binding parameter.
oslo_db.sqlalchemy.update_match module¶
- exception oslo_db.sqlalchemy.update_match.CantUpdateException¶
Bases:
Exception
- exception oslo_db.sqlalchemy.update_match.MultiRowsMatched¶
Bases:
CantUpdateException
- exception oslo_db.sqlalchemy.update_match.NoRowsMatched¶
Bases:
CantUpdateException
- oslo_db.sqlalchemy.update_match.manufacture_criteria(mapped, values)¶
Given a mapper/class and a namespace of values, produce a WHERE clause.
The class should be a mapped class and the entries in the dictionary correspond to mapped attribute names on the class.
A value may also be a tuple in which case that particular attribute will be compared to a tuple using IN. The scalar value or tuple can also contain None which translates to an IS NULL, that is properly joined with OR against an IN expression if appropriate.
- Parameters:
cls – a mapped class, or actual
Mapper
object.values – dictionary of values.
- oslo_db.sqlalchemy.update_match.manufacture_entity_criteria(entity, include_only=None, exclude=None)¶
Given a mapped instance, produce a WHERE clause.
The attributes set upon the instance will be combined to produce a SQL expression using the mapped SQL expressions as the base of comparison.
Values on the instance may be set as tuples in which case the criteria will produce an IN clause. None is also acceptable as a scalar or tuple entry, which will produce IS NULL that is properly joined with an OR against an IN expression if appropriate.
- Parameters:
entity – a mapped entity.
include_only – optional sequence of keys to limit which keys are included.
exclude – sequence of keys to exclude
- oslo_db.sqlalchemy.update_match.manufacture_persistent_object(session, specimen, values=None, primary_key=None)¶
Make an ORM-mapped object persistent in a Session without SQL.
The persistent object is returned.
If a matching object is already present in the given session, the specimen is merged into it and the persistent object returned. Otherwise, the specimen itself is made persistent and is returned.
The object must contain a full primary key, or provide it via the values or primary_key parameters. The object is peristed to the Session in a “clean” state with no pending changes.
- Parameters:
session – A Session object.
specimen – a mapped object which is typically transient.
values – a dictionary of values to be applied to the specimen, in addition to the state that’s already on it. The attributes will be set such that no history is created; the object remains clean.
primary_key – optional tuple-based primary key. This will also be applied to the instance if present.
- oslo_db.sqlalchemy.update_match.update_on_match(query, specimen, surrogate_key, values=None, attempts=3, include_only=None, process_query=None, handle_failure=None)¶
Emit an UPDATE statement matching the given specimen.
E.g.:
with enginefacade.writer() as session: specimen = MyInstance( uuid='ccea54f', interface_id='ad33fea', vm_state='SOME_VM_STATE', ) values = { 'vm_state': 'SOME_NEW_VM_STATE' } base_query = model_query( context, models.Instance, project_only=True, session=session) hostname_query = model_query( context, models.Instance, session=session, read_deleted='no'). filter(func.lower(models.Instance.hostname) == 'SOMEHOSTNAME') surrogate_key = ('uuid', ) def process_query(query): return query.where(~exists(hostname_query)) def handle_failure(query): try: instance = base_query.one() except NoResultFound: raise exception.InstanceNotFound(instance_id=instance_uuid) if session.query(hostname_query.exists()).scalar(): raise exception.InstanceExists( name=values['hostname'].lower()) # try again return False persistent_instance = base_query.update_on_match( specimen, surrogate_key, values=values, process_query=process_query, handle_failure=handle_failure )
The UPDATE statement is constructed against the given specimen using those values which are present to construct a WHERE clause. If the specimen contains additional values to be ignored, the
include_only
parameter may be passed which indicates a sequence of attributes to use when constructing the WHERE.The UPDATE is performed against an ORM Query, which is created from the given
Session
, or alternatively by passing the`query
parameter referring to an existing query.Before the query is invoked, it is also passed through the callable sent as
process_query
, if present. This hook allows additional criteria to be added to the query after it is created but before invocation.The function will then invoke the UPDATE statement and check for “success” one or more times, up to a maximum of that passed as
attempts
.The initial check for “success” from the UPDATE statement is that the number of rows returned matches 1. If zero rows are matched, then the UPDATE statement is assumed to have “failed”, and the failure handling phase begins.
The failure handling phase involves invoking the given
handle_failure
function, if any. This handler can perform additional queries to attempt to figure out why the UPDATE didn’t match any rows. The handler, upon detection of the exact failure condition, should throw an exception to exit; if it doesn’t, it has the option of returning True or False, where False means the error was not handled, and True means that there was not in fact an error, and the function should return successfully.If the failure handler is not present, or returns False after
attempts
number of attempts, then the function overall raises CantUpdateException. If the handler returns True, then the function returns with no error.The return value of the function is a persistent version of the given specimen; this may be the specimen itself, if no matching object were already present in the session; otherwise, the existing object is returned, with the state of the specimen merged into it. The returned persistent object will have the given values populated into the object.
The object is is returned as “persistent”, meaning that it is associated with the given Session and has an identity key (that is, a real primary key value).
In order to produce this identity key, a strategy must be used to determine it as efficiently and safely as possible:
If the given specimen already contained its primary key attributes fully populated, then these attributes were used as criteria in the UPDATE, so we have the primary key value; it is populated directly.
If the target backend supports RETURNING, then when the update() query is performed with a RETURNING clause so that the matching primary key is returned atomically. This currently includes Postgresql, Oracle and others (notably not MySQL or SQLite).
If the target backend is MySQL, and the given model uses a single-column, AUTO_INCREMENT integer primary key value (as is the case for Nova), MySQL’s recommended approach of making use of
LAST_INSERT_ID(expr)
is used to atomically acquire the matching primary key value within the scope of the UPDATE statement, then it fetched immediately following by usingSELECT LAST_INSERT_ID()
. http://dev.mysql.com/doc/refman/5.0/en/information- functions.html#function_last-insert-idOtherwise, for composite keys on MySQL or other backends such as SQLite, the row as UPDATED must be re-fetched in order to acquire the primary key value. The
surrogate_key
parameter is used for this in order to re-fetch the row; this is a column name with a known, unique value where the object can be fetched.
- oslo_db.sqlalchemy.update_match.update_returning_pk(query, values, surrogate_key)¶
Perform an UPDATE, returning the primary key of the matched row.
The primary key is returned using a selection of strategies:
if the database supports RETURNING, RETURNING is used to retrieve the primary key values inline.
If the database is MySQL and the entity is mapped to a single integer primary key column, MySQL’s last_insert_id() function is used inline within the UPDATE and then upon a second SELECT to get the value.
Otherwise, a “refetch” strategy is used, where a given “surrogate” key value (typically a UUID column on the entity) is used to run a new SELECT against that UUID. This UUID is also placed into the UPDATE query to ensure the row matches.
- Parameters:
query – a Query object with existing criterion, against a single entity.
values – a dictionary of values to be updated on the row.
surrogate_key – a tuple of (attrname, value), referring to a UNIQUE attribute that will also match the row. This attribute is used to retrieve the row via a SELECT when no optimized strategy exists.
- Returns:
the primary key, returned as a tuple. Is only returned if rows matched is one. Otherwise, CantUpdateException is raised.
oslo_db.sqlalchemy.utils module¶
- class oslo_db.sqlalchemy.utils.DialectFunctionDispatcher¶
Bases:
object
- dispatch_for(expr)¶
- classmethod dispatch_for_dialect(expr, multiple=False)¶
Provide dialect-specific functionality within distinct functions.
e.g.:
@dispatch_for_dialect("*") def set_special_option(engine): pass @set_special_option.dispatch_for("sqlite") def set_sqlite_special_option(engine): return engine.execute("sqlite thing") @set_special_option.dispatch_for("mysql+mysqldb") def set_mysqldb_special_option(engine): return engine.execute("mysqldb thing")
After the above registration, the
set_special_option()
function is now a dispatcher, given a SQLAlchemyEngine
,Connection
, URL string, orsqlalchemy.engine.URL
object:eng = create_engine('...') result = set_special_option(eng)
The filter system supports two modes, “multiple” and “single”. The default is “single”, and requires that one and only one function match for a given backend. In this mode, the function may also have a return value, which will be returned by the top level call.
“multiple” mode, on the other hand, does not support return arguments, but allows for any number of matching functions, where each function will be called:
# the initial call sets this up as a "multiple" dispatcher @dispatch_for_dialect("*", multiple=True) def set_options(engine): # set options that apply to *all* engines @set_options.dispatch_for("postgresql") def set_postgresql_options(engine): # set options that apply to all Postgresql engines @set_options.dispatch_for("postgresql+psycopg2") def set_postgresql_psycopg2_options(engine): # set options that apply only to "postgresql+psycopg2" @set_options.dispatch_for("*+pyodbc") def set_pyodbc_options(engine): # set options that apply to all pyodbc backends
Note that in both modes, any number of additional arguments can be accepted by member functions. For example, to populate a dictionary of options, it may be passed in:
@dispatch_for_dialect("*", multiple=True) def set_engine_options(url, opts): pass @set_engine_options.dispatch_for("mysql+mysqldb") def _mysql_set_default_charset_to_utf8(url, opts): opts.setdefault('charset', 'utf-8') @set_engine_options.dispatch_for("sqlite") def _set_sqlite_in_memory_check_same_thread(url, opts): if url.database in (None, 'memory'): opts['check_same_thread'] = False opts = {} set_engine_options(url, opts)
The driver specifiers are of the form:
<database | *>[+<driver | *>]
. That is, database name or “*”, followed by an optional+
sign with driver or “*”. Omitting the driver name implies all drivers for that database.
- dispatch_on_drivername(drivername)¶
Return a sub-dispatcher for the given drivername.
This provides a means of calling a different function, such as the “*” function, for a given target object that normally refers to a sub-function.
- class oslo_db.sqlalchemy.utils.DialectMultiFunctionDispatcher¶
Bases:
DialectFunctionDispatcher
- class oslo_db.sqlalchemy.utils.DialectSingleFunctionDispatcher¶
Bases:
DialectFunctionDispatcher
- oslo_db.sqlalchemy.utils.add_index(engine, table_name, index_name, idx_columns)¶
Create an index for given columns.
- Parameters:
engine – sqlalchemy engine
table_name – name of the table
index_name – name of the index
idx_columns – tuple with names of columns that will be indexed
- oslo_db.sqlalchemy.utils.change_index_columns(engine, table_name, index_name, new_columns)¶
Change set of columns that are indexed by given index.
- Parameters:
engine – sqlalchemy engine
table_name – name of the table
index_name – name of the index
new_columns – tuple with names of columns that will be indexed
- oslo_db.sqlalchemy.utils.column_exists(engine, table_name, column)¶
Check if table has given column.
- Parameters:
engine – sqlalchemy engine
table_name – name of the table
column – name of the colmn
- oslo_db.sqlalchemy.utils.dispatch_for_dialect(expr, multiple=False)¶
Provide dialect-specific functionality within distinct functions.
e.g.:
@dispatch_for_dialect("*") def set_special_option(engine): pass @set_special_option.dispatch_for("sqlite") def set_sqlite_special_option(engine): return engine.execute("sqlite thing") @set_special_option.dispatch_for("mysql+mysqldb") def set_mysqldb_special_option(engine): return engine.execute("mysqldb thing")
After the above registration, the
set_special_option()
function is now a dispatcher, given a SQLAlchemyEngine
,Connection
, URL string, orsqlalchemy.engine.URL
object:eng = create_engine('...') result = set_special_option(eng)
The filter system supports two modes, “multiple” and “single”. The default is “single”, and requires that one and only one function match for a given backend. In this mode, the function may also have a return value, which will be returned by the top level call.
“multiple” mode, on the other hand, does not support return arguments, but allows for any number of matching functions, where each function will be called:
# the initial call sets this up as a "multiple" dispatcher @dispatch_for_dialect("*", multiple=True) def set_options(engine): # set options that apply to *all* engines @set_options.dispatch_for("postgresql") def set_postgresql_options(engine): # set options that apply to all Postgresql engines @set_options.dispatch_for("postgresql+psycopg2") def set_postgresql_psycopg2_options(engine): # set options that apply only to "postgresql+psycopg2" @set_options.dispatch_for("*+pyodbc") def set_pyodbc_options(engine): # set options that apply to all pyodbc backends
Note that in both modes, any number of additional arguments can be accepted by member functions. For example, to populate a dictionary of options, it may be passed in:
@dispatch_for_dialect("*", multiple=True) def set_engine_options(url, opts): pass @set_engine_options.dispatch_for("mysql+mysqldb") def _mysql_set_default_charset_to_utf8(url, opts): opts.setdefault('charset', 'utf-8') @set_engine_options.dispatch_for("sqlite") def _set_sqlite_in_memory_check_same_thread(url, opts): if url.database in (None, 'memory'): opts['check_same_thread'] = False opts = {} set_engine_options(url, opts)
The driver specifiers are of the form:
<database | *>[+<driver | *>]
. That is, database name or “*”, followed by an optional+
sign with driver or “*”. Omitting the driver name implies all drivers for that database.
- oslo_db.sqlalchemy.utils.drop_index(engine, table_name, index_name)¶
Drop index with given name.
- Parameters:
engine – sqlalchemy engine
table_name – name of the table
index_name – name of the index
- oslo_db.sqlalchemy.utils.drop_old_duplicate_entries_from_table(engine, table_name, use_soft_delete, *uc_column_names)¶
Drop all old rows having the same values for columns in uc_columns.
This method drop (or mark ad deleted if use_soft_delete is True) old duplicate rows form table with name table_name.
- Parameters:
engine – Sqlalchemy engine
table_name – Table with duplicates
use_soft_delete – If True - values will be marked as deleted, if False - values will be removed from table
uc_column_names – Unique constraint columns
- oslo_db.sqlalchemy.utils.get_db_connection_info(conn_pieces)¶
- oslo_db.sqlalchemy.utils.get_foreign_key_constraint_name(engine, table_name, column_name)¶
Find the name of foreign key in a table, given constrained column name.
- Parameters:
engine – a SQLAlchemy engine (or connection)
table_name – name of table which contains the constraint
column_name – name of column that is constrained by the foreign key.
- Returns:
the name of the first foreign key constraint which constrains the given column in the given table.
- oslo_db.sqlalchemy.utils.get_indexes(engine, table_name)¶
Get all index list from a given table.
- Parameters:
engine – sqlalchemy engine
table_name – name of the table
- oslo_db.sqlalchemy.utils.get_non_innodb_tables(connectable, skip_tables=('migrate_version', 'alembic_version'))¶
Get a list of tables which don’t use InnoDB storage engine.
- Parameters:
connectable – a SQLAlchemy Engine or a Connection instance
skip_tables – a list of tables which might have a different storage engine
- oslo_db.sqlalchemy.utils.get_table(engine, name)¶
Returns an sqlalchemy table dynamically from db.
Needed because the models don’t work for us in migrations as models will be far out of sync with the current data.
Warning
Do not use this method when creating ForeignKeys in database migrations because sqlalchemy needs the same MetaData object to hold information about the parent table and the reference table in the ForeignKey. This method uses a unique MetaData object per table object so it won’t work with ForeignKey creation.
- oslo_db.sqlalchemy.utils.get_unique_keys(model)¶
Get a list of sets of unique model keys.
- Parameters:
model – the ORM model class
- Return type:
list of sets of strings
- Returns:
unique model keys or None if unable to find them
- oslo_db.sqlalchemy.utils.index_exists(engine, table_name, index_name)¶
Check if given index exists.
- Parameters:
engine – sqlalchemy engine
table_name – name of the table
index_name – name of the index
- oslo_db.sqlalchemy.utils.index_exists_on_columns(engine, table_name, columns)¶
Check if an index on given columns exists.
- Parameters:
engine – sqlalchemy engine
table_name – name of the table
columns – a list type of columns that will be checked
- oslo_db.sqlalchemy.utils.make_url(target)¶
Return a
url.URL
object
- oslo_db.sqlalchemy.utils.model_query(model, session, args=None, **kwargs)¶
Query helper for db.sqlalchemy api methods.
This accounts for deleted and project_id fields.
- Parameters:
model (models.ModelBase) – Model to query. Must be a subclass of ModelBase.
session (sqlalchemy.orm.session.Session) – The session to use.
args (tuple) – Arguments to query. If None - model is used.
Keyword arguments:
- Parameters:
project_id (iterable, model.__table__.columns.project_id.type, None type) – If present, allows filtering by project_id(s). Can be either a project_id value, or an iterable of project_id values, or None. If an iterable is passed, only rows whose project_id column value is on the project_id list will be returned. If None is passed, only rows which are not bound to any project, will be returned.
deleted (bool) – If present, allows filtering by deleted field. If True is passed, only deleted entries will be returned, if False - only existing entries.
Usage:
from oslo_db.sqlalchemy import utils def get_instance_by_uuid(uuid): session = get_session() with session.begin() return (utils.model_query(models.Instance, session=session) .filter(models.Instance.uuid == uuid) .first()) def get_nodes_stat(): data = (Node.id, Node.cpu, Node.ram, Node.hdd) session = get_session() with session.begin() return utils.model_query(Node, session=session, args=data).all()
Also you can create your own helper, based on
utils.model_query()
. For example, it can be useful if you plan to useproject_id
anddeleted
parameters from project’scontext
from oslo_db.sqlalchemy import utils def _model_query(context, model, session=None, args=None, project_id=None, project_only=False, read_deleted=None): # We suppose, that functions ``_get_project_id()`` and # ``_get_deleted()`` should handle passed parameters and # context object (for example, decide, if we need to restrict a user # to query his own entries by project_id or only allow admin to read # deleted entries). For return values, we expect to get # ``project_id`` and ``deleted``, which are suitable for the # ``model_query()`` signature. kwargs = {} if project_id is not None: kwargs['project_id'] = _get_project_id(context, project_id, project_only) if read_deleted is not None: kwargs['deleted'] = _get_deleted_dict(context, read_deleted) session = session or get_session() with session.begin(): return utils.model_query(model, session=session, args=args, **kwargs) def get_instance_by_uuid(context, uuid): return (_model_query(context, models.Instance, read_deleted='yes') .filter(models.Instance.uuid == uuid) .first()) def get_nodes_data(context, project_id, project_only='allow_none'): data = (Node.id, Node.cpu, Node.ram, Node.hdd) return (_model_query(context, Node, args=data, project_id=project_id, project_only=project_only) .all())
- oslo_db.sqlalchemy.utils.paginate_query(query, model, limit, sort_keys, marker=None, sort_dir=None, sort_dirs=None)¶
Returns a query with sorting / pagination criteria added.
Pagination works by requiring a unique sort_key, specified by sort_keys. (If sort_keys is not unique, then we risk looping through values.) We use the last row in the previous page as the ‘marker’ for pagination. So we must return values that follow the passed marker in the order. With a single-valued sort_key, this would be easy: sort_key > X. With a compound-values sort_key, (k1, k2, k3) we must do this to repeat the lexicographical ordering: (k1 > X1) or (k1 == X1 && k2 > X2) or (k1 == X1 && k2 == X2 && k3 > X3)
We also have to cope with different sort_directions and cases where k2, k3, … are nullable.
Typically, the id of the last row is used as the client-facing pagination marker, then the actual marker object must be fetched from the db and passed in to us as marker.
The “offset” parameter is intentionally avoided. As offset requires a full scan through the preceding results each time, criteria-based pagination is preferred. See http://use-the-index-luke.com/no-offset for further background.
- Parameters:
query – the query object to which we should add paging/sorting
model – the ORM model class
limit – maximum number of items to return
sort_keys – array of attributes by which results should be sorted
marker – the last item of the previous page; we returns the next results after this value.
sort_dir – direction in which results should be sorted (asc, desc) suffix -nullsfirst, -nullslast can be added to defined the ordering of null values
sort_dirs – per-column array of sort_dirs, corresponding to sort_keys
- Return type:
sqlalchemy.orm.query.Query
- Returns:
The query with sorting/pagination added.
- oslo_db.sqlalchemy.utils.sanitize_db_url(url)¶
- oslo_db.sqlalchemy.utils.to_list(x, default=None)¶