Source code for scenario.test_security_groups_basic_ops

# Copyright 2013 Red Hat, Inc.
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.
from oslo_log import log
import testtools

from tempest.common import compute
from tempest.common import utils
from tempest.common.utils import net_info
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest.scenario import manager

CONF = config.CONF
LOG = log.getLogger(__name__)


[docs] class TestSecurityGroupsBasicOps(manager.NetworkScenarioTest): """The test suite for security groups This test suite assumes that Nova has been configured to boot VM's with Neutron-managed networking, and attempts to verify cross tenant connectivity as follows ssh: in order to overcome "ip namespace", each tenant has an "access point" VM with floating-ip open to incoming ssh connection allowing network commands (ping/ssh) to be executed from within the tenant-network-namespace Tempest host performs key-based authentication to the ssh server via floating IP address connectivity test is done by pinging destination server via source server ssh connection. success - ping returns failure - ping_timeout reached multi-node: Multi-Node mode is enabled when CONF.compute.min_compute_nodes > 1. Tests connectivity between servers on different compute nodes. When enabled, test will boot each new server to different compute nodes. setup: for primary tenant: 1. create a network&subnet 2. create a router (if public router isn't configured) 3. connect tenant network to public network via router 4. create an access point: a. a security group open to incoming ssh connection b. a VM with a floating ip 5. create a general empty security group (same as "default", but without rules allowing in-tenant traffic) tests: 1. _verify_network_details 2. _verify_mac_addr: for each access point verify that (subnet, fix_ip, mac address) are as defined in the port list 3. _test_in_tenant_block: test that in-tenant traffic is disabled without rules allowing it 4. _test_in_tenant_allow: test that in-tenant traffic is enabled once an appropriate rule has been created 5. _test_cross_tenant_block: test that cross-tenant traffic is disabled without a rule allowing it on destination tenant 6. _test_cross_tenant_allow: * test that cross-tenant traffic is enabled once an appropriate rule has been created on destination tenant. * test that reverse traffic is still blocked * test than reverse traffic is enabled once an appropriate rule has been created on source tenant 7. _test_port_update_new_security_group: * test that traffic is blocked with default security group * test that traffic is enabled after updating port with new security group having appropriate rule 8. _test_multiple_security_groups: test multiple security groups can be associated with the vm assumptions: 1. alt_tenant/user existed and is different from primary_tenant/user 2. Public network is defined and reachable from the Tempest host 3. Public router can either be: * defined, in which case all tenants networks can connect directly to it, and cross tenant check will be done on the private IP of the destination tenant or * not defined (empty string), in which case each tenant will have its own router connected to the public network """ credentials = ['primary', 'alt', 'admin'] class TenantProperties(object): """helper class to save tenant details id credentials network subnet security groups servers access point """ def __init__(self, clients): # Credentials from manager are filled with both names and IDs self.manager = clients self.creds = self.manager.credentials self.network = None self.subnet = None self.router = None self.security_groups = {} self.servers = list() self.access_point = None def set_network(self, network, subnet, router): self.network = network self.subnet = subnet self.router = router @classmethod def skip_checks(cls): super(TestSecurityGroupsBasicOps, cls).skip_checks() if CONF.network.port_vnic_type in ['direct', 'macvtap']: msg = ('Not currently supported when using vnic_type' ' direct or macvtap') raise cls.skipException(msg) if not (CONF.network.project_networks_reachable or CONF.network.public_network_id): msg = ('Either project_networks_reachable must be "true", or ' 'public_network_id must be defined.') raise cls.skipException(msg) if not utils.is_extension_enabled('security-group', 'network'): msg = "security-group extension not enabled." raise cls.skipException(msg) if CONF.network.shared_physical_network: msg = ('Deployment uses a shared physical network, security ' 'groups not supported') raise cls.skipException(msg) if not CONF.network_feature_enabled.floating_ips: raise cls.skipException("Floating ips are not available") @classmethod def setup_credentials(cls): # Create no network resources for these tests. cls.set_network_resources() super(TestSecurityGroupsBasicOps, cls).setup_credentials() @classmethod def resource_setup(cls): super(TestSecurityGroupsBasicOps, cls).resource_setup() cls.multi_node = CONF.compute.min_compute_nodes > 1 and \ compute.is_scheduler_filter_enabled("DifferentHostFilter") if cls.multi_node: LOG.info("Working in Multi Node mode") else: LOG.info("Working in Single Node mode") cls.floating_ips = {} cls.tenants = {} cls.primary_tenant = cls.TenantProperties(cls.os_primary) cls.alt_tenant = cls.TenantProperties(cls.os_alt) for tenant in [cls.primary_tenant, cls.alt_tenant]: cls.tenants[tenant.creds.project_id] = tenant cls.floating_ip_access = not CONF.network.public_router_id def setUp(self): """Set up a single tenant with an accessible server. If multi-host is enabled, save created server uuids. """ self.servers = [] super(TestSecurityGroupsBasicOps, self).setUp() self._deploy_tenant(self.primary_tenant) self._verify_network_details(self.primary_tenant) self._verify_mac_addr(self.primary_tenant) def _create_tenant_keypairs(self, tenant): keypair = self.create_keypair(tenant.manager.keypairs_client) tenant.keypair = keypair def _create_tenant_security_groups(self, tenant): access_sg = self.create_empty_security_group( namestart='secgroup_access-', project_id=tenant.creds.project_id, client=tenant.manager.security_groups_client ) # don't use default secgroup since it allows in-project traffic def_sg = self.create_empty_security_group( namestart='secgroup_general-', project_id=tenant.creds.project_id, client=tenant.manager.security_groups_client ) tenant.security_groups.update(access=access_sg, default=def_sg) ssh_rule = dict( protocol='tcp', port_range_min=22, port_range_max=22, direction='ingress', ) sec_group_rules_client = tenant.manager.security_group_rules_client self.create_security_group_rule( secgroup=access_sg, sec_group_rules_client=sec_group_rules_client, **ssh_rule) def _verify_network_details(self, tenant): # Checks that we see the newly created network/subnet/router via # checking the result of list_[networks,routers,subnets] # Check that (router, subnet) couple exist in port_list seen_nets = self.os_admin.networks_client.list_networks() seen_names = [n['name'] for n in seen_nets['networks']] seen_ids = [n['id'] for n in seen_nets['networks']] self.assertIn(tenant.network['name'], seen_names) self.assertIn(tenant.network['id'], seen_ids) seen_subnets = [ (n['id'], n['cidr'], n['network_id']) for n in self.os_admin.subnets_client.list_subnets()['subnets'] ] mysubnet = (tenant.subnet['id'], tenant.subnet['cidr'], tenant.network['id']) self.assertIn(mysubnet, seen_subnets) seen_routers = self.os_admin.routers_client.list_routers() seen_router_ids = [n['id'] for n in seen_routers['routers']] seen_router_names = [n['name'] for n in seen_routers['routers']] self.assertIn(tenant.router['name'], seen_router_names) self.assertIn(tenant.router['id'], seen_router_ids) myport = (tenant.router['id'], tenant.subnet['id']) router_ports = [ (i['device_id'], f['subnet_id']) for i in self.os_admin.ports_client.list_ports( device_id=tenant.router['id'])['ports'] if net_info.is_router_interface_port(i) for f in i['fixed_ips'] ] self.assertIn(myport, router_ports) def _create_server(self, name, tenant, security_groups, **kwargs): """Creates a server and assigns it to security group. If multi-host is enabled, Ensures servers are created on different compute nodes, by storing created servers' ids and uses different_host as scheduler_hints on creation. Validates servers are created as requested, using admin client. """ security_groups_names = [{'name': s['name']} for s in security_groups] if self.multi_node: kwargs["scheduler_hints"] = {'different_host': self.servers} server = self.create_server( name=name, networks=[{'uuid': tenant.network["id"]}], key_name=tenant.keypair['name'], security_groups=security_groups_names, clients=tenant.manager, **kwargs) if 'security_groups' in server: self.assertEqual( sorted([s['name'] for s in security_groups]), sorted([s['name'] for s in server['security_groups']])) # Verify servers are on different compute nodes if self.multi_node: new_host = self.get_host_for_server(server["id"]) host_list = [self.get_host_for_server(s) for s in self.servers] self.assertNotIn(new_host, host_list, message="Failed to boot servers on different " "Compute nodes.") self.servers.append(server["id"]) return server def _create_tenant_servers(self, tenant, num=1): for i in range(num): name = 'server-{tenant}-gen-{num}'.format( tenant=tenant.creds.tenant_name, num=i ) name = data_utils.rand_name( prefix=CONF.resource_name_prefix, name=name) server = self._create_server(name, tenant, [tenant.security_groups['default']]) tenant.servers.append(server) def _set_access_point(self, tenant): # creates a server in a secgroup with rule allowing external ssh # in order to access project internal network # workaround ip namespace secgroups = tenant.security_groups.values() name = 'server-{tenant}-access_point'.format( tenant=tenant.creds.tenant_name) name = data_utils.rand_name( prefix=CONF.resource_name_prefix, name=name) server = self._create_server(name, tenant, security_groups=secgroups) tenant.access_point = server self._assign_floating_ips(tenant, server) def _assign_floating_ips(self, tenant, server): public_network_id = CONF.network.public_network_id floating_ip = self.create_floating_ip( server, public_network_id, client=tenant.manager.floating_ips_client) self.floating_ips.setdefault(server['id'], floating_ip) def _create_tenant_network(self, tenant, port_security_enabled=True): network, subnet, router = self.setup_network_subnet_with_router( networks_client=tenant.manager.networks_client, routers_client=tenant.manager.routers_client, subnets_client=tenant.manager.subnets_client, port_security_enabled=port_security_enabled) tenant.set_network(network, subnet, router) def _deploy_tenant(self, tenant_or_id): """creates: network subnet router (if public not defined) access security group access-point server """ if not isinstance(tenant_or_id, self.TenantProperties): tenant = self.tenants[tenant_or_id] else: tenant = tenant_or_id self._create_tenant_keypairs(tenant) self._create_tenant_network(tenant) self._create_tenant_security_groups(tenant) self._set_access_point(tenant) def _get_server_ip(self, server, floating=False): """returns the ip (floating/internal) of a server""" if floating: server_ip = self.floating_ips[server['id']]['floating_ip_address'] else: server_ip = None network_name = self.tenants[server['tenant_id']].network['name'] if network_name in server['addresses']: server_ip = server['addresses'][network_name][0]['addr'] return server_ip def _connect_to_access_point(self, tenant): """create ssh connection to tenant access point""" access_point_ssh = \ self.floating_ips[tenant.access_point['id']]['floating_ip_address'] private_key = tenant.keypair['private_key'] access_point_ssh = self.get_remote_client( access_point_ssh, private_key=private_key, server=tenant.access_point) return access_point_ssh def _test_in_tenant_block(self, tenant): access_point_ssh = self._connect_to_access_point(tenant) for server in tenant.servers: self.check_remote_connectivity(source=access_point_ssh, dest=self._get_server_ip(server), should_succeed=False) def _test_in_tenant_allow(self, tenant): ruleset = dict( protocol='icmp', remote_group_id=tenant.security_groups['default']['id'], direction='ingress' ) self.create_security_group_rule( secgroup=tenant.security_groups['default'], security_groups_client=tenant.manager.security_groups_client, **ruleset ) access_point_ssh = self._connect_to_access_point(tenant) for server in tenant.servers: self.check_remote_connectivity(source=access_point_ssh, dest=self._get_server_ip(server)) def _test_cross_tenant_block(self, source_tenant, dest_tenant, ruleset): # if public router isn't defined, then dest_tenant access is via # floating-ip protocol = ruleset['protocol'] access_point_ssh = self._connect_to_access_point(source_tenant) ip = self._get_server_ip(dest_tenant.access_point, floating=self.floating_ip_access) self.check_remote_connectivity(source=access_point_ssh, dest=ip, should_succeed=False, protocol=protocol) def _test_cross_tenant_allow(self, source_tenant, dest_tenant, ruleset): """check for each direction: creating rule for tenant incoming traffic enables only 1way traffic """ protocol = ruleset['protocol'] sec_group_rules_client = ( dest_tenant.manager.security_group_rules_client) self.create_security_group_rule( secgroup=dest_tenant.security_groups['default'], sec_group_rules_client=sec_group_rules_client, **ruleset ) access_point_ssh = self._connect_to_access_point(source_tenant) ip = self._get_server_ip(dest_tenant.access_point, floating=self.floating_ip_access) self.check_remote_connectivity(access_point_ssh, ip, protocol=protocol) # test that reverse traffic is still blocked self._test_cross_tenant_block(dest_tenant, source_tenant, ruleset) # allow reverse traffic and check sec_group_rules_client = ( source_tenant.manager.security_group_rules_client) self.create_security_group_rule( secgroup=source_tenant.security_groups['default'], sec_group_rules_client=sec_group_rules_client, **ruleset ) access_point_ssh_2 = self._connect_to_access_point(dest_tenant) ip = self._get_server_ip(source_tenant.access_point, floating=self.floating_ip_access) self.check_remote_connectivity(access_point_ssh_2, ip, protocol=protocol) def _verify_mac_addr(self, tenant): """Verify that VM has the same ip, mac as listed in port""" access_point_ssh = self._connect_to_access_point(tenant) mac_addr = access_point_ssh.get_mac_address() mac_addr = mac_addr.strip().lower() # Get the fixed_ips and mac_address fields of all ports. Select # only those two columns to reduce the size of the response. port_list = self.os_admin.ports_client.list_ports( fields=['fixed_ips', 'mac_address'])['ports'] port_detail_list = [ (port['fixed_ips'][0]['subnet_id'], port['fixed_ips'][0]['ip_address'], port['mac_address'].lower()) for port in port_list if port['fixed_ips'] ] server_ip = self._get_server_ip(tenant.access_point) subnet_id = tenant.subnet['id'] self.assertIn((subnet_id, server_ip, mac_addr), port_detail_list) def _log_console_output_for_all_tenants(self): for tenant in self.tenants.values(): client = tenant.manager.servers_client self.log_console_output(servers=tenant.servers, client=client) if tenant.access_point is not None: self.log_console_output( servers=[tenant.access_point], client=client) def _create_protocol_ruleset(self, protocol, port=80): if protocol == 'icmp': ruleset = dict(protocol='icmp', direction='ingress') else: ruleset = dict(protocol=protocol, port_range_min=port, port_range_max=port, direction='ingress') return ruleset
[docs] @decorators.attr(type='multinode') @decorators.idempotent_id('e79f879e-debb-440c-a7e4-efeda05b6848') @utils.services('compute', 'network') def test_cross_tenant_traffic(self): if not self.credentials_provider.is_multi_tenant(): raise self.skipException("No secondary tenant defined") try: # deploy new project self._deploy_tenant(self.alt_tenant) self._verify_network_details(self.alt_tenant) self._verify_mac_addr(self.alt_tenant) # cross tenant check source_tenant = self.primary_tenant dest_tenant = self.alt_tenant protocol = CONF.scenario.protocol LOG.debug("Testing cross tenant traffic for %s protocol", protocol) if protocol in ['udp', 'tcp']: for tenant in [source_tenant, dest_tenant]: access_point = self._connect_to_access_point(tenant) access_point.nc_listen_host(protocol=protocol) ruleset = self._create_protocol_ruleset(protocol) self._test_cross_tenant_block(source_tenant, dest_tenant, ruleset) self._test_cross_tenant_allow(source_tenant, dest_tenant, ruleset) except Exception: self._log_console_output_for_all_tenants() raise
[docs] @decorators.attr(type='multinode') @decorators.idempotent_id('63163892-bbf6-4249-aa12-d5ea1f8f421b') @utils.services('compute', 'network') def test_in_tenant_traffic(self): try: self._create_tenant_servers(self.primary_tenant, num=1) # in-tenant check self._test_in_tenant_block(self.primary_tenant) self._test_in_tenant_allow(self.primary_tenant) except Exception: self._log_console_output_for_all_tenants() raise
[docs] @decorators.idempotent_id('f4d556d7-1526-42ad-bafb-6bebf48568f6') @decorators.attr(type=['slow', 'multinode']) @utils.services('compute', 'network') def test_port_update_new_security_group(self): """Verifies the traffic after updating the vm port With new security group having appropriate rule. """ new_tenant = self.primary_tenant # Create empty security group and add icmp rule in it new_sg = self.create_empty_security_group( namestart='secgroup_new-', project_id=new_tenant.creds.project_id, client=new_tenant.manager.security_groups_client) icmp_rule = dict( protocol='icmp', direction='ingress', ) sec_group_rules_client = new_tenant.manager.security_group_rules_client self.create_security_group_rule( secgroup=new_sg, sec_group_rules_client=sec_group_rules_client, **icmp_rule) new_tenant.security_groups.update(new_sg=new_sg) # Create server with default security group name = 'server-{tenant}-gen-1'.format( tenant=new_tenant.creds.tenant_name ) name = data_utils.rand_name( prefix=CONF.resource_name_prefix, name=name) server = self._create_server(name, new_tenant, [new_tenant.security_groups['default']]) # Check connectivity failure with default security group try: access_point_ssh = self._connect_to_access_point(new_tenant) self.check_remote_connectivity(source=access_point_ssh, dest=self._get_server_ip(server), should_succeed=False) server_id = server['id'] port_id = self.os_admin.ports_client.list_ports( device_id=server_id)['ports'][0]['id'] # update port with new security group and check connectivity self.ports_client.update_port(port_id, security_groups=[ new_tenant.security_groups['new_sg']['id']]) self.check_remote_connectivity( source=access_point_ssh, dest=self._get_server_ip(server)) except Exception: self._log_console_output_for_all_tenants() raise
[docs] @decorators.idempotent_id('d2f77418-fcc4-439d-b935-72eca704e293') @decorators.attr(type=['slow', 'multinode']) @utils.services('compute', 'network') def test_multiple_security_groups(self): """Verify multiple security groups and checks that rules provided in the both the groups is applied onto VM """ tenant = self.primary_tenant ip = self._get_server_ip(tenant.access_point, floating=self.floating_ip_access) ssh_login = CONF.validation.image_ssh_user private_key = tenant.keypair['private_key'] self.check_vm_connectivity(ip, should_connect=False) ruleset = dict( protocol='icmp', direction='ingress' ) self.create_security_group_rule( secgroup=tenant.security_groups['default'], **ruleset ) # NOTE: Vm now has 2 security groups one with ssh rule( # already added in setUp() method),and other with icmp rule # (added in the above step).The check_vm_connectivity tests # -that vm ping test is successful # -ssh to vm is successful self.check_vm_connectivity(ip, username=ssh_login, private_key=private_key, should_connect=True)
[docs] @decorators.attr(type=['slow', 'multinode']) @utils.requires_ext(service='network', extension='port-security') @decorators.idempotent_id('7c811dcc-263b-49a3-92d2-1b4d8405f50c') @utils.services('compute', 'network') def test_port_security_disable_security_group(self): """Verify the default security group rules is disabled.""" new_tenant = self.primary_tenant # Create server name = 'server-{tenant}-gen-1'.format( tenant=new_tenant.creds.tenant_name ) name = data_utils.rand_name( prefix=CONF.resource_name_prefix, name=name) server = self._create_server(name, new_tenant, [new_tenant.security_groups['default']]) access_point_ssh = self._connect_to_access_point(new_tenant) server_id = server['id'] port_id = self.os_admin.ports_client.list_ports( device_id=server_id)['ports'][0]['id'] # Flip the port's port security and check connectivity try: self.ports_client.update_port(port_id, port_security_enabled=True, security_groups=[]) self.check_remote_connectivity(source=access_point_ssh, dest=self._get_server_ip(server), should_succeed=False) self.ports_client.update_port(port_id, port_security_enabled=False, security_groups=[]) self.check_remote_connectivity( source=access_point_ssh, dest=self._get_server_ip(server)) except Exception: self._log_console_output_for_all_tenants() raise
[docs] @decorators.attr(type=['slow', 'multinode']) @utils.requires_ext(service='network', extension='port-security') @decorators.idempotent_id('13ccf253-e5ad-424b-9c4a-97b88a026699') # TODO(mriedem): We shouldn't actually need to check this since neutron # disables the port_security extension by default, but the problem is nova # assumes port_security_enabled=True if it's not set on the network # resource, which will mean nova may attempt to apply a security group on # a port on that network which would fail. This is really a bug in nova. @testtools.skipUnless( CONF.network_feature_enabled.port_security, 'Port security must be enabled.') @utils.services('compute', 'network') def test_boot_into_disabled_port_security_network_without_secgroup(self): tenant = self.primary_tenant self._create_tenant_network(tenant, port_security_enabled=False) self.assertFalse(tenant.network['port_security_enabled']) name = data_utils.rand_name( prefix=CONF.resource_name_prefix, name='server-smoke') sec_groups = [] server = self._create_server(name, tenant, sec_groups) server_id = server['id'] ports = self.os_admin.ports_client.list_ports( device_id=server_id)['ports'] self.assertEqual(1, len(ports)) for port in ports: self.assertEmpty(port['security_groups'], "Neutron shouldn't even use it's default sec " "group.")