# -*- coding: utf-8 -*-
"""
flask.ext.security.datastore
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
This module contains an user datastore classes.
:copyright: (c) 2012 by Matt Wright.
:license: MIT, see LICENSE for more details.
"""
class Datastore(object):
def __init__(self, db):
self.db = db
def commit(self):
pass
def put(self, model):
raise NotImplementedError
def delete(self, model):
raise NotImplementedError
class SQLAlchemyDatastore(Datastore):
def commit(self):
self.db.session.commit()
def put(self, model):
self.db.session.add(model)
return model
def delete(self, model):
self.db.session.delete(model)
class MongoEngineDatastore(Datastore):
def put(self, model):
model.save()
return model
def delete(self, model):
model.delete()
class PeeweeDatastore(Datastore):
def put(self, model):
model.save()
return model
def delete(self, model):
model.delete_instance()
class UserDatastore(object):
"""Abstracted user datastore.
:param user_model: A user model class definition
:param role_model: A role model class definition
"""
def __init__(self, user_model, role_model):
self.user_model = user_model
self.role_model = role_model
def _prepare_role_modify_args(self, user, role):
if isinstance(user, basestring):
user = self.find_user(email=user)
if isinstance(role, basestring):
role = self.find_role(role)
return user, role
def _prepare_create_user_args(self, **kwargs):
kwargs.setdefault('active', True)
roles = kwargs.get('roles', [])
for i, role in enumerate(roles):
rn = role.name if isinstance(role, self.role_model) else role
# see if the role exists
roles[i] = self.find_role(rn)
kwargs['roles'] = roles
return kwargs
def find_user(self, *args, **kwargs):
"""Returns a user matching the provided parameters."""
raise NotImplementedError
def find_role(self, *args, **kwargs):
"""Returns a role matching the provided name."""
raise NotImplementedError
def add_role_to_user(self, user, role):
"""Adds a role tp a user
:param user: The user to manipulate
:param role: The role to add to the user
"""
rv = False
user, role = self._prepare_role_modify_args(user, role)
if role not in user.roles:
rv = True
user.roles.append(role)
return rv
def remove_role_from_user(self, user, role):
"""Removes a role from a user
:param user: The user to manipulate
:param role: The role to remove from the user
"""
rv = False
user, role = self._prepare_role_modify_args(user, role)
if role in user.roles:
rv = True
user.roles.remove(role)
return rv
def toggle_active(self, user):
"""Toggles a user's active status. Always returns True."""
user.active = not user.active
return True
def deactivate_user(self, user):
"""Deactivates a specified user. Returns `True` if a change was made.
:param user: The user to deactivate
"""
if user.active:
user.active = False
return True
return False
def activate_user(self, user):
"""Activates a specified user. Returns `True` if a change was made.
:param user: The user to activate
"""
if not user.active:
user.active = True
return True
return False
def create_role(self, **kwargs):
"""Creates and returns a new role from the given parameters."""
role = self.role_model(**kwargs)
return self.put(role)
def find_or_create_role(self, name, **kwargs):
"""Returns a role matching the given name or creates it with any
additionally provided parameters
"""
kwargs["name"] = name
return self.find_role(name) or self.create_role(**kwargs)
def create_user(self, **kwargs):
"""Creates and returns a new user from the given parameters."""
user = self.user_model(**self._prepare_create_user_args(**kwargs))
print "in abstraced create_user, user is:", user
return self.put(user)
def delete_user(self, user):
"""Delete the specified user
:param user: The user to delete
"""
self.delete(user)
class SQLAlchemyUserDatastore(SQLAlchemyDatastore, UserDatastore):
"""A SQLAlchemy datastore implementation for Flask-Security that assumes the
use of the Flask-SQLAlchemy extension.
"""
def __init__(self, db, user_model, role_model):
SQLAlchemyDatastore.__init__(self, db)
UserDatastore.__init__(self, user_model, role_model)
def find_user(self, **kwargs):
return self.user_model.query.filter_by(**kwargs).first()
def find_role(self, role):
return self.role_model.query.filter_by(name=role).first()
class MongoEngineUserDatastore(MongoEngineDatastore, UserDatastore):
"""A MongoEngine datastore implementation for Flask-Security that assumes
the use of the Flask-MongoEngine extension.
"""
def __init__(self, db, user_model, role_model):
MongoEngineDatastore.__init__(self, db)
UserDatastore.__init__(self, user_model, role_model)
def find_user(self, **kwargs):
return self.user_model.objects(**kwargs).first()
def find_role(self, role):
return self.role_model.objects(name=role).first()
class PeeweeUserDatastore(PeeweeDatastore, UserDatastore):
"""A PeeweeD datastore implementation for Flask-Security that assumes
the use of the Flask-Peewee extension.
:param user_model: A user model class definition
:param role_model: A role model class definition
:param role_link: A model implementing the many-to-many user-role relation
"""
def __init__(self, db, user_model, role_model, role_link):
PeeweeDatastore.__init__(self, db)
UserDatastore.__init__(self, user_model, role_model)
self.UserRole = role_link
def find_user(self, **kwargs):
try:
return self.user_model.filter(**kwargs).get()
except self.user_model.DoesNotExist:
return None
def find_role(self, role):
try:
return self.role_model.filter(name=role).get()
except self.role_model.DoesNotExist:
return None
def create_user(self, **kwargs):
"""Creates and returns a new user from the given parameters."""
roles = kwargs.pop('roles', [])
user = self.user_model(**self._prepare_create_user_args(**kwargs))
user = self.put(user)
for role in roles:
self.add_role_to_user(user, role)
return user
def add_role_to_user(self, user, role):
"""Adds a role tp a user
:param user: The user to manipulate
:param role: The role to add to the user
"""
user, role = self._prepare_role_modify_args(user, role)
if self.UserRole.select().where(self.UserRole.user==user, self.UserRole.role==role).count():
return False
else:
self.UserRole.create(user=user, role=role)
return True
def remove_role_from_user(self, user, role):
"""Removes a role from a user
:param user: The user to manipulate
:param role: The role to remove from the user
"""
user, role = self._prepare_role_modify_args(user, role)
if self.UserRole.select().where(self.UserRole.user==user, self.UserRole.role==role).count():
self.UserRole.delete().where(self.UserRole.user==user, self.UserRole.role==role)
return True
else:
return False