Source code for validations_libs.cli.common

#!/usr/bin/env python

#   Copyright 2021 Red Hat, Inc.
#
#   Licensed under the Apache License, Version 2.0 (the "License"); you may
#   not use this file except in compliance with the License. You may obtain
#   a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#   WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#   License for the specific language governing permissions and limitations
#   under the License.

from argparse import ArgumentDefaultsHelpFormatter
from cliff import _argparse
import json
from validations_libs.logger import getLogger
from prettytable import PrettyTable
import re
import sys
import time
import threading
import yaml

try:
    from junit_xml import TestSuite, TestCase, to_xml_report_string
    JUNIT_XML_FOUND = True
except ImportError:
    JUNIT_XML_FOUND = False

from validations_libs.cli import colors

# Handle backward compatibility for Cliff 2.16.0 in stable/train:
if hasattr(_argparse, 'SmartHelpFormatter'):
    from cliff._argparse import SmartHelpFormatter
else:
    from cliff.command import _SmartHelpFormatter as SmartHelpFormatter


[docs]class ValidationHelpFormatter(ArgumentDefaultsHelpFormatter, SmartHelpFormatter): """Composite CLI help formatter, providing both default argument values, and correct new line treatment. """ def _get_help_string(self, action): default_value = action.default if isinstance(default_value, list) or isinstance(default_value, str): if len(default_value) > 0: return super()._get_help_string(action) return super(ArgumentDefaultsHelpFormatter, self)._get_help_string(action)
[docs]def write_output(output_log, results): """Write output log file as Json format""" with open(output_log, 'w') as output: output.write(json.dumps({'results': results}, indent=4, sort_keys=True))
[docs]def write_junitxml(output_junitxml, results): """Write output file as JUnitXML format""" if not JUNIT_XML_FOUND: log = getLogger(__name__ + ".write_junitxml") log.warning('junitxml output disabled: the `junit_xml` python module ' 'is missing.') return test_cases = [] duration_re = re.compile('([0-9]+):([0-9]+):([0-9]+).([0-9]+)') for vitem in results: if vitem.get('Validations'): parsed_duration = 0 test_duration = vitem.get('Duration', '') matched_duration = duration_re.match(test_duration) if matched_duration: parsed_duration = (int(matched_duration[1])*3600 + int(matched_duration[2])*60 + int(matched_duration[3]) + float('0.{}'.format(matched_duration[4]))) test_stdout = vitem.get('Status_by_Host', '') test_case = TestCase('validations', vitem['Validations'], parsed_duration, test_stdout) if vitem['Status'] == 'FAILED': test_case.add_failure_info('FAILED') test_cases.append(test_case) ts = TestSuite("Validations", test_cases) with open(output_junitxml, 'w') as output: output.write(to_xml_report_string([ts]))
[docs]def read_cli_data_file(data_file): """Read CLI data (YAML/JSON) file. :param data_file: Path to the requested file. :type data_file: ``path like`` :returns: Parsed YAML/JSON file :rtype: ``dict`` :raises: RuntimeError if the file doesn't exist or is malformed. """ try: with open(data_file, 'r') as _file: return yaml.safe_load(_file.read()) except (yaml.YAMLError, IOError) as error: error_msg = ( "The file {} must be properly formatted YAML/JSON." "Details: {}.").format(data_file, error) raise RuntimeError(error_msg)
[docs]class Spinner(object): """Animated spinner to indicate activity during processing""" busy = False delay = 0.1
[docs] @staticmethod def spinning_cursor(): while 1: for cursor in '|/-\\': yield cursor
def __init__(self, delay=None): self.spinner_generator = self.spinning_cursor() if delay and float(delay): self.delay = delay
[docs] def spinner_task(self): while self.busy: sys.stdout.write(next(self.spinner_generator)) sys.stdout.flush() time.sleep(self.delay) sys.stdout.write('\b') sys.stdout.flush()
def __enter__(self): self.busy = True threading.Thread(target=self.spinner_task).start() def __exit__(self, exception, value, tb): self.busy = False time.sleep(self.delay) if exception is not None: return False