FastAPI 结合 JWT
创始人
2024-09-26 00:46:44
0

文章目录

  • FastAPI 结合 JWT
  • 步骤
  • 安装
  • 步骤
    • 导入必要的模块
    • 设置配置和初始化应用
    • 创建数据模型
    • 实现辅助函数
      • 生成 JWT Token
      • 获取用户数据
      • 验证密码
      • 获取当前用户
    • 用户登录获取 Token
    • 受保护的路由示例
  • 所有代码
  • 测试
    • 获取 Token
    • 访问受保护的路由
      • token正确
      • token错误
  • 总结
  • 注意

FastAPI 结合 JWT

JWT(JSON Web Token)是一种基于 JSON 的开放标准(RFC 7519),用于在各方之间传递安全可靠的信息。JWT 可以签名(使用 HMAC 算法或 RSA 等),从而可以验证内容是否被篡改。JWT 通常用于认证和授权流程。

步骤

在 FastAPI 中,JWT 主要用于保护 API 路由,使其只允许经过身份验证的用户访问。认证流程大致如下:

  1. **用户登录:**用户通过提交用户名和密码获取 JWT。
  2. **获取 Token:**服务器验证用户凭据后,生成并返回 JWT 给用户。
  3. **访问受保护的路由:**用户在访问受保护的路由时,需要在请求头中携带该 JWT。
  4. **Token 验证:**服务器验证 JWT 的合法性和有效性,允许或拒绝访问受保护资源。

安装

pip install fastapi uvicorn pyjwt  

步骤

导入必要的模块

from fastapi import FastAPI, Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from pydantic import BaseModel from datetime import datetime, timedelta, timezone import jwt  

设置配置和初始化应用

SECRET_KEY = "your_secret_key"  # 用于签名 JWT 的密钥 ALGORITHM = "HS256"             # 加密算法 ACCESS_TOKEN_EXPIRE_MINUTES = 30  # Token 过期时间  app = FastAPI()  oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")  

创建数据模型

class Token(BaseModel):     access_token: str     token_type: str  class TokenData(BaseModel):     username: str | None = None  class User(BaseModel):     username: str     email: str | None = None     full_name: str | None = None     disabled: bool | None = None  class UserInDB(User):     hashed_password: str  

实现辅助函数

生成 JWT Token

def create_access_token(data: dict, expires_delta: timedelta | None = None):     to_encode = data.copy()     if expires_delta:         expire = datetime.now(timezone.utc) + expires_delta     else:         expire = datetime.now(timezone.utc) + timedelta(minutes=15)     to_encode.update({"exp": expire})     encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)     return encoded_jwt  

获取用户数据

fake_users_db = {     "testuser": {         "username": "testuser",         "full_name": "Test User",         "email": "testuser@example.com",         "hashed_password": "fakehashedpassword",         "disabled": False,     } }  def get_user(db, username: str):     if username in db:         user_dict = db[username]         return UserInDB(**user_dict)  

验证密码

def verify_password(plain_password, hashed_password):     return plain_password == hashed_password  

获取当前用户

async def get_current_user(token: str = Depends(oauth2_scheme)):     credentials_exception = HTTPException(         status_code=status.HTTP_401_UNAUTHORIZED,         detail="Could not validate credentials",         headers={"WWW-Authenticate": "Bearer"},     )     try:         payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])         username: str = payload.get("sub")         if username is None:             raise credentials_exception         token_data = TokenData(username=username)     except jwt.PyJWTError:         raise credentials_exception     user = get_user(fake_users_db, username=token_data.username)     if user is None:         raise credentials_exception     return user  async def get_current_active_user(current_user: User = Depends(get_current_user)):     if current_user.disabled:         raise HTTPException(status_code=400, detail="Inactive user")     return current_user  

用户登录获取 Token

@app.post("/token", response_model=Token) async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):     user = get_user(fake_users_db, form_data.username)     if not user or not verify_password(form_data.password, user.hashed_password):         raise HTTPException(             status_code=status.HTTP_401_UNAUTHORIZED,             detail="Incorrect username or password",             headers={"WWW-Authenticate": "Bearer"},         )     access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)     access_token = create_access_token(         data={"sub": user.username}, expires_delta=access_token_expires     )     return {"access_token": access_token, "token_type": "bearer"}  

受保护的路由示例

@app.get("/users/me/", response_model=User) async def read_users_me(current_user: User = Depends(get_current_active_user)):     return current_user  

所有代码

from fastapi import FastAPI, Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from pydantic import BaseModel from datetime import datetime, timedelta, timezone import uvicorn import jwt import os  # JWT 相关配置 SECRET_KEY = "123456789ashdgjha.slakdv.laksd*as-d/sd3"  # 用于签名 JWT 的密钥(需要妥善保管,实际应用中应存储在环境变量或配置文件中) ALGORITHM = "HS256"  # 使用的加密算法 ACCESS_TOKEN_EXPIRE_MINUTES = 30  # Token 的有效时间,以分钟为单位  app = FastAPI()  # 创建 FastAPI 应用实例  # OAuth2PasswordBearer 实例,用于依赖项 oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")  # 模拟的用户数据库,通常在实际应用中应从数据库中获取用户信息 fake_users_db = {     "testuser": {         "username": "testuser",         "full_name": "Test User",         "email": "testuser@example.com",         "hashed_password": "fakehashedpassword",  # 在实际应用中,存储经过哈希处理的密码         "disabled": False,  # 用户是否被禁用     } }  # Pydantic 模型,用于定义请求和响应的数据结构 class Token(BaseModel):     access_token: str  # Token 字符串     token_type: str  # Token 类型(一般为 "bearer")  class TokenData(BaseModel):     username: str | None = None  # 从 Token 中提取的用户名  class User(BaseModel):     username: str  # 用户名     email: str | None = None  # 邮箱地址,可选     full_name: str | None = None  # 用户全名,可选     disabled: bool | None = None  # 用户是否被禁用,可选  class UserInDB(User):     hashed_password: str  # 存储在数据库中的哈希密码  # 生成 JWT Token 的函数 def create_access_token(data: dict, expires_delta: timedelta | None = None):     """     生成 JWT Token。      参数:     - data (dict): 要编码到 JWT 中的数据。     - expires_delta (timedelta, 可选): Token 的过期时间。      返回:     - str: 编码后的 JWT 字符串。     """     to_encode = data.copy()  # 创建副本,以避免修改原始数据     if expires_delta:         expire = datetime.now(timezone.utc) + expires_delta     else:         expire = datetime.now(timezone.utc) + timedelta(minutes=15)     to_encode.update({"exp": expire})  # 添加过期时间到 JWT 数据中     encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)  # 生成 JWT     return encoded_jwt  # 从假数据库获取用户信息 def get_user(db, username: str):     """     从数据库中获取用户信息。      参数:     - db (dict): 用户数据库(在本例中为假数据)。     - username (str): 用户名。      返回:     - UserInDB | None: 返回匹配的用户信息,如果不存在则返回 None。     """     if username in db:         user_dict = db[username]         return UserInDB(**user_dict)  # 验证用户密码 def verify_password(plain_password, hashed_password):     """     验证用户密码。      参数:     - plain_password (str): 用户输入的明文密码。     - hashed_password (str): 存储在数据库中的哈希密码。      返回:     - bool: 密码匹配返回 True,否则返回 False。     """     return plain_password == hashed_password  # 在实际应用中,这里应该使用哈希函数进行比较  # 验证 Token 并获取当前用户 async def get_current_user(token: str = Depends(oauth2_scheme)):     """     从 JWT Token 中提取用户信息,并验证 Token 的合法性。      参数:     - token (str): JWT Token。      返回:     - User: 返回当前用户信息。      抛出:     - HTTPException: 当 Token 无效或用户不存在时抛出 401 错误。     """     credentials_exception = HTTPException(         status_code=status.HTTP_401_UNAUTHORIZED,         detail="Could not validate credentials",         headers={"WWW-Authenticate": "Bearer"},     )     try:         payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])  # 解码 JWT         username: str = payload.get("sub")  # 获取 JWT 中的用户名         if username is None:             raise credentials_exception         token_data = TokenData(username=username)     except jwt.PyJWTError:         raise credentials_exception     user = get_user(fake_users_db, username=token_data.username)  # 获取用户信息     if user is None:         raise credentials_exception     return user  # 验证用户是否被禁用 async def get_current_active_user(current_user: User = Depends(get_current_user)):     """     验证当前用户是否被禁用。      参数:     - current_user (User): 当前用户信息。      返回:     - User: 如果用户未被禁用,返回用户信息。      抛出:     - HTTPException: 当用户被禁用时抛出 400 错误。     """     if current_user.disabled:         raise HTTPException(status_code=400, detail="Inactive user")     return current_user  # 用户登录获取 Token @app.post("/token", response_model=Token) async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):     """     用户登录接口,用于获取 JWT Token。      参数:     - form_data (OAuth2PasswordRequestForm): 包含用户名和密码的表单数据。      返回:     - dict: 包含 access_token 和 token_type 的响应数据。      抛出:     - HTTPException: 当用户名或密码错误时抛出 401 错误。     """     user = get_user(fake_users_db, form_data.username)     if not user or not verify_password(form_data.password, user.hashed_password):         raise HTTPException(             status_code=status.HTTP_401_UNAUTHORIZED,             detail="Incorrect username or password",             headers={"WWW-Authenticate": "Bearer"},         )     access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)  # 设置 Token 过期时间     access_token = create_access_token(         data={"sub": user.username}, expires_delta=access_token_expires     )     return {"access_token": access_token, "token_type": "bearer"}  # 受保护的路由示例 @app.get("/users/me/", response_model=User) async def read_users_me(current_user: User = Depends(get_current_active_user)):     """     获取当前用户信息的受保护路由。      参数:     - current_user (User): 当前登录的用户信息(通过 JWT 验证)。      返回:     - User: 返回当前用户的信息。     """     return current_user  # 运行应用 if __name__ == "__main__":     uvicorn.run(         f"{os.path.basename(__file__).split('.')[0]}:app",         host="127.0.0.1",         port=8000,         reload=True,  # 启用自动重载     )  

测试

获取 Token

在这里插入图片描述

访问受保护的路由

token正确

在这里插入图片描述

token错误

在这里插入图片描述

总结

JWT(JSON Web Token)是一种用于安全地在各方之间传递信息的开放标准,通常用于用户认证和授权。它将用户信息编码为一个签名的令牌,客户端可以使用该令牌访问受保护的资源。

在 FastAPI 中,JWT 用于保护 API 路由。用户通过提交用户名和密码获取 JWT,客户端在后续请求中使用该令牌进行身份验证。服务器验证令牌的合法性后,允许用户访问受保护的资源。

实现 JWT 认证的步骤包括安装必要的依赖项、配置 JWT 设置(如密钥和算法)、定义数据模型、实现辅助函数(如生成和验证 JWT 的函数),并设置受保护的 API 路由。

  • JWT 配置:包括密钥(SECRET_KEY)和加密算法(ALGORITHM),用于生成和验证令牌。
  • 辅助函数:如生成 JWT、验证密码、获取用户信息等,用于处理认证逻辑。
  • API 路由:包含登录接口,用于生成令牌,以及受保护的路由,只允许携带有效 JWT 的请求访问。

使用 Postman 或其他工具可以测试 JWT 的生成和验证过程,确保只有经过身份验证的用户才能访问受保护的 API 路由。

注意

示例中密码并没有经过hash加密实际应用中要加密的

相关内容

热门资讯

9分钟总结!途游斗地主外挂(透... 1、不需要AI权限,帮助你快速的进行wepoke计算辅助教程,沉浸在游戏的游玩之中。2、里面整个we...
重大发现!德扑之星比赛创建设置... 1、重大发现!德扑之星比赛创建设置(透视)辅助透视(详细教程)-哔哩哔哩;详细教程。2、德扑之星比赛...
3分钟逻辑!大凉山生活号跑得快... 1、完成wepoke的残局,帮助玩家取得所有比赛的胜利,直登高塔的教程。2、多达1000个不同的游戏...
6分钟比赛!佳友互娱开挂(透视... 1、完成wepoke的残局,帮助玩家取得所有比赛的胜利,直登高塔的教程。2、多达1000个不同的游戏...
6分钟猫腻!欢乐茶馆有外挂(透... 1、这是跨平台的wepoke黑科技,在线的操作超级的方便,而且功能也是很强大的。2、在线的操作方便,...
六分钟aa扑克有外挂!大同麻将... 六分钟aa扑克有外挂!大同麻将有挂(透视)透视辅助(详细教程)-哔哩哔哩;科技详细教程小薇《7574...
2分钟免费!微扑克辅助挂(透视... 1、2分钟免费!微扑克辅助挂(透视)辅助透视(详细教程)-哔哩哔哩;该软件可以轻松地帮助玩家将微扑克...
3分钟规律!中至辅助器免费版v... 3分钟规律!中至辅助器免费版v3.0(透视)辅助透视(详细教程)-哔哩哔哩科技教程也叫必备教程,这是...
5分钟底牌!微扑克辅助多少钱(... 《5分钟底牌!微扑克辅助多少钱(透视)辅助透视(详细教程)-哔哩哔哩》 微扑克辅助多少钱软件透明挂更...
一分钟规律!wepoke有规律... 1、一分钟规律!wepoke有规律的(透视)透视辅助(详细教程)-哔哩哔哩。2、wepoke有规律的...