# -*- coding: utf-8 -*- """ flask.ext.security.views ~~~~~~~~~~~~~~~~~~~~~~~~ Flask-Security views module :copyright: (c) 2012 by Matt Wright. :license: MIT, see LICENSE for more details. """ from flask import current_app, redirect, request, render_template, jsonify, \ after_this_request, Blueprint from flask_login import current_user from werkzeug.datastructures import MultiDict from werkzeug.local import LocalProxy from .confirmable import send_confirmation_instructions, \ confirm_user, confirm_email_token_status from .decorators import login_required, anonymous_user_required from .passwordless import send_login_instructions, \ login_token_status from .recoverable import reset_password_token_status, \ send_reset_password_instructions, update_password from .changeable import change_user_password from .registerable import register_user from .utils import get_url, get_post_login_redirect, do_flash, \ get_message, login_user, logout_user, url_for_security as url_for, \ config_value # Convenient references _security = LocalProxy(lambda: current_app.extensions['security']) _datastore = LocalProxy(lambda: _security.datastore) def _render_json(form, include_auth_token=False): has_errors = len(form.errors) > 0 if has_errors: code = 400 response = dict(errors=form.errors) else: code = 200 response = dict(user=dict(id=str(form.user.id))) if include_auth_token: token = form.user.get_auth_token() response['user']['authentication_token'] = token return jsonify(dict(meta=dict(code=code), response=response)) def _commit(response=None): _datastore.commit() return response def _ctx(endpoint): return _security._run_ctx_processor(endpoint) @anonymous_user_required def login(): """View function for login view""" form_class = _security.login_form if request.json: form = form_class(MultiDict(request.json)) else: form = form_class() if form.validate_on_submit(): login_user(form.user, remember=form.remember.data) after_this_request(_commit) if not request.json: return redirect(get_post_login_redirect()) form.next.data = get_url(request.args.get('next')) \ or get_url(request.form.get('next')) or '' if request.json: return _render_json(form, True) return render_template(config_value('LOGIN_USER_TEMPLATE'), login_user_form=form, **_ctx('login')) @login_required def logout(): """View function which handles a logout request.""" logout_user() return redirect(request.args.get('next', None) or get_url(_security.post_logout_view)) def register(): """View function which handles a registration request.""" if _security.confirmable or request.json: form_class = _security.confirm_register_form else: form_class = _security.register_form if request.json: form_data = MultiDict(request.json) else: form_data = request.form form = form_class(form_data) if form.validate_on_submit(): user = register_user(**form.to_dict()) form.user = user if not _security.confirmable or _security.login_without_confirmation: after_this_request(_commit) login_user(user) if not request.json: post_register_url = get_url(_security.post_register_view) post_login_url = get_url(_security.post_login_view) return redirect(post_register_url or post_login_url) if request.json: return _render_json(form) return render_template(config_value('REGISTER_USER_TEMPLATE'), register_user_form=form, **_ctx('register')) def send_login(): """View function that sends login instructions for passwordless login""" form_class = _security.passwordless_login_form if request.json: form = form_class(MultiDict(request.json)) else: form = form_class() if form.validate_on_submit(): send_login_instructions(form.user) if request.json is None: do_flash(*get_message('LOGIN_EMAIL_SENT', email=form.user.email)) if request.json: return _render_json(form) return render_template(config_value('SEND_LOGIN_TEMPLATE'), send_login_form=form, **_ctx('send_login')) @anonymous_user_required def token_login(token): """View function that handles passwordless login via a token""" expired, invalid, user = login_token_status(token) if invalid: do_flash(*get_message('INVALID_LOGIN_TOKEN')) if expired: send_login_instructions(user) do_flash(*get_message('LOGIN_EXPIRED', email=user.email, within=_security.login_within)) if invalid or expired: return redirect(url_for('login')) login_user(user, True) after_this_request(_commit) do_flash(*get_message('PASSWORDLESS_LOGIN_SUCCESSFUL')) return redirect(get_post_login_redirect()) def send_confirmation(): """View function which sends confirmation instructions.""" form_class = _security.send_confirmation_form if request.json: form = form_class(MultiDict(request.json)) else: form = form_class() if form.validate_on_submit(): send_confirmation_instructions(form.user) if request.json is None: do_flash(*get_message('CONFIRMATION_REQUEST', email=form.user.email)) if request.json: return _render_json(form) return render_template(config_value('SEND_CONFIRMATION_TEMPLATE'), send_confirmation_form=form, **_ctx('send_confirmation')) @anonymous_user_required def confirm_email(token): """View function which handles a email confirmation request.""" expired, invalid, user = confirm_email_token_status(token) if not user or invalid: invalid = True do_flash(*get_message('INVALID_CONFIRMATION_TOKEN')) if expired: send_confirmation_instructions(user) do_flash(*get_message('CONFIRMATION_EXPIRED', email=user.email, within=_security.confirm_email_within)) if invalid or expired: return redirect(get_url(_security.confirm_error_view) or url_for('send_confirmation')) confirm_user(user) login_user(user, True) after_this_request(_commit) do_flash(*get_message('EMAIL_CONFIRMED')) return redirect(get_url(_security.post_confirm_view) or get_url(_security.post_login_view)) def forgot_password(): """View function that handles a forgotten password request.""" form_class = _security.forgot_password_form if request.json: form = form_class(MultiDict(request.json)) else: form = form_class() if form.validate_on_submit(): send_reset_password_instructions(form.user) if request.json is None: do_flash(*get_message('PASSWORD_RESET_REQUEST', email=form.user.email)) if request.json: return _render_json(form) return render_template(config_value('FORGOT_PASSWORD_TEMPLATE'), forgot_password_form=form, **_ctx('forgot_password')) @anonymous_user_required def reset_password(token): """View function that handles a reset password request.""" expired, invalid, user = reset_password_token_status(token) if invalid: do_flash(*get_message('INVALID_RESET_PASSWORD_TOKEN')) if expired: do_flash(*get_message('PASSWORD_RESET_EXPIRED', email=user.email, within=_security.reset_password_within)) if invalid or expired: return redirect(url_for('forgot_password')) form = _security.reset_password_form() if form.validate_on_submit(): after_this_request(_commit) update_password(user, form.password.data) do_flash(*get_message('PASSWORD_RESET')) login_user(user, True) return redirect(get_url(_security.post_reset_view) or get_url(_security.post_login_view)) return render_template(config_value('RESET_PASSWORD_TEMPLATE'), reset_password_form=form, reset_password_token=token, **_ctx('reset_password')) @login_required def change_password(): """View function which handles a change password request.""" form_class = _security.change_password_form if request.json: form = form_class(MultiDict(request.json)) else: form = form_class() if form.validate_on_submit(): after_this_request(_commit) change_user_password(current_user, form.new_password.data) if request.json is None: do_flash(*get_message('PASSWORD_CHANGE')) return redirect(get_url(_security.post_change_view) or get_url(_security.post_login_view)) if request.json: return _render_json(form) return render_template('security/change_password.html', change_password_form=form, **_ctx('change_password')) def create_blueprint(state, import_name): """Creates the security extension blueprint""" bp = Blueprint(state.blueprint_name, import_name, url_prefix=state.url_prefix, subdomain=state.subdomain, template_folder='templates') bp.route(state.logout_url, endpoint='logout')(logout) if state.passwordless: bp.route(state.login_url, methods=['GET', 'POST'], endpoint='login')(send_login) bp.route(state.login_url + '/', endpoint='token_login')(token_login) else: bp.route(state.login_url, methods=['GET', 'POST'], endpoint='login')(login) if state.registerable: bp.route(state.register_url, methods=['GET', 'POST'], endpoint='register')(register) if state.recoverable: bp.route(state.reset_url, methods=['GET', 'POST'], endpoint='forgot_password')(forgot_password) bp.route(state.reset_url + '/', methods=['GET', 'POST'], endpoint='reset_password')(reset_password) if state.changeable: bp.route(state.change_url, methods=['GET', 'POST'], endpoint='change_password')(change_password) if state.confirmable: bp.route(state.confirm_url, methods=['GET', 'POST'], endpoint='send_confirmation')(send_confirmation) bp.route(state.confirm_url + '/', methods=['GET', 'POST'], endpoint='confirm_email')(confirm_email) return bp