FastAPI 结合 JWT
创始人
2024-09-26 00:46:44
0

文章目录

  • FastAPI 结合 JWT
  • 步骤
  • 安装
  • 步骤
    • 导入必要的模块
    • 设置配置和初始化应用
    • 创建数据模型
    • 实现辅助函数
      • 生成 JWT Token
      • 获取用户数据
      • 验证密码
      • 获取当前用户
    • 用户登录获取 Token
    • 受保护的路由示例
  • 所有代码
  • 测试
    • 获取 Token
    • 访问受保护的路由
      • token正确
      • token错误
  • 总结
  • 注意

FastAPI 结合 JWT

JWT(JSON Web Token)是一种基于 JSON 的开放标准(RFC 7519),用于在各方之间传递安全可靠的信息。JWT 可以签名(使用 HMAC 算法或 RSA 等),从而可以验证内容是否被篡改。JWT 通常用于认证和授权流程。

步骤

在 FastAPI 中,JWT 主要用于保护 API 路由,使其只允许经过身份验证的用户访问。认证流程大致如下:

  1. **用户登录:**用户通过提交用户名和密码获取 JWT。
  2. **获取 Token:**服务器验证用户凭据后,生成并返回 JWT 给用户。
  3. **访问受保护的路由:**用户在访问受保护的路由时,需要在请求头中携带该 JWT。
  4. **Token 验证:**服务器验证 JWT 的合法性和有效性,允许或拒绝访问受保护资源。

安装

pip install fastapi uvicorn pyjwt  

步骤

导入必要的模块

from fastapi import FastAPI, Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from pydantic import BaseModel from datetime import datetime, timedelta, timezone import jwt  

设置配置和初始化应用

SECRET_KEY = "your_secret_key"  # 用于签名 JWT 的密钥 ALGORITHM = "HS256"             # 加密算法 ACCESS_TOKEN_EXPIRE_MINUTES = 30  # Token 过期时间  app = FastAPI()  oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")  

创建数据模型

class Token(BaseModel):     access_token: str     token_type: str  class TokenData(BaseModel):     username: str | None = None  class User(BaseModel):     username: str     email: str | None = None     full_name: str | None = None     disabled: bool | None = None  class UserInDB(User):     hashed_password: str  

实现辅助函数

生成 JWT Token

def create_access_token(data: dict, expires_delta: timedelta | None = None):     to_encode = data.copy()     if expires_delta:         expire = datetime.now(timezone.utc) + expires_delta     else:         expire = datetime.now(timezone.utc) + timedelta(minutes=15)     to_encode.update({"exp": expire})     encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)     return encoded_jwt  

获取用户数据

fake_users_db = {     "testuser": {         "username": "testuser",         "full_name": "Test User",         "email": "testuser@example.com",         "hashed_password": "fakehashedpassword",         "disabled": False,     } }  def get_user(db, username: str):     if username in db:         user_dict = db[username]         return UserInDB(**user_dict)  

验证密码

def verify_password(plain_password, hashed_password):     return plain_password == hashed_password  

获取当前用户

async def get_current_user(token: str = Depends(oauth2_scheme)):     credentials_exception = HTTPException(         status_code=status.HTTP_401_UNAUTHORIZED,         detail="Could not validate credentials",         headers={"WWW-Authenticate": "Bearer"},     )     try:         payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])         username: str = payload.get("sub")         if username is None:             raise credentials_exception         token_data = TokenData(username=username)     except jwt.PyJWTError:         raise credentials_exception     user = get_user(fake_users_db, username=token_data.username)     if user is None:         raise credentials_exception     return user  async def get_current_active_user(current_user: User = Depends(get_current_user)):     if current_user.disabled:         raise HTTPException(status_code=400, detail="Inactive user")     return current_user  

用户登录获取 Token

@app.post("/token", response_model=Token) async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):     user = get_user(fake_users_db, form_data.username)     if not user or not verify_password(form_data.password, user.hashed_password):         raise HTTPException(             status_code=status.HTTP_401_UNAUTHORIZED,             detail="Incorrect username or password",             headers={"WWW-Authenticate": "Bearer"},         )     access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)     access_token = create_access_token(         data={"sub": user.username}, expires_delta=access_token_expires     )     return {"access_token": access_token, "token_type": "bearer"}  

受保护的路由示例

@app.get("/users/me/", response_model=User) async def read_users_me(current_user: User = Depends(get_current_active_user)):     return current_user  

所有代码

from fastapi import FastAPI, Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from pydantic import BaseModel from datetime import datetime, timedelta, timezone import uvicorn import jwt import os  # JWT 相关配置 SECRET_KEY = "123456789ashdgjha.slakdv.laksd*as-d/sd3"  # 用于签名 JWT 的密钥(需要妥善保管,实际应用中应存储在环境变量或配置文件中) ALGORITHM = "HS256"  # 使用的加密算法 ACCESS_TOKEN_EXPIRE_MINUTES = 30  # Token 的有效时间,以分钟为单位  app = FastAPI()  # 创建 FastAPI 应用实例  # OAuth2PasswordBearer 实例,用于依赖项 oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")  # 模拟的用户数据库,通常在实际应用中应从数据库中获取用户信息 fake_users_db = {     "testuser": {         "username": "testuser",         "full_name": "Test User",         "email": "testuser@example.com",         "hashed_password": "fakehashedpassword",  # 在实际应用中,存储经过哈希处理的密码         "disabled": False,  # 用户是否被禁用     } }  # Pydantic 模型,用于定义请求和响应的数据结构 class Token(BaseModel):     access_token: str  # Token 字符串     token_type: str  # Token 类型(一般为 "bearer")  class TokenData(BaseModel):     username: str | None = None  # 从 Token 中提取的用户名  class User(BaseModel):     username: str  # 用户名     email: str | None = None  # 邮箱地址,可选     full_name: str | None = None  # 用户全名,可选     disabled: bool | None = None  # 用户是否被禁用,可选  class UserInDB(User):     hashed_password: str  # 存储在数据库中的哈希密码  # 生成 JWT Token 的函数 def create_access_token(data: dict, expires_delta: timedelta | None = None):     """     生成 JWT Token。      参数:     - data (dict): 要编码到 JWT 中的数据。     - expires_delta (timedelta, 可选): Token 的过期时间。      返回:     - str: 编码后的 JWT 字符串。     """     to_encode = data.copy()  # 创建副本,以避免修改原始数据     if expires_delta:         expire = datetime.now(timezone.utc) + expires_delta     else:         expire = datetime.now(timezone.utc) + timedelta(minutes=15)     to_encode.update({"exp": expire})  # 添加过期时间到 JWT 数据中     encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)  # 生成 JWT     return encoded_jwt  # 从假数据库获取用户信息 def get_user(db, username: str):     """     从数据库中获取用户信息。      参数:     - db (dict): 用户数据库(在本例中为假数据)。     - username (str): 用户名。      返回:     - UserInDB | None: 返回匹配的用户信息,如果不存在则返回 None。     """     if username in db:         user_dict = db[username]         return UserInDB(**user_dict)  # 验证用户密码 def verify_password(plain_password, hashed_password):     """     验证用户密码。      参数:     - plain_password (str): 用户输入的明文密码。     - hashed_password (str): 存储在数据库中的哈希密码。      返回:     - bool: 密码匹配返回 True,否则返回 False。     """     return plain_password == hashed_password  # 在实际应用中,这里应该使用哈希函数进行比较  # 验证 Token 并获取当前用户 async def get_current_user(token: str = Depends(oauth2_scheme)):     """     从 JWT Token 中提取用户信息,并验证 Token 的合法性。      参数:     - token (str): JWT Token。      返回:     - User: 返回当前用户信息。      抛出:     - HTTPException: 当 Token 无效或用户不存在时抛出 401 错误。     """     credentials_exception = HTTPException(         status_code=status.HTTP_401_UNAUTHORIZED,         detail="Could not validate credentials",         headers={"WWW-Authenticate": "Bearer"},     )     try:         payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])  # 解码 JWT         username: str = payload.get("sub")  # 获取 JWT 中的用户名         if username is None:             raise credentials_exception         token_data = TokenData(username=username)     except jwt.PyJWTError:         raise credentials_exception     user = get_user(fake_users_db, username=token_data.username)  # 获取用户信息     if user is None:         raise credentials_exception     return user  # 验证用户是否被禁用 async def get_current_active_user(current_user: User = Depends(get_current_user)):     """     验证当前用户是否被禁用。      参数:     - current_user (User): 当前用户信息。      返回:     - User: 如果用户未被禁用,返回用户信息。      抛出:     - HTTPException: 当用户被禁用时抛出 400 错误。     """     if current_user.disabled:         raise HTTPException(status_code=400, detail="Inactive user")     return current_user  # 用户登录获取 Token @app.post("/token", response_model=Token) async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):     """     用户登录接口,用于获取 JWT Token。      参数:     - form_data (OAuth2PasswordRequestForm): 包含用户名和密码的表单数据。      返回:     - dict: 包含 access_token 和 token_type 的响应数据。      抛出:     - HTTPException: 当用户名或密码错误时抛出 401 错误。     """     user = get_user(fake_users_db, form_data.username)     if not user or not verify_password(form_data.password, user.hashed_password):         raise HTTPException(             status_code=status.HTTP_401_UNAUTHORIZED,             detail="Incorrect username or password",             headers={"WWW-Authenticate": "Bearer"},         )     access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)  # 设置 Token 过期时间     access_token = create_access_token(         data={"sub": user.username}, expires_delta=access_token_expires     )     return {"access_token": access_token, "token_type": "bearer"}  # 受保护的路由示例 @app.get("/users/me/", response_model=User) async def read_users_me(current_user: User = Depends(get_current_active_user)):     """     获取当前用户信息的受保护路由。      参数:     - current_user (User): 当前登录的用户信息(通过 JWT 验证)。      返回:     - User: 返回当前用户的信息。     """     return current_user  # 运行应用 if __name__ == "__main__":     uvicorn.run(         f"{os.path.basename(__file__).split('.')[0]}:app",         host="127.0.0.1",         port=8000,         reload=True,  # 启用自动重载     )  

测试

获取 Token

在这里插入图片描述

访问受保护的路由

token正确

在这里插入图片描述

token错误

在这里插入图片描述

总结

JWT(JSON Web Token)是一种用于安全地在各方之间传递信息的开放标准,通常用于用户认证和授权。它将用户信息编码为一个签名的令牌,客户端可以使用该令牌访问受保护的资源。

在 FastAPI 中,JWT 用于保护 API 路由。用户通过提交用户名和密码获取 JWT,客户端在后续请求中使用该令牌进行身份验证。服务器验证令牌的合法性后,允许用户访问受保护的资源。

实现 JWT 认证的步骤包括安装必要的依赖项、配置 JWT 设置(如密钥和算法)、定义数据模型、实现辅助函数(如生成和验证 JWT 的函数),并设置受保护的 API 路由。

  • JWT 配置:包括密钥(SECRET_KEY)和加密算法(ALGORITHM),用于生成和验证令牌。
  • 辅助函数:如生成 JWT、验证密码、获取用户信息等,用于处理认证逻辑。
  • API 路由:包含登录接口,用于生成令牌,以及受保护的路由,只允许携带有效 JWT 的请求访问。

使用 Postman 或其他工具可以测试 JWT 的生成和验证过程,确保只有经过身份验证的用户才能访问受保护的 API 路由。

注意

示例中密码并没有经过hash加密实际应用中要加密的

相关内容

热门资讯

今日公布!wepoker软件安... 今日公布!wepoker软件安装包,Wepoker原来是有挂猫腻,新2024教程(有挂助手)-哔哩哔...
记者发布!宝宝浙江游戏有外挂,... 记者发布!宝宝浙江游戏有外挂,中至麻将有脚本确实真有挂辅助挂,高科技教程(有挂方法)-哔哩哔哩;相信...
二分钟输赢!wepoke软件透... 二分钟输赢!wepoke软件透明挂是真的,德州透视插件,爆料教程(有挂到底有挂)-哔哩哔哩;超受欢迎...
专业讨论!wepoker底牌透... 专业讨论!wepoker底牌透视脚本,WEpoker有挂其实确实是有挂,教你教程(有挂科普)-哔哩哔...
传递经验!福建大玩家十三水开挂... 传递经验!福建大玩家十三水开挂,28圈辅助工具确实真有挂辅助挂,科技教程(有挂技巧)-哔哩哔哩;福建...
关于!微扑克微乐辅助,wEpO... 关于!微扑克微乐辅助,wEpOker确实是有挂的,攻略方法(有挂普及)-哔哩哔哩;是一款可以让一直输...
3分钟逻辑!德扑之星窥牌,we... 3分钟逻辑!德扑之星窥牌,wepoker辅助是真的假的,必备教程(有挂机器人)-哔哩哔哩;超受欢迎的...
揭秘!白金岛小程序辅助,中至吉... 您好,白金岛小程序辅助这款游戏可以开挂的,确实是有挂的,需要了解加微【136704302】很多玩家在...
推荐十款!wepoker透视器... 推荐十款!wepoker透视器免费,WepokEr原来确实是有挂,力荐教程(有挂测试)-哔哩哔哩;w...
三分钟了解!微信边锋斗地主提升... 三分钟了解!微信边锋斗地主提升胜率,永盛联盟有挂确实有辅助挂,爆料教程(有挂存在)-哔哩哔哩是一款可...