The congress.datasources.datasource_driver Module

class congress.datasources.datasource_driver.DataSourceDriver(name=”, args=None)

Bases: congress.dse2.data_service.DataService

A super-class for datasource drivers.

This class implements a polling mechanism for polling a datasource.

This class also implements a translation mechanism that accepts data from the datasource in the form of Python lists, dicts, and individual values, and puts that data into Congress data tables. The translation mechanism takes a declarative description of the Python object’s structure, for example, whether an object is a list or dict, if a list contains another list, which dict keys to extract, and the tables and columns in which to put the extracted data. If you want to use data which isn’t in above type, such as string, you can retrieve the data with your method which has logic how to change the data to Python lists, dict, and individual values.

The DataSourceDriver uses a predefined scheme for translating datasource objects into Congress tables. For example, the driver converts a list containing individual values into a single table, where each row contains an entry from the list, but the declarative description enables us to control the table names and column names used for the resulting table schema.

The declarative description consists of four different types of translator: HDICT, VDICT, LIST, and VALUE. A translator itself is a python dict containing several parameters. The translation-type parameter describes which of the four types the translator is; the remainder of the parameters describe things like the table, column names, and sub-translators.

HDICT parameters with example values:
{‘translation-type’: ‘HDICT’,

‘table-name’: ‘example_table’, ‘parent-key’: ‘parent_key_column’, ‘id-col’: ‘id_col’, ‘selector-type’: ‘DOT_SELECTOR’, ‘field-translators’: ({‘fieldname’: ‘field1’, ‘col’: ‘col1’,

‘translator’: {‘translation-type’: ‘VALUE’}},
{‘fieldname’: ‘field2’, ‘col’: ‘col2’,
‘translator’: {‘translation-type’: ‘VALUE’})}

The HDICT translator reads in a python dict and translates each key in the dict into a column of the output table. The fields in the table will be in the same order as the fields in the HDICT translator. Use selector-type to specify whether to access the fields using dot notation such as ‘obj.field1’ or using a dict selector such as obj[‘field1’]. SELECTOR must be either ‘DOT_SELECTOR’ or ‘DICT_SELECTOR’. If the translator contains a field, but the object does not contain that field, the translator will populate the column with ‘None’.

If ‘parent-key’ is specified, the translator prepends a value from the parent translator as the first column of this table. For example, if the parent table already has a unique key named ‘id’, then setting ‘parent-key’: ‘id’ will populate each row in the child table with the unique foreign key from the parent. Also, if the subtranslator specifies a ‘parent-key’, the parent table will not have a column for that subtranslator. For example, if the subtranslator for ‘field1’ specifies a ‘parent-key’, the parent table will not have a column for field1; instead, the parent table’s parent_key_column will be the foreign key into the subtable. To set the column name for a ‘parent-key’ set ‘parent-col-name’ otherwise the default name for the column will be ‘parent_key’.

Instead, if ‘id-col’ is specified, the translator will prepend a generated id column to each row. The ‘id-col’ value can be either a string indicating an id should be generated based on the hash of the remaining fields, or it is a function that takes as argument the object and returns an ID as a string or number. If ‘id-col’ is specified with a sub-translator, that value is included as a column in the top-level translator’s table.

Using both parent-key and id-col at the same time is redudant, so DataSourceDriver will reject that configuration.

The example translator expects an object such as:
{‘field1’: 123, ‘field2’: 456}

and populates a table ‘example_table’ with row (id, 123, 456) where id is equal to the hash of (123, 456).

Recursion: If a field-translator is a translator other than VALUE, then that field-translator will cause the creation of a second table. The field-translator will populate the second table, and each row in the primary table will (in the column for that field) contain a hash of the second table’s entries derived from the primary table’s row. For example, if the translator is:

{‘translation-type’: ‘HDICT’,

‘table-name’: ‘example_table’, ‘selector-type’: ‘DOT_SELECTOR’, ‘field-translators’: ({‘fieldname’: ‘field1’, ‘col’: ‘col1’,

‘translator’: {

‘translation-type’: ‘LIST’, ‘table-name’: ‘subtable’, ‘val-col’: ‘c’, ‘translator’: {‘translation-type’: ‘VALUE’}},})}

The object {‘field1’: [1, 2, 3]} will translate to one tuple in example_table and three tuples in subtable:

example_table: (h(1, 2, 3)) subtable: (h(1, 2, 3), 1)

(h(1, 2, 3), 2) (h(1, 2, 3), 3)

In addition, sometimes one will have data that is structured in the following manor (i.e a dict contained in a list within a dict):


{‘id’: ‘11111’,
‘things’: [{‘type’: 1, ‘location’: 2}]}

To handle this congress has a special attribute in-list that one can set. Without in-list, the translator would represent the LIST explicitly, and the schema would have 3 tables. This allows you to use two hdicts to represent the data.

For Example:

thing_translator = {

‘translation-type’: ‘HDICT’, ‘table-name’: ‘things_table’, ‘parent-key’: ‘id’, ‘selector-type’: ‘DICT_SELECTOR’, ‘in-list’: True, ‘field-translators’:

({‘fieldname’: ‘type’,
‘translator’: {‘translation-type’: ‘VALUE’}},
{‘fieldname’: ‘location’,
‘translator’: {‘translation-type’: ‘VALUE’}})}
{‘translation-type’: ‘HDICT’,

‘table-name’: ‘example_table’, ‘parent-key’: ‘parent_key_column’, ‘selector-type’: ‘DOT_SELECTOR’, ‘field-translators’:

({‘fieldname’: ‘id’,

‘translator’: {‘translation-type’: ‘VALUE’}}, {‘fieldname’: ‘thing’:

‘translator’: thing_translator})}
VDICT parameters with example values:
{‘translation-type’: ‘VDICT’,
‘table-name’: ‘table’, ‘parent-key’: ‘parent_key_column’, ‘id-col’: ‘id_col’, ‘key-col’: ‘key_col’, ‘val-col’: ‘value_col’, ‘translator’: TRANSLATOR}

The VDICT translator reads in a python dict, and turns each key-value pair into a row of the output table. The output table will have 2 or 3 columns, depending on whether the ‘id-col’ or ‘parent-key’ is present. Recursion works as it does with HDICT.

VDICT treats a subtranslator with a ‘parent-key’ the same way that a HDICT does. The subtranslator prepends the parent’s key value to each row of the subtable, i.e. (parent_key_column, key_col, value_col). Instead if ‘id-col’ is present, the columns will be (id_col, key_col, value_col), otherwise (key_col, value_col). However, if the VDICT’s subtranslator specifies the parent-key, the parent-key must be the VDICT’s ‘val-col’ column due to an implementation choice (the id column is not available until after the subtranslator runs).

LIST parameters with example values:
{‘translation-type’: ‘LIST’,
‘table-name’: ‘table1’, ‘parent-key’: ‘parent_key_column’, ‘id-col’: ‘id_col’, ‘val-col’: ‘value_col’, ‘translator’: {‘translation-type’: ‘VALUE’}}

The LIST translator is like the VDICT translator, except that it reads a python list from the object, and produces 1 or 2 columns depending on whether ‘id-col’ is present. It always produces a column for id-col. The content of id-col is either a value (if the translator is a VALUE) or a hash of a recursive value as in HDICT.

A LIST may specify a parent-key when the LIST is a subtranslator, but the subtranslator of a LIST may not specify a ‘parent-key’ because the LIST’s table will then have no columns.

VALUE parameters with example values:
{‘translation-type’: ‘VALUE’,

‘extract-fn’: lambda x: x.[‘foo’]}

The VALUE translator reads a single value like and int or a string from the object. The translator uses the extract-fn to extract a value from the object. If ‘extract-fn’ is not defined, then the default extract function is the identity function. The resulting value will be either a number such as 123 or a string. It will translate a boolean value to the string ‘True’ or ‘False’.

COL = ‘col’
EXTRACT_FN = ‘extract-fn’
FIELDNAME = ‘fieldname’
FIELD_TRANSLATORS = ‘field-translators’
FIELD_TRANSLATOR_PARAMS = (‘fieldname’, ‘col’, ‘desc’, ‘translator’)
HDICT_PARAMS = (‘translation-type’, ‘table-name’, ‘parent-key’, ‘id-col’, ‘selector-type’, ‘field-translators’, ‘in-list’, ‘parent-col-name’, ‘objects-extract-fn’, ‘parent-key-desc’)
ID_COL = ‘id-col’
ID_COL_NAME = ‘id-col’
IN_LIST = ‘in-list’
KEY_COL = ‘key-col’
LIST_PARAMS = (‘translation-type’, ‘table-name’, ‘parent-key’, ‘id-col’, ‘val-col’, ‘translator’, ‘parent-col-name’, ‘objects-extract-fn’, ‘parent-key-desc’, ‘val-col-desc’)
OBJECTS_EXTRACT_FN = ‘objects-extract-fn’
PARENT_COL_NAME = ‘parent-col-name’
PARENT_KEY = ‘parent-key’
PARENT_KEY_COL_NAME = ‘parent_key’
PARENT_KEY_DESC = ‘parent-key-desc’
SELECTOR_TYPE = ‘selector-type’
TABLE_NAME = ‘table-name’
TRANSLATION_TYPE = ‘translation-type’
TRANSLATION_TYPE_PARAMS = (‘translation-type’,)
TRANSLATOR = ‘translator’
VALUE_PARAMS = (‘translation-type’, ‘extract-fn’)
VAL_COL = ‘val-col’
VAL_COL_DESC = ‘val-col-desc’
VDICT_PARAMS = (‘translation-type’, ‘table-name’, ‘parent-key’, ‘id-col’, ‘key-col’, ‘val-col’, ‘translator’, ‘parent-col-name’, ‘objects-extract-fn’)
classmethod check_params(params, valid_params)
classmethod check_translation_type(params)
classmethod convert_obj(obj, translator, parent_row_dict=None)

Convert obj using translator.

Takes an object and a translation descriptor. Returns two items: (1) a list of tuples where the first element is the name of a table, and the second element is a tuple to be inserted into the table, and

(2) if the translator specified an id-col, then return the id’s value here. The id is a hash that takes into account all the content of the list of tuples. The hash can be used as a unique key to identify the content in obj. Otherwise, return None here.

classmethod convert_objs(objects, translator)

Convert list of objs using translator.

Takes a list of objects, and translates them using the translator. Returns a list of tuples, where each tuple is a pair containing a table name, and a tuple to be inserted into the table.

classmethod convert_responses(obj_list, conversion)

Get mapping of column name to column’s integer position.

Given a tablename, returns a dictionary mapping the columnnames of that table to the integer position of that column. Returns None if tablename is not in the schema.

get_row_data(table_id, *args, **kwargs)

Gets row data for a give table.

classmethod get_schema()

Get mapping of table name to column names.

Returns a dictionary mapping tablenames to the list of column names for that table. Both tablenames and columnnames are strings.

classmethod get_tablename(table_id)

Get a table name.

classmethod get_tablenames()

Get a list of table names.

Returns list of table names the datasource has


Get a translator.

Returns a translator specified by translator_name.


Get a list of translators.

Returns a list of translators that describes how to translate from the datasource’s data structures to the Congress tables.

classmethod need_column_for_subtable_id(subtranslator)
prepush_processor(data, dataindex, type=None)

Called before push.

Takes as input the DATA that the receiver needs and returns the payload for the message. If this is a regular publication message, make the payload just the delta; otherwise, make the payload the entire table.


Registers translator with congress and validates its schema.

state_set_diff(state1, state2, table=None)

Return STATE1 - STATE2.

Given 2 tuplesets STATE1 and STATE2, return the set difference STATE1-STATE2. Each tupleset is represented as a dictionary from tablename to set of tuples. Return value is a tupleset, also represented as a dictionary from tablename to set of tuples.

class congress.datasources.datasource_driver.DataSourceDriverEndpoints(service)

Bases: congress.dse2.data_service.DataServiceEndPoints

get_actions(context, source_id)
get_datasource_schema(context, source_id)
get_row_data(context, table_id, source_id, trace)
get_status(context, source_id, params)
get_tablename(context, table_id, source_id)
get_tablenames(context, source_id)
request_execute(context, action, action_args, wait)
request_refresh(context, source_id)
class congress.datasources.datasource_driver.ExecutionDriver

Bases: object

An add-on class for action execution.

This class implements an action execution ‘virtual’ method execute() which is called when a driver receives a ‘req’ message. The handler for ‘req’ message is placed under the DatasourceDriver(). Each driver which uses this class must implement the execute() method to handle how the action is used: whether defining it as a method and calling it or passing it as an API call to a service.

add_executable_client_methods(client, api_prefix)

Inspect client to get supported builtin methods

param client: the datasource driver client param api_prefix: the filter used to filter methods

add_executable_method(method_name, method_args, method_desc=”)

Add executable method information.

param method_name: The name of the method to add param method_args: List of arguments and description of the method,

e.g. [{‘name’: ‘arg1’, ‘description’: ‘arg1’}, {‘name’: ‘arg2’, ‘description’: ‘arg2’}]

param method_desc: Description of the method

execute(action, action_args)

This method must be implemented by each driver.

Action can be a service API or a user-defined function :param action: a user-defined function or a service API call :param action_args: in format of

{‘positional’: [‘arg1’, ‘arg2’],
‘named’: {‘key1’: ‘value1’, ‘key2’: ‘value2’}}

Return all supported actions of a datasource driver.

Action should be a service API or a user-defined function. This method should return a dict for all supported actions, together with optional descriptions for each action and its required/supported arguments. E.g. {‘results’: [{‘name’: ‘execute1’,

‘args’: [{“name”: ‘arg1’, “description”: “None”},
{“name”: ‘arg2’, “description”: “None”}],

‘description’: ‘execute function 1’}]



Request handler.

The handler calls execute method.

request_execute(context, action, action_args, wait)

Accept execution requests and execute requests from leader

class congress.datasources.datasource_driver.PollingDataSourceDriver(name=”, args=None)

Bases: congress.datasources.datasource_driver.DataSourceDriver

add_update_method(method, translator)
get_row_data(table_id, *args, **kwargs)

Return a snapshot of table.


Register translators for polling and define tables.

This registers a translator and defines tables for subscribers. When a table name in root translator is specified as a lazy it skips registering the translator and doesn’t define the table.


Periodically called to update new info.

Function called periodically to grab new information, compute deltas, and publish those deltas.


Entrypoint for the datasource driver’s poller greenthread.

Triggers polling every poll_time seconds or after request_refresh is called.

Parameters:poll_time – is the amount of time (in seconds) to wait between

polling rounds.


Request a refresh of this service’s data.


Check all the lazy_tables is root table name.

class congress.datasources.datasource_driver.PushedDataSourceDriver(name=”, args=None)

Bases: congress.datasources.datasource_driver.DataSourceDriver

Push Type DataSource Driver.

This DataSource Driver is a base class for push type datasource driver.

update_entire_data(table_id, objs)
class congress.datasources.datasource_driver.PushedDataSourceDriverEndpoints(service)

Bases: congress.dse2.data_service.DataServiceEndPoints

update_entire_data(context, table_id, source_id, objs)