# -*- coding: utf-8 -*- """ flask.ext.security.datastore ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ This module contains an user datastore classes. :copyright: (c) 2012 by Matt Wright. :license: MIT, see LICENSE for more details. """ class Datastore(object): def __init__(self, db): self.db = db def commit(self): pass def put(self, model): raise NotImplementedError def delete(self, model): raise NotImplementedError class SQLAlchemyDatastore(Datastore): def commit(self): self.db.session.commit() def put(self, model): self.db.session.add(model) return model def delete(self, model): self.db.session.delete(model) class MongoEngineDatastore(Datastore): def put(self, model): model.save() return model def delete(self, model): model.delete() class PeeweeDatastore(Datastore): def put(self, model): model.save() return model def delete(self, model): model.delete_instance() class UserDatastore(object): """Abstracted user datastore. :param user_model: A user model class definition :param role_model: A role model class definition """ def __init__(self, user_model, role_model): self.user_model = user_model self.role_model = role_model def _prepare_role_modify_args(self, user, role): if isinstance(user, basestring): user = self.find_user(email=user) if isinstance(role, basestring): role = self.find_role(role) return user, role def _prepare_create_user_args(self, **kwargs): kwargs.setdefault('active', True) roles = kwargs.get('roles', []) for i, role in enumerate(roles): rn = role.name if isinstance(role, self.role_model) else role # see if the role exists roles[i] = self.find_role(rn) kwargs['roles'] = roles return kwargs def find_user(self, *args, **kwargs): """Returns a user matching the provided parameters.""" raise NotImplementedError def find_role(self, *args, **kwargs): """Returns a role matching the provided name.""" raise NotImplementedError def add_role_to_user(self, user, role): """Adds a role tp a user :param user: The user to manipulate :param role: The role to add to the user """ rv = False user, role = self._prepare_role_modify_args(user, role) if role not in user.roles: rv = True user.roles.append(role) return rv def remove_role_from_user(self, user, role): """Removes a role from a user :param user: The user to manipulate :param role: The role to remove from the user """ rv = False user, role = self._prepare_role_modify_args(user, role) if role in user.roles: rv = True user.roles.remove(role) return rv def toggle_active(self, user): """Toggles a user's active status. Always returns True.""" user.active = not user.active return True def deactivate_user(self, user): """Deactivates a specified user. Returns `True` if a change was made. :param user: The user to deactivate """ if user.active: user.active = False return True return False def activate_user(self, user): """Activates a specified user. Returns `True` if a change was made. :param user: The user to activate """ if not user.active: user.active = True return True return False def create_role(self, **kwargs): """Creates and returns a new role from the given parameters.""" role = self.role_model(**kwargs) return self.put(role) def find_or_create_role(self, name, **kwargs): """Returns a role matching the given name or creates it with any additionally provided parameters """ kwargs["name"] = name return self.find_role(name) or self.create_role(**kwargs) def create_user(self, **kwargs): """Creates and returns a new user from the given parameters.""" user = self.user_model(**self._prepare_create_user_args(**kwargs)) print "in abstraced create_user, user is:", user return self.put(user) def delete_user(self, user): """Delete the specified user :param user: The user to delete """ self.delete(user) class SQLAlchemyUserDatastore(SQLAlchemyDatastore, UserDatastore): """A SQLAlchemy datastore implementation for Flask-Security that assumes the use of the Flask-SQLAlchemy extension. """ def __init__(self, db, user_model, role_model): SQLAlchemyDatastore.__init__(self, db) UserDatastore.__init__(self, user_model, role_model) def find_user(self, **kwargs): return self.user_model.query.filter_by(**kwargs).first() def find_role(self, role): return self.role_model.query.filter_by(name=role).first() class MongoEngineUserDatastore(MongoEngineDatastore, UserDatastore): """A MongoEngine datastore implementation for Flask-Security that assumes the use of the Flask-MongoEngine extension. """ def __init__(self, db, user_model, role_model): MongoEngineDatastore.__init__(self, db) UserDatastore.__init__(self, user_model, role_model) def find_user(self, **kwargs): return self.user_model.objects(**kwargs).first() def find_role(self, role): return self.role_model.objects(name=role).first() class PeeweeUserDatastore(PeeweeDatastore, UserDatastore): """A PeeweeD datastore implementation for Flask-Security that assumes the use of the Flask-Peewee extension. :param user_model: A user model class definition :param role_model: A role model class definition :param role_link: A model implementing the many-to-many user-role relation """ def __init__(self, db, user_model, role_model, role_link): PeeweeDatastore.__init__(self, db) UserDatastore.__init__(self, user_model, role_model) self.UserRole = role_link def find_user(self, **kwargs): try: return self.user_model.filter(**kwargs).get() except self.user_model.DoesNotExist: return None def find_role(self, role): try: return self.role_model.filter(name=role).get() except self.role_model.DoesNotExist: return None def create_user(self, **kwargs): """Creates and returns a new user from the given parameters.""" roles = kwargs.pop('roles', []) user = self.user_model(**self._prepare_create_user_args(**kwargs)) user = self.put(user) for role in roles: self.add_role_to_user(user, role) return user def add_role_to_user(self, user, role): """Adds a role tp a user :param user: The user to manipulate :param role: The role to add to the user """ user, role = self._prepare_role_modify_args(user, role) if self.UserRole.select().where(self.UserRole.user==user, self.UserRole.role==role).count(): return False else: self.UserRole.create(user=user, role=role) return True def remove_role_from_user(self, user, role): """Removes a role from a user :param user: The user to manipulate :param role: The role to remove from the user """ user, role = self._prepare_role_modify_args(user, role) if self.UserRole.select().where(self.UserRole.user==user, self.UserRole.role==role).count(): self.UserRole.delete().where(self.UserRole.user==user, self.UserRole.role==role) return True else: return False