首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >FastAPI后台开发基础(16): OAuth2 以及 JWT Token 的使用

FastAPI后台开发基础(16): OAuth2 以及 JWT Token 的使用

原创
作者头像
密码学人CipherHUB
发布2024-11-19 14:40:36
发布2024-11-19 14:40:36
2.2K1
举报
文章被收录于专栏:编码视界编码视界

基于OAuth2 获取账密表单

在这个 FastAPI 应用中,我们使用了 OAuth2PasswordRequestForm 来处理 OAuth 2.0 密码授权流程。这种授权方式允许客户端通过发送用户名和密码直接从授权服务器获取访问令牌,通常用于信任客户端的情况,如用户的设备。

使用 OAuth2PasswordRequestForm 的优点:

  1. 简化客户端实现:客户端不需要单独处理如何发送用户名和密码的细节,只需填充表单即可。
  2. 标准化的请求处理:使用标准的表单方式提交数据,使得请求的处理更加统一和标准化。
  3. 安全性:虽然传输用户名和密码,但通常会结合 HTTPS 使用,确保数据传输的安全性。

功能实现:

/items/ 路径的 POST 请求中,我们通过 OAuth2PasswordRequestForm 依赖来接收表单数据。这个表单包括以下字段:

  • username: 用户名
  • password: 密码
  • scopes: 请求的权限范围,可选
  • client_id: 客户端标识
  • client_secret: 客户端密钥
  • grant_type: 授权类型,这里是 "password",表示密码授权流程

请求示例:

使用 curl 发送请求的示例:

代码语言:bash
复制
curl -X 'POST' \
  'http://127.0.0.1:18081/items/' \
  -H 'accept: application/json' \
  -H 'Content-Type: application/x-www-form-urlencoded' \
  -d 'grant_type=password&username=user_test&password=user_password&scope=test_scope&client_id=test_client_id&client_secret=test_client_secret'

这个请求通过 application/x-www-form-urlencoded 类型的内容,将用户的认证信息发送到服务器。服务器接收这些信息后,可以进行相应的处理,如验证用户身份、生成令牌等。

服务器端处理:

在服务器端,get_items 函数接收这些数据,然后可以进行进一步的处理,如:

  • 验证用户名和密码的正确性
  • 检查客户端 ID 和密钥的有效性
  • 根据请求的权限范围生成相应的访问令牌

最后,函数返回这些信息的字典形式,主要用于演示和调试。在实际应用中,你可能会返回一个生成的访问令牌或相关的错误信息。

代码语言:python
复制
from __future__ import annotations

from typing import Annotated

import uvicorn
from fastapi import Depends, FastAPI
from fastapi.security import OAuth2PasswordRequestForm

app = FastAPI()


@app.post("/items/")
async def get_items(oauth2_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
    """
    curl -X 'POST' \
    'http://127.0.0.1:18081/items/' \
    -H 'accept: application/json' \
    -H 'Content-Type: application/x-www-form-urlencoded' \
    -d 'grant_type=password&username=user_test&password=user_password&scope=test_scope&client_id=test_client_id&client_secret=test_client_secret'

    {
        "grant_type": "password",
        "username": "user_test",
        "password": "user_password",
        "scopes": [
          "test_scope"
        ],
        "client_id": "test_client_id",
        "client_secret": "test_client_secret"
    }
    """
    print('username:', oauth2_data.username,
          '\npassword:', oauth2_data.password,
          '\nclient_id:', oauth2_data.client_id,
          '\nclient_secret:', oauth2_data.client_secret,
          '\nscopes:', oauth2_data.scopes,
          '\ngrant_type:', oauth2_data.grant_type)
    return dict(oauth2_data.__dict__.items())


if __name__ == '__main__':
    uvicorn.run(app, host = '127.0.0.1', port = 18081)
发送账密表单数据
发送账密表单数据
服务端的响应数据
服务端的响应数据
服务端处理日志
服务端处理日志

JWT基于账密表单生成Token

总体流程

处理流程
处理流程

测试代码

代码语言:python
复制
from __future__ import annotations

import datetime

import jwt
import uvicorn
from fastapi import Depends, FastAPI, HTTPException, Request
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm

app = FastAPI()

SECRET_KEY = "your-secret-key"
ALGORITHMS = ["HS256", "HS384", "HS512"]

user_cache = {}  # 临时缓存


def check_token(token: str):
    """
    检查 token 是否有效
    """
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms = ALGORITHMS)
        print('check_token payload:', payload)
        if payload['username'] == user_cache[token]['username'] and payload['password'] == user_cache[token]['password']:
            return True
        else:
            return False
    except jwt.ExpiredSignatureError:
        return False
    except jwt.InvalidTokenError:
        return False


@app.post('/generate_my_token')
def generate_token(request: Request, form_data: OAuth2PasswordRequestForm = Depends()):
    """
    curl -X 'POST' \
    'http://127.0.0.1:18081/generate_my_token' \
    -H 'accept: application/json' \
    -H 'Content-Type: application/x-www-form-urlencoded' \
    -d 'grant_type=password&username=test1&password=pwd1&scope=&client_id=string&client_secret=string'

    {
        "access_token":"eyJhbGciOiJIUzM4NCIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ0ZXN0MSJ9.9GbGTL3B9rQP6nOyJSbMnzr4n9qKnibbC_W0ANsEH9do_rNUfEVbigwWoF1w2ySn",
        "algorithm":"HS384"
    }

    一个 JWT 令牌主要由三个部分组成,它们之间用点(.)分隔
    这三个部分分别是:
        Header(头部)
        Payload(负载)
        Signature(签名)

    Header(头部)
    Header 是一个 JSON 对象,被编码为 Base64Url 字符串:
    echo "eyJhbGciOiJIUzM4NCIsInR5cCI6IkpXVCJ9"|base64 -d
        {"alg":"HS384","typ":"JWT"}

    Payload(负载)
    Payload 也是一个 JSON 对象,被编码为 Base64Url 字符串
    echo "eyJ1c2VybmFtZSI6InN0cmluZyIsInBhc3N3b3JkIjoic3RyaW5nIiwiY2xpZW50X2lkIjoic3RyaW5nIiwiY2xpZW50X3NlY3JldCI6InN0cmluZyIsInNjb3BlcyI6W10sImdyYW50X3R5cGUiOiJwYXNzd29yZCIsImFsZ29yaXRobSI6IkhTMjU2IiwidGltZXN0YW1wIjoiMjAyNC0xMS0xOVQxMjo0ODoxMi4wMTI4NTkifQ"|base64 -d
    {"username":"string","password":"string","client_id":"string","client_secret":"string","scopes":[],"grant_type":"password","algorithm":"HS256","timestamp":"2024-11-19T12:48:12.012859"}

    Signature(签名)
    签名用于验证消息在传递过程中没有被篡改,并且,对于使用私钥签名的令牌,它还可以验证 JWT 的发送方是谁
    """
    print('generate_token HEADER:', request.headers.items())

    # 这里应该有验证用户名和密码的逻辑
    if form_data.username == "" or form_data.password == "":
        raise HTTPException(status_code = 400, detail = "Incorrect username or password")
    # 根据用户名和密码选择加密算法
    v = 0
    for i in form_data.username + form_data.password:
        v += ord(i)
    enc_alg = ALGORITHMS[v % len(ALGORITHMS)]

    payload = {
        'username': form_data.username,
        'password': form_data.password,
        'client_id': form_data.client_id,
        'client_secret': form_data.client_secret,
        'scopes': form_data.scopes,
        'grant_type': form_data.grant_type,
        'algorithm': enc_alg,
        'timestamp': datetime.datetime.now().isoformat()
    }
    # 简单示例:生成一个 JWT
    token = jwt.encode(payload, SECRET_KEY, algorithm = enc_alg)
    user_cache[token] = {'username': form_data.username,
                         'password': form_data.password,
                         'login_user': token,
                         'client_id': form_data.client_id,
                         'client_secret': form_data.client_secret,
                         'scopes': form_data.scopes,
                         'grant_type': form_data.grant_type}
    print(check_token(token))
    return {"access_token": token, "algorithm": enc_alg}


# OAuth2PasswordBearer 会尝试从请求的授权头中提取 Bearer 令牌
# 如果没有找到令牌或令牌格式不正确,它将返回一个错误响应
# -H "Authorization: Bearer eyJhbGciOiJIUzM4NCIsInR5cCI6IkpXVCJ9.xxxxxx"
oauth2_pwd_bearer = OAuth2PasswordBearer(tokenUrl = 'generate_my_token')


async def get_current_user(token: str = Depends(oauth2_pwd_bearer)):
    print('get_current_user, login_user:', token)
    if token not in user_cache:
        raise HTTPException(status_code = 401, detail = "Invalid token")
    if not check_token(token):
        raise HTTPException(status_code = 401, detail = "Token check failed")
    return {'login-user': user_cache[token]}


@app.get('/items')
async def read_items(request: Request, login_user: str = Depends(get_current_user)):
    """
    curl -X GET http://127.0.0.1:18081/items \
     -H "Authorization: Bearer eyJhbGciOiJIUzM4NCIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ0ZXN0MSJ9.9GbGTL3B9rQP6nOyJSbMnzr4n9qKnibbC_W0ANsEH9do_rNUfEVbigwWoF1w2ySn"
    """
    print('read_items HEADER:', request.headers.items())
    return {'response': login_user}


if __name__ == '__main__':
    uvicorn.run(app, host = '127.0.0.1', port = 18081)

1. 生成令牌 (/generate_my_token)

  • 路径操作POST /generate_my_token
  • 功能:接收用户的用户名、密码、客户端 ID 和客户端密钥等信息,并生成一个 JWT 令牌。
  • 实现细节
    • 使用 OAuth2PasswordRequestForm 从请求中提取表单数据。
    • 验证用户名和密码是否提供(简单检查,实际应用中应连接到用户数据库进行验证)。
    • 根据用户名和密码的字符值选择一个加密算法(HS256, HS384, HS512 中的一个)。
    • 构建 JWT 的 payload,包括用户名、密码、客户端信息、授权类型、时间戳等。
    • 使用选定的算法和密钥生成 JWT。
    • 将生成的 JWT 存储在一个简单的缓存(字典)中,以便后续验证使用。
    • 返回生成的 JWT 和使用的算法。

2. 验证令牌

  • 函数check_token
  • 功能:验证给定的 JWT 是否有效。
  • 实现细节
    • 使用 jwt 库解码和验证 JWT。
    • 检查解码后的 payload 是否与缓存中的用户信息匹配。
    • 捕获并处理可能的异常,如过期的令牌或格式错误的令牌。

3. 获取当前用户信息

  • 依赖函数get_current_user
  • 功能:从请求中提取 JWT,并验证其有效性,然后返回与该令牌关联的用户信息。
  • 实现细节
    • 使用 OAuth2PasswordBearer 从请求的授权头中提取 JWT。
    • 调用 check_token 验证令牌。
    • 如果验证失败,抛出 HTTP 401 错误。
    • 如果验证成功,返回缓存中的用户信息。

4. 读取受保护的资源 (/items)

  • 路径操作GET /items
  • 功能:用户必须提供有效的 JWT 才能访问此端点。
  • 实现细节
    • 使用 Depends(get_current_user) 确保只有提供了有效 JWT 的用户才能访问。
    • 返回与登录用户关联的信息。
生成 jwt-token
生成 jwt-token
基于生成的 token 访问其他接口
基于生成的 token 访问其他接口
未生成 token 时访问接口被拦截
未生成 token 时访问接口被拦截

使用非对称密钥进行JWT Token的生成和校验

使用 ecc 密钥为例进行 token 的生成和校验

代码语言:python
复制
from __future__ import annotations

import datetime

import jwt
import uvicorn
from cryptography.hazmat.primitives import serialization
from fastapi import Depends, FastAPI, Request
from fastapi.security import OAuth2PasswordRequestForm

app = FastAPI()

# 读取私钥
with open('oauth2_jwt_token_ecc_private.pem', 'rb') as f:
    private_key = serialization.load_pem_private_key(
        f.read(),
        password = None,
    )

with open('oauth2_jwt_token_ecc_public.pem', 'rb') as f:
    public_key = serialization.load_pem_public_key(
        f.read(),
    )


def check_token(token: str):
    """
    验证 token
    """
    print('internal check token:', token)
    try:
        payload = jwt.decode(token, public_key, algorithms = ['ES512'])
        print('check_token, payload:', payload)
        return True
    except jwt.exceptions.InvalidSignatureError:
        print('check_token, InvalidSignatureError')
        return False
    except jwt.exceptions.ExpiredSignatureError:
        print('check_token, ExpiredSignatureError')
        return False
    except jwt.exceptions.InvalidTokenError:
        print('check_token, InvalidTokenError')
        return False


@app.post('/generate_my_ecc_jwt_token')
async def generate_my_ecc_jwt_token(request: Request, form_data: OAuth2PasswordRequestForm = Depends()):
    """
    支持的非对称算法:

        RS256 - RSA 使用 SHA-256

        RS384 - RSA 使用 SHA-384

        RS512 - RSA 使用 SHA-512

        ES256 - ECDSA 使用 P-256 和 SHA-256

        ES384 - ECDSA 使用 P-384 和 SHA-384

        ES512 - ECDSA 使用 P-521 和 SHA-512

        PS256 - RSA SSA-PSS 使用 SHA-256 和 MGF1

        PS384 - RSA SSA-PSS 使用 SHA-384 和 MGF1

        PS512 - RSA SSA-PSS 使用 SHA-512 和 MGF1

    curl -X 'POST' \
    'http://127.0.0.1:18081/generate_my_ecc_jwt_token' \
    -H 'accept: application/json' \
    -H 'Content-Type: application/x-www-form-urlencoded' \
    -d 'grant_type=password&username=string&password=string&scope=&client_id=string&client_secret=string'

     {
         "access_token": "eyJhbGciOiJFUzUxMiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6InN0cmluZyIsInBhc3N3b3JkIjoic3RyaW5nIiwiY2xpZW50X2lkIjoic3RyaW5nIiwiY2xpZW50X3NlY3JldCI6InN0cmluZyIsInNjb3BlcyI6W10sImdyYW50X3R5cGUiOiJwYXNzd29yZCJ9.AH62PD4t1x3oE-tJXzy3KimxX6Rvz1uY8J8EtCx92tDg6bId98BJUP-dkL_46hB8JOq4VbwJO77xEh_YOiEHJs-tAXfPgIawtSXgtraY1yRVOoEAN4f-bqdhqOeloPKIE3xH5ly1AD5LvLH7qLUxLI6Geil16Y67QuDw6N7C0kJXZe6j"
     }
    """
    print('generate_my_ecc_jwt_token HEADER:', request.headers.items())

    # 生成 JWT
    payload = {
        'username': form_data.username,
        'password': form_data.password,
        'client_id': form_data.client_id,
        'client_secret': form_data.client_secret,
        'scopes': form_data.scopes,
        'grant_type': form_data.grant_type,
        'algorithm': 'ES512',
        'timestamp': datetime.datetime.now().isoformat()
    }
    token = jwt.encode(payload, private_key, algorithm = 'ES512')

    print(check_token(token))

    return {'access_token': token}


if __name__ == '__main__':
    uvicorn.run(app, host = '127.0.0.1', port = 18081)

公钥

代码语言:python
复制
-----BEGIN PUBLIC KEY-----
MIGbMBAGByqGSM49AgEGBSuBBAAjA4GGAAQAPcXcQJsSY3z+WaFTxkxNxVn6HRat
2ofsvGDbZ06wkCWc08BNXPLQ4COPAvBE1rT0wqS4MkMmgI0F2jfWUU0BEiwAr5h8
JqEOZFpxTrm2tG/TX8j7SJH2/LFxLF9riv8jWjJLqdiuBYKBrLF2cxSjpGyku3ef
WuQBuHs81LHOSjX7Wqk=
-----END PUBLIC KEY-----

私钥

代码语言:python
复制
-----BEGIN PRIVATE KEY-----
MIHuAgEAMBAGByqGSM49AgEGBSuBBAAjBIHWMIHTAgEBBEIBVp+PXoGods37Go4R
UIk6LKvBVisTB+OEKJVQb37omfHihXbKrj2AaWF34AH13/r0QNcRmhOfpb/0PjHp
eCjuK8yhgYkDgYYABAA9xdxAmxJjfP5ZoVPGTE3FWfodFq3ah+y8YNtnTrCQJZzT
wE1c8tDgI48C8ETWtPTCpLgyQyaAjQXaN9ZRTQESLACvmHwmoQ5kWnFOuba0b9Nf
yPtIkfb8sXEsX2uK/yNaMkup2K4FgoGssXZzFKOkbKS7d59a5AG4ezzUsc5KNfta
qQ==
-----END PRIVATE KEY-----

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 基于OAuth2 获取账密表单
    • 使用 OAuth2PasswordRequestForm 的优点:
    • 功能实现:
    • 请求示例:
    • 服务器端处理:
  • JWT基于账密表单生成Token
    • 总体流程
    • 测试代码
      • 1. 生成令牌 (/generate_my_token)
      • 2. 验证令牌
      • 3. 获取当前用户信息
      • 4. 读取受保护的资源 (/items)
  • 使用非对称密钥进行JWT Token的生成和校验
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档