# Copyright 2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
import uuid
from keystoneclient.common import cms
from oslo_serialization import jsonutils
import six
from six.moves import http_client
from testtools import matchers
from keystone.common import extension as keystone_extension
import keystone.conf
from keystone.tests import unit
from keystone.tests.unit import default_fixtures
from keystone.tests.unit import ksfixtures
from keystone.tests.unit import rest
from keystone.tests.unit.schema import v2
CONF = keystone.conf.CONF
[docs]class CoreApiTests(object):
[docs]    def assertValidError(self, error):
        self.assertIsNotNone(error.get('code'))
        self.assertIsNotNone(error.get('title'))
        self.assertIsNotNone(error.get('message'))
 
[docs]    def assertValidVersion(self, version):
        self.assertIsNotNone(version)
        self.assertIsNotNone(version.get('id'))
        self.assertIsNotNone(version.get('status'))
        self.assertIsNotNone(version.get('updated'))
 
[docs]    def assertValidExtension(self, extension):
        self.assertIsNotNone(extension)
        self.assertIsNotNone(extension.get('name'))
        self.assertIsNotNone(extension.get('namespace'))
        self.assertIsNotNone(extension.get('alias'))
        self.assertIsNotNone(extension.get('updated'))
 
[docs]    def assertValidExtensionLink(self, link):
        self.assertIsNotNone(link.get('rel'))
        self.assertIsNotNone(link.get('type'))
        self.assertIsNotNone(link.get('href'))
 
[docs]    def assertValidTenant(self, tenant):
        self.assertIsNotNone(tenant.get('id'))
        self.assertIsNotNone(tenant.get('name'))
        self.assertNotIn('domain_id', tenant)
        self.assertNotIn('parent_id', tenant)
 
[docs]    def assertValidUser(self, user):
        self.assertIsNotNone(user.get('id'))
        self.assertIsNotNone(user.get('name'))
 
[docs]    def assertValidRole(self, tenant):
        self.assertIsNotNone(tenant.get('id'))
        self.assertIsNotNone(tenant.get('name'))
 
[docs]    def test_public_not_found(self):
        r = self.public_request(
            path='/%s' % uuid.uuid4().hex,
            expected_status=http_client.NOT_FOUND)
        self.assertValidErrorResponse(r)
 
[docs]    def test_admin_not_found(self):
        r = self.admin_request(
            path='/%s' % uuid.uuid4().hex,
            expected_status=http_client.NOT_FOUND)
        self.assertValidErrorResponse(r)
 
[docs]    def test_public_multiple_choice(self):
        r = self.public_request(path='/', expected_status=300)
        self.assertValidMultipleChoiceResponse(r)
 
[docs]    def test_admin_multiple_choice(self):
        r = self.admin_request(path='/', expected_status=300)
        self.assertValidMultipleChoiceResponse(r)
 
[docs]    def test_public_version(self):
        r = self.public_request(path='/v2.0/')
        self.assertValidVersionResponse(r)
 
[docs]    def test_admin_version(self):
        r = self.admin_request(path='/v2.0/')
        self.assertValidVersionResponse(r)
 
[docs]    def test_public_extensions(self):
        r = self.public_request(path='/v2.0/extensions')
        self.assertValidExtensionListResponse(
            r, keystone_extension.PUBLIC_EXTENSIONS)
 
[docs]    def test_admin_extensions(self):
        r = self.admin_request(path='/v2.0/extensions')
        self.assertValidExtensionListResponse(
            r, keystone_extension.ADMIN_EXTENSIONS)
 
[docs]    def test_admin_extensions_returns_not_found(self):
        self.admin_request(path='/v2.0/extensions/invalid-extension',
                           expected_status=http_client.NOT_FOUND)
 
[docs]    def test_public_osksadm_extension_returns_not_found(self):
        self.public_request(path='/v2.0/extensions/OS-KSADM',
                            expected_status=http_client.NOT_FOUND)
 
[docs]    def test_admin_osksadm_extension(self):
        r = self.admin_request(path='/v2.0/extensions/OS-KSADM')
        self.assertValidExtensionResponse(
            r, keystone_extension.ADMIN_EXTENSIONS)
 
[docs]    def test_authenticate(self):
        r = self.public_request(
            method='POST',
            path='/v2.0/tokens',
            body={
                'auth': {
                    'passwordCredentials': {
                        'username': self.user_foo['name'],
                        'password': self.user_foo['password'],
                    },
                    'tenantId': self.tenant_bar['id'],
                },
            },
            expected_status=http_client.OK)
        self.assertValidAuthenticationResponse(r, require_service_catalog=True)
 
[docs]    def test_authenticate_unscoped(self):
        r = self.public_request(
            method='POST',
            path='/v2.0/tokens',
            body={
                'auth': {
                    'passwordCredentials': {
                        'username': self.user_foo['name'],
                        'password': self.user_foo['password'],
                    },
                },
            },
            expected_status=http_client.OK)
        self.assertValidAuthenticationResponse(r)
 
[docs]    def test_get_tenants_for_token(self):
        r = self.public_request(path='/v2.0/tenants',
                                token=self.get_scoped_token())
        self.assertValidTenantListResponse(r)
 
[docs]    def test_validate_token(self):
        token = self.get_scoped_token()
        r = self.admin_request(
            path='/v2.0/tokens/%(token_id)s' % {
                'token_id': token,
            },
            token=token)
        self.assertValidAuthenticationResponse(r)
 
[docs]    def test_invalid_token_returns_not_found(self):
        token = self.get_scoped_token()
        self.admin_request(
            path='/v2.0/tokens/%(token_id)s' % {
                'token_id': 'invalid',
            },
            token=token,
            expected_status=http_client.NOT_FOUND)
 
[docs]    def test_validate_token_service_role(self):
        self.md_foobar = self.assignment_api.add_role_to_user_and_project(
            self.user_foo['id'],
            self.tenant_service['id'],
            self.role_service['id'])
        token = self.get_scoped_token(
            tenant_id=default_fixtures.SERVICE_TENANT_ID)
        r = self.admin_request(
            path='/v2.0/tokens/%s' % token,
            token=token)
        self.assertValidAuthenticationResponse(r)
 
[docs]    def test_remove_role_revokes_token(self):
        self.md_foobar = self.assignment_api.add_role_to_user_and_project(
            self.user_foo['id'],
            self.tenant_service['id'],
            self.role_service['id'])
        token = self.get_scoped_token(
            tenant_id=default_fixtures.SERVICE_TENANT_ID)
        r = self.admin_request(
            path='/v2.0/tokens/%s' % token,
            token=token)
        self.assertValidAuthenticationResponse(r)
        self.assignment_api.remove_role_from_user_and_project(
            self.user_foo['id'],
            self.tenant_service['id'],
            self.role_service['id'])
        r = self.admin_request(
            path='/v2.0/tokens/%s' % token,
            token=token,
            expected_status=http_client.UNAUTHORIZED)
 
[docs]    def test_validate_token_belongs_to(self):
        token = self.get_scoped_token()
        path = ('/v2.0/tokens/%s?belongsTo=%s' % (token,
                                                  self.tenant_bar['id']))
        r = self.admin_request(path=path, token=token)
        self.assertValidAuthenticationResponse(r, require_service_catalog=True)
 
[docs]    def test_validate_token_no_belongs_to_still_returns_catalog(self):
        token = self.get_scoped_token()
        path = ('/v2.0/tokens/%s' % token)
        r = self.admin_request(path=path, token=token)
        self.assertValidAuthenticationResponse(r, require_service_catalog=True)
 
[docs]    def test_validate_token_head(self):
        """The same call as above, except using HEAD.
        There's no response to validate here, but this is included for the
        sake of completely covering the core API.
        """
        token = self.get_scoped_token()
        self.admin_request(
            method='HEAD',
            path='/v2.0/tokens/%(token_id)s' % {
                'token_id': token,
            },
            token=token,
            expected_status=http_client.OK)
 
[docs]    def test_endpoints(self):
        token = self.get_scoped_token()
        r = self.admin_request(
            path='/v2.0/tokens/%(token_id)s/endpoints' % {
                'token_id': token,
            },
            token=token)
        self.assertValidEndpointListResponse(r)
 
[docs]    def test_get_tenant(self):
        token = self.get_scoped_token()
        r = self.admin_request(
            path='/v2.0/tenants/%(tenant_id)s' % {
                'tenant_id': self.tenant_bar['id'],
            },
            token=token)
        self.assertValidTenantResponse(r)
 
[docs]    def test_get_tenant_by_name(self):
        token = self.get_scoped_token()
        r = self.admin_request(
            path='/v2.0/tenants?name=%(tenant_name)s' % {
                'tenant_name': self.tenant_bar['name'],
            },
            token=token)
        self.assertValidTenantResponse(r)
 
[docs]    def test_get_user_roles_with_tenant(self):
        token = self.get_scoped_token()
        r = self.admin_request(
            path='/v2.0/tenants/%(tenant_id)s/users/%(user_id)s/roles' % {
                'tenant_id': self.tenant_bar['id'],
                'user_id': self.user_foo['id'],
            },
            token=token)
        self.assertValidRoleListResponse(r)
 
[docs]    def test_get_user_roles_without_tenant(self):
        token = self.get_scoped_token()
        self.admin_request(
            path='/v2.0/users/%(user_id)s/roles' % {
                'user_id': self.user_foo['id'],
            },
            token=token, expected_status=http_client.NOT_IMPLEMENTED)
 
[docs]    def test_get_user(self):
        token = self.get_scoped_token()
        r = self.admin_request(
            path='/v2.0/users/%(user_id)s' % {
                'user_id': self.user_foo['id'],
            },
            token=token)
        self.assertValidUserResponse(r)
 
[docs]    def test_get_user_by_name(self):
        token = self.get_scoped_token()
        r = self.admin_request(
            path='/v2.0/users?name=%(user_name)s' % {
                'user_name': self.user_foo['name'],
            },
            token=token)
        self.assertValidUserResponse(r)
 
[docs]    def test_create_update_user_invalid_enabled_type(self):
        # Enforce usage of boolean for 'enabled' field
        token = self.get_scoped_token()
        # Test CREATE request
        r = self.admin_request(
            method='POST',
            path='/v2.0/users',
            body={
                'user': {
                    'name': uuid.uuid4().hex,
                    'password': uuid.uuid4().hex,
                    'enabled': "False",
                },
            },
            token=token,
            expected_status=http_client.BAD_REQUEST)
        self.assertValidErrorResponse(r)
        r = self.admin_request(
            method='POST',
            path='/v2.0/users',
            body={
                'user': {
                    'name': uuid.uuid4().hex,
                    'password': uuid.uuid4().hex,
                    # In JSON, 0|1 are not booleans
                    'enabled': 0,
                },
            },
            token=token,
            expected_status=http_client.BAD_REQUEST)
        self.assertValidErrorResponse(r)
        # Test UPDATE request
        path = '/v2.0/users/%(user_id)s' % {
               'user_id': self.user_foo['id'],
        }
        r = self.admin_request(
            method='PUT',
            path=path,
            body={
                'user': {
                    'enabled': "False",
                },
            },
            token=token,
            expected_status=http_client.BAD_REQUEST)
        self.assertValidErrorResponse(r)
        r = self.admin_request(
            method='PUT',
            path=path,
            body={
                'user': {
                    # In JSON, 0|1 are not booleans
                    'enabled': 1,
                },
            },
            token=token,
            expected_status=http_client.BAD_REQUEST)
        self.assertValidErrorResponse(r)
 
[docs]    def test_create_update_user_valid_enabled_type(self):
        # Enforce usage of boolean for 'enabled' field
        token = self.get_scoped_token()
        # Test CREATE request
        self.admin_request(method='POST',
                           path='/v2.0/users',
                           body={
                               'user': {
                                   'name': uuid.uuid4().hex,
                                   'password': uuid.uuid4().hex,
                                   'enabled': False,
                               },
                           },
                           token=token,
                           expected_status=http_client.OK)
 
[docs]    def test_error_response(self):
        """Trigger assertValidErrorResponse by convention."""
        self.public_request(path='/v2.0/tenants',
                            expected_status=http_client.UNAUTHORIZED)
 
[docs]    def test_invalid_parameter_error_response(self):
        token = self.get_scoped_token()
        bad_body = {
            'OS-KSADM:service%s' % uuid.uuid4().hex: {
                'name': uuid.uuid4().hex,
                'type': uuid.uuid4().hex,
            },
        }
        res = self.admin_request(method='POST',
                                 path='/v2.0/OS-KSADM/services',
                                 body=bad_body,
                                 token=token,
                                 expected_status=http_client.BAD_REQUEST)
        self.assertValidErrorResponse(res)
        res = self.admin_request(method='POST',
                                 path='/v2.0/users',
                                 body=bad_body,
                                 token=token,
                                 expected_status=http_client.BAD_REQUEST)
        self.assertValidErrorResponse(res)
 
    def _get_user_id(self, r):
        """Helper method to return user ID from a response.
        This needs to be overridden by child classes
        based on their content type.
        """
        raise NotImplementedError()
    def _get_role_id(self, r):
        """Helper method to return a role ID from a response.
        This needs to be overridden by child classes
        based on their content type.
        """
        raise NotImplementedError()
    def _get_role_name(self, r):
        """Helper method to return role NAME from a response.
        This needs to be overridden by child classes
        based on their content type.
        """
        raise NotImplementedError()
    def _get_project_id(self, r):
        """Helper method to return project ID from a response.
        This needs to be overridden by child classes
        based on their content type.
        """
        raise NotImplementedError()
[docs]    def assertNoRoles(self, r):
        """Helper method to assert No Roles.
        This needs to be overridden by child classes
        based on their content type.
        """
        raise NotImplementedError()
 
[docs]    def test_update_user_tenant(self):
        token = self.get_scoped_token()
        # Create a new user
        r = self.admin_request(
            method='POST',
            path='/v2.0/users',
            body={
                'user': {
                    'name': uuid.uuid4().hex,
                    'password': uuid.uuid4().hex,
                    'tenantId': self.tenant_bar['id'],
                    'enabled': True,
                },
            },
            token=token,
            expected_status=http_client.OK)
        user_id = self._get_user_id(r.result)
        # Check if member_role is in tenant_bar
        r = self.admin_request(
            path='/v2.0/tenants/%(project_id)s/users/%(user_id)s/roles' % {
                'project_id': self.tenant_bar['id'],
                'user_id': user_id
            },
            token=token,
            expected_status=http_client.OK)
        self.assertEqual(CONF.member_role_name, self._get_role_name(r.result))
        # Create a new tenant
        r = self.admin_request(
            method='POST',
            path='/v2.0/tenants',
            body={
                'tenant': {
                    'name': 'test_update_user',
                    'description': 'A description ...',
                    'enabled': True,
                },
            },
            token=token,
            expected_status=http_client.OK)
        project_id = self._get_project_id(r.result)
        # Update user's tenant
        r = self.admin_request(
            method='PUT',
            path='/v2.0/users/%(user_id)s' % {
                'user_id': user_id,
            },
            body={
                'user': {
                    'tenantId': project_id,
                },
            },
            token=token,
            expected_status=http_client.OK)
        # 'member_role' should be in new_tenant
        r = self.admin_request(
            path='/v2.0/tenants/%(project_id)s/users/%(user_id)s/roles' % {
                'project_id': project_id,
                'user_id': user_id
            },
            token=token,
            expected_status=http_client.OK)
        self.assertEqual('_member_', self._get_role_name(r.result))
        # 'member_role' should not be in tenant_bar any more
        r = self.admin_request(
            path='/v2.0/tenants/%(project_id)s/users/%(user_id)s/roles' % {
                'project_id': self.tenant_bar['id'],
                'user_id': user_id
            },
            token=token,
            expected_status=http_client.OK)
        self.assertNoRoles(r.result)
 
[docs]    def test_update_user_with_invalid_tenant(self):
        token = self.get_scoped_token()
        # Create a new user
        r = self.admin_request(
            method='POST',
            path='/v2.0/users',
            body={
                'user': {
                    'name': 'test_invalid_tenant',
                    'password': uuid.uuid4().hex,
                    'tenantId': self.tenant_bar['id'],
                    'enabled': True,
                },
            },
            token=token,
            expected_status=http_client.OK)
        user_id = self._get_user_id(r.result)
        # Update user with an invalid tenant
        r = self.admin_request(
            method='PUT',
            path='/v2.0/users/%(user_id)s' % {
                'user_id': user_id,
            },
            body={
                'user': {
                    'tenantId': 'abcde12345heha',
                },
            },
            token=token,
            expected_status=http_client.NOT_FOUND)
 
[docs]    def test_update_user_with_invalid_tenant_no_prev_tenant(self):
        token = self.get_scoped_token()
        # Create a new user
        r = self.admin_request(
            method='POST',
            path='/v2.0/users',
            body={
                'user': {
                    'name': 'test_invalid_tenant',
                    'password': uuid.uuid4().hex,
                    'enabled': True,
                },
            },
            token=token,
            expected_status=http_client.OK)
        user_id = self._get_user_id(r.result)
        # Update user with an invalid tenant
        r = self.admin_request(
            method='PUT',
            path='/v2.0/users/%(user_id)s' % {
                'user_id': user_id,
            },
            body={
                'user': {
                    'tenantId': 'abcde12345heha',
                },
            },
            token=token,
            expected_status=http_client.NOT_FOUND)
 
[docs]    def test_update_user_with_old_tenant(self):
        token = self.get_scoped_token()
        # Create a new user
        r = self.admin_request(
            method='POST',
            path='/v2.0/users',
            body={
                'user': {
                    'name': uuid.uuid4().hex,
                    'password': uuid.uuid4().hex,
                    'tenantId': self.tenant_bar['id'],
                    'enabled': True,
                },
            },
            token=token,
            expected_status=http_client.OK)
        user_id = self._get_user_id(r.result)
        # Check if member_role is in tenant_bar
        r = self.admin_request(
            path='/v2.0/tenants/%(project_id)s/users/%(user_id)s/roles' % {
                'project_id': self.tenant_bar['id'],
                'user_id': user_id
            },
            token=token,
            expected_status=http_client.OK)
        self.assertEqual(CONF.member_role_name, self._get_role_name(r.result))
        # Update user's tenant with old tenant id
        r = self.admin_request(
            method='PUT',
            path='/v2.0/users/%(user_id)s' % {
                'user_id': user_id,
            },
            body={
                'user': {
                    'tenantId': self.tenant_bar['id'],
                },
            },
            token=token,
            expected_status=http_client.OK)
        # 'member_role' should still be in tenant_bar
        r = self.admin_request(
            path='/v2.0/tenants/%(project_id)s/users/%(user_id)s/roles' % {
                'project_id': self.tenant_bar['id'],
                'user_id': user_id
            },
            token=token,
            expected_status=http_client.OK)
        self.assertEqual('_member_', self._get_role_name(r.result))
 
[docs]    def test_authenticating_a_user_with_no_password(self):
        token = self.get_scoped_token()
        username = uuid.uuid4().hex
        # create the user
        self.admin_request(
            method='POST',
            path='/v2.0/users',
            body={
                'user': {
                    'name': username,
                    'enabled': True,
                },
            },
            token=token)
        # fail to authenticate
        r = self.public_request(
            method='POST',
            path='/v2.0/tokens',
            body={
                'auth': {
                    'passwordCredentials': {
                        'username': username,
                        'password': 'password',
                    },
                },
            },
            expected_status=http_client.UNAUTHORIZED)
        self.assertValidErrorResponse(r)
 
 
[docs]class LegacyV2UsernameTests(object):
    """Test to show the broken username behavior in V2.
    The V2 API is documented to use `username` instead of `name`.  The
    API forced used to use name and left the username to fall into the
    `extra` field.
    These tests ensure this behavior works so fixes to `username`/`name`
    will be backward compatible.
    """
[docs]    def create_user(self, **user_attrs):
        """Create a users and returns the response object.
        :param user_attrs: attributes added to the request body (optional)
        """
        token = self.get_scoped_token()
        body = {
            'user': {
                'name': uuid.uuid4().hex,
                'enabled': True,
            },
        }
        body['user'].update(user_attrs)
        return self.admin_request(
            method='POST',
            path='/v2.0/users',
            token=token,
            body=body,
            expected_status=http_client.OK)
 
[docs]    def test_update_returns_new_username_when_adding_username(self):
        """The response for updating a user will contain the extra fields.
        This is specifically testing for updating a username when a value
        was not previously set.
        """
        token = self.get_scoped_token()
        r = self.create_user()
        id_ = self.get_user_attribute_from_response(r, 'id')
        name = self.get_user_attribute_from_response(r, 'name')
        enabled = self.get_user_attribute_from_response(r, 'enabled')
        r = self.admin_request(
            method='PUT',
            path='/v2.0/users/%s' % id_,
            token=token,
            body={
                'user': {
                    'name': name,
                    'username': 'new_username',
                    'enabled': enabled,
                },
            },
            expected_status=http_client.OK)
        self.assertValidUserResponse(r)
        user = self.get_user_from_response(r)
        self.assertEqual('new_username', user.get('username'))
 
[docs]    def test_update_returns_new_username_when_updating_username(self):
        """The response for updating a user will contain the extra fields.
        This tests updating a username that was previously set.
        """
        token = self.get_scoped_token()
        r = self.create_user(username='original_username')
        id_ = self.get_user_attribute_from_response(r, 'id')
        name = self.get_user_attribute_from_response(r, 'name')
        enabled = self.get_user_attribute_from_response(r, 'enabled')
        r = self.admin_request(
            method='PUT',
            path='/v2.0/users/%s' % id_,
            token=token,
            body={
                'user': {
                    'name': name,
                    'username': 'new_username',
                    'enabled': enabled,
                },
            },
            expected_status=http_client.OK)
        self.assertValidUserResponse(r)
        user = self.get_user_from_response(r)
        self.assertEqual('new_username', user.get('username'))
 
[docs]    def test_username_is_always_returned_create(self):
        """Username is set as the value of name if no username is provided.
        This matches the v2.0 spec where we really should be using username
        and not name.
        """
        r = self.create_user()
        self.assertValidUserResponse(r)
        user = self.get_user_from_response(r)
        self.assertEqual(user.get('name'), user.get('username'))
 
[docs]    def test_username_is_always_returned_get(self):
        """Username is set as the value of name if no username is provided.
        This matches the v2.0 spec where we really should be using username
        and not name.
        """
        token = self.get_scoped_token()
        r = self.create_user()
        id_ = self.get_user_attribute_from_response(r, 'id')
        r = self.admin_request(path='/v2.0/users/%s' % id_, token=token)
        self.assertValidUserResponse(r)
        user = self.get_user_from_response(r)
        self.assertEqual(user.get('name'), user.get('username'))
 
[docs]    def test_username_is_always_returned_get_by_name(self):
        """Username is set as the value of name if no username is provided.
        This matches the v2.0 spec where we really should be using username
        and not name.
        """
        token = self.get_scoped_token()
        r = self.create_user()
        name = self.get_user_attribute_from_response(r, 'name')
        r = self.admin_request(path='/v2.0/users?name=%s' % name, token=token)
        self.assertValidUserResponse(r)
        user = self.get_user_from_response(r)
        self.assertEqual(user.get('name'), user.get('username'))
 
[docs]    def test_username_is_always_returned_update_no_username_provided(self):
        """Username is set as the value of name if no username is provided.
        This matches the v2.0 spec where we really should be using username
        and not name.
        """
        token = self.get_scoped_token()
        r = self.create_user()
        id_ = self.get_user_attribute_from_response(r, 'id')
        name = self.get_user_attribute_from_response(r, 'name')
        enabled = self.get_user_attribute_from_response(r, 'enabled')
        r = self.admin_request(
            method='PUT',
            path='/v2.0/users/%s' % id_,
            token=token,
            body={
                'user': {
                    'name': name,
                    'enabled': enabled,
                },
            },
            expected_status=http_client.OK)
        self.assertValidUserResponse(r)
        user = self.get_user_from_response(r)
        self.assertEqual(user.get('name'), user.get('username'))
 
[docs]    def test_updated_username_is_returned(self):
        """Username is set as the value of name if no username is provided.
        This matches the v2.0 spec where we really should be using username
        and not name.
        """
        token = self.get_scoped_token()
        r = self.create_user()
        id_ = self.get_user_attribute_from_response(r, 'id')
        name = self.get_user_attribute_from_response(r, 'name')
        enabled = self.get_user_attribute_from_response(r, 'enabled')
        r = self.admin_request(
            method='PUT',
            path='/v2.0/users/%s' % id_,
            token=token,
            body={
                'user': {
                    'name': name,
                    'enabled': enabled,
                },
            },
            expected_status=http_client.OK)
        self.assertValidUserResponse(r)
        user = self.get_user_from_response(r)
        self.assertEqual(user.get('name'), user.get('username'))
 
[docs]    def test_username_can_be_used_instead_of_name_create(self):
        token = self.get_scoped_token()
        r = self.admin_request(
            method='POST',
            path='/v2.0/users',
            token=token,
            body={
                'user': {
                    'username': uuid.uuid4().hex,
                    'enabled': True,
                },
            },
            expected_status=http_client.OK)
        self.assertValidUserResponse(r)
        user = self.get_user_from_response(r)
        self.assertEqual(user.get('name'), user.get('username'))
 
[docs]    def test_username_can_be_used_instead_of_name_update(self):
        token = self.get_scoped_token()
        r = self.create_user()
        id_ = self.get_user_attribute_from_response(r, 'id')
        new_username = uuid.uuid4().hex
        enabled = self.get_user_attribute_from_response(r, 'enabled')
        r = self.admin_request(
            method='PUT',
            path='/v2.0/users/%s' % id_,
            token=token,
            body={
                'user': {
                    'username': new_username,
                    'enabled': enabled,
                },
            },
            expected_status=http_client.OK)
        self.assertValidUserResponse(r)
        user = self.get_user_from_response(r)
        self.assertEqual(new_username, user.get('name'))
        self.assertEqual(user.get('name'), user.get('username'))
  
[docs]class RestfulTestCase(rest.RestfulTestCase):
[docs]    def setUp(self):
        super(RestfulTestCase, self).setUp()
        # TODO(termie): add an admin user to the fixtures and use that user
        # override the fixtures, for now
        self.assignment_api.add_role_to_user_and_project(
            self.user_foo['id'],
            self.tenant_bar['id'],
            self.role_admin['id'])
  
[docs]class V2TestCase(object):
[docs]    def config_overrides(self):
        super(V2TestCase, self).config_overrides()
        self.config_fixture.config(
            group='catalog',
            driver='templated',
            template_file=unit.dirs.tests('default_catalog.templates'))
 
    def _get_user_id(self, r):
        return r['user']['id']
    def _get_role_name(self, r):
        return r['roles'][0]['name']
    def _get_role_id(self, r):
        return r['roles'][0]['id']
    def _get_project_id(self, r):
        return r['tenant']['id']
    def _get_token_id(self, r):
        return r.result['access']['token']['id']
[docs]    def assertNoRoles(self, r):
        self.assertEqual([], r['roles'])
 
[docs]    def assertValidErrorResponse(self, r):
        self.assertIsNotNone(r.result.get('error'))
        self.assertValidError(r.result['error'])
        self.assertEqual(r.result['error']['code'], r.status_code)
 
[docs]    def assertValidExtension(self, extension, expected):
        super(V2TestCase, self).assertValidExtension(extension)
        descriptions = [ext['description'] for ext in expected.values()]
        description = extension.get('description')
        self.assertIsNotNone(description)
        self.assertIn(description, descriptions)
        self.assertIsNotNone(extension.get('links'))
        self.assertNotEmpty(extension.get('links'))
        for link in extension.get('links'):
            self.assertValidExtensionLink(link)
 
[docs]    def assertValidExtensionListResponse(self, r, expected):
        self.assertIsNotNone(r.result.get('extensions'))
        self.assertIsNotNone(r.result['extensions'].get('values'))
        self.assertNotEmpty(r.result['extensions'].get('values'))
        for extension in r.result['extensions']['values']:
            self.assertValidExtension(extension, expected)
 
[docs]    def assertValidExtensionResponse(self, r, expected):
        self.assertValidExtension(r.result.get('extension'), expected)
 
[docs]    def assertValidUser(self, user):
        super(V2TestCase, self).assertValidUser(user)
        self.assertNotIn('default_project_id', user)
        if 'tenantId' in user:
            # NOTE(morganfainberg): tenantId should never be "None", it gets
            # filtered out of the object if it is there. This is suspenders
            # and a belt check to avoid unintended regressions.
            self.assertIsNotNone(user.get('tenantId'))
 
[docs]    def assertValidAuthenticationResponse(self, r,
                                          require_service_catalog=False):
        self.assertIsNotNone(r.result.get('access'))
        self.assertIsNotNone(r.result['access'].get('token'))
        self.assertIsNotNone(r.result['access'].get('user'))
        # validate token
        self.assertIsNotNone(r.result['access']['token'].get('id'))
        self.assertIsNotNone(r.result['access']['token'].get('expires'))
        tenant = r.result['access']['token'].get('tenant')
        if tenant is not None:
            # validate tenant
            self.assertIsNotNone(tenant.get('id'))
            self.assertIsNotNone(tenant.get('name'))
        # validate user
        self.assertIsNotNone(r.result['access']['user'].get('id'))
        self.assertIsNotNone(r.result['access']['user'].get('name'))
        if require_service_catalog:
            # roles are only provided with a service catalog
            roles = r.result['access']['user'].get('roles')
            self.assertNotEmpty(roles)
            for role in roles:
                self.assertIsNotNone(role.get('name'))
        serviceCatalog = r.result['access'].get('serviceCatalog')
        # validate service catalog
        if require_service_catalog:
            self.assertIsNotNone(serviceCatalog)
        if serviceCatalog is not None:
            self.assertIsInstance(serviceCatalog, list)
            if require_service_catalog:
                self.assertNotEmpty(serviceCatalog)
            for service in r.result['access']['serviceCatalog']:
                # validate service
                self.assertIsNotNone(service.get('name'))
                self.assertIsNotNone(service.get('type'))
                # services contain at least one endpoint
                self.assertIsNotNone(service.get('endpoints'))
                self.assertNotEmpty(service['endpoints'])
                for endpoint in service['endpoints']:
                    # validate service endpoint
                    self.assertIsNotNone(endpoint.get('publicURL'))
 
[docs]    def assertValidTenantListResponse(self, r):
        self.assertIsNotNone(r.result.get('tenants'))
        self.assertNotEmpty(r.result['tenants'])
        for tenant in r.result['tenants']:
            self.assertValidTenant(tenant)
            self.assertIsNotNone(tenant.get('enabled'))
            self.assertIn(tenant.get('enabled'), [True, False])
 
[docs]    def assertValidUserResponse(self, r):
        self.assertIsNotNone(r.result.get('user'))
        self.assertValidUser(r.result['user'])
 
[docs]    def assertValidTenantResponse(self, r):
        self.assertIsNotNone(r.result.get('tenant'))
        self.assertValidTenant(r.result['tenant'])
 
[docs]    def assertValidRoleListResponse(self, r):
        self.assertIsNotNone(r.result.get('roles'))
        self.assertNotEmpty(r.result['roles'])
        for role in r.result['roles']:
            self.assertValidRole(role)
 
[docs]    def assertValidVersion(self, version):
        super(V2TestCase, self).assertValidVersion(version)
        self.assertIsNotNone(version.get('links'))
        self.assertNotEmpty(version.get('links'))
        for link in version.get('links'):
            self.assertIsNotNone(link.get('rel'))
            self.assertIsNotNone(link.get('href'))
        self.assertIsNotNone(version.get('media-types'))
        self.assertNotEmpty(version.get('media-types'))
        for media in version.get('media-types'):
            self.assertIsNotNone(media.get('base'))
            self.assertIsNotNone(media.get('type'))
 
[docs]    def assertValidMultipleChoiceResponse(self, r):
        self.assertIsNotNone(r.result.get('versions'))
        self.assertIsNotNone(r.result['versions'].get('values'))
        self.assertNotEmpty(r.result['versions']['values'])
        for version in r.result['versions']['values']:
            self.assertValidVersion(version)
 
[docs]    def assertValidVersionResponse(self, r):
        self.assertValidVersion(r.result.get('version'))
 
[docs]    def assertValidEndpointListResponse(self, r):
        self.assertIsNotNone(r.result.get('endpoints'))
        self.assertNotEmpty(r.result['endpoints'])
        for endpoint in r.result['endpoints']:
            self.assertIsNotNone(endpoint.get('id'))
            self.assertIsNotNone(endpoint.get('name'))
            self.assertIsNotNone(endpoint.get('type'))
            self.assertIsNotNone(endpoint.get('publicURL'))
            self.assertIsNotNone(endpoint.get('internalURL'))
            self.assertIsNotNone(endpoint.get('adminURL'))
 
[docs]    def get_user_from_response(self, r):
        return r.result.get('user')
 
[docs]    def get_user_attribute_from_response(self, r, attribute_name):
        return r.result['user'][attribute_name]
 
[docs]    def test_service_crud_requires_auth(self):
        """Service CRUD should return unauthorized without an X-Auth-Token."""
        # values here don't matter because it will be unauthorized before
        # they're checked (bug 1006822).
        service_path = '/v2.0/OS-KSADM/services/%s' % uuid.uuid4().hex
        service_body = {
            'OS-KSADM:service': {
                'name': uuid.uuid4().hex,
                'type': uuid.uuid4().hex,
            },
        }
        r = self.admin_request(method='GET',
                               path='/v2.0/OS-KSADM/services',
                               expected_status=http_client.UNAUTHORIZED)
        self.assertValidErrorResponse(r)
        r = self.admin_request(method='POST',
                               path='/v2.0/OS-KSADM/services',
                               body=service_body,
                               expected_status=http_client.UNAUTHORIZED)
        self.assertValidErrorResponse(r)
        r = self.admin_request(method='GET',
                               path=service_path,
                               expected_status=http_client.UNAUTHORIZED)
        self.assertValidErrorResponse(r)
        r = self.admin_request(method='DELETE',
                               path=service_path,
                               expected_status=http_client.UNAUTHORIZED)
        self.assertValidErrorResponse(r)
 
[docs]    def test_user_role_list_requires_auth(self):
        """User role list return unauthorized without an X-Auth-Token."""
        # values here don't matter because it will be unauthorized before
        # they're checked (bug 1006815).
        path = '/v2.0/tenants/%(tenant_id)s/users/%(user_id)s/roles' % {
            'tenant_id': uuid.uuid4().hex,
            'user_id': uuid.uuid4().hex,
        }
        r = self.admin_request(path=path,
                               expected_status=http_client.UNAUTHORIZED)
        self.assertValidErrorResponse(r)
 
[docs]    def test_fetch_revocation_list_nonadmin_fails(self):
        self.admin_request(
            method='GET',
            path='/v2.0/tokens/revoked',
            expected_status=http_client.UNAUTHORIZED)
 
[docs]    def test_fetch_revocation_list_admin_200(self):
        token = self.get_scoped_token()
        r = self.admin_request(
            method='GET',
            path='/v2.0/tokens/revoked',
            token=token,
            expected_status=http_client.OK)
        self.assertValidRevocationListResponse(r)
 
[docs]    def assertValidRevocationListResponse(self, response):
        self.assertIsNotNone(response.result['signed'])
 
    def _fetch_parse_revocation_list(self):
        token1 = self.get_scoped_token()
        # TODO(morganfainberg): Because this is making a restful call to the
        # app a change to UTCNOW via mock.patch will not affect the returned
        # token. The only surefire way to ensure there is not a transient bug
        # based upon when the second token is issued is with a sleep. This
        # issue all stems from the limited resolution (no microseconds) on the
        # expiry time of tokens and the way revocation events utilizes token
        # expiry to revoke individual tokens. This is a stop-gap until all
        # associated issues with resolution on expiration and revocation events
        # are resolved.
        time.sleep(1)
        token2 = self.get_scoped_token()
        self.admin_request(method='DELETE',
                           path='/v2.0/tokens/%s' % token2,
                           token=token1)
        r = self.admin_request(
            method='GET',
            path='/v2.0/tokens/revoked',
            token=token1,
            expected_status=http_client.OK)
        signed_text = r.result['signed']
        data_json = cms.cms_verify(signed_text, CONF.signing.certfile,
                                   CONF.signing.ca_certs)
        data = jsonutils.loads(data_json)
        return (data, token2)
[docs]    def test_fetch_revocation_list_md5(self):
        """Hash for tokens in revocation list and server config should match.
        If the server is configured for md5, then the revocation list has
        tokens hashed with MD5.
        """
        # The default hash algorithm is md5.
        hash_algorithm = 'md5'
        (data, token) = self._fetch_parse_revocation_list()
        token_hash = cms.cms_hash_token(token, mode=hash_algorithm)
        self.assertThat(token_hash, matchers.Equals(data['revoked'][0]['id']))
 
[docs]    def test_fetch_revocation_list_sha256(self):
        """Hash for tokens in revocation list and server config should match.
        If the server is configured for sha256, then the revocation list has
        tokens hashed with SHA256.
        """
        hash_algorithm = 'sha256'
        self.config_fixture.config(group='token',
                                   hash_algorithm=hash_algorithm)
        (data, token) = self._fetch_parse_revocation_list()
        token_hash = cms.cms_hash_token(token, mode=hash_algorithm)
        self.assertThat(token_hash, matchers.Equals(data['revoked'][0]['id']))
 
[docs]    def test_create_update_user_invalid_enabled_type(self):
        # Enforce usage of boolean for 'enabled' field
        token = self.get_scoped_token()
        # Test CREATE request
        r = self.admin_request(
            method='POST',
            path='/v2.0/users',
            body={
                'user': {
                    'name': uuid.uuid4().hex,
                    'password': uuid.uuid4().hex,
                    # In JSON, "true|false" are not boolean
                    'enabled': "true",
                },
            },
            token=token,
            expected_status=http_client.BAD_REQUEST)
        self.assertValidErrorResponse(r)
        # Test UPDATE request
        r = self.admin_request(
            method='PUT',
            path='/v2.0/users/%(user_id)s' % {
                 'user_id': self.user_foo['id'],
            },
            body={
                'user': {
                    # In JSON, "true|false" are not boolean
                    'enabled': "true",
                },
            },
            token=token,
            expected_status=http_client.BAD_REQUEST)
        self.assertValidErrorResponse(r)
 
[docs]    def test_authenticating_a_user_with_an_OSKSADM_password(self):
        token = self.get_scoped_token()
        username = uuid.uuid4().hex
        password = uuid.uuid4().hex
        # create the user
        r = self.admin_request(
            method='POST',
            path='/v2.0/users',
            body={
                'user': {
                    'name': username,
                    'OS-KSADM:password': password,
                    'enabled': True,
                },
            },
            token=token)
        # successfully authenticate
        self.public_request(
            method='POST',
            path='/v2.0/tokens',
            body={
                'auth': {
                    'passwordCredentials': {
                        'username': username,
                        'password': password,
                    },
                },
            },
            expected_status=http_client.OK)
        # ensure password doesn't leak
        user_id = r.result['user']['id']
        r = self.admin_request(
            method='GET',
            path='/v2.0/users/%s' % user_id,
            token=token,
            expected_status=http_client.OK)
        self.assertNotIn('OS-KSADM:password', r.result['user'])
 
[docs]    def test_updating_a_user_with_an_OSKSADM_password(self):
        token = self.get_scoped_token()
        user_id = self.user_foo['id']
        password = uuid.uuid4().hex
        # update the user
        self.admin_request(
            method='PUT',
            path='/v2.0/users/%s/OS-KSADM/password' % user_id,
            body={
                'user': {
                   'password': password,
                },
            },
            token=token,
            expected_status=http_client.OK)
        # successfully authenticate
        self.public_request(
            method='POST',
            path='/v2.0/tokens',
            body={
                'auth': {
                    'passwordCredentials': {
                        'username': self.user_foo['name'],
                        'password': password,
                    },
                },
            },
            expected_status=http_client.OK)
 
[docs]    def test_enable_or_disable_user(self):
        token = self.get_scoped_token()
        user_id = self.user_badguy['id']
        self.assertFalse(self.user_badguy['enabled'])
        def _admin_request(body, status):
            resp = self.admin_request(
                method='PUT',
                path='/v2.0/users/%s/OS-KSADM/enabled' % user_id,
                token=token,
                body=body,
                expected_status=status)
            return resp
        # Enable the user.
        body = {'user': {'enabled': True}}
        resp = _admin_request(body, http_client.OK)
        self.assertTrue(resp.json['user']['enabled'])
        # Disable the user.
        body = {'user': {'enabled': False}}
        resp = _admin_request(body, http_client.OK)
        self.assertFalse(resp.json['user']['enabled'])
        # Attributes other than `enabled` should still work due to bug 1607751
        body = {
            'user': {
                'description': uuid.uuid4().hex,
                'name': uuid.uuid4().hex,
                'enabled': True
            }
        }
        _admin_request(body, http_client.OK)
        #  `enabled` is boolean, type other than boolean is not allowed.
        body = {'user': {'enabled': uuid.uuid4().hex}}
        _admin_request(body, http_client.BAD_REQUEST)
  
[docs]class V2TestCaseUUID(V2TestCase, RestfulTestCase, CoreApiTests,
                     LegacyV2UsernameTests):
[docs]    def config_overrides(self):
        super(V2TestCaseUUID, self).config_overrides()
        self.config_fixture.config(group='token', provider='uuid')
  
[docs]class V2TestCaseFernet(V2TestCase, RestfulTestCase, CoreApiTests,
                       LegacyV2UsernameTests):
[docs]    def config_overrides(self):
        super(V2TestCaseFernet, self).config_overrides()
        self.config_fixture.config(group='token', provider='fernet')
        self.useFixture(
            ksfixtures.KeyRepository(
                self.config_fixture,
                'fernet_tokens',
                CONF.fernet_tokens.max_active_keys
            )
        )
 
[docs]    def test_fetch_revocation_list_md5(self):
        self.skipTest('Revocation lists do not support Fernet')
 
[docs]    def test_fetch_revocation_list_sha256(self):
        self.skipTest('Revocation lists do not support Fernet')
  
[docs]class RevokeApiTestCase(V2TestCase, RestfulTestCase, CoreApiTests,
                        LegacyV2UsernameTests):
[docs]    def config_overrides(self):
        super(RevokeApiTestCase, self).config_overrides()
        self.config_fixture.config(
            group='token',
            provider='pki',
            revoke_by_id=False)
 
[docs]    def test_fetch_revocation_list_admin_200(self):
        self.skip_test_overrides('Revoke API disables revocation_list.')
 
[docs]    def test_fetch_revocation_list_md5(self):
        self.skip_test_overrides('Revoke API disables revocation_list.')
 
[docs]    def test_fetch_revocation_list_sha256(self):
        self.skip_test_overrides('Revoke API disables revocation_list.')
  
[docs]class TestFernetTokenProviderV2(RestfulTestCase):
[docs]    def setUp(self):
        super(TestFernetTokenProviderV2, self).setUp()
        # Add catalog data
        self.region = unit.new_region_ref()
        self.region_id = self.region['id']
        self.catalog_api.create_region(self.region)
        self.service = unit.new_service_ref()
        self.service_id = self.service['id']
        self.catalog_api.create_service(self.service_id, self.service)
        self.endpoint = unit.new_endpoint_ref(service_id=self.service_id,
                                              interface='public',
                                              region_id=self.region_id)
        self.endpoint_id = self.endpoint['id']
        self.catalog_api.create_endpoint(self.endpoint_id, self.endpoint)
 
[docs]    def assertValidUnscopedTokenResponse(self, r):
        v2.unscoped_validator.validate(r.json['access'])
 
[docs]    def assertValidScopedTokenResponse(self, r):
        v2.scoped_validator.validate(r.json['access'])
    # Used by RestfulTestCase 
    def _get_token_id(self, r):
        return r.result['access']['token']['id']
[docs]    def new_project_ref(self):
        return {'id': uuid.uuid4().hex,
                'name': uuid.uuid4().hex,
                'description': uuid.uuid4().hex,
                'domain_id': 'default',
                'enabled': True}
 
[docs]    def config_overrides(self):
        super(TestFernetTokenProviderV2, self).config_overrides()
        self.config_fixture.config(group='token', provider='fernet')
        self.useFixture(
            ksfixtures.KeyRepository(
                self.config_fixture,
                'fernet_tokens',
                CONF.fernet_tokens.max_active_keys
            )
        )
 
[docs]    def test_authenticate_unscoped_token(self):
        unscoped_token = self.get_unscoped_token()
        # Fernet token must be of length 255 per usability requirements
        self.assertLess(len(unscoped_token), 255)
 
[docs]    def test_validate_unscoped_token(self):
        # Grab an admin token to validate with
        project_ref = self.new_project_ref()
        self.resource_api.create_project(project_ref['id'], project_ref)
        self.assignment_api.add_role_to_user_and_project(self.user_foo['id'],
                                                         project_ref['id'],
                                                         self.role_admin['id'])
        admin_token = self.get_scoped_token(tenant_id=project_ref['id'])
        unscoped_token = self.get_unscoped_token()
        path = ('/v2.0/tokens/%s' % unscoped_token)
        resp = self.admin_request(
            method='GET',
            path=path,
            token=admin_token,
            expected_status=http_client.OK)
        self.assertValidUnscopedTokenResponse(resp)
 
[docs]    def test_authenticate_scoped_token(self):
        project_ref = self.new_project_ref()
        self.resource_api.create_project(project_ref['id'], project_ref)
        self.assignment_api.add_role_to_user_and_project(
            self.user_foo['id'], project_ref['id'], self.role_service['id'])
        token = self.get_scoped_token(tenant_id=project_ref['id'])
        # Fernet token must be of length 255 per usability requirements
        self.assertLess(len(token), 255)
 
[docs]    def test_validate_scoped_token(self):
        project_ref = self.new_project_ref()
        self.resource_api.create_project(project_ref['id'], project_ref)
        self.assignment_api.add_role_to_user_and_project(self.user_foo['id'],
                                                         project_ref['id'],
                                                         self.role_admin['id'])
        project2_ref = self.new_project_ref()
        self.resource_api.create_project(project2_ref['id'], project2_ref)
        self.assignment_api.add_role_to_user_and_project(
            self.user_foo['id'], project2_ref['id'], self.role_member['id'])
        admin_token = self.get_scoped_token(tenant_id=project_ref['id'])
        member_token = self.get_scoped_token(tenant_id=project2_ref['id'])
        path = ('/v2.0/tokens/%s?belongsTo=%s' % (member_token,
                project2_ref['id']))
        # Validate token belongs to project
        resp = self.admin_request(
            method='GET',
            path=path,
            token=admin_token,
            expected_status=http_client.OK)
        self.assertValidScopedTokenResponse(resp)
 
[docs]    def test_token_authentication_and_validation(self):
        """Test token authentication for Fernet token provider.
        Verify that token authentication returns validate response code and
        valid token belongs to project.
        """
        project_ref = self.new_project_ref()
        self.resource_api.create_project(project_ref['id'], project_ref)
        unscoped_token = self.get_unscoped_token()
        self.assignment_api.add_role_to_user_and_project(self.user_foo['id'],
                                                         project_ref['id'],
                                                         self.role_admin['id'])
        token_id = unscoped_token
        if six.PY2:
            token_id = token_id.encode('ascii')
        r = self.public_request(
            method='POST',
            path='/v2.0/tokens',
            body={
                'auth': {
                    'tenantName': project_ref['name'],
                    'token': {
                        'id': token_id,
                    }
                }
            },
            expected_status=http_client.OK)
        token_id = self._get_token_id(r)
        path = ('/v2.0/tokens/%s?belongsTo=%s' % (token_id, project_ref['id']))
        # Validate token belongs to project
        resp = self.admin_request(
            method='GET',
            path=path,
            token=self.get_admin_token(),
            expected_status=http_client.OK)
        self.assertValidScopedTokenResponse(resp)
 
[docs]    def test_rescoped_tokens_maintain_original_expiration(self):
        project_ref = self.new_project_ref()
        self.resource_api.create_project(project_ref['id'], project_ref)
        self.assignment_api.add_role_to_user_and_project(self.user_foo['id'],
                                                         project_ref['id'],
                                                         self.role_admin['id'])
        resp = self.public_request(
            method='POST',
            path='/v2.0/tokens',
            body={
                'auth': {
                    'tenantName': project_ref['name'],
                    'passwordCredentials': {
                        'username': self.user_foo['name'],
                        'password': self.user_foo['password']
                    }
                }
            },
            # NOTE(lbragstad): This test may need to be refactored if Keystone
            # decides to disallow rescoping using a scoped token.
            expected_status=http_client.OK)
        original_token = resp.result['access']['token']['id']
        original_expiration = resp.result['access']['token']['expires']
        resp = self.public_request(
            method='POST',
            path='/v2.0/tokens',
            body={
                'auth': {
                    'tenantName': project_ref['name'],
                    'token': {
                        'id': original_token,
                    }
                }
            },
            expected_status=http_client.OK)
        rescoped_token = resp.result['access']['token']['id']
        rescoped_expiration = resp.result['access']['token']['expires']
        self.assertNotEqual(original_token, rescoped_token)
        self.assertEqual(original_expiration, rescoped_expiration)
        self.assertValidScopedTokenResponse(resp)