废话不多说,直接上代码。
目录结构, 由于我也是刚开始学这个框架,只是了解了怎么注册蓝图,JWT的集成,数据库的集成,想了解更多,自行打开官方文档去详细阅读。fastapi官网文档链接
创建一个main.py文件, 我这个是添加了蓝图, 关键字:
from fastapi import FastAPI
from text import demo
from wx import test_client
from sql_conf import models, database
models.Base.metadata.create_all(bind=database.engine)
app = FastAPI()
app.include_router(demo.router, prefix="/api")
app.include_router(test_client.router, prefix="/wx")
if __name__ == '__main__':
import uvicorn
uvicorn.run(
app='main:app',
host="0.0.0.0",
port=8082,
reload=True,
debug=True
)
auth.py用来jwt的校验和生成
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# Eamil: 1922878025@qq.com
# @Author: Wyc
# @Time: 3:25 下午
import jwt
from fastapi import HTTPException, Security, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBasic
from passlib.context import CryptContext
from datetime import datetime, timedelta
class AuthHandler():
security = HTTPBasic()
pwd_content = CryptContext(schemes=['bcrypt'], deprecated='auto')
secret = 'SECRET'
# 密码加密
def get_password_has(self, password):
return self.pwd_content.hash(password)
# 密码校验
def verify_password(self, plain_password, hashed_password):
return self.pwd_content.verify(plain_password, hashed_password)
# 生成token
def encode_token(self, user_id):
payload = {
'exp': datetime.utcnow() + timedelta(days=0, minutes=120),
'iat': datetime.utcnow(),
'sub': user_id
}
return str(jwt.encode(payload, self.secret, algorithm='HS256'))
# token解码
def decode_token(self, token):
try:
payload = jwt.decode(token, self.secret, algorithms=['HS256'])
return payload['sub']
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="签名已过期"
)
except jwt.InvalidTokenError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='签名无效!')
def auth_wrapper(self, oauth: HTTPAuthorizationCredentials = Security(security)):
return self.decode_token(oauth.credentials)
wx,test两个目录,都是单独的一个app。
wx目录下的test_client.py
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# Eamil: 1922878025@qq.com
# @Author: Wyc
# @Time: 4:00 下午
from fastapi import status, APIRouter, Depends, Path, Query
from fastapi.responses import JSONResponse
from sql_conf.database import SessionLocal
from typing import Union, Any
from sql_conf.models import User
from sql_conf.crud import db_create
from sqlalchemy.orm import Session
from sql_conf.db_session import get_db
from .data_model import UserItem, USerLgoin
from auth import AuthHandler
auth_middle = AuthHandler()
router = APIRouter(
tags=["wx"],
responses={404: {"description": "未找到接口!"}}
)
@router.get('/')
async def read_main(token_data: Union[str, Any] = Depends(auth_middle.decode_token),):
content = {"msg": "hello wx"}
return JSONResponse(status_code=status.HTTP_200_OK, content=content)
@router.post('/create_user')
async def create_user_info( Item: UserItem, db: Session = Depends(get_db)):
result = {'code': 0, "message": ""}
request_dict = Item.dict()
login = request_dict.get('login')
email = request_dict.get('email')
pwd = request_dict.get("hashed_password")
if not login:
return JSONResponse(status_code=status.HTTP_201_CREATED, content={"code": 201, "msg": '请输入账号!'})
has_pwd = auth_middle.get_password_has(pwd)
save_user_info = {
'email': email,
'login': login,
'hashed_password': has_pwd
}
comiit_user = db_create(db=db, payload=save_user_info)
if comiit_user:
result['code'] = 200
result['message'] = '注册成功!'
else:
result['code'] = 201
result['message'] = '注册失败!'
return JSONResponse(status_code=status.HTTP_200_OK, content=result)
@router.post('/login_user')
async def user_login(Data: USerLgoin, db: Session=Depends(get_db)):
result = {}
user_dict = Data.dict()
user_name = user_dict.get('username')
pwd = user_dict.get('password')
filter_data = db.query(User).filter_by(login=user_name).first()
version_pwd = auth_middle.verify_password(plain_password=pwd, hashed_password=filter_data.hashed_password)
print(version_pwd)
if version_pwd:
token = auth_middle.encode_token(user_id=filter_data.id)
print(token)
result['code'] = 200
result['msg'] = '登录成功!'
result['token'] = token
else:
result['code'] = 201
result['msg'] = '登录失败!'
return JSONResponse(status_code=status.HTTP_200_OK, content=result)
@router.get('/get_user/', summary="获取用户信息")
async def get_user_info(token_data: Union[str, Any] = Depends(auth_middle.decode_token), db: Session = Depends(get_db)):
result = {}
filter_user = db.query(User).filter_by(id=int(token_data)).first()
if filter_user:
result['code'] = 200
result['message'] = '用户信息'
result['userinfo'] = {
'login': filter_user.login,
'email': filter_user.email,
'pwd': filter_user.hashed_password
}
else:
result['code'] = 201
result['message'] = "未获取到用户信息!"
return JSONResponse(status_code=status.HTTP_200_OK, content=result)
data_model.py 用来创建数据模型
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# Eamil: 1922878025@qq.com
# @Author: Wyc
# @Time: 4:02 下午
from pydantic import BaseModel
class UserItem(BaseModel):
login: str
email: str
hashed_password: str
class USerLgoin(BaseModel):
username: str
password: str
text目录demo.py文件
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# Eamil: 1922878025@qq.com
# @Author: Wyc
# @Time: 3:48 下午
from typing import Optional
from fastapi import FastAPI, Body, status, APIRouter
from pydantic import BaseModel
from fastapi.responses import JSONResponse
router = APIRouter(
tags=["items"],
responses={404: {"description": "Not found"}}
)
class Item(BaseModel):
name: str
price: float
is_offer: Optional[bool] = None
@router.get('/')
def index():
content = {"hello": "world"}
return JSONResponse(status_code=status.HTTP_200_OK, content=content)
@router.get("/items/{item_id}")
def read_item(item_id: int, q: Optional[str] = None):
return {'itrm_id': item_id, "q": q}
@router.put("/items/{item_id}")
def update_item(item_id: int, item: Item):
print(item_id)
print(item)
return {"item_name": item.name, "item_id": item_id}
sql_conf目录用于配置数据库
创建 crud.py, database.py, db_session.py, models.py
crud.py 数据库的增删改查
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# Eamil: 1922878025@qq.com
# @Author: Wyc
# @Time: 4:52 下午
from sql_conf import models
from sqlalchemy.orm import Session
def db_create(db: Session, payload):
try:
db_user = models.User(**payload)
db.add(db_user)
db.flush()
db.commit()
return db_user
except Exception as e:
print("错误: {}".format(e))
数据库的连接 databases.py
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# Eamil: 1922878025@qq.com
# @Author: Wyc
# @Time: 4:52 下午
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
db_conf = {
'user': 'user',
'password': '密码',
'host': 'ip',
'port': '3306',
'name': 'demo'
}
# 数据库访问地址
SQLALCHEMY_DATABASE_URL = "mysql+pymysql://{user}:{password}@{host}:{port}/{name}".format(
user=db_conf['user'],
password=db_conf['password'],
host=db_conf['host'],
port=db_conf['port'],
name=db_conf['name']
)
# 启动引擎
engine = create_engine(
SQLALCHEMY_DATABASE_URL
)
# 创建会话
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# 数据模型基类
Base = declarative_base()
models.py创建模型
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# Eamil: 1922878025@qq.com
# @Author: Wyc
# @Time: 4:52 下午
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from .database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
login = Column(String(length=60), index=True)
email = Column(String(length=255), unique=True, index=True, )
hashed_password = Column(String(length=255))
is_active = Column(Boolean, default=True)
db_session.py 数据库会话信息
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# Eamil: 1922878025@qq.com
# @Author: Wyc
# @Time: 2:59 下午
from .database import SessionLocal
def get_db():
db = ""
try:
db = SessionLocal()
yield db
finally:
db.close()