JWT(JSON Web Token)是一种基于 JSON 的开放标准(RFC 7519),用于在各方之间传递安全可靠的信息。JWT 可以签名(使用 HMAC 算法或 RSA 等),从而可以验证内容是否被篡改。JWT 通常用于认证和授权流程。
在 FastAPI 中,JWT 主要用于保护 API 路由,使其只允许经过身份验证的用户访问。认证流程大致如下:
pip install fastapi uvicorn pyjwt
from fastapi import FastAPI, Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from pydantic import BaseModel from datetime import datetime, timedelta, timezone import jwt
SECRET_KEY = "your_secret_key" # 用于签名 JWT 的密钥 ALGORITHM = "HS256" # 加密算法 ACCESS_TOKEN_EXPIRE_MINUTES = 30 # Token 过期时间 app = FastAPI() oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class Token(BaseModel): access_token: str token_type: str class TokenData(BaseModel): username: str | None = None class User(BaseModel): username: str email: str | None = None full_name: str | None = None disabled: bool | None = None class UserInDB(User): hashed_password: str
def create_access_token(data: dict, expires_delta: timedelta | None = None): to_encode = data.copy() if expires_delta: expire = datetime.now(timezone.utc) + expires_delta else: expire = datetime.now(timezone.utc) + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt
fake_users_db = { "testuser": { "username": "testuser", "full_name": "Test User", "email": "testuser@example.com", "hashed_password": "fakehashedpassword", "disabled": False, } } def get_user(db, username: str): if username in db: user_dict = db[username] return UserInDB(**user_dict)
def verify_password(plain_password, hashed_password): return plain_password == hashed_password
async def get_current_user(token: str = Depends(oauth2_scheme)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception token_data = TokenData(username=username) except jwt.PyJWTError: raise credentials_exception user = get_user(fake_users_db, username=token_data.username) if user is None: raise credentials_exception return user async def get_current_active_user(current_user: User = Depends(get_current_user)): if current_user.disabled: raise HTTPException(status_code=400, detail="Inactive user") return current_user
@app.post("/token", response_model=Token) async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): user = get_user(fake_users_db, form_data.username) if not user or not verify_password(form_data.password, user.hashed_password): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me/", response_model=User) async def read_users_me(current_user: User = Depends(get_current_active_user)): return current_user
from fastapi import FastAPI, Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from pydantic import BaseModel from datetime import datetime, timedelta, timezone import uvicorn import jwt import os # JWT 相关配置 SECRET_KEY = "123456789ashdgjha.slakdv.laksd*as-d/sd3" # 用于签名 JWT 的密钥(需要妥善保管,实际应用中应存储在环境变量或配置文件中) ALGORITHM = "HS256" # 使用的加密算法 ACCESS_TOKEN_EXPIRE_MINUTES = 30 # Token 的有效时间,以分钟为单位 app = FastAPI() # 创建 FastAPI 应用实例 # OAuth2PasswordBearer 实例,用于依赖项 oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") # 模拟的用户数据库,通常在实际应用中应从数据库中获取用户信息 fake_users_db = { "testuser": { "username": "testuser", "full_name": "Test User", "email": "testuser@example.com", "hashed_password": "fakehashedpassword", # 在实际应用中,存储经过哈希处理的密码 "disabled": False, # 用户是否被禁用 } } # Pydantic 模型,用于定义请求和响应的数据结构 class Token(BaseModel): access_token: str # Token 字符串 token_type: str # Token 类型(一般为 "bearer") class TokenData(BaseModel): username: str | None = None # 从 Token 中提取的用户名 class User(BaseModel): username: str # 用户名 email: str | None = None # 邮箱地址,可选 full_name: str | None = None # 用户全名,可选 disabled: bool | None = None # 用户是否被禁用,可选 class UserInDB(User): hashed_password: str # 存储在数据库中的哈希密码 # 生成 JWT Token 的函数 def create_access_token(data: dict, expires_delta: timedelta | None = None): """ 生成 JWT Token。 参数: - data (dict): 要编码到 JWT 中的数据。 - expires_delta (timedelta, 可选): Token 的过期时间。 返回: - str: 编码后的 JWT 字符串。 """ to_encode = data.copy() # 创建副本,以避免修改原始数据 if expires_delta: expire = datetime.now(timezone.utc) + expires_delta else: expire = datetime.now(timezone.utc) + timedelta(minutes=15) to_encode.update({"exp": expire}) # 添加过期时间到 JWT 数据中 encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) # 生成 JWT return encoded_jwt # 从假数据库获取用户信息 def get_user(db, username: str): """ 从数据库中获取用户信息。 参数: - db (dict): 用户数据库(在本例中为假数据)。 - username (str): 用户名。 返回: - UserInDB | None: 返回匹配的用户信息,如果不存在则返回 None。 """ if username in db: user_dict = db[username] return UserInDB(**user_dict) # 验证用户密码 def verify_password(plain_password, hashed_password): """ 验证用户密码。 参数: - plain_password (str): 用户输入的明文密码。 - hashed_password (str): 存储在数据库中的哈希密码。 返回: - bool: 密码匹配返回 True,否则返回 False。 """ return plain_password == hashed_password # 在实际应用中,这里应该使用哈希函数进行比较 # 验证 Token 并获取当前用户 async def get_current_user(token: str = Depends(oauth2_scheme)): """ 从 JWT Token 中提取用户信息,并验证 Token 的合法性。 参数: - token (str): JWT Token。 返回: - User: 返回当前用户信息。 抛出: - HTTPException: 当 Token 无效或用户不存在时抛出 401 错误。 """ credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) # 解码 JWT username: str = payload.get("sub") # 获取 JWT 中的用户名 if username is None: raise credentials_exception token_data = TokenData(username=username) except jwt.PyJWTError: raise credentials_exception user = get_user(fake_users_db, username=token_data.username) # 获取用户信息 if user is None: raise credentials_exception return user # 验证用户是否被禁用 async def get_current_active_user(current_user: User = Depends(get_current_user)): """ 验证当前用户是否被禁用。 参数: - current_user (User): 当前用户信息。 返回: - User: 如果用户未被禁用,返回用户信息。 抛出: - HTTPException: 当用户被禁用时抛出 400 错误。 """ if current_user.disabled: raise HTTPException(status_code=400, detail="Inactive user") return current_user # 用户登录获取 Token @app.post("/token", response_model=Token) async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): """ 用户登录接口,用于获取 JWT Token。 参数: - form_data (OAuth2PasswordRequestForm): 包含用户名和密码的表单数据。 返回: - dict: 包含 access_token 和 token_type 的响应数据。 抛出: - HTTPException: 当用户名或密码错误时抛出 401 错误。 """ user = get_user(fake_users_db, form_data.username) if not user or not verify_password(form_data.password, user.hashed_password): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) # 设置 Token 过期时间 access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"} # 受保护的路由示例 @app.get("/users/me/", response_model=User) async def read_users_me(current_user: User = Depends(get_current_active_user)): """ 获取当前用户信息的受保护路由。 参数: - current_user (User): 当前登录的用户信息(通过 JWT 验证)。 返回: - User: 返回当前用户的信息。 """ return current_user # 运行应用 if __name__ == "__main__": uvicorn.run( f"{os.path.basename(__file__).split('.')[0]}:app", host="127.0.0.1", port=8000, reload=True, # 启用自动重载 )
JWT(JSON Web Token)是一种用于安全地在各方之间传递信息的开放标准,通常用于用户认证和授权。它将用户信息编码为一个签名的令牌,客户端可以使用该令牌访问受保护的资源。
在 FastAPI 中,JWT 用于保护 API 路由。用户通过提交用户名和密码获取 JWT,客户端在后续请求中使用该令牌进行身份验证。服务器验证令牌的合法性后,允许用户访问受保护的资源。
实现 JWT 认证的步骤包括安装必要的依赖项、配置 JWT 设置(如密钥和算法)、定义数据模型、实现辅助函数(如生成和验证 JWT 的函数),并设置受保护的 API 路由。
- JWT 配置:包括密钥(
SECRET_KEY
)和加密算法(ALGORITHM
),用于生成和验证令牌。- 辅助函数:如生成 JWT、验证密码、获取用户信息等,用于处理认证逻辑。
- API 路由:包含登录接口,用于生成令牌,以及受保护的路由,只允许携带有效 JWT 的请求访问。
使用 Postman 或其他工具可以测试 JWT 的生成和验证过程,确保只有经过身份验证的用户才能访问受保护的 API 路由。
示例中
密码并没有经过hash加密
,实际应用中要加密的
。