Python/Backend

FastAPI 비동기 세션 처리 AsyncSession 활용 예제(sqlalchemy)

jimmy_AI 2024. 6. 1. 16:11
반응형

지난 번의 글에서는 FastAPI로 sqlalchemy 활용 CRUD 예제 코드를 다룬 적이 있습니다.

 

FastAPI에서 sqlalchemy 활용 SQL DB CRUD 구현 예제

안녕하세요. 아래 링크의 지난 글에서는 NoSQL인 MongoDB를 기준으로fastapi의 CRUD 메소드 구현 방법을 알아보았습니다. FastAPI에서 MongoDB 데이터 연동 CRUD 예제안녕하세요. 아래 링크의 지난 글에서는

jimmy-ai.tistory.com

 

이번 글에서는 해당 포스트의 코드를 비동기로 세션을 처리하는

AsyncSession를 활용한 코드로 변경해보도록 하겠습니다.

 

 

모듈 설치

AsyncSession 처리를 위하여 몇 가지 모듈 설치가 더 필요합니다.

필요한 모듈들을 설치하는 예제 명령어는 다음과 같습니다.

 

pip install sqlalchemy[asyncio] asyncpg fastapi

 

 

데이터베이스 세션 불러오기 코드

비동기 처리를 위한 DB 세션을 가져오는 코드는 다음과 같이 변경됩니다.

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

DATABASE_URL = "사용할 DB의 url"

engine = create_async_engine(DATABASE_URL, echo=True)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine, class_=AsyncSession)

async def get_db():
    async with SessionLocal() as session:
        yield session

 

이전 글의 코드에서 create_engine 대신 create_async_engine으로 메소드가 변경되었으며,

sessionmaker 내에도 Asyncsession으로 class를 지정해준 모습을 살펴볼 수 있습니다.

 

pydantic 데이터 모델 및 sqlalchemy 모델의 경우 이전 글의 코드를 그대로 사용할 수 있으니

여기서는 따로 다루지는 않도록 하겠습니다.

 

 

get 메소드 예시

get 메소드로 id 기준 데이터를 조회하는 코드는 다음과 같이 변경됩니다.

from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.future import select

app = FastAPI()

@app.get("/users/{user_id}", response_model=User)
async def read_user(user_id: int, db: AsyncSession = Depends(get_db)):
    result = await db.execute(select(User).filter(User.id == user_id))
    db_user = result.scalar_one_or_none()
    if db_user is None: # 해당하는 id가 없는 경우 404 에러 반환
        raise HTTPException(status_code=404, detail="User not found")
    return db_user

 

async def 및 await 구문이 사용되었으며, 일부 메소드의 사용 방법이

약간 변경된 점을 확인해볼 수 있었습니다.

 

 

post 메소드 예시

post 메소드로 새로운 데이터를 추가하는 코드는 다음과 같이 변경됩니다.

@app.post("/users/", response_model=User)
async def create_user(user: UserCreate, db: AsyncSession = Depends(get_db)):
    db_user = User(username=user.username, email=user.email)
    db.add(db_user)
    await db.commit()
    await db.refresh(db_user)
    return db_user

 

commit 및 refresh 과정에서 await 키워드가 사용됩니다.

 

 

patch 메소드 예시

마찬가지로 저장된 데이터를 수정하는 patch 메소드의 변경된 코드도 살펴보겠습니다.

@app.patch("/users/{user_id}", response_model=User)
async def update_user(user_id: int, user: UserCreate, db: AsyncSession = Depends(get_db)):
    result = await db.execute(select(User).filter(User.id == user_id))
    db_user = result.scalar_one_or_none()
    if db_user is None: # 해당하는 id가 없는 경우 404 에러 반환
        raise HTTPException(status_code=404, detail="User not found")
    
    db_user.username = user.username
    db_user.email = user.email
    await db.commit()
    await db.refresh(db_user)
    return db_user

 

get과 post의 변경된 구문 부분이 모두 합쳐져서 반영된 모습을 살펴볼 수 있습니다.

 

 

delete 메소드 예시

마지막으로 데이터를 지우는 delete 메소드의 변경된 코드 예제입니다.

@app.delete("/users/{user_id}", response_model=User)
async def delete_user(user_id: int, db: AsyncSession = Depends(get_db)):
    result = await db.execute(select(User).filter(User.id == user_id))
    db_user = result.scalar_one_or_none()
    if db_user is None: # 해당하는 id가 없는 경우 404 에러 반환
        raise HTTPException(status_code=404, detail="User not found")
    
    await db.delete(db_user)
    await db.commit()
    return db_user

 

위의 메소드들과 마찬가지로 비동기로 세션을 처리하는 코드가 반영되었습니다.

이 경우에는 db.delete 메소드 사용 시에도 await 키워드가 포함됩니다.