Source code for keystone.identity.backends.sql_model

# Copyright 2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import datetime
import typing as ty

from oslo_utils import timeutils
import sqlalchemy
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy import orm
from sqlalchemy.orm import collections

from keystone.common import password_hashing
from keystone.common import resource_options
from keystone.common import sql
import keystone.conf
from keystone.identity.backends import resource_options as iro

CONF = keystone.conf.CONF


[docs] class User(sql.ModelBase, sql.ModelDictMixinWithExtras): __tablename__ = 'user' attributes = [ 'id', 'name', 'domain_id', 'password', 'enabled', 'default_project_id', 'password_expires_at', ] readonly_attributes = ['id', 'password_expires_at', 'password'] resource_options_registry = iro.USER_OPTIONS_REGISTRY id = sql.Column(sql.String(64), primary_key=True) domain_id = sql.Column(sql.String(64), nullable=False) _enabled = sql.Column('enabled', sql.Boolean) extra = sql.Column(sql.JsonBlob()) default_project_id = sql.Column(sql.String(64), index=True) _resource_option_mapper = orm.relationship( 'UserOption', single_parent=True, cascade='all,delete,delete-orphan', lazy='subquery', backref='user', collection_class=collections.attribute_mapped_collection('option_id'), ) local_user = orm.relationship( 'LocalUser', uselist=False, single_parent=True, lazy='joined', cascade='all,delete-orphan', backref='user', ) federated_users = orm.relationship( 'FederatedUser', single_parent=True, lazy='joined', cascade='all,delete-orphan', backref='user', ) nonlocal_user = orm.relationship( 'NonLocalUser', uselist=False, single_parent=True, lazy='joined', cascade='all,delete-orphan', backref='user', ) expiring_user_group_memberships = orm.relationship( 'ExpiringUserGroupMembership', cascade='all, delete-orphan', backref="user", ) created_at = sql.Column(sql.DateTime, nullable=True) last_active_at = sql.Column(sql.Date, nullable=True) # unique constraint needed here to support composite fk constraints __table_args__: ty.Any = (sql.UniqueConstraint('id', 'domain_id'), {}) # NOTE(stevemar): we use a hybrid property here because we leverage the # expression method, see `@name.expression` and `LocalUser.name` below. @hybrid_property def name(self): """Return the current user name.""" if self.local_user: return self.local_user.name elif self.nonlocal_user: return self.nonlocal_user.name elif self.federated_users: return self.federated_users[0].display_name else: return None @name.setter # type: ignore[no-redef] def name(self, value): if self.federated_users: self.federated_users[0].display_name = value elif self.local_user: self.local_user.name = value else: self.local_user = LocalUser() self.local_user.name = value @name.expression # type: ignore[no-redef] def name(cls): return LocalUser.name # password properties @property def password_ref(self): """Return the current password ref.""" if self.local_user and self.local_user.passwords: return self.local_user.passwords[-1] return None # NOTE(stevemar): we use a hybrid property here because we leverage the # expression method, see `@password.expression` and `Password.password` # below. @hybrid_property def password(self): """Return the current password.""" if self.password_ref: return self.password_ref.password_hash return None @property def password_created_at(self): """Return when password was created at.""" if self.password_ref: return self.password_ref.created_at return None @property def password_expires_at(self): """Return when password expires at.""" if self.password_ref: return self.password_ref.expires_at return None @property def password_is_expired(self): """Return whether password is expired or not.""" if self.password_expires_at and not self._password_expiry_exempt(): return timeutils.utcnow() >= self.password_expires_at return False @password.setter # type: ignore[no-redef] def password(self, value): now = timeutils.utcnow() if not self.local_user: self.local_user = LocalUser() # truncate extra passwords if self.local_user.passwords: unique_cnt = CONF.security_compliance.unique_last_password_count unique_cnt = unique_cnt + 1 if unique_cnt == 0 else unique_cnt self.local_user.passwords = self.local_user.passwords[-unique_cnt:] # set all previous passwords to be expired for ref in self.local_user.passwords: if not ref.expires_at or ref.expires_at > now: ref.expires_at = now new_password_ref = Password() hashed_passwd = None if value is not None: # NOTE(notmorgan): hash the passwords, never directly bind the # "value" in the unhashed form to hashed_passwd to ensure the # unhashed password cannot end up in the db. If an unhashed # password ends up in the DB, it cannot be used for auth, it is # however incorrect and could leak user credentials (due to users # doing insecure things such as sharing passwords across # different systems) to unauthorized parties. hashed_passwd = password_hashing.hash_password(value) new_password_ref.password_hash = hashed_passwd new_password_ref.created_at = now new_password_ref.expires_at = self._get_password_expires_at(now) self.local_user.passwords.append(new_password_ref) def _password_expiry_exempt(self): # Get the IGNORE_PASSWORD_EXPIRY_OPT value from the user's # option_mapper. return getattr( self.get_resource_option(iro.IGNORE_PASSWORD_EXPIRY_OPT.option_id), 'option_value', False, ) def _get_password_expires_at(self, created_at): expires_days = CONF.security_compliance.password_expires_days if not self._password_expiry_exempt(): if expires_days: expired_date = created_at + datetime.timedelta( days=expires_days ) return expired_date.replace(microsecond=0) return None @password.expression # type: ignore[no-redef] def password(cls): return Password.password_hash # NOTE(stevemar): we use a hybrid property here because we leverage the # expression method, see `@enabled.expression` and `User._enabled` below. @hybrid_property def enabled(self): """Return whether user is enabled or not.""" if self._enabled: max_days = ( CONF.security_compliance.disable_user_account_days_inactive ) inactivity_exempt = getattr( self.get_resource_option( iro.IGNORE_USER_INACTIVITY_OPT.option_id ), 'option_value', False, ) last_active = self.last_active_at if not last_active and self.created_at: last_active = self.created_at.date() if max_days and last_active: now = timeutils.utcnow().date() days_inactive = (now - last_active).days if days_inactive >= max_days and not inactivity_exempt: self._enabled = False return self._enabled @enabled.setter # type: ignore[no-redef] def enabled(self, value): if ( value and CONF.security_compliance.disable_user_account_days_inactive ): self.last_active_at = timeutils.utcnow().date() if value and self.local_user: self.local_user.failed_auth_count = 0 self.local_user.failed_auth_at = None self._enabled = value @enabled.expression # type: ignore[no-redef] def enabled(cls): return User._enabled
[docs] def get_resource_option(self, option_id): if option_id in self._resource_option_mapper.keys(): return self._resource_option_mapper[option_id] return None
[docs] def to_dict(self, include_extra_dict=False): d = super().to_dict(include_extra_dict=include_extra_dict) if 'default_project_id' in d and d['default_project_id'] is None: del d['default_project_id'] # NOTE(notmorgan): Eventually it may make sense to drop the empty # option dict creation to the superclass (if enough models use it) d['options'] = resource_options.ref_mapper_to_dict_options(self) return d
[docs] @classmethod def from_dict(cls, user_dict): """Override from_dict to remove password_expires_at attribute. Overriding this method to remove password_expires_at attribute to support update_user and unit tests where password_expires_at inadvertently gets added by calling to_dict followed by from_dict. :param user_dict: User entity dictionary :returns User: User object """ new_dict = user_dict.copy() resource_options = {} options = new_dict.pop('options', {}) password_expires_at_key = 'password_expires_at' # nosec if password_expires_at_key in user_dict: del new_dict[password_expires_at_key] for opt in cls.resource_options_registry.options: if opt.option_name in options: opt_value = options[opt.option_name] # NOTE(notmorgan): None is always a valid type if opt_value is not None: opt.validator(opt_value) resource_options[opt.option_id] = opt_value user_obj = super().from_dict(new_dict) setattr(user_obj, '_resource_options', resource_options) return user_obj
[docs] class LocalUser(sql.ModelBase, sql.ModelDictMixin): __tablename__ = 'local_user' attributes = ['id', 'user_id', 'domain_id', 'name'] id = sql.Column(sql.Integer, primary_key=True) user_id = sql.Column(sql.String(64), nullable=False) domain_id = sql.Column(sql.String(64), nullable=False) name = sql.Column(sql.String(255), nullable=False) passwords = orm.relationship( 'Password', single_parent=True, cascade='all,delete-orphan', lazy='joined', backref='local_user', order_by='Password.created_at_int', ) failed_auth_count = sql.Column(sql.Integer, nullable=True) failed_auth_at = sql.Column(sql.DateTime, nullable=True) __table_args__ = ( sql.UniqueConstraint('user_id'), sql.UniqueConstraint('domain_id', 'name'), sqlalchemy.ForeignKeyConstraint( ['user_id', 'domain_id'], ['user.id', 'user.domain_id'], onupdate='CASCADE', ondelete='CASCADE', ), )
[docs] class Password(sql.ModelBase, sql.ModelDictMixin): __tablename__ = 'password' attributes = [ 'id', 'local_user_id', 'password_hash', 'created_at', 'expires_at', ] id = sql.Column(sql.Integer, primary_key=True) local_user_id = sql.Column( sql.Integer, sql.ForeignKey('local_user.id', ondelete='CASCADE'), nullable=False, ) password_hash = sql.Column(sql.String(255), nullable=True) # TODO(lbragstad): Once Rocky opens for development, the _created_at and # _expires_at attributes/columns can be removed from the schema. The # migration ensures all passwords are converted from datetime objects to # big integers. The old datetime columns and their corresponding attributes # in the model are no longer required. # created_at default set here to safe guard in case it gets missed _created_at = sql.Column( 'created_at', sql.DateTime, nullable=False, default=timeutils.utcnow ) _expires_at = sql.Column('expires_at', sql.DateTime, nullable=True) # set the default to 0, a 0 indicates it is unset. created_at_int = sql.Column( sql.DateTimeInt(), nullable=False, default=0, server_default='0' ) expires_at_int = sql.Column(sql.DateTimeInt(), nullable=True) self_service = sql.Column( sql.Boolean, default=False, nullable=False, server_default='0' ) @hybrid_property def created_at(self): return self.created_at_int or self._created_at @created_at.setter # type: ignore[no-redef] def created_at(self, value): self._created_at = value self.created_at_int = value @hybrid_property def expires_at(self): return self.expires_at_int or self._expires_at @expires_at.setter # type: ignore[no-redef] def expires_at(self, value): self._expires_at = value self.expires_at_int = value
[docs] class FederatedUser(sql.ModelBase, sql.ModelDictMixin): __tablename__ = 'federated_user' attributes = [ 'id', 'user_id', 'idp_id', 'protocol_id', 'unique_id', 'display_name', ] id = sql.Column(sql.Integer, primary_key=True) user_id = sql.Column( sql.String(64), sql.ForeignKey('user.id', ondelete='CASCADE'), nullable=False, ) idp_id = sql.Column( sql.String(64), sql.ForeignKey('identity_provider.id', ondelete='CASCADE'), nullable=False, ) protocol_id = sql.Column(sql.String(64), nullable=False) unique_id = sql.Column(sql.String(255), nullable=False) display_name = sql.Column(sql.String(255), nullable=True) __table_args__ = ( sql.UniqueConstraint('idp_id', 'protocol_id', 'unique_id'), sqlalchemy.ForeignKeyConstraint( ['protocol_id', 'idp_id'], ['federation_protocol.id', 'federation_protocol.idp_id'], ondelete='CASCADE', ), )
[docs] class NonLocalUser(sql.ModelBase, sql.ModelDictMixin): """SQL data model for nonlocal users (LDAP and custom).""" __tablename__ = 'nonlocal_user' attributes = ['domain_id', 'name', 'user_id'] domain_id = sql.Column(sql.String(64), primary_key=True) name = sql.Column(sql.String(255), primary_key=True) user_id = sql.Column(sql.String(64), nullable=False) __table_args__ = ( sql.UniqueConstraint('user_id'), sqlalchemy.ForeignKeyConstraint( ['user_id', 'domain_id'], ['user.id', 'user.domain_id'], onupdate='CASCADE', ondelete='CASCADE', ), )
[docs] class Group(sql.ModelBase, sql.ModelDictMixinWithExtras): __tablename__ = 'group' attributes = ['id', 'name', 'domain_id', 'description'] id = sql.Column(sql.String(64), primary_key=True) name = sql.Column(sql.String(64), nullable=False) domain_id = sql.Column(sql.String(64), nullable=False) description = sql.Column(sql.Text()) extra = sql.Column(sql.JsonBlob()) expiring_user_group_memberships = orm.relationship( 'ExpiringUserGroupMembership', cascade='all, delete-orphan', backref="group", ) # Unique constraint across two columns to create the separation # rather than just only 'name' being unique __table_args__ = (sql.UniqueConstraint('domain_id', 'name'),)
[docs] class UserGroupMembership(sql.ModelBase, sql.ModelDictMixin): """Group membership join table.""" __tablename__ = 'user_group_membership' user_id = sql.Column( sql.String(64), sql.ForeignKey('user.id'), primary_key=True ) group_id = sql.Column( sql.String(64), sql.ForeignKey('group.id'), primary_key=True )
[docs] class ExpiringUserGroupMembership(sql.ModelBase, sql.ModelDictMixin): """Expiring group membership through federation mapping rules.""" __tablename__ = 'expiring_user_group_membership' user_id = sql.Column( sql.String(64), sql.ForeignKey('user.id'), primary_key=True ) group_id = sql.Column( sql.String(64), sql.ForeignKey('group.id'), primary_key=True ) idp_id = sql.Column( sql.String(64), sql.ForeignKey('identity_provider.id', ondelete='CASCADE'), primary_key=True, ) last_verified = sql.Column(sql.DateTime, nullable=False) @hybrid_property def expires(self): ttl = self.idp.authorization_ttl if not ttl: ttl = CONF.federation.default_authorization_ttl return self.last_verified + datetime.timedelta(minutes=ttl) @hybrid_property def expired(self): return self.expires <= timeutils.utcnow()
[docs] class UserOption(sql.ModelBase): __tablename__ = 'user_option' user_id = sql.Column( sql.String(64), sql.ForeignKey('user.id', ondelete='CASCADE'), nullable=False, primary_key=True, ) option_id = sql.Column(sql.String(4), nullable=False, primary_key=True) option_value = sql.Column(sql.JsonBlob, nullable=True) def __init__(self, option_id, option_value): self.option_id = option_id self.option_value = option_value