跳至主要內容

用户系统

Mr.Liu大约 2 分钟

用户系统

JWT 扩展

配置项

import datetime
import os


class Config(object):
    SECRET_KEY = '1mQYKQ/EjZw3GEYju5Xg220DlVej+t+2Uc0rCtCx0kno0vbf5ZSsiPIViCxxmf0qpoupYJAHTjIm992KSj+9Ww=='  # 密钥, cookie、session等利用该密钥加密
    
    JWT_SECRET_KEY = SECRET_KEY  # 生成jwt所需的密钥,如果没有设置,则按照 SECRET_KEY 加密
    
    JWT_ACCESS_TOKEN_EXPIRES = datetime.timedelta(days=1)  # 过期时间
    
    # 传递jwt时的格式
    JWT_HEADER_NAME = 'Authorization'
    JWT_HEADER_TYPE = 'Bearer' # 可以修改为其他值,如,类似 django的 JWT
    ...

要访问受保护的jwt_required视图,您需要根据每个请求发送 JWT。默认情况下,这一点是用一个类似:Authorization: Bearer <access_token>

ge_userinfo(){
    this.$axios.get('userinfo', {
        headers:{
            'Authorization': 'Bearer ' + this.token
        }
    })
    .then(resp=>{
        console.log(resp.data)
    })
}

token扩展

登录成功之后,需要 生成token,因此使用 Flask-JWT-Extended扩展,生成token

...
from flask_jwt_extended import JWTManager

...
jwt = JWTManager()


def config_extensions(app):
    ...
    jwt.init_app(app)

模型类

用户模型类

from werkzeug.security import generate_password_hash, check_password_hash
from app.extensions import db


class User(db.Model):
    __tablename__ = 'tb_user'
    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    username = db.Column(db.String(20), unique=True)
    _password = db.Column(db.String(256))

    # 设置访问密码的方法,并用装饰器@property设置为属性,调用时不用加括号
    @property
    def password(self):
        return self._password

    # 设置加密的方法,传入密码,对类属性进行操作
    @password.setter
    def password(self, value):
        self._password = generate_password_hash(value)

    # 设置验证密码的方法
    def check_password(self, user_pad):
        return check_password_hash(self._password, user_pad)

注册

from flask_restful import Resource
from flask_restful import reqparse
from sqlalchemy.exc import SQLAlchemyError
from app.extensions import db
from .models import User


class RegisterView(Resource):
    def post(self):
        parser = reqparse.RequestParser()
        parser.add_argument('username', type=str, location=['json', 'form'], required=True)
        parser.add_argument('password', type=str, location=['json', 'form'], required=True)

        args = parser.parse_args()

        user = User(**args)
        db.session.add(user)
        try:
            db.session.commit()
        except SQLAlchemyError as e:
            print(e)
            return {'msg': '注册失败'}, 400
        return {'msg': '注册成功'}, 201

登录

  • 获取用户名和密码
  • 根据用户名查询用户对象
    • 查询到用户,校验密码
      • 成功,生成token
      • 失败, 登录失败
    • 查询不到,登录失败
from flask_jwt_extended import create_access_token
from flask_restful import Resource
from flask_restful import reqparse
from sqlalchemy.exc import SQLAlchemyError
from .models import User


class LoginView(Resource):

    def post(self):
        parser = reqparse.RequestParser()
        parser.add_argument('username', type=str, location=['json', 'form'], required=True)
        parser.add_argument('password', type=str, location=['json', 'form'], required=True)

        args = parser.parse_args()
        username = args.get('username')
        password = args.get('password')

        user = User.query.filter_by(username=username).first()
        if not user:
            return {"msg": "Bad username or password"}, 401

        if not user.check_password(password):
            return {"msg": "Bad username or password"}, 401

        access_token = create_access_token(identity={"username": username, 'id': user.id})
        return {'token': access_token, 'username': username}, 200

权限校验

验证装饰器

from functools import wraps
from flask_jwt_extended import verify_jwt_in_request, get_jwt


def jwt_required(fn):
    @wraps(fn)
    def decorator(*args, **kwargs):
        verify_jwt_in_request()
        claims = get_jwt()
        if claims.get("sub"):
            return fn(*args, **kwargs)
        else:
            return {'msg': "认证失败"}, 401

    return decorator

装饰器验证视图类

from flask_jwt_extended import get_jwt_identity
from flask_restful import Resource
from app.utils import jwt_required

class UserinfoView(Resource):
    method_decorators = [jwt_required]

    def get(self):
        current_user = get_jwt_identity() # 解析token,得到载荷数据,格式为字典
        
        return {
            'id': current_user.get('id'),
            'username': current_user.get('username')
        }