Source code for network.admin.test_ports

# Copyright 2014 OpenStack Foundation
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

from tempest.api.network import base
from tempest.common import utils
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib import decorators

CONF = config.CONF


[docs] class PortsAdminExtendedAttrsTestJSON(base.BaseAdminNetworkTest): """Test extended attributes of ports""" @classmethod def setup_clients(cls): super(PortsAdminExtendedAttrsTestJSON, cls).setup_clients() cls.hyper_client = cls.os_admin.hypervisor_client @classmethod def resource_setup(cls): super(PortsAdminExtendedAttrsTestJSON, cls).resource_setup() cls.network = cls.create_network() if CONF.service_available.nova: hyper_list = cls.hyper_client.list_hypervisors() cls.host_id = hyper_list['hypervisors'][0]['hypervisor_hostname']
[docs] @decorators.idempotent_id('8e8569c1-9ac7-44db-8bc1-f5fb2814f29b') @utils.services('compute') def test_create_port_binding_ext_attr(self): """Test creating port with extended attribute""" post_body = {"network_id": self.network['id'], "binding:host_id": self.host_id, "name": data_utils.rand_name( self.__class__.__name__, prefix=CONF.resource_name_prefix)} body = self.admin_ports_client.create_port(**post_body) port = body['port'] self.addCleanup(self.admin_ports_client.wait_for_resource_deletion, port['id']) self.addCleanup( test_utils.call_and_ignore_notfound_exc, self.admin_ports_client.delete_port, port['id']) host_id = port['binding:host_id'] self.assertIsNotNone(host_id) self.assertEqual(self.host_id, host_id)
[docs] @decorators.idempotent_id('6f6c412c-711f-444d-8502-0ac30fbf5dd5') @utils.services('compute') def test_update_port_binding_ext_attr(self): """Test updating port's extended attribute""" post_body = {"network_id": self.network['id'], "name": data_utils.rand_name( self.__class__.__name__, prefix=CONF.resource_name_prefix)} body = self.admin_ports_client.create_port(**post_body) port = body['port'] self.addCleanup(self.admin_ports_client.wait_for_resource_deletion, port['id']) self.addCleanup( test_utils.call_and_ignore_notfound_exc, self.admin_ports_client.delete_port, port['id']) update_body = {"binding:host_id": self.host_id} body = self.admin_ports_client.update_port(port['id'], **update_body) updated_port = body['port'] host_id = updated_port['binding:host_id'] self.assertIsNotNone(host_id) self.assertEqual(self.host_id, host_id)
[docs] @decorators.idempotent_id('1c82a44a-6c6e-48ff-89e1-abe7eaf8f9f8') @utils.services('compute') def test_list_ports_binding_ext_attr(self): """Test updating and listing port's extended attribute""" # Create a new port post_body = {"network_id": self.network['id'], "name": data_utils.rand_name( self.__class__.__name__, prefix=CONF.resource_name_prefix)} body = self.admin_ports_client.create_port(**post_body) port = body['port'] self.addCleanup(self.admin_ports_client.wait_for_resource_deletion, port['id']) self.addCleanup( test_utils.call_and_ignore_notfound_exc, self.admin_ports_client.delete_port, port['id']) # Update the port's binding attributes so that is now 'bound' # to a host update_body = {"binding:host_id": self.host_id} self.admin_ports_client.update_port(port['id'], **update_body) # List all ports, ensure new port is part of list and its binding # attributes are set and accurate body = self.admin_ports_client.list_ports() ports_list = body['ports'] pids_list = [p['id'] for p in ports_list] self.assertIn(port['id'], pids_list) listed_port = [p for p in ports_list if p['id'] == port['id']] self.assertEqual(1, len(listed_port), 'Multiple ports listed with id %s in ports listing: ' '%s' % (port['id'], ports_list)) self.assertEqual(self.host_id, listed_port[0]['binding:host_id'])
[docs] @decorators.idempotent_id('b54ac0ff-35fc-4c79-9ca3-c7dbd4ea4f13') def test_show_port_binding_ext_attr(self): """Test showing port's extended attribute""" body = self.admin_ports_client.create_port( name=data_utils.rand_name( self.__class__.__name__, prefix=CONF.resource_name_prefix), network_id=self.network['id']) port = body['port'] self.addCleanup(self.admin_ports_client.wait_for_resource_deletion, port['id']) self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.admin_ports_client.delete_port, port['id']) body = self.admin_ports_client.show_port(port['id']) show_port = body['port'] self.assertEqual(port['binding:host_id'], show_port['binding:host_id']) self.assertEqual(port['binding:vif_type'], show_port['binding:vif_type']) self.assertEqual(port['binding:vif_details'], show_port['binding:vif_details'])
[docs] class PortsAdminExtendedAttrsIpV6TestJSON(PortsAdminExtendedAttrsTestJSON): _ip_version = 6