from datetime import datetime, timedelta from typing import Optional from jose import JWTError, jwt import bcrypt from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from sqlalchemy.orm import Session import os import models from database import get_db SECRET_KEY = os.environ.get( "SECRET_KEY", "insecure-default-key-change-in-production-use-env-var" ) ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 * 7 # 7 days oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/token") def verify_password(plain: str, hashed: str) -> bool: return bcrypt.checkpw(plain.encode("utf-8"), hashed.encode("utf-8")) def get_password_hash(password: str) -> str: return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8") def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: to_encode = data.copy() expire = datetime.utcnow() + ( expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) ) to_encode["exp"] = expire return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) def get_current_user( token: str = Depends(oauth2_scheme), db: Session = Depends(get_db) ) -> models.User: exc = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if not username: raise exc except JWTError: raise exc user = db.query(models.User).filter(models.User.username == username).first() if not user: raise exc return user def get_current_admin( current_user: models.User = Depends(get_current_user), ) -> models.User: if not current_user.is_admin: raise HTTPException(status_code=403, detail="Admin access required") return current_user