full_stack_todo / src /dependencies.py
romaisaKhurram
somethong add
2f9c039
from fastapi import HTTPException, status, Depends, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from typing import Optional, Dict, Any
from datetime import datetime, timedelta
from .config import settings
import uuid
from collections import defaultdict
import time
# Rate limiting implementation (T041)
class RateLimiter:
def __init__(self, max_requests: int = 100, window_size: int = 60):
self.max_requests = max_requests # 100 requests per minute
self.window_size = window_size # 60 seconds window
self.requests = defaultdict(list)
def check_rate_limit(self, identifier: str) -> bool:
"""
Check if the identifier has exceeded the rate limit.
"""
current_time = time.time()
# Clean old requests outside the window
self.requests[identifier] = [
req_time for req_time in self.requests[identifier]
if current_time - req_time < self.window_size
]
# Check if rate limit exceeded
if len(self.requests[identifier]) >= self.max_requests:
return False
# Add current request
self.requests[identifier].append(current_time)
return True
# Global rate limiter instance
rate_limiter = RateLimiter(max_requests=100, window_size=60) # NFR-002: 100 concurrent users
def check_rate_limit(request: Request = None) -> None:
"""
Dependency to check rate limit based on client IP.
"""
if request:
# Get client IP from request
client_ip = request.client.host if request.client else "unknown"
if not rate_limiter.check_rate_limit(client_ip):
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="Rate limit exceeded. Please try again later."
)
def verify_token(token: str) -> Dict[str, Any]:
"""
Verify JWT token and return payload.
"""
try:
payload = jwt.decode(
token,
settings.better_auth_secret,
algorithms=["HS256"]
)
return payload
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# Standard HTTPBearer security scheme
security = HTTPBearer()
def get_token_with_fallback(request: Request) -> str:
"""
Extract and return the token from the authorization header or from request state (for cookie fallback).
"""
# First, try to get token from the Authorization header
auth_header = request.headers.get("Authorization")
if auth_header:
if auth_header.startswith("Bearer "):
return auth_header[7:] # Remove "Bearer " prefix
else:
# If it doesn't have "Bearer " prefix, return the whole header value
return auth_header
# If not in header, check if the middleware stored a token in the request state
if hasattr(request, 'state') and hasattr(request.state, 'auth_token'):
token = getattr(request.state, 'auth_token', None)
if token and isinstance(token, str):
# Remove "Bearer " prefix if it exists
if token.startswith("Bearer "):
return token[7:]
return token
# If no token found, raise appropriate error
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="No authentication token provided",
headers={"WWW-Authenticate": "Bearer"},
)
def get_token_from_header(credentials: HTTPAuthorizationCredentials = Depends(security)) -> str:
"""
Extract and return the token from the authorization header.
This maintains compatibility with existing dependencies.
"""
return credentials.credentials
def get_current_user_id(request: Request) -> uuid.UUID:
"""
Get the current user ID from the JWT token, checking both header and cookie (via request state).
This updated version works with both the original header-based auth and cookie fallback.
"""
# Get the token using our fallback method
token = get_token_with_fallback(request)
payload = verify_token(token)
user_id: str = payload.get("user_id")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
return uuid.UUID(user_id)
except ValueError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid user ID in token",
headers={"WWW-Authenticate": "Bearer"},
)
async def validate_token_expiration(request: Request):
"""
Validate token expiration to ensure 24-hour expiry as per NFR-003.
This updated version works with both header and cookie (via request state) authentication.
"""
# Get the token using our fallback method
token = get_token_with_fallback(request)
payload = verify_token(token)
exp: int = payload.get("exp")
if exp:
expiration_time = datetime.fromtimestamp(exp)
current_time = datetime.utcnow()
# Check if token has expired
if current_time > expiration_time:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has expired",
headers={"WWW-Authenticate": "Bearer"},
)
# Check if token was issued more than 24 hours ago (NFR-003)
iat: int = payload.get("iat")
if iat:
issued_time = datetime.fromtimestamp(iat)
if (current_time - issued_time) > timedelta(hours=24):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token issued more than 24 hours ago (NFR-003)",
headers={"WWW-Authenticate": "Bearer"},
)
return payload
# The duplicate validate_token_expiration function has been removed to prevent conflicts.
# The original validate_token_expiration function (defined at line 144) should be sufficient.