Source code for keystone.api.users

#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

# This file handles all flask-restful resources for /v3/users

import base64
import os
import uuid

import flask
import http.client
from oslo_serialization import jsonutils
from werkzeug import exceptions

from keystone.api._shared import json_home_relations
from keystone.application_credential import schema as app_cred_schema
from keystone.common import json_home
from keystone.common import provider_api
from keystone.common import rbac_enforcer
from keystone.common import utils
from keystone.common import validation
import keystone.conf
from keystone import exception as ks_exception
from keystone.i18n import _
from keystone.identity import schema
from keystone import notifications
from keystone.server import flask as ks_flask


CRED_TYPE_EC2 = 'ec2'
CONF = keystone.conf.CONF
ENFORCER = rbac_enforcer.RBACEnforcer
PROVIDERS = provider_api.ProviderAPIs

ACCESS_TOKEN_ID_PARAMETER_RELATION = (
    json_home_relations.os_oauth1_parameter_rel_func(
        parameter_name='access_token_id')
)


def _convert_v3_to_ec2_credential(credential):
    # Prior to bug #1259584 fix, blob was stored unserialized
    # but it should be stored as a json string for compatibility
    # with the v3 credentials API.  Fall back to the old behavior
    # for backwards compatibility with existing DB contents
    try:
        blob = jsonutils.loads(credential['blob'])
    except TypeError:
        blob = credential['blob']
    return {'user_id': credential.get('user_id'),
            'tenant_id': credential.get('project_id'),
            'access': blob.get('access'),
            'secret': blob.get('secret'),
            'trust_id': blob.get('trust_id')}


def _format_token_entity(entity):

    formatted_entity = entity.copy()
    access_token_id = formatted_entity['id']
    user_id = formatted_entity.get('authorizing_user_id', '')
    if 'role_ids' in entity:
        formatted_entity.pop('role_ids')
    if 'access_secret' in entity:
        formatted_entity.pop('access_secret')

    url = ('/users/%(user_id)s/OS-OAUTH1/access_tokens/%(access_token_id)s'
           '/roles' % {'user_id': user_id,
                       'access_token_id': access_token_id})

    formatted_entity.setdefault('links', {})
    formatted_entity['links']['roles'] = (ks_flask.base_url(url))

    return formatted_entity


def _check_unrestricted_application_credential(token):
    if 'application_credential' in token.methods:
        if not token.application_credential['unrestricted']:
            action = _("Using method 'application_credential' is not "
                       "allowed for managing additional application "
                       "credentials.")
            raise ks_exception.ForbiddenAction(action=action)


def _build_user_target_enforcement():
    target = {}
    try:
        target['user'] = PROVIDERS.identity_api.get_user(
            flask.request.view_args.get('user_id')
        )
        if flask.request.view_args.get('group_id'):
            target['group'] = PROVIDERS.identity_api.get_group(
                flask.request.view_args.get('group_id')
            )
    except ks_exception.NotFound:  # nosec
        # Defer existence in the event the user doesn't exist, we'll
        # check this later anyway.
        pass

    return target


def _build_enforcer_target_data_owner_and_user_id_match():
    ref = {}
    if flask.request.view_args:
        credential_id = flask.request.view_args.get('credential_id')
        if credential_id is not None:
            hashed_id = utils.hash_access_key(credential_id)
            ref['credential'] = PROVIDERS.credential_api.get_credential(
                hashed_id)
    return ref


def _update_request_user_id_attribute():
    # This method handles a special case in policy enforcement. The application
    # credential API is underneath the user path (e.g.,
    # /v3/users/{user_id}/application_credentials/{application_credential_id}).
    # The RBAC enforcer thinks the user to evaluate for application credential
    # ownership comes from the path, but it should come from the actual
    # application credential reference. By ensuring we pull the user ID from
    # the application credential, we close a loop hole where users could
    # effectively bypass authorization to view or delete any application
    # credential in the system, assuming the attacker knows the application
    # credential ID of another user. So long as the attacker matches the user
    # ID in the request path to the user in the token of the request, they can
    # pass the `rule:owner` policy check. This method protects against that by
    # ensuring we use the application credential user ID and not something
    # determined from the client.
    try:
        app_cred = (
            PROVIDERS.application_credential_api.get_application_credential(
                flask.request.view_args.get('application_credential_id')
            )
        )
        flask.request.view_args['user_id'] = app_cred['user_id']

        # This target isn't really used in the default policy for application
        # credentials, but we return it since we're using this method as a hook
        # to update the flask request variables, which are used later in the
        # keystone RBAC enforcer to populate the policy_dict, which ultimately
        # turns into target attributes.
        return {'user_id': app_cred['user_id']}
    except ks_exception.NotFound:  # nosec
        # Defer existance in the event the application credential doesn't
        # exist, we'll check this later anyway.
        pass


def _format_role_entity(role_id):
    role = PROVIDERS.role_api.get_role(role_id)
    formatted_entity = role.copy()
    if 'description' in role:
        formatted_entity.pop('description')
    if 'enabled' in role:
        formatted_entity.pop('enabled')
    return formatted_entity


[docs]class UserResource(ks_flask.ResourceBase): collection_key = 'users' member_key = 'user' get_member_from_driver = PROVIDERS.deferred_provider_lookup( api='identity_api', method='get_user')
[docs] def get(self, user_id=None): """Get a user resource or list users. GET/HEAD /v3/users GET/HEAD /v3/users/{user_id} """ if user_id is not None: return self._get_user(user_id) return self._list_users()
def _get_user(self, user_id): """Get a user resource. GET/HEAD /v3/users/{user_id} """ ENFORCER.enforce_call( action='identity:get_user', build_target=_build_user_target_enforcement ) ref = PROVIDERS.identity_api.get_user(user_id) return self.wrap_member(ref) def _list_users(self): """List users. GET/HEAD /v3/users """ filters = ('domain_id', 'enabled', 'idp_id', 'name', 'protocol_id', 'unique_id', 'password_expires_at') target = None if self.oslo_context.domain_id: target = {'domain_id': self.oslo_context.domain_id} hints = self.build_driver_hints(filters) ENFORCER.enforce_call( action='identity:list_users', filters=filters, target_attr=target ) domain = self._get_domain_id_for_list_request() if domain is None and self.oslo_context.domain_id: domain = self.oslo_context.domain_id refs = PROVIDERS.identity_api.list_users( domain_scope=domain, hints=hints) # If the user making the request used a domain-scoped token, let's make # sure we filter out users that are not in that domain. Otherwise, we'd # be exposing users in other domains. This if statement is needed in # case _get_domain_id_for_list_request() short-circuits due to # configuration and protects against information from other domains # leaking to people who shouldn't see it. if self.oslo_context.domain_id: domain_id = self.oslo_context.domain_id users = [user for user in refs if user['domain_id'] == domain_id] else: users = refs return self.wrap_collection(users, hints=hints)
[docs] def post(self): """Create a user. POST /v3/users """ user_data = self.request_body_json.get('user', {}) target = {'user': user_data} ENFORCER.enforce_call( action='identity:create_user', target_attr=target ) validation.lazy_validate(schema.user_create, user_data) user_data = self._normalize_dict(user_data) user_data = self._normalize_domain_id(user_data) ref = PROVIDERS.identity_api.create_user( user_data, initiator=self.audit_initiator) return self.wrap_member(ref), http.client.CREATED
[docs] def patch(self, user_id): """Update a user. PATCH /v3/users/{user_id} """ ENFORCER.enforce_call( action='identity:update_user', build_target=_build_user_target_enforcement ) PROVIDERS.identity_api.get_user(user_id) user_data = self.request_body_json.get('user', {}) validation.lazy_validate(schema.user_update, user_data) self._require_matching_id(user_data) ref = PROVIDERS.identity_api.update_user( user_id, user_data, initiator=self.audit_initiator) return self.wrap_member(ref)
[docs] def delete(self, user_id): """Delete a user. DELETE /v3/users/{user_id} """ ENFORCER.enforce_call( action='identity:delete_user', build_target=_build_user_target_enforcement ) PROVIDERS.identity_api.delete_user(user_id) return None, http.client.NO_CONTENT
[docs]class UserChangePasswordResource(ks_flask.ResourceBase):
[docs] @ks_flask.unenforced_api def get(self, user_id): # Special case, GET is not allowed. raise exceptions.MethodNotAllowed(valid_methods=['POST'])
[docs] @ks_flask.unenforced_api def post(self, user_id): user_data = self.request_body_json.get('user', {}) validation.lazy_validate(schema.password_change, user_data) try: PROVIDERS.identity_api.change_password( user_id=user_id, original_password=user_data['original_password'], new_password=user_data['password'], initiator=self.audit_initiator) except AssertionError as e: raise ks_exception.Unauthorized( _('Error when changing user password: %s') % e ) return None, http.client.NO_CONTENT
[docs]class UserProjectsResource(ks_flask.ResourceBase): collection_key = 'projects' member_key = 'project' get_member_from_driver = PROVIDERS.deferred_provider_lookup( api='resource_api', method='get_project')
[docs] def get(self, user_id): filters = ('domain_id', 'enabled', 'name') ENFORCER.enforce_call(action='identity:list_user_projects', filters=filters, build_target=_build_user_target_enforcement) hints = self.build_driver_hints(filters) refs = PROVIDERS.assignment_api.list_projects_for_user(user_id) return self.wrap_collection(refs, hints=hints)
[docs]class UserGroupsResource(ks_flask.ResourceBase): collection_key = 'groups' member_key = 'group' get_member_from_driver = PROVIDERS.deferred_provider_lookup( api='identity_api', method='get_group')
[docs] def get(self, user_id): """Get groups for a user. GET/HEAD /v3/users/{user_id}/groups """ filters = ('name',) hints = self.build_driver_hints(filters) ENFORCER.enforce_call(action='identity:list_groups_for_user', build_target=_build_user_target_enforcement, filters=filters) refs = PROVIDERS.identity_api.list_groups_for_user(user_id=user_id, hints=hints) if (self.oslo_context.domain_id): filtered_refs = [] for ref in refs: if ref['domain_id'] == self.oslo_context.domain_id: filtered_refs.append(ref) refs = filtered_refs return self.wrap_collection(refs, hints=hints)
class _UserOSEC2CredBaseResource(ks_flask.ResourceBase): collection_key = 'credentials' member_key = 'credential' @classmethod def _add_self_referential_link(cls, ref, collection_name=None): # NOTE(morgan): This should be refactored to have an EC2 Cred API with # a sane prefix instead of overloading the "_add_self_referential_link" # method. This was chosen as it more closely mirrors the pre-flask # code (for transition). path = '/users/%(user_id)s/credentials/OS-EC2/%(credential_id)s' url = ks_flask.base_url(path) % { 'user_id': ref['user_id'], 'credential_id': ref['access']} ref.setdefault('links', {}) ref['links']['self'] = url
[docs]class UserOSEC2CredentialsResourceListCreate(_UserOSEC2CredBaseResource):
[docs] def get(self, user_id): """List EC2 Credentials for user. GET/HEAD /v3/users/{user_id}/credentials/OS-EC2 """ ENFORCER.enforce_call(action='identity:ec2_list_credentials') PROVIDERS.identity_api.get_user(user_id) credential_refs = PROVIDERS.credential_api.list_credentials_for_user( user_id, type=CRED_TYPE_EC2) collection_refs = [ _convert_v3_to_ec2_credential(cred) for cred in credential_refs ] return self.wrap_collection(collection_refs)
[docs] def post(self, user_id): """Create EC2 Credential for user. POST /v3/users/{user_id}/credentials/OS-EC2 """ target = {} target['credential'] = {'user_id': user_id} ENFORCER.enforce_call(action='identity:ec2_create_credential', target_attr=target) PROVIDERS.identity_api.get_user(user_id) tenant_id = self.request_body_json.get('tenant_id') PROVIDERS.resource_api.get_project(tenant_id) blob = dict( access=uuid.uuid4().hex, secret=uuid.uuid4().hex, trust_id=self.oslo_context.trust_id ) credential_id = utils.hash_access_key(blob['access']) cred_data = dict( user_id=user_id, project_id=tenant_id, blob=jsonutils.dumps(blob), id=credential_id, type=CRED_TYPE_EC2 ) PROVIDERS.credential_api.create_credential(credential_id, cred_data) ref = _convert_v3_to_ec2_credential(cred_data) return self.wrap_member(ref), http.client.CREATED
[docs]class UserOSEC2CredentialsResourceGetDelete(_UserOSEC2CredBaseResource): @staticmethod def _get_cred_data(credential_id): cred = PROVIDERS.credential_api.get_credential(credential_id) if not cred or cred['type'] != CRED_TYPE_EC2: raise ks_exception.Unauthorized( message=_('EC2 access key not found.')) return _convert_v3_to_ec2_credential(cred)
[docs] def get(self, user_id, credential_id): """Get a specific EC2 credential. GET/HEAD /users/{user_id}/credentials/OS-EC2/{credential_id} """ func = _build_enforcer_target_data_owner_and_user_id_match ENFORCER.enforce_call( action='identity:ec2_get_credential', build_target=func) PROVIDERS.identity_api.get_user(user_id) ec2_cred_id = utils.hash_access_key(credential_id) cred_data = self._get_cred_data(ec2_cred_id) return self.wrap_member(cred_data)
[docs] def delete(self, user_id, credential_id): """Delete a specific EC2 credential. DELETE /users/{user_id}/credentials/OS-EC2/{credential_id} """ func = _build_enforcer_target_data_owner_and_user_id_match ENFORCER.enforce_call(action='identity:ec2_delete_credential', build_target=func) PROVIDERS.identity_api.get_user(user_id) ec2_cred_id = utils.hash_access_key(credential_id) self._get_cred_data(ec2_cred_id) PROVIDERS.credential_api.delete_credential(ec2_cred_id) return None, http.client.NO_CONTENT
class _OAuth1ResourceBase(ks_flask.ResourceBase): collection_key = 'access_tokens' member_key = 'access_token' @classmethod def _add_self_referential_link(cls, ref, collection_name=None): # NOTE(morgan): This should be refactored to have an OAuth1 API with # a sane prefix instead of overloading the "_add_self_referential_link" # method. This was chosen as it more closely mirrors the pre-flask # code (for transition). ref.setdefault('links', {}) path = '/users/%(user_id)s/OS-OAUTH1/access_tokens' % { 'user_id': ref.get('authorizing_user_id', '') } ref['links']['self'] = ks_flask.base_url(path) + '/' + ref['id']
[docs]class OAuth1ListAccessTokensResource(_OAuth1ResourceBase):
[docs] def get(self, user_id): """List OAuth1 Access Tokens for user. GET /v3/users/{user_id}/OS-OAUTH1/access_tokens """ ENFORCER.enforce_call(action='identity:list_access_tokens') if self.oslo_context.is_delegated_auth: raise ks_exception.Forbidden( _('Cannot list request tokens with a token ' 'issued via delegation.')) refs = PROVIDERS.oauth_api.list_access_tokens(user_id) formatted_refs = ([_format_token_entity(x) for x in refs]) return self.wrap_collection(formatted_refs)
[docs]class OAuth1AccessTokenCRUDResource(_OAuth1ResourceBase):
[docs] def get(self, user_id, access_token_id): """Get specific access token. GET/HEAD /v3/users/{user_id}/OS-OAUTH1/access_tokens/{access_token_id} """ ENFORCER.enforce_call(action='identity:get_access_token') access_token = PROVIDERS.oauth_api.get_access_token(access_token_id) if access_token['authorizing_user_id'] != user_id: raise ks_exception.NotFound() access_token = _format_token_entity(access_token) return self.wrap_member(access_token)
[docs] def delete(self, user_id, access_token_id): """Delete specific access token. DELETE /v3/users/{user_id}/OS-OAUTH1/access_tokens/{access_token_id} """ ENFORCER.enforce_call( action='identity:ec2_delete_credential', build_target=_build_enforcer_target_data_owner_and_user_id_match) access_token = PROVIDERS.oauth_api.get_access_token(access_token_id) reason = ( 'Invalidating the token cache because an access token for ' 'consumer %(consumer_id)s has been deleted. Authorization for ' 'users with OAuth tokens will be recalculated and enforced ' 'accordingly the next time they authenticate or validate a ' 'token.' % {'consumer_id': access_token['consumer_id']} ) notifications.invalidate_token_cache_notification(reason) PROVIDERS.oauth_api.delete_access_token( user_id, access_token_id, initiator=self.audit_initiator) return None, http.client.NO_CONTENT
[docs]class OAuth1AccessTokenRoleListResource(ks_flask.ResourceBase): collection_key = 'roles' member_key = 'role'
[docs] def get(self, user_id, access_token_id): """List roles for a user access token. GET/HEAD /v3/users/{user_id}/OS-OAUTH1/access_tokens/ {access_token_id}/roles """ ENFORCER.enforce_call(action='identity:list_access_token_roles') access_token = PROVIDERS.oauth_api.get_access_token(access_token_id) if access_token['authorizing_user_id'] != user_id: raise ks_exception.NotFound() authed_role_ids = access_token['role_ids'] authed_role_ids = jsonutils.loads(authed_role_ids) refs = ([_format_role_entity(x) for x in authed_role_ids]) return self.wrap_collection(refs)
[docs]class OAuth1AccessTokenRoleResource(ks_flask.ResourceBase): collection_key = 'roles' member_key = 'role'
[docs] def get(self, user_id, access_token_id, role_id): """Get role for access token. GET/HEAD /v3/users/{user_id}/OS-OAUTH1/access_tokens/ {access_token_id}/roles/{role_id} """ ENFORCER.enforce_call(action='identity:get_access_token_role') access_token = PROVIDERS.oauth_api.get_access_token(access_token_id) if access_token['authorizing_user_id'] != user_id: raise ks_exception.Unauthorized(_('User IDs do not match')) authed_role_ids = access_token['role_ids'] authed_role_ids = jsonutils.loads(authed_role_ids) for authed_role_id in authed_role_ids: if authed_role_id == role_id: role = _format_role_entity(role_id) return self.wrap_member(role) raise ks_exception.RoleNotFound(role_id=role_id)
[docs]class UserAppCredListCreateResource(ks_flask.ResourceBase): collection_key = 'application_credentials' member_key = 'application_credential' _public_parameters = frozenset([ 'id', 'name', 'description', 'expires_at', 'project_id', 'roles', # secret is only exposed after create, it is not stored 'secret', 'links', 'unrestricted', 'access_rules' ]) @staticmethod def _generate_secret(): length = 64 secret = os.urandom(length) secret = base64.urlsafe_b64encode(secret) secret = secret.rstrip(b'=') secret = secret.decode('utf-8') return secret @staticmethod def _normalize_role_list(app_cred_roles): roles = [] for role in app_cred_roles: if role.get('id'): roles.append(role) else: roles.append(PROVIDERS.role_api.get_unique_role_by_name( role['name'])) return roles def _get_roles(self, app_cred_data, token): if app_cred_data.get('roles'): roles = self._normalize_role_list(app_cred_data['roles']) # NOTE(cmurphy): The user is not allowed to add a role that is not # in their token. This is to prevent trustees or application # credential users from escallating their privileges to include # additional roles that the trustor or application credential # creator has assigned on the project. token_roles = [r['id'] for r in token.roles] for role in roles: if role['id'] not in token_roles: detail = _('Cannot create an application credential with ' 'unassigned role') raise ks_exception.ApplicationCredentialValidationError( detail=detail) else: roles = token.roles return roles
[docs] def get(self, user_id): """List application credentials for user. GET/HEAD /v3/users/{user_id}/application_credentials """ filters = ('name',) ENFORCER.enforce_call(action='identity:list_application_credentials', filters=filters) app_cred_api = PROVIDERS.application_credential_api hints = self.build_driver_hints(filters) refs = app_cred_api.list_application_credentials(user_id, hints=hints) return self.wrap_collection(refs, hints=hints)
[docs] def post(self, user_id): """Create application credential. POST /v3/users/{user_id}/application_credentials """ ENFORCER.enforce_call(action='identity:create_application_credential') app_cred_data = self.request_body_json.get( 'application_credential', {}) validation.lazy_validate(app_cred_schema.application_credential_create, app_cred_data) token = self.auth_context['token'] _check_unrestricted_application_credential(token) if self.oslo_context.user_id != user_id: action = _('Cannot create an application credential for another ' 'user.') raise ks_exception.ForbiddenAction(action=action) project_id = self.oslo_context.project_id app_cred_data = self._assign_unique_id(app_cred_data) if not app_cred_data.get('secret'): app_cred_data['secret'] = self._generate_secret() app_cred_data['user_id'] = user_id app_cred_data['project_id'] = project_id app_cred_data['roles'] = self._get_roles(app_cred_data, token) if app_cred_data.get('expires_at'): app_cred_data['expires_at'] = utils.parse_expiration_date( app_cred_data['expires_at']) if app_cred_data.get('access_rules'): for access_rule in app_cred_data['access_rules']: # If user provides an access rule by ID, it will be looked up # by ID. If user provides an access rule that is identical to # an existing one, the ID generated here will be ignored and # the pre-existing access rule will be used. if 'id' not in access_rule: # Generate directly, rather than using _assign_unique_id, # so that there is no deep copy made access_rule['id'] = uuid.uuid4().hex app_cred_data = self._normalize_dict(app_cred_data) app_cred_api = PROVIDERS.application_credential_api try: ref = app_cred_api.create_application_credential( app_cred_data, initiator=self.audit_initiator) except ks_exception.RoleAssignmentNotFound as e: # Raise a Bad Request, not a Not Found, in accordance with the # API-SIG recommendations: # https://specs.openstack.org/openstack/api-wg/guidelines/http.html#failure-code-clarifications raise ks_exception.ApplicationCredentialValidationError( detail=str(e)) return self.wrap_member(ref), http.client.CREATED
[docs]class UserAppCredGetDeleteResource(ks_flask.ResourceBase): collection_key = 'application_credentials' member_key = 'application_credential'
[docs] def get(self, user_id, application_credential_id): """Get application credential resource. GET/HEAD /v3/users/{user_id}/application_credentials/ {application_credential_id} """ target = _update_request_user_id_attribute() ENFORCER.enforce_call( action='identity:get_application_credential', target_attr=target, ) ref = PROVIDERS.application_credential_api.get_application_credential( application_credential_id) return self.wrap_member(ref)
[docs] def delete(self, user_id, application_credential_id): """Delete application credential resource. DELETE /v3/users/{user_id}/application_credentials/ {application_credential_id} """ target = _update_request_user_id_attribute() ENFORCER.enforce_call( action='identity:delete_application_credential', target_attr=target ) token = self.auth_context['token'] _check_unrestricted_application_credential(token) PROVIDERS.application_credential_api.delete_application_credential( application_credential_id, initiator=self.audit_initiator) return None, http.client.NO_CONTENT
[docs]class UserAccessRuleListResource(ks_flask.ResourceBase): collection_key = 'access_rules' member_key = 'access_rule'
[docs] def get(self, user_id): """List access rules for user. GET/HEAD /v3/users/{user_id}/access_rules """ filters = ('service', 'path', 'method',) ENFORCER.enforce_call(action='identity:list_access_rules', filters=filters, build_target=_build_user_target_enforcement) app_cred_api = PROVIDERS.application_credential_api hints = self.build_driver_hints(filters) refs = app_cred_api.list_access_rules_for_user(user_id, hints=hints) hints = self.build_driver_hints(filters) return self.wrap_collection(refs, hints=hints)
[docs]class UserAccessRuleGetDeleteResource(ks_flask.ResourceBase): collection_key = 'access_rules' member_key = 'access_rule'
[docs] def get(self, user_id, access_rule_id): """Get access rule resource. GET/HEAD /v3/users/{user_id}/access_rules/{access_rule_id} """ ENFORCER.enforce_call( action='identity:get_access_rule', build_target=_build_user_target_enforcement ) ref = PROVIDERS.application_credential_api.get_access_rule( access_rule_id) return self.wrap_member(ref)
[docs] def delete(self, user_id, access_rule_id): """Delete access rule resource. DELETE /v3/users/{user_id}/access_rules/{access_rule_id} """ ENFORCER.enforce_call( action='identity:delete_access_rule', build_target=_build_user_target_enforcement ) PROVIDERS.application_credential_api.delete_access_rule( access_rule_id, initiator=self.audit_initiator) return None, http.client.NO_CONTENT
[docs]class UserAPI(ks_flask.APIBase): _name = 'users' _import_name = __name__ resources = [UserResource] resource_mapping = [ ks_flask.construct_resource_map( resource=UserChangePasswordResource, url='/users/<string:user_id>/password', resource_kwargs={}, rel='user_change_password', path_vars={'user_id': json_home.Parameters.USER_ID} ), ks_flask.construct_resource_map( resource=UserGroupsResource, url='/users/<string:user_id>/groups', resource_kwargs={}, rel='user_groups', path_vars={'user_id': json_home.Parameters.USER_ID} ), ks_flask.construct_resource_map( resource=UserProjectsResource, url='/users/<string:user_id>/projects', resource_kwargs={}, rel='user_projects', path_vars={'user_id': json_home.Parameters.USER_ID} ), ks_flask.construct_resource_map( resource=UserOSEC2CredentialsResourceListCreate, url='/users/<string:user_id>/credentials/OS-EC2', resource_kwargs={}, rel='user_credentials', resource_relation_func=( json_home_relations.os_ec2_resource_rel_func), path_vars={'user_id': json_home.Parameters.USER_ID} ), ks_flask.construct_resource_map( resource=UserOSEC2CredentialsResourceGetDelete, url=('/users/<string:user_id>/credentials/OS-EC2/' '<string:credential_id>'), resource_kwargs={}, rel='user_credential', resource_relation_func=( json_home_relations.os_ec2_resource_rel_func), path_vars={ 'credential_id': json_home.build_v3_parameter_relation( 'credential_id'), 'user_id': json_home.Parameters.USER_ID} ), ks_flask.construct_resource_map( resource=OAuth1ListAccessTokensResource, url='/users/<string:user_id>/OS-OAUTH1/access_tokens', resource_kwargs={}, rel='user_access_tokens', resource_relation_func=( json_home_relations.os_oauth1_resource_rel_func), path_vars={'user_id': json_home.Parameters.USER_ID} ), ks_flask.construct_resource_map( resource=OAuth1AccessTokenCRUDResource, url=('/users/<string:user_id>/OS-OAUTH1/' 'access_tokens/<string:access_token_id>'), resource_kwargs={}, rel='user_access_token', resource_relation_func=( json_home_relations.os_oauth1_resource_rel_func), path_vars={ 'access_token_id': ACCESS_TOKEN_ID_PARAMETER_RELATION, 'user_id': json_home.Parameters.USER_ID} ), ks_flask.construct_resource_map( resource=OAuth1AccessTokenRoleListResource, url=('/users/<string:user_id>/OS-OAUTH1/access_tokens/' '<string:access_token_id>/roles'), resource_kwargs={}, rel='user_access_token_roles', resource_relation_func=( json_home_relations.os_oauth1_resource_rel_func), path_vars={'access_token_id': ACCESS_TOKEN_ID_PARAMETER_RELATION, 'user_id': json_home.Parameters.USER_ID} ), ks_flask.construct_resource_map( resource=OAuth1AccessTokenRoleResource, url=('/users/<string:user_id>/OS-OAUTH1/access_tokens/' '<string:access_token_id>/roles/<string:role_id>'), resource_kwargs={}, rel='user_access_token_role', resource_relation_func=( json_home_relations.os_oauth1_resource_rel_func), path_vars={'access_token_id': ACCESS_TOKEN_ID_PARAMETER_RELATION, 'role_id': json_home.Parameters.ROLE_ID, 'user_id': json_home.Parameters.USER_ID} ), ks_flask.construct_resource_map( resource=UserAppCredListCreateResource, url='/users/<string:user_id>/application_credentials', resource_kwargs={}, rel='application_credentials', path_vars={'user_id': json_home.Parameters.USER_ID} ), ks_flask.construct_resource_map( resource=UserAppCredGetDeleteResource, url=('/users/<string:user_id>/application_credentials/' '<string:application_credential_id>'), resource_kwargs={}, rel='application_credential', path_vars={ 'user_id': json_home.Parameters.USER_ID, 'application_credential_id': json_home.Parameters.APPLICATION_CRED_ID} ), ks_flask.construct_resource_map( resource=UserAccessRuleListResource, url='/users/<string:user_id>/access_rules', resource_kwargs={}, rel='access_rules', path_vars={'user_id': json_home.Parameters.USER_ID} ), ks_flask.construct_resource_map( resource=UserAccessRuleGetDeleteResource, url=('/users/<string:user_id>/access_rules/' '<string:access_rule_id>'), resource_kwargs={}, rel='access_rule', path_vars={ 'user_id': json_home.Parameters.USER_ID, 'access_rule_id': json_home.Parameters.ACCESS_RULE_ID} ) ]
APIs = (UserAPI,)