用户系统
2023年8月3日大约 2 分钟
用户系统
JWT 扩展
配置项
import datetime
import os
class Config(object):
SECRET_KEY = '1mQYKQ/EjZw3GEYju5Xg220DlVej+t+2Uc0rCtCx0kno0vbf5ZSsiPIViCxxmf0qpoupYJAHTjIm992KSj+9Ww==' # 密钥, cookie、session等利用该密钥加密
JWT_SECRET_KEY = SECRET_KEY # 生成jwt所需的密钥,如果没有设置,则按照 SECRET_KEY 加密
JWT_ACCESS_TOKEN_EXPIRES = datetime.timedelta(days=1) # 过期时间
# 传递jwt时的格式
JWT_HEADER_NAME = 'Authorization'
JWT_HEADER_TYPE = 'Bearer' # 可以修改为其他值,如,类似 django的 JWT
...
要访问受保护的jwt_required视图,您需要根据每个请求发送 JWT。默认情况下,这一点是用一个类似:
Authorization: Bearer <access_token>
ge_userinfo(){
this.$axios.get('userinfo', {
headers:{
'Authorization': 'Bearer ' + this.token
}
})
.then(resp=>{
console.log(resp.data)
})
}
token扩展
登录成功之后,需要 生成token,因此使用
Flask-JWT-Extended
扩展,生成token
...
from flask_jwt_extended import JWTManager
...
jwt = JWTManager()
def config_extensions(app):
...
jwt.init_app(app)
模型类
用户模型类
from werkzeug.security import generate_password_hash, check_password_hash
from app.extensions import db
class User(db.Model):
__tablename__ = 'tb_user'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
username = db.Column(db.String(20), unique=True)
_password = db.Column(db.String(256))
# 设置访问密码的方法,并用装饰器@property设置为属性,调用时不用加括号
@property
def password(self):
return self._password
# 设置加密的方法,传入密码,对类属性进行操作
@password.setter
def password(self, value):
self._password = generate_password_hash(value)
# 设置验证密码的方法
def check_password(self, user_pad):
return check_password_hash(self._password, user_pad)
注册
from flask_restful import Resource
from flask_restful import reqparse
from sqlalchemy.exc import SQLAlchemyError
from app.extensions import db
from .models import User
class RegisterView(Resource):
def post(self):
parser = reqparse.RequestParser()
parser.add_argument('username', type=str, location=['json', 'form'], required=True)
parser.add_argument('password', type=str, location=['json', 'form'], required=True)
args = parser.parse_args()
user = User(**args)
db.session.add(user)
try:
db.session.commit()
except SQLAlchemyError as e:
print(e)
return {'msg': '注册失败'}, 400
return {'msg': '注册成功'}, 201
登录
- 获取用户名和密码
- 根据用户名查询用户对象
- 查询到用户,校验密码
- 成功,生成token
- 失败, 登录失败
- 查询不到,登录失败
- 查询到用户,校验密码
from flask_jwt_extended import create_access_token
from flask_restful import Resource
from flask_restful import reqparse
from sqlalchemy.exc import SQLAlchemyError
from .models import User
class LoginView(Resource):
def post(self):
parser = reqparse.RequestParser()
parser.add_argument('username', type=str, location=['json', 'form'], required=True)
parser.add_argument('password', type=str, location=['json', 'form'], required=True)
args = parser.parse_args()
username = args.get('username')
password = args.get('password')
user = User.query.filter_by(username=username).first()
if not user:
return {"msg": "Bad username or password"}, 401
if not user.check_password(password):
return {"msg": "Bad username or password"}, 401
access_token = create_access_token(identity={"username": username, 'id': user.id})
return {'token': access_token, 'username': username}, 200
权限校验
验证装饰器
from functools import wraps
from flask_jwt_extended import verify_jwt_in_request, get_jwt
def jwt_required(fn):
@wraps(fn)
def decorator(*args, **kwargs):
verify_jwt_in_request()
claims = get_jwt()
if claims.get("sub"):
return fn(*args, **kwargs)
else:
return {'msg': "认证失败"}, 401
return decorator
装饰器验证视图类
from flask_jwt_extended import get_jwt_identity
from flask_restful import Resource
from app.utils import jwt_required
class UserinfoView(Resource):
method_decorators = [jwt_required]
def get(self):
current_user = get_jwt_identity() # 解析token,得到载荷数据,格式为字典
return {
'id': current_user.get('id'),
'username': current_user.get('username')
}