Source code for openstack_dashboard.test.helpers

# Copyright 2012 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Copyright 2012 Nebula, Inc.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

from functools import wraps
import importlib
import logging
import os
import traceback
from unittest import mock
from urllib import parse

from django.conf import settings
from django.contrib.messages.storage import cookie as cookie_storage
from django.contrib.messages.storage import default_storage
from django.core.handlers import wsgi
from django.test.client import RequestFactory
from django.test import tag
from django.test import testcases
from django import urls

from openstack_auth import user
from openstack_auth import utils
from requests.packages.urllib3.connection import HTTPConnection

from horizon import base
from horizon import conf
from horizon import exceptions
from horizon.test import helpers as horizon_helpers
from openstack_dashboard import api
from openstack_dashboard import context_processors
from openstack_dashboard.test.test_data import utils as test_utils


LOG = logging.getLogger(__name__)

# Makes output of failing tests much easier to read.
wsgi.WSGIRequest.__repr__ = lambda self: "<class 'django.http.HttpRequest'>"

# Shortcuts to avoid importing horizon_helpers and for backward compatibility.
update_settings = horizon_helpers.update_settings
IsA = horizon_helpers.IsA
IsHttpRequest = horizon_helpers.IsHttpRequest


def create_mocks(target_methods, stop_mock=True):
    """decorator to simplify setting up multiple mocks at once

    :param target_methods: a dict to define methods to be patched using mock.

    A key of "target_methods" is a target object whose attribute(s) are
    patched.

    A value of "target_methods" is a list of methods to be patched
    using mock. Each element of the list can be a string or a tuple
    consisting of two strings.

    A string specifies a method name of "target" object to be mocked.
    The decorator create a mock object for the method and the started mock
    can be accessed via 'mock_<method-name>' of the test class.
    For example, in case of::

        @create_mocks({api.nova: ['server_list',
                                  'flavor_list']})
        def test_example(self):
            ...
            self.mock_server_list.return_value = ...
            self.mock_flavar_list.side_effect = ...

    you can access the mocked method via "self.mock_server_list"
    inside a test class.

    The tuple version is useful when there are multiple methods with
    a same name are mocked in a single test.
    The format of the tuple is::

        ("<method-name-to-be-mocked>", "<attr-name>")

    The decorator create a mock object for "<method-name-to-be-mocked>"
    and the started mock can be accessed via 'mock_<attr-name>' of
    the test class.

    Example::

        @create_mocks({
            api.nova: [
                'usage_get',
                ('tenant_absolute_limits', 'nova_tenant_absolute_limits'),
            ],
            api.cinder: [
                ('tenant_absolute_limits', 'cinder_tenant_absolute_limits'),
            ],
        })
        def test_example(self):
            ...
            self.mock_usage_get.return_value = ...
            self.mock_nova_tenant_absolute_limits.return_value = ...
            self.mock_cinder_tenant_absolute_limits.return_value = ...
            ...

    :param stop_mock: If True (default), mocks started in this decorator will
        be stopped. Set this to False only if you cannot stop mocks when exiting
        this decorator. The default value, True, should work for most cases.
    """
    def wrapper(function):
        @wraps(function)
        def wrapped(inst, *args, **kwargs):
            patchers = []
            for target, methods in target_methods.items():
                for method in methods:
                    if isinstance(method, str):
                        method_mocked = method
                        attr_name = method
                    else:
                        method_mocked = method[0]
                        attr_name = method[1]
                    m = mock.patch.object(target, method_mocked)
                    patchers.append(m)
                    setattr(inst, 'mock_%s' % attr_name, m.start())
            retval = function(inst, *args, **kwargs)
            if stop_mock:
                for m in patchers:
                    m.stop()
            return retval
        return wrapped
    return wrapper


def _apply_panel_mocks(patchers=None):
    """Global mocks on panels that get called on all views."""
    if patchers is None:
        patchers = {}
    mocked_methods = settings.TEST_GLOBAL_MOCKS_ON_PANELS
    for name, mock_config in mocked_methods.items():
        method = mock_config['method']
        mock_params = {}
        for param in ['return_value', 'side_effect']:
            if param in mock_config:
                mock_params[param] = mock_config[param]
        patcher = mock.patch(method, **mock_params)
        patcher.start()
        patchers[name] = patcher
    return patchers


class RequestFactoryWithMessages(RequestFactory):
    def get(self, *args, **kwargs):
        req = super().get(*args, **kwargs)
        req.user = utils.get_user(req)
        req.session = []
        req._messages = default_storage(req)
        return req

    def post(self, *args, **kwargs):
        req = super().post(*args, **kwargs)
        req.user = utils.get_user(req)
        req.session = []
        req._messages = default_storage(req)
        return req


[docs] class TestCase(horizon_helpers.TestCase): """Specialized base test case class for Horizon. It gives access to numerous additional features: * A full suite of test data through various attached objects and managers (e.g. ``self.servers``, ``self.user``, etc.). See the docs for :class:`~openstack_dashboard.test.test_data.utils.TestData` for more information. * A set of request context data via ``self.context``. * A ``RequestFactory`` class which supports Django's ``contrib.messages`` framework via ``self.factory``. * A ready-to-go request object via ``self.request``. * The ability to override specific time data controls for easier testing. * Several handy additional assertion methods. """ # To force test failures when unmocked API calls are attempted, provide # boolean variable to store failures missing_mocks = False def fake_conn_request(self): # print a stacktrace to illustrate where the unmocked API call # is being made from traceback.print_stack() # forcing a test failure for missing mock self.missing_mocks = True
[docs] def setUp(self): self._real_conn_request = HTTPConnection.connect HTTPConnection.connect = self.fake_conn_request self._real_context_processor = context_processors.openstack context_processors.openstack = lambda request: self.context self.patchers = _apply_panel_mocks() super().setUp()
def _setup_test_data(self): super()._setup_test_data() test_utils.load_test_data(self) self.context = { 'authorized_tenants': self.tenants.list(), 'JS_CATALOG': context_processors.get_js_catalog(settings), } def _setup_factory(self): # For some magical reason we need a copy of this here. self.factory = RequestFactoryWithMessages() def _setup_user(self, **kwargs): self._real_get_user = utils.get_user tenants = self.context['authorized_tenants'] base_kwargs = { 'id': self.user.id, 'token': self.token, 'username': self.user.name, 'domain_id': self.domain.id, 'user_domain_name': self.domain.name, 'tenant_id': self.tenant.id, 'service_catalog': self.service_catalog, 'authorized_tenants': tenants } base_kwargs.update(kwargs) self.setActiveUser(**base_kwargs) def _setup_request(self): super()._setup_request() self.request.session['token'] = self.token.id
[docs] def tearDown(self): HTTPConnection.connect = self._real_conn_request context_processors.openstack = self._real_context_processor utils.get_user = self._real_get_user mock.patch.stopall() super().tearDown() # cause a test failure if an unmocked API call was attempted if self.missing_mocks: raise AssertionError("An unmocked API call was made.")
def setActiveUser(self, id=None, token=None, username=None, tenant_id=None, service_catalog=None, tenant_name=None, roles=None, authorized_tenants=None, enabled=True, domain_id=None, user_domain_name=None): def get_user(request): ret = user.User( id=id, token=token, user=username, domain_id=domain_id, user_domain_name=user_domain_name, tenant_id=tenant_id, tenant_name=tenant_name, service_catalog=service_catalog, roles=roles, enabled=enabled, authorized_tenants=authorized_tenants, endpoint=settings.OPENSTACK_KEYSTONE_URL, ) ret._is_system_user = False return ret utils.get_user = get_user
[docs] def assertRedirectsNoFollow(self, response, expected_url): """Check for redirect. Asserts that the given response issued a 302 redirect without processing the view which is redirected to. """ if response.has_header('location'): loc = response['location'] else: loc = '' loc = parse.unquote(loc) expected_url = parse.unquote(expected_url) self.assertEqual(loc, expected_url) self.assertEqual(response.status_code, 302)
[docs] def assertNoFormErrors(self, response, context_name="form"): """Checks for no form errors. Asserts that the response either does not contain a form in its context, or that if it does, that form has no errors. """ context = getattr(response, "context", {}) if not context or context_name not in context: return True errors = response.context[context_name]._errors assert len(errors) == 0, \ "Unexpected errors were found on the form: %s" % errors
[docs] def assertFormErrors(self, response, count=0, message=None, context_name="form"): """Check for form errors. Asserts that the response does contain a form in its context, and that form has errors, if count were given, it must match the exact numbers of errors """ context = getattr(response, "context", {}) assert (context and context_name in context), \ "The response did not contain a form." errors = response.context[context_name]._errors if count: assert len(errors) == count, \ "%d errors were found on the form, %d expected" % \ (len(errors), count) if message: text = testcases.assert_and_parse_html( self, message, None, '"message" contains invalid HTML:') content = testcases.assert_and_parse_html( self, str(errors), None, '"_errors" in the response context is not valid HTML:') match_count = content.count(text) self.assertGreaterEqual(match_count, 1) else: assert len(errors) > 0, "No errors were found on the form"
[docs] def assertStatusCode(self, response, expected_code): """Validates an expected status code. Matches camel case of other assert functions """ if response.status_code == expected_code: return self.fail('status code %r != %r: %s' % (response.status_code, expected_code, response.content))
def assertItemsCollectionEqual(self, response, items_list): self.assertEqual(response.json, {"items": items_list}) def getAndAssertTableRowAction(self, response, table_name, action_name, row_id): table = response.context[table_name + '_table'] rows = list(filter(lambda x: x.id == row_id, table.data)) self.assertEqual(1, len(rows), "Did not find a row matching id '%s'" % row_id) row_actions = table.get_row_actions(rows[0]) actions = list(filter(lambda x: x.name == action_name, row_actions)) msg_args = (action_name, table_name, row_id) self.assertGreater( len(actions), 0, "No action named '%s' found in '%s' table for id '%s'" % msg_args) self.assertEqual( 1, len(actions), "Multiple actions named '%s' found in '%s' table for id '%s'" % msg_args) return actions[0] def getAndAssertTableAction(self, response, table_name, action_name): table = response.context[table_name + '_table'] table_actions = table.get_table_actions() actions = list(filter(lambda x: x.name == action_name, table_actions)) msg_args = (action_name, table_name) self.assertGreater( len(actions), 0, "No action named '%s' found in '%s' table" % msg_args) self.assertEqual( 1, len(actions), "More than one action named '%s' found in '%s' table" % msg_args) return actions[0] @staticmethod def mock_rest_request(**args): mock_args = { 'user.is_authenticated': True, 'policy.check.return_value': True, 'body': '' } mock_args.update(args) return mock.Mock(**mock_args) def assert_mock_multiple_calls_with_same_arguments( self, mocked_method, count, expected_call): self.assertEqual(count, mocked_method.call_count) mocked_method.assert_has_calls([expected_call] * count)
[docs] def assertNoWorkflowErrors(self, response, context_name="workflow"): """Checks for no workflow errors. Asserts that the response either does not contain a workflow in its context, or that if it does, that workflow has no errors. """ context = getattr(response, "context", {}) if not context or context_name not in context: return True errors = [step.action._errors for step in response.context[context_name].steps] self.assertEqual( 0, len(errors), "Unexpected errors were found on the workflow: %s" % errors)
[docs] def assertWorkflowErrors(self, response, count=0, message=None, context_name="workflow"): """Check for workflow errors. Asserts that the response does contain a workflow in its context, and that workflow has errors, if count were given, it must match the exact numbers of errors """ context = getattr(response, "context", {}) self.assertIn(context_name, context, msg="The response did not contain a workflow.") errors = {} for step in response.context[context_name].steps: errors.update(step.action._errors) if count: self.assertEqual( count, len(errors), "%d errors were found on the workflow, %d expected" % (len(errors), count)) if message and message not in str(errors): self.fail("Expected message not found, instead found: %s" % ["%s: %s" % (key, [e for e in field_errors]) for (key, field_errors) in errors.items()]) else: self.assertGreater( len(errors), 0, "No errors were found on the workflow")
def assertCookieMessage(self, response, expected_msg, detail_msg=None): data = response.cookies["messages"] storage = cookie_storage.CookieStorage(None) messages = [m.message for m in storage._decode(data.value)] if detail_msg is not None: _expected = exceptions._append_detail(expected_msg, detail_msg) else: _expected = expected_msg self.assertIn(_expected, messages)
[docs] class BaseAdminViewTests(TestCase): """Sets an active user with the "admin" role. For testing admin-only views and functionality. """ def setActiveUser(self, *args, **kwargs): if "roles" not in kwargs: kwargs['roles'] = [self.roles.admin._info] super().setActiveUser(*args, **kwargs) def setSessionValues(self, **kwargs): settings.SESSION_ENGINE = 'django.contrib.sessions.backends.file' engine = importlib.import_module(settings.SESSION_ENGINE) store = engine.SessionStore() for key in kwargs: store[key] = kwargs[key] self.request.session[key] = kwargs[key] store.save() self.session = store self.client.cookies[settings.SESSION_COOKIE_NAME] = store.session_key
[docs] class APITestCase(TestCase):
[docs] def setUp(self): super().setUp() utils.patch_middleware_get_user()
class RestAPITestCase(TestCase): def setUp(self): super().setUp() mock.patch('horizon.utils.http.is_ajax', return_value=True).start() # APIMockTestCase was introduced to support mox to mock migration smoothly # but it turns we have still users of APITestCase. # We keep both for a while. # Looking at the usage of these classes, it seems better to drop this one. # TODO(amotoki): Clean up APIMockTestCase usage in horizon plugins. APIMockTestCase = APITestCase # Need this to test both Glance API V1 and V2 versions class ResetImageAPIVersionMixin(object): def setUp(self): super().setUp() api.glance.VERSIONS.clear_active_cache() def tearDown(self): api.glance.VERSIONS.clear_active_cache() super().tearDown() @horizon_helpers.pytest_mark('selenium') @tag('selenium') class SeleniumTestCase(horizon_helpers.SeleniumTestCase): def setUp(self): super().setUp() test_utils.load_test_data(self) self._real_get_user = utils.get_user self.setActiveUser(id=self.user.id, token=self.token, username=self.user.name, tenant_id=self.tenant.id, service_catalog=self.service_catalog, authorized_tenants=self.tenants.list()) self.patchers = _apply_panel_mocks() os.environ["HORIZON_TEST_RUN"] = "True" def tearDown(self): utils.get_user = self._real_get_user mock.patch.stopall() del os.environ["HORIZON_TEST_RUN"] def setActiveUser(self, id=None, token=None, username=None, tenant_id=None, service_catalog=None, tenant_name=None, roles=None, authorized_tenants=None, enabled=True): def get_user(request): return user.User(id=id, token=token, user=username, tenant_id=tenant_id, service_catalog=service_catalog, roles=roles, enabled=enabled, authorized_tenants=authorized_tenants, endpoint=settings.OPENSTACK_KEYSTONE_URL) utils.get_user = get_user class SeleniumAdminTestCase(SeleniumTestCase): """Version of AdminTestCase for Selenium. Sets an active user with the "admin" role for testing admin-only views and functionality. """ def setActiveUser(self, *args, **kwargs): if "roles" not in kwargs: kwargs['roles'] = [self.roles.admin._info] super().setActiveUser(*args, **kwargs) def my_custom_sort(flavor): sort_order = { 'm1.secret': 0, 'm1.tiny': 1, 'm1.massive': 2, 'm1.metadata': 3, } return sort_order[flavor.name] # TODO(amotoki): Investigate a way to run PluginTestCase with the main # unit tests. Currently we fail to find a way to clean up urlpatterns and # Site registry touched by setUp() cleanly. As a workaround, we run # PluginTestCase as a separate test process. Hopefully this workaround has gone # in future. For more detail, see bugs 1809983, 1866666 and # https://review.opendev.org/#/c/627640/. @horizon_helpers.pytest_mark('plugin_test') @tag('plugin-test') class PluginTestCase(TestCase): """Test case for testing plugin system of Horizon. For use with tests which deal with the pluggable dashboard and panel configuration, it takes care of backing up and restoring the Horizon configuration. """ def setUp(self): super().setUp() self.old_horizon_config = conf.HORIZON_CONFIG conf.HORIZON_CONFIG = conf.LazySettings() base.Horizon._urls() # Store our original dashboards self._discovered_dashboards = base.Horizon._registry.keys() # Gather up and store our original panels for each dashboard self._discovered_panels = {} for dash in self._discovered_dashboards: panels = base.Horizon._registry[dash]._registry.keys() self._discovered_panels[dash] = panels def tearDown(self): super().tearDown() conf.HORIZON_CONFIG = self.old_horizon_config # Destroy our singleton and re-create it. base.HorizonSite._instance = None del base.Horizon base.Horizon = base.HorizonSite() # Reload the convenience references to Horizon stored in __init__ importlib.reload(importlib.import_module("horizon")) # Re-register our original dashboards and panels. # This is necessary because autodiscovery only works on the first # import, and calling reload introduces innumerable additional # problems. Manual re-registration is the only good way for testing. for dash in self._discovered_dashboards: base.Horizon.register(dash) for panel in self._discovered_panels[dash]: dash.register(panel) self._reload_urls() def _reload_urls(self): """CLeans up URLs. Clears out the URL caches, reloads the root urls module, and re-triggers the autodiscovery mechanism for Horizon. Allows URLs to be re-calculated after registering new dashboards. Useful only for testing and should never be used on a live site. """ urls.clear_url_caches() importlib.reload(importlib.import_module(settings.ROOT_URLCONF)) base.Horizon._urls() def mock_obj_to_dict(r): return mock.Mock(**{'to_dict.return_value': r}) def mock_factory(r): """mocks all the attributes as well as the to_dict """ mocked = mock_obj_to_dict(r) mocked.configure_mock(**r) return mocked