panko.storage.impl_elasticsearch

Source code for panko.storage.impl_elasticsearch

#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import datetime
import operator

import elasticsearch as es
from elasticsearch import helpers
from oslo_log import log
from oslo_utils import netutils
from oslo_utils import timeutils
import six

from panko import storage
from panko.storage import base
from panko.storage import models
from panko import utils

LOG = log.getLogger(__name__)


AVAILABLE_CAPABILITIES = {
    'events': {'query': {'simple': True}},
}


AVAILABLE_STORAGE_CAPABILITIES = {
    'storage': {'production_ready': True},
}


[docs]class Connection(base.Connection): """Put the event data into an ElasticSearch db. Events in ElasticSearch are indexed by day and stored by event_type. An example document:: {"_index":"events_2014-10-21", "_type":"event_type0", "_id":"dc90e464-65ab-4a5d-bf66-ecb956b5d779", "_score":1.0, "_source":{"timestamp": "2014-10-21T20:02:09.274797" "traits": {"id4_0": "2014-10-21T20:02:09.274797", "id3_0": 0.7510790937279408, "id2_0": 5, "id1_0": "18c97ba1-3b74-441a-b948-a702a30cbce2"} } } """ CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES, AVAILABLE_CAPABILITIES) STORAGE_CAPABILITIES = utils.update_nested( base.Connection.STORAGE_CAPABILITIES, AVAILABLE_STORAGE_CAPABILITIES, ) # NOTE(gordc): mainly for testing, data is not searchable after write, # it is only searchable after periodic refreshes. _refresh_on_write = False def __init__(self, url, conf): url_split = netutils.urlsplit(url) use_ssl = conf.database.es_ssl_enabled self.index_name = conf.database.es_index_name self.conn = es.Elasticsearch(hosts=url_split.netloc + url_split.path, use_ssl=use_ssl)
[docs] def upgrade(self): iclient = es.client.IndicesClient(self.conn) ts_template = { 'template': '*', 'mappings': {'_default_': {'properties': {'traits': {'type': 'nested'}}}}} iclient.put_template(name='enable_timestamp', body=ts_template)
[docs] def record_events(self, events): def _build_bulk_index(event_list): for ev in event_list: traits = {t.name: t.value for t in ev.traits} yield {'_op_type': 'create', '_index': '%s_%s' % (self.index_name, ev.generated.date().isoformat()), '_type': ev.event_type, '_id': ev.message_id, '_source': {'timestamp': ev.generated.isoformat(), 'traits': traits, 'raw': ev.raw}} error = None for ok, result in helpers.streaming_bulk( self.conn, _build_bulk_index(events)): if not ok: __, result = result.popitem() if result['status'] == 409: LOG.info('Duplicate event detected, skipping it: %s', result) else: LOG.exception('Failed to record event: %s', result) error = storage.StorageUnknownWriteError(result) if self._refresh_on_write: self.conn.indices.refresh(index='%s_*' % self.index_name) while self.conn.cluster.pending_tasks(local=True)['tasks']: pass if error: raise error
def _make_dsl_from_filter(self, indices, ev_filter): q_args = {} filters = [] if ev_filter.start_timestamp: filters.append({'range': {'timestamp': {'ge': ev_filter.start_timestamp.isoformat()}}}) while indices[0] < ( '%s_%s' % (self.index_name, ev_filter.start_timestamp.date().isoformat())): del indices[0] if ev_filter.end_timestamp: filters.append({'range': {'timestamp': {'le': ev_filter.end_timestamp.isoformat()}}}) while indices[-1] > ( '%s_%s' % (self.index_name, ev_filter.end_timestamp.date().isoformat())): del indices[-1] q_args['index'] = indices if ev_filter.event_type: q_args['doc_type'] = ev_filter.event_type if ev_filter.message_id: filters.append({'term': {'_id': ev_filter.message_id}}) if ev_filter.traits_filter or ev_filter.admin_proj: or_cond = [] trait_filters = [] for t_filter in ev_filter.traits_filter or []: value = None for val_type in ['integer', 'string', 'float', 'datetime']: if t_filter.get(val_type): value = t_filter.get(val_type) if isinstance(value, six.string_types): value = value.lower() elif isinstance(value, datetime.datetime): value = value.isoformat() break if t_filter.get('op') in ['gt', 'ge', 'lt', 'le']: op = (t_filter.get('op').replace('ge', 'gte') .replace('le', 'lte')) trait_filters.append( {'range': { "traits.%s" % t_filter['key']: {op: value}}}) else: tf = {"query": {"query_string": { "query": "traits.%s: \"%s\"" % (t_filter['key'], value) }}} if t_filter.get('op') == 'ne': tf = {"not": tf} trait_filters.append(tf) if ev_filter.admin_proj: or_cond = [{'missing': {'field': 'traits.project_id'}}, {'term': { 'traits.project_id': ev_filter.admin_proj}}] filters.append( {'nested': {'path': 'traits', 'query': {'filtered': { 'filter': {'bool': {'must': trait_filters, 'should': or_cond}}}}}}) q_args['body'] = {'query': {'filtered': {'filter': {'bool': {'must': filters}}}}} return q_args
[docs] def get_events(self, event_filter, pagination=None): limit = None if pagination: if pagination.get('sort'): LOG.warning('Driver does not support sort functionality') limit = pagination.get('limit') if limit == 0: return iclient = es.client.IndicesClient(self.conn) indices = iclient.get_mapping('%s_*' % self.index_name).keys() if indices: filter_args = self._make_dsl_from_filter(indices, event_filter) if limit is not None: filter_args['size'] = limit results = self.conn.search(fields=['_id', 'timestamp', '_type', '_source'], sort='timestamp:asc', **filter_args) trait_mappings = {} for record in results['hits']['hits']: trait_list = [] if not record['_type'] in trait_mappings: trait_mappings[record['_type']] = list( self.get_trait_types(record['_type'])) for key in record['_source']['traits'].keys(): value = record['_source']['traits'][key] for t_map in trait_mappings[record['_type']]: if t_map['name'] == key: dtype = t_map['data_type'] break else: dtype = models.Trait.TEXT_TYPE trait_list.append(models.Trait( name=key, dtype=dtype, value=models.Trait.convert_value(dtype, value))) gen_ts = timeutils.normalize_time(timeutils.parse_isotime( record['_source']['timestamp'])) yield models.Event(message_id=record['_id'], event_type=record['_type'], generated=gen_ts, traits=sorted( trait_list, key=operator.attrgetter('dtype')), raw=record['_source']['raw'])
[docs] def get_event_types(self): iclient = es.client.IndicesClient(self.conn) es_mappings = iclient.get_mapping('%s_*' % self.index_name) seen_types = set() for index in es_mappings.keys(): for ev_type in es_mappings[index]['mappings'].keys(): seen_types.add(ev_type) # TODO(gordc): tests assume sorted ordering but backends are not # explicitly ordered. # NOTE: _default_ is a type that appears in all mappings but is not # real 'type' seen_types.discard('_default_') return sorted(list(seen_types))
@staticmethod def _remap_es_types(d_type): if d_type == 'string': d_type = 'text' elif d_type == 'long': d_type = 'int' elif d_type == 'double': d_type = 'float' elif d_type == 'date' or d_type == 'date_time': d_type = 'datetime' return d_type
[docs] def get_trait_types(self, event_type): iclient = es.client.IndicesClient(self.conn) es_mappings = iclient.get_mapping('%s_*' % self.index_name) seen_types = [] for index in es_mappings.keys(): # if event_type exists in index and has traits if (es_mappings[index]['mappings'].get(event_type) and es_mappings[index]['mappings'][event_type]['properties'] ['traits'].get('properties')): for t_type in (es_mappings[index]['mappings'][event_type] ['properties']['traits']['properties'].keys()): d_type = (es_mappings[index]['mappings'][event_type] ['properties']['traits']['properties'] [t_type]['type']) d_type = models.Trait.get_type_by_name( self._remap_es_types(d_type)) if (t_type, d_type) not in seen_types: yield {'name': t_type, 'data_type': d_type} seen_types.append((t_type, d_type))
[docs] def get_traits(self, event_type, trait_type=None): t_types = dict((res['name'], res['data_type']) for res in self.get_trait_types(event_type)) if not t_types or (trait_type and trait_type not in t_types.keys()): return result = self.conn.search('%s_*' % self.index_name, event_type) for ev in result['hits']['hits']: if trait_type and ev['_source']['traits'].get(trait_type): yield models.Trait( name=trait_type, dtype=t_types[trait_type], value=models.Trait.convert_value( t_types[trait_type], ev['_source']['traits'][trait_type])) else: for trait in ev['_source']['traits'].keys(): yield models.Trait( name=trait, dtype=t_types[trait], value=models.Trait.convert_value( t_types[trait], ev['_source']['traits'][trait]))
Creative Commons Attribution 3.0 License

Except where otherwise noted, this document is licensed under Creative Commons Attribution 3.0 License. See all OpenStack Legal Documents.