Source code for osprofiler.drivers.elasticsearch_driver

# Copyright 2016 Mirantis Inc.
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

from typing import Any
from urllib import parse as parser

from oslo_config import cfg

from osprofiler.drivers import base
from osprofiler import exc


[docs] class ElasticsearchDriver(base.Driver): def __init__( self, connection_str: str, index_name: str = "osprofiler-notifications", project: str | None = None, service: str | None = None, host: str | None = None, conf: cfg.ConfigOpts = cfg.CONF, **kwargs: Any, ) -> None: """Elasticsearch driver for OSProfiler.""" super().__init__( connection_str, project=project, service=service, host=host, conf=conf, **kwargs, ) try: from elasticsearch import Elasticsearch except ImportError: raise exc.CommandError( "To use OSProfiler with ElasticSearch driver, " "please install `elasticsearch` library. " "To install with pip:\n `pip install elasticsearch`." ) client_url = parser.urlunparse( parser.urlparse(self.connection_str)._replace(scheme="http") ) self.conf = conf self.client = Elasticsearch(client_url) self.index_name = index_name self.index_name_error = "osprofiler-notifications-error"
[docs] @classmethod def get_name(cls) -> str: return "elasticsearch"
[docs] def notify(self, info: dict[str, Any], **kwargs: Any) -> None: """Send notifications to Elasticsearch. :param info: Contains information about trace element. In payload dict there are always 3 ids: "base_id" - uuid that is common for all notifications related to one trace. Used to simplify retrieving of all trace elements from Elasticsearch. "parent_id" - uuid of parent element in trace "trace_id" - uuid of current element in trace With parent_id and trace_id it's quite simple to build tree of trace elements, which simplify analyze of trace. """ info = info.copy() info["project"] = self.project info["service"] = self.service self.client.index( # type: ignore[call-arg] index=self.index_name, doc_type=self.conf.profiler.es_doc_type, body=info, ) if ( self.filter_error_trace and info.get("info", {}).get("etype") is not None ): self.notify_error_trace(info)
[docs] def notify_error_trace(self, info: dict[str, Any]) -> None: """Store base_id and timestamp of error trace to a separate index.""" self.client.index( # type: ignore[call-arg] index=self.index_name_error, doc_type=self.conf.profiler.es_doc_type, body={"base_id": info["base_id"], "timestamp": info["timestamp"]}, )
def _hits(self, response: Any) -> list[Any]: """Returns all hits of search query using scrolling :param response: ElasticSearch query response """ scroll_id = response["_scroll_id"] scroll_size = len(response["hits"]["hits"]) result: list[Any] = [] while scroll_size > 0: for hit in response["hits"]["hits"]: result.append(hit["_source"]) response = self.client.scroll( scroll_id=scroll_id, scroll=self.conf.profiler.es_scroll_time ) scroll_id = response["_scroll_id"] scroll_size = len(response["hits"]["hits"]) return result
[docs] def list_traces( self, fields: set[str] | None = None ) -> list[dict[str, Any]]: """Query all traces from the storage. :param fields: Set of trace fields to return. Defaults to 'base_id' and 'timestamp' :returns: List of traces, where each trace is a dictionary containing at least `base_id` and `timestamp`. """ query: dict[str, Any] = {"match_all": {}} fields = set(fields or self.default_trace_fields) response = self.client.search( # type: ignore[call-arg] index=self.index_name, doc_type=self.conf.profiler.es_doc_type, size=self.conf.profiler.es_scroll_size, scroll=self.conf.profiler.es_scroll_time, body={ "_source": fields, "query": query, "sort": [{"timestamp": "asc"}], }, ) return self._hits(response)
[docs] def list_error_traces(self) -> list[dict[str, Any]]: """Returns all traces that have error/exception.""" response = self.client.search( # type: ignore[call-arg] index=self.index_name_error, doc_type=self.conf.profiler.es_doc_type, size=self.conf.profiler.es_scroll_size, scroll=self.conf.profiler.es_scroll_time, body={ "_source": self.default_trace_fields, "query": {"match_all": {}}, "sort": [{"timestamp": "asc"}], }, ) return self._hits(response)
[docs] def get_report(self, base_id: str) -> dict[str, Any]: """Retrieves and parses notification from Elasticsearch. :param base_id: Base id of trace elements. """ response = self.client.search( # type: ignore[call-arg] index=self.index_name, doc_type=self.conf.profiler.es_doc_type, size=self.conf.profiler.es_scroll_size, scroll=self.conf.profiler.es_scroll_time, body={"query": {"match": {"base_id": base_id}}}, ) for n in self._hits(response): trace_id = n["trace_id"] parent_id = n["parent_id"] name = n["name"] project = n["project"] service = n["service"] host = n["info"]["host"] timestamp = n["timestamp"] self._append_results( trace_id, parent_id, name, project, service, host, timestamp, n ) return self._parse_results()