Source code for network.admin.test_routers

# Copyright 2013 OpenStack Foundation
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import testtools

from tempest.api.network import base
from tempest.common import identity
from tempest.common import utils
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib import decorators

CONF = config.CONF


[docs] class RoutersAdminTest(base.BaseAdminNetworkTest): """Test routers operation supported by admin""" # NOTE(salv-orlando): This class inherits from BaseAdminNetworkTest # as some router operations, such as enabling or disabling SNAT # require admin credentials by default def _cleanup_router(self, router): self.delete_router(router) def _create_router(self, name=None, admin_state_up=False, external_network_id=None, enable_snat=None): # associate a cleanup with created routers to avoid quota limits router = self.create_router(name, admin_state_up, external_network_id, enable_snat) self.addCleanup(test_utils.call_and_ignore_notfound_exc, self._cleanup_router, router) return router @classmethod def skip_checks(cls): super(RoutersAdminTest, cls).skip_checks() if not utils.is_extension_enabled('router', 'network'): msg = "router extension not enabled." raise cls.skipException(msg)
[docs] @decorators.idempotent_id('e54dd3a3-4352-4921-b09d-44369ae17397') def test_create_router_setting_project_id(self): prefix = CONF.resource_name_prefix """Test creating router from admin user setting project_id.""" project = data_utils.rand_name(name='test_tenant_', prefix=prefix) description = data_utils.rand_name(name='desc_', prefix=prefix) project = identity.identity_utils(self.os_admin).create_project( name=project, description=description) project_id = project['id'] self.addCleanup(identity.identity_utils(self.os_admin).delete_project, project_id) name = data_utils.rand_name(name='router-', prefix=prefix) create_body = self.admin_routers_client.create_router( name=name, project_id=project_id) self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.admin_routers_client.delete_router, create_body['router']['id']) self.assertEqual(project_id, create_body['router']['project_id'])
[docs] @decorators.idempotent_id('847257cc-6afd-4154-b8fb-af49f5670ce8') @utils.requires_ext(extension='ext-gw-mode', service='network') @testtools.skipUnless(CONF.network.public_network_id, 'The public_network_id option must be specified.') def test_create_router_with_default_snat_value(self): """Create a router with default snat rule""" router = self._create_router( external_network_id=CONF.network.public_network_id) self._verify_router_gateway( router['id'], {'network_id': CONF.network.public_network_id, 'enable_snat': True})
[docs] @decorators.idempotent_id('ea74068d-09e9-4fd7-8995-9b6a1ace920f') @utils.requires_ext(extension='ext-gw-mode', service='network') @testtools.skipUnless(CONF.network.public_network_id, 'The public_network_id option must be specified.') def test_create_router_with_snat_explicit(self): """Test creating router with specified enable_snat value""" name = data_utils.rand_name( 'snat-router', prefix=CONF.resource_name_prefix) # Create a router enabling snat attributes enable_snat_states = [False, True] for enable_snat in enable_snat_states: external_gateway_info = { 'network_id': CONF.network.public_network_id, 'enable_snat': enable_snat} create_body = self.admin_routers_client.create_router( name=name, external_gateway_info=external_gateway_info) self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.admin_routers_client.delete_router, create_body['router']['id']) # Verify snat attributes after router creation self._verify_router_gateway(create_body['router']['id'], exp_ext_gw_info=external_gateway_info)
def _verify_router_gateway(self, router_id, exp_ext_gw_info=None): show_body = self.admin_routers_client.show_router(router_id) actual_ext_gw_info = show_body['router']['external_gateway_info'] if exp_ext_gw_info is None: self.assertIsNone(actual_ext_gw_info) return # Verify only keys passed in exp_ext_gw_info for k, v in exp_ext_gw_info.items(): self.assertEqual(v, actual_ext_gw_info[k]) def _verify_gateway_port(self, router_id): list_body = self.admin_ports_client.list_ports( network_id=CONF.network.public_network_id, device_id=router_id, device_owner="network:router_gateway") self.assertEqual(len(list_body['ports']), 1) gw_port = list_body['ports'][0] fixed_ips = gw_port['fixed_ips'] self.assertNotEmpty(fixed_ips) # Assert that all of the IPs from the router gateway port # are allocated from a valid public subnet. public_net_body = self.admin_networks_client.show_network( CONF.network.public_network_id) public_subnet_ids = public_net_body['network']['subnets'] for fixed_ip in fixed_ips: subnet_id = fixed_ip['subnet_id'] self.assertIn(subnet_id, public_subnet_ids)
[docs] @decorators.idempotent_id('6cc285d8-46bf-4f36-9b1a-783e3008ba79') @testtools.skipUnless(CONF.network.public_network_id, 'The public_network_id option must be specified.') def test_update_router_set_gateway(self): """Test updating router's gateway info""" router = self._create_router() self.routers_client.update_router( router['id'], external_gateway_info={ 'network_id': CONF.network.public_network_id}) # Verify operation - router self._verify_router_gateway( router['id'], {'network_id': CONF.network.public_network_id}) self._verify_gateway_port(router['id'])
[docs] @decorators.idempotent_id('b386c111-3b21-466d-880c-5e72b01e1a33') @utils.requires_ext(extension='ext-gw-mode', service='network') @testtools.skipUnless(CONF.network.public_network_id, 'The public_network_id option must be specified.') def test_update_router_set_gateway_with_snat_explicit(self): """Test setting router's gateway with snat enabled""" router = self._create_router() self.admin_routers_client.update_router( router['id'], external_gateway_info={ 'network_id': CONF.network.public_network_id, 'enable_snat': True}) self._verify_router_gateway( router['id'], {'network_id': CONF.network.public_network_id, 'enable_snat': True}) self._verify_gateway_port(router['id'])
[docs] @decorators.idempotent_id('96536bc7-8262-4fb2-9967-5c46940fa279') @utils.requires_ext(extension='ext-gw-mode', service='network') @testtools.skipUnless(CONF.network.public_network_id, 'The public_network_id option must be specified.') def test_update_router_set_gateway_without_snat(self): """Test setting router's gateway with snat not enabled""" router = self._create_router() self.admin_routers_client.update_router( router['id'], external_gateway_info={ 'network_id': CONF.network.public_network_id, 'enable_snat': False}) self._verify_router_gateway( router['id'], {'network_id': CONF.network.public_network_id, 'enable_snat': False}) self._verify_gateway_port(router['id'])
[docs] @decorators.idempotent_id('ad81b7ee-4f81-407b-a19c-17e623f763e8') @testtools.skipUnless(CONF.network.public_network_id, 'The public_network_id option must be specified.') def test_update_router_unset_gateway(self): """Test unsetting router's gateway""" router = self._create_router( external_network_id=CONF.network.public_network_id) self.routers_client.update_router(router['id'], external_gateway_info={}) self._verify_router_gateway(router['id']) # No gateway port expected list_body = self.admin_ports_client.list_ports( network_id=CONF.network.public_network_id, device_id=router['id']) self.assertFalse(list_body['ports'])
[docs] @decorators.idempotent_id('f2faf994-97f4-410b-a831-9bc977b64374') @utils.requires_ext(extension='ext-gw-mode', service='network') @testtools.skipUnless(CONF.network.public_network_id, 'The public_network_id option must be specified.') def test_update_router_reset_gateway_without_snat(self): """Test updating router's gateway to be with snat not enabled""" router = self._create_router( external_network_id=CONF.network.public_network_id) self.admin_routers_client.update_router( router['id'], external_gateway_info={ 'network_id': CONF.network.public_network_id, 'enable_snat': False}) self._verify_router_gateway( router['id'], {'network_id': CONF.network.public_network_id, 'enable_snat': False}) self._verify_gateway_port(router['id'])
[docs] @decorators.idempotent_id('cbe42f84-04c2-11e7-8adb-fa163e4fa634') @utils.requires_ext(extension='ext-gw-mode', service='network') def test_create_router_set_gateway_with_fixed_ip(self): """Test creating router setting gateway with fixed ip""" # At first create an external network and then use that # to create address and delete network_name = data_utils.rand_name( self.__class__.__name__, prefix=CONF.resource_name_prefix) network_1 = self.admin_networks_client.create_network( name=network_name, **{'router:external': True})['network'] self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.admin_networks_client.delete_network, network_1['id']) subnet = self.create_subnet( network_1, client=self.admin_subnets_client, enable_dhcp=False) self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.admin_subnets_client.delete_subnet, subnet['id']) port = self.admin_ports_client.create_port( name=data_utils.rand_name( self.__class__.__name__, prefix=CONF.resource_name_prefix), network_id=network_1['id'])['port'] self.admin_ports_client.delete_port(port_id=port['id']) fixed_ip = { 'subnet_id': port['fixed_ips'][0]['subnet_id'], 'ip_address': port['fixed_ips'][0]['ip_address'] } external_gateway_info = { 'network_id': network_1['id'], 'external_fixed_ips': [fixed_ip] } # Create a router and set gateway to fixed_ip router = self.admin_routers_client.create_router( external_gateway_info=external_gateway_info)['router'] self.admin_routers_client.delete_router(router['id']) # Examine router's gateway is equal to fixed_ip self.assertEqual(router['external_gateway_info'][ 'external_fixed_ips'][0]['ip_address'], fixed_ip['ip_address'])
[docs] class RoutersIpV6AdminTest(RoutersAdminTest): _ip_version = 6