diff options
Diffstat (limited to 'wqflask/flask_security/views.py')
-rw-r--r-- | wqflask/flask_security/views.py | 359 |
1 files changed, 359 insertions, 0 deletions
diff --git a/wqflask/flask_security/views.py b/wqflask/flask_security/views.py new file mode 100644 index 00000000..1b8488d8 --- /dev/null +++ b/wqflask/flask_security/views.py @@ -0,0 +1,359 @@ +# -*- coding: utf-8 -*- +""" + flask.ext.security.views + ~~~~~~~~~~~~~~~~~~~~~~~~ + + Flask-Security views module + + :copyright: (c) 2012 by Matt Wright. + :license: MIT, see LICENSE for more details. +""" + +from flask import current_app, redirect, request, render_template, jsonify, \ + after_this_request, Blueprint +from flask_login import current_user +from werkzeug.datastructures import MultiDict +from werkzeug.local import LocalProxy + +from .confirmable import send_confirmation_instructions, \ + confirm_user, confirm_email_token_status +from .decorators import login_required, anonymous_user_required +from .passwordless import send_login_instructions, \ + login_token_status +from .recoverable import reset_password_token_status, \ + send_reset_password_instructions, update_password +from .changeable import change_user_password +from .registerable import register_user +from .utils import get_url, get_post_login_redirect, do_flash, \ + get_message, login_user, logout_user, url_for_security as url_for, \ + config_value + + +# Convenient references +_security = LocalProxy(lambda: current_app.extensions['security']) + +_datastore = LocalProxy(lambda: _security.datastore) + + +def _render_json(form, include_auth_token=False): + has_errors = len(form.errors) > 0 + + if has_errors: + code = 400 + response = dict(errors=form.errors) + else: + code = 200 + response = dict(user=dict(id=str(form.user.id))) + if include_auth_token: + token = form.user.get_auth_token() + response['user']['authentication_token'] = token + + return jsonify(dict(meta=dict(code=code), response=response)) + + +def _commit(response=None): + _datastore.commit() + return response + + +def _ctx(endpoint): + return _security._run_ctx_processor(endpoint) + + +@anonymous_user_required +def login(): + """View function for login view""" + + form_class = _security.login_form + + if request.json: + form = form_class(MultiDict(request.json)) + else: + form = form_class() + + if form.validate_on_submit(): + login_user(form.user, remember=form.remember.data) + after_this_request(_commit) + + if not request.json: + return redirect(get_post_login_redirect()) + + form.next.data = get_url(request.args.get('next')) \ + or get_url(request.form.get('next')) or '' + + if request.json: + return _render_json(form, True) + + return render_template(config_value('LOGIN_USER_TEMPLATE'), + login_user_form=form, + **_ctx('login')) + + +@login_required +def logout(): + """View function which handles a logout request.""" + + logout_user() + + return redirect(request.args.get('next', None) or + get_url(_security.post_logout_view)) + + +def register(): + """View function which handles a registration request.""" + + if _security.confirmable or request.json: + form_class = _security.confirm_register_form + else: + form_class = _security.register_form + + if request.json: + form_data = MultiDict(request.json) + else: + form_data = request.form + + form = form_class(form_data) + + if form.validate_on_submit(): + user = register_user(**form.to_dict()) + form.user = user + + if not _security.confirmable or _security.login_without_confirmation: + after_this_request(_commit) + login_user(user) + + if not request.json: + post_register_url = get_url(_security.post_register_view) + post_login_url = get_url(_security.post_login_view) + return redirect(post_register_url or post_login_url) + + if request.json: + return _render_json(form) + + return render_template(config_value('REGISTER_USER_TEMPLATE'), + register_user_form=form, + **_ctx('register')) + + +def send_login(): + """View function that sends login instructions for passwordless login""" + + form_class = _security.passwordless_login_form + + if request.json: + form = form_class(MultiDict(request.json)) + else: + form = form_class() + + if form.validate_on_submit(): + send_login_instructions(form.user) + if request.json is None: + do_flash(*get_message('LOGIN_EMAIL_SENT', email=form.user.email)) + + if request.json: + return _render_json(form) + + return render_template(config_value('SEND_LOGIN_TEMPLATE'), + send_login_form=form, + **_ctx('send_login')) + + +@anonymous_user_required +def token_login(token): + """View function that handles passwordless login via a token""" + + expired, invalid, user = login_token_status(token) + + if invalid: + do_flash(*get_message('INVALID_LOGIN_TOKEN')) + if expired: + send_login_instructions(user) + do_flash(*get_message('LOGIN_EXPIRED', email=user.email, + within=_security.login_within)) + if invalid or expired: + return redirect(url_for('login')) + + login_user(user, True) + after_this_request(_commit) + do_flash(*get_message('PASSWORDLESS_LOGIN_SUCCESSFUL')) + + return redirect(get_post_login_redirect()) + + +def send_confirmation(): + """View function which sends confirmation instructions.""" + + form_class = _security.send_confirmation_form + + if request.json: + form = form_class(MultiDict(request.json)) + else: + form = form_class() + + if form.validate_on_submit(): + send_confirmation_instructions(form.user) + if request.json is None: + do_flash(*get_message('CONFIRMATION_REQUEST', email=form.user.email)) + + if request.json: + return _render_json(form) + + return render_template(config_value('SEND_CONFIRMATION_TEMPLATE'), + send_confirmation_form=form, + **_ctx('send_confirmation')) + + +@anonymous_user_required +def confirm_email(token): + """View function which handles a email confirmation request.""" + + expired, invalid, user = confirm_email_token_status(token) + + if not user or invalid: + invalid = True + do_flash(*get_message('INVALID_CONFIRMATION_TOKEN')) + if expired: + send_confirmation_instructions(user) + do_flash(*get_message('CONFIRMATION_EXPIRED', email=user.email, + within=_security.confirm_email_within)) + if invalid or expired: + return redirect(get_url(_security.confirm_error_view) or + url_for('send_confirmation')) + + confirm_user(user) + login_user(user, True) + after_this_request(_commit) + do_flash(*get_message('EMAIL_CONFIRMED')) + + return redirect(get_url(_security.post_confirm_view) or + get_url(_security.post_login_view)) + + +def forgot_password(): + """View function that handles a forgotten password request.""" + + form_class = _security.forgot_password_form + + if request.json: + form = form_class(MultiDict(request.json)) + else: + form = form_class() + + if form.validate_on_submit(): + send_reset_password_instructions(form.user) + if request.json is None: + do_flash(*get_message('PASSWORD_RESET_REQUEST', email=form.user.email)) + + if request.json: + return _render_json(form) + + return render_template(config_value('FORGOT_PASSWORD_TEMPLATE'), + forgot_password_form=form, + **_ctx('forgot_password')) + + +@anonymous_user_required +def reset_password(token): + """View function that handles a reset password request.""" + + expired, invalid, user = reset_password_token_status(token) + + if invalid: + do_flash(*get_message('INVALID_RESET_PASSWORD_TOKEN')) + if expired: + do_flash(*get_message('PASSWORD_RESET_EXPIRED', email=user.email, + within=_security.reset_password_within)) + if invalid or expired: + return redirect(url_for('forgot_password')) + + form = _security.reset_password_form() + + if form.validate_on_submit(): + after_this_request(_commit) + update_password(user, form.password.data) + do_flash(*get_message('PASSWORD_RESET')) + login_user(user, True) + return redirect(get_url(_security.post_reset_view) or + get_url(_security.post_login_view)) + + return render_template(config_value('RESET_PASSWORD_TEMPLATE'), + reset_password_form=form, + reset_password_token=token, + **_ctx('reset_password')) + + +@login_required +def change_password(): + """View function which handles a change password request.""" + + form_class = _security.change_password_form + + if request.json: + form = form_class(MultiDict(request.json)) + else: + form = form_class() + + if form.validate_on_submit(): + after_this_request(_commit) + change_user_password(current_user, form.new_password.data) + if request.json is None: + do_flash(*get_message('PASSWORD_CHANGE')) + return redirect(get_url(_security.post_change_view) or + get_url(_security.post_login_view)) + + if request.json: + return _render_json(form) + + return render_template('security/change_password.html', + change_password_form=form, + **_ctx('change_password')) + + +def create_blueprint(state, import_name): + """Creates the security extension blueprint""" + + bp = Blueprint(state.blueprint_name, import_name, + url_prefix=state.url_prefix, + subdomain=state.subdomain, + template_folder='templates') + + bp.route(state.logout_url, endpoint='logout')(logout) + + if state.passwordless: + bp.route(state.login_url, + methods=['GET', 'POST'], + endpoint='login')(send_login) + bp.route(state.login_url + '/<token>', + endpoint='token_login')(token_login) + else: + bp.route(state.login_url, + methods=['GET', 'POST'], + endpoint='login')(login) + + if state.registerable: + bp.route(state.register_url, + methods=['GET', 'POST'], + endpoint='register')(register) + + if state.recoverable: + bp.route(state.reset_url, + methods=['GET', 'POST'], + endpoint='forgot_password')(forgot_password) + bp.route(state.reset_url + '/<token>', + methods=['GET', 'POST'], + endpoint='reset_password')(reset_password) + + if state.changeable: + bp.route(state.change_url, + methods=['GET', 'POST'], + endpoint='change_password')(change_password) + + if state.confirmable: + bp.route(state.confirm_url, + methods=['GET', 'POST'], + endpoint='send_confirmation')(send_confirmation) + bp.route(state.confirm_url + '/<token>', + methods=['GET', 'POST'], + endpoint='confirm_email')(confirm_email) + + return bp |