본문 바로가기
IT&Jobs/Study

[FASTAPI] FastAPI JWT 인증 시스템 완벽 구현 - 실무 적용 가능한 보안 API 만들기

by jaeilpark 2026. 4. 15.
728x90
반응형

<aside> 📋 🖼️ 썸네일: FastAPI JWT 인증 완벽 구현 가이드 📂 카테고리: ⚡ FastAPI/백엔드 📋 타입: tutorial 🏷️ 태그: #FastAPI #JWT #인증 #보안 #Python #API #토큰 #로그인 🔍 메타: FastAPI JWT 인증 시스템 완벽 구현 가이드. 로그인, 회원가입, 토큰 갱신까지 실무 적용 가능한 코드 제공

</aside>

FastAPI JWT 인증 시스템 완벽 구현

실무에서 FastAPI로 API를 개발할 때 가장 중요한 것 중 하나가 바로 인증 시스템입니다. 오늘은 JWT(JSON Web Token)를 활용한 완전한 인증 시스템을 구현해보겠습니다.

🎯 구현할 기능

  • JWT 토큰 생성 및 검증
  • 로그인/회원가입 API
  • 보호된 라우트 구현
  • 토큰 갱신 (Refresh Token)
  • 비밀번호 해싱

📦 필요한 패키지 설치

pip install fastapi uvicorn python-jose[cryptography] passlib[bcrypt] python-multipart

🔧 프로젝트 구조

project/
├── main.py
├── auth/
│   ├── __init__.py
│   ├── models.py
│   ├── utils.py
│   └── router.py
└── database.py

1️⃣ 설정 및 유틸리티 함수

auth/utils.py

from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import HTTPException, status

# 환경 변수로 관리하는 것을 권장
SECRET_KEY = "your-secret-key-here-change-this-in-production"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 7

# 비밀번호 해싱 설정
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

class AuthUtils:
    @staticmethod
    def verify_password(plain_password: str, hashed_password: str) -> bool:
        """비밀번호 검증"""
        return pwd_context.verify(plain_password, hashed_password)
    
    @staticmethod
    def get_password_hash(password: str) -> str:
        """비밀번호 해싱"""
        return pwd_context.hash(password)
    
    @staticmethod
    def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
        """Access 토큰 생성"""
        to_encode = data.copy()
        if expires_delta:
            expire = datetime.utcnow() + expires_delta
        else:
            expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
        
        to_encode.update({"exp": expire, "type": "access"})
        encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
        return encoded_jwt
    
    @staticmethod
    def create_refresh_token(data: dict) -> str:
        """Refresh 토큰 생성"""
        to_encode = data.copy()
        expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
        to_encode.update({"exp": expire, "type": "refresh"})
        encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
        return encoded_jwt
    
    @staticmethod
    def verify_token(token: str, token_type: str = "access") -> dict:
        """토큰 검증 및 페이로드 반환"""
        try:
            payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
            if pa

2️⃣ 데이터 모델 정의

auth/models.py

from pydantic import BaseModel, EmailStr
from typing import Optional
from datetime import datetime

class UserBase(BaseModel):
    email: EmailStr
    username: str
    full_name: Optional[str] = None
    is_active: bool = True

class UserCreate(UserBase):
    password: str

class UserResponse(UserBase):
    id: int
    created_at: datetime
    
    class Config:
        from_attributes = True

class UserInDB(UserBase):
    id: int
    hashed_password: str
    created_at: datetime

class Token(BaseModel):
    access_token: str
    refresh_token: str
    token_type: str = "bearer"

class TokenData(BaseModel):
    username: Optional[str] = None

class LoginRequest(BaseModel):
    username: str
    password: str

class RefreshTokenRequest(BaseModel):
    refresh_token: str

3️⃣ 간단한 인메모리 데이터베이스

database.py

from typing import Dict, Optional
from auth.models import UserInDB
from auth.utils import AuthUtils
from datetime import datetime

class FakeDatabase:
    def __init__(self):
        self.users: Dict[str, UserInDB] = {}
        self.user_id_counter = 1
    
    def get_user_by_username(self, username: str) -> Optional[UserInDB]:
        return self.users.get(username)
    
    def get_user_by_email(self, email: str) -> Optional[UserInDB]:
        for user in self.users.values():
            if user.email == email:
                return user
        return None
    
    def create_user(self, user_data: dict) -> UserInDB:
        user = UserInDB(
            id=self.user_id_counter,
            email=user_data["email"],
            username=user_data["username"],
            full_name=user_data.get("full_name"),
            hashed_password=AuthUtils.get_password_hash(user_data["password"]),
            is_active=True,
            created_at=datetime.utcnow()
        )
        self.users[user.username] = user
        self.user_id_counter += 1
        return user

# 전역 데이터베이스 인스턴스
db = FakeDatabase()

4️⃣ 인증 라우터 구현

auth/router.py

from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from typing import Optional
from .models import UserCreate, UserResponse, Token, LoginRequest, RefreshTokenRequest
from .utils import AuthUtils
from database import db

router = APIRouter(prefix="/auth", tags=["Authentication"])
security = HTTPBearer()

def authenticate_user(username: str, password: str):
    """사용자 인증"""
    user = db.get_user_by_username(username)
    if not user:
        return False
    if not AuthUtils.verify_password(password, user.hashed_password):
        return False
    return user

def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
    """현재 로그인된 사용자 정보 반환"""
    token = credentials.credentials
    payload = AuthUtils.verify_token(token, "access")
    username: str = payload.get("sub")
    if username is None:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Could not validate credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    
    user = db.get_user_by_username(username)
    if user is None:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="User not found"
        )
    return user

@router.post("/register", response_model=UserResponse)
async def register(user: UserCreate):
    """회원가입"""
    # 중복 검사
    if db.get_user_by_username(user.username):
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Username already registered"
        )
    
    if db.get_user_by_email(user.email):
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Email already registered"
        )
    
    # 사용자 생성
    db_user = db.create_user(user.dict())
    return UserResp

5️⃣ 메인 애플리케이션

main.py

from fastapi import FastAPI, Depends, HTTPException
from auth.router import router as auth_router, get_current_user
from auth.models import UserResponse

app = FastAPI(
    title="FastAPI JWT Authentication",
    description="JWT 인증 시스템 구현 예제",
    version="1.0.0"
)

# 인증 라우터 등록
app.include_router(auth_router)

@app.get("/")
async def root():
    return {"message": "FastAPI JWT Authentication System"}

@app.get("/protected")
async def protected_route(current_user: UserResponse = Depends(get_current_user)):
    """보호된 라우트 예제"""
    return {
        "message": f"Hello {current_user.username}!",
        "user_info": {
            "id": current_user.id,
            "email": current_user.email,
            "full_name": current_user.full_name
        }
    }

@app.get("/admin-only")
async def admin_only_route(current_user: UserResponse = Depends(get_current_user)):
    """관리자 전용 라우트 예제"""
    # 실제로는 사용자 역할 확인 로직 추가
    if current_user.username != "admin":
        raise HTTPException(status_code=403, detail="Admin access required")
    
    return {"message": "Admin area access granted"}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

🚀 실행 및 테스트

1. 서버 실행

uvicorn main:app --reload

2. API 테스트 (curl)

# 회원가입
curl -X POST "<http://localhost:8000/auth/register>" \\
     -H "Content-Type: application/json" \\
     -d '{
       "email": "test@example.com",
       "username": "testuser",
       "password": "testpass123",
       "full_name": "Test User"
     }'

# 로그인
curl -X POST "<http://localhost:8000/auth/login>" \\
     -H "Content-Type: application/json" \\
     -d '{
       "username": "testuser",
       "password": "testpass123"
     }'

# 보호된 라우트 접근 (토큰 필요)
curl -X GET "<http://localhost:8000/protected>" \\
     -H "Authorization: Bearer YOUR_ACCESS_TOKEN_HERE"

3. Python requests로 테스트

import requests

base_url = "<http://localhost:8000>"

# 회원가입
response = requests.post(f"{base_url}/auth/register", json={
    "email": "test@example.com",
    "username": "testuser",
    "password": "testpass123",
    "full_name": "Test User"
})
print("Register:", response.json())

# 로그인
response = requests.post(f"{base_url}/auth/login", json={
    "username": "testuser",
    "password": "testpass123"
})
tokens = response.json()
print("Login:", tokens)

# 보호된 라우트 접근
headers = {"Authorization": f"Bearer {tokens['access_token']}"}
response = requests.get(f"{base_url}/protected", headers=headers)
print("Protected route:", response.json())

# 토큰 갱신
response = requests.post(f"{base_url}/auth/refresh", json={
    "refresh_token": tokens['refresh_token']
})
print("Refresh:", response.json())

🔒 보안 고려사항

1. 환경 변수 사용

import os
from dotenv import load_dotenv

load_dotenv()

SECRET_KEY = os.getenv("SECRET_KEY", "fallback-secret-key")
ALGORITHM = os.getenv("ALGORITHM", "HS256")

2. 토큰 블랙리스트

# 로그아웃된 토큰 관리
blacklisted_tokens = set()

def is_token_blacklisted(token: str) -> bool:
    return token in blacklisted_tokens

@router.post("/logout")
async def logout(credentials: HTTPAuthorizationCredentials = Depends(security)):
    token = credentials.credentials
    blacklisted_tokens.add(token)
    return {"message": "Successfully logged out"}

3. 비율 제한 (Rate Limiting)

from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@router.post("/login")
@limiter.limit("5/minute")
async def login(request: Request, login_data: LoginRequest):
    # 로그인 로직...
    pass

📝 실무 적용 팁

  1. 데이터베이스 연동: SQLAlchemy나 Tortoise ORM 사용
  2. Redis 활용: 토큰 저장 및 세션 관리
  3. 로깅: 인증 실패, 토큰 만료 등 보안 이벤트 기록
  4. HTTPS 사용: 프로덕션에서는 반드시 HTTPS 적용
  5. 토큰 만료 시간: 보안과 사용성의 균형 고려

🎉 마무리

이제 실무에서 바로 사용할 수 있는 완전한 JWT 인증 시스템이 완성되었습니다! 코드를 복사해서 프로젝트에 적용하고, 필요에 따라 데이터베이스와 추가 보안 기능을 확장해보세요.

다음 포스트에서는 이 인증 시스템을 PostgreSQL과 연동하는 방법을 다뤄보겠습니다.

728x90
반응형

댓글