목차
- 동기 코드의 한계 — requests로 8초 걸리던 구간
- asyncio의 기본 구조 — event loop가 핵심이다
- aiohttp — 비동기 HTTP 클라이언트의 표준
- asyncio.gather — 동시 실행의 핵심
- Semaphore로 동시성 제한하기
- 흔한 실수 3가지
- 실전 패턴 — 재시도와 타임아웃 조합
- async generator — 대량 데이터 스트리밍
- asyncio 디버깅 팁
RuntimeError: This event loop is already running
FastAPI 엔드포인트 안에서 asyncio.run()을 호출한 순간 터진 에러다. 프론트엔드에서 백엔드로 넘어온 지 1년쯤 됐을 때, 외부 API 5개를 동시에 호출하는 집계 서비스를 만들고 있었다. JavaScript의 Promise.all()처럼 Python asyncio도 비슷하게 돌아갈 거라고 생각했는데, 완전히 다른 세계였다. 이 에러 하나 잡는 데 반나절을 날렸고, 그 과정에서 Python asyncio의 구조를 제대로 이해하게 됐다.
동기 코드의 한계 — requests로 8초 걸리던 구간
문제의 시작은 단순했다. 외부 서비스 5곳에서 데이터를 가져와 합산하는 엔드포인트 하나. requests 라이브러리로 순차 호출하고 있었다.
import requests
import time
def fetch_all_data():
urls = [
"https://api-a.example.com/data",
"https://api-b.example.com/data",
"https://api-c.example.com/data",
"https://api-d.example.com/data",
"https://api-e.example.com/data",
]
start = time.time()
results = []
for url in urls:
resp = requests.get(url, timeout=10)
results.append(resp.json())
elapsed = time.time() - start
print(f"총 소요: {elapsed:.2f}초") # 매번 7~8초
return results
각 API 응답이 평균 1.5초. 5개를 순서대로 호출하니까 7~8초가 나왔다. 클라이언트 쪽 타임아웃이 30초였는데, API 하나가 느려지는 날이면 20초를 넘기기도 했다. 프론트엔드 시절에는 fetch를 여러 개 던지고 Promise.all()로 묶으면 그만이었는데, Python에서는 그게 안 됐다. requests는 동기 라이브러리라서 하나가 끝나야 다음 요청이 나간다.
asyncio의 기본 구조 — event loop가 핵심이다
Python asyncio를 처음 접하면 async/await 키워드부터 보게 되는데, 진짜 이해해야 할 건 event loop다.
async/await는 껍데기고, event loop가 본체
JavaScript도 내부적으로 event loop가 돌지만 개발자가 직접 신경 쓸 일은 거의 없다. 브라우저나 Node.js 런타임이 알아서 관리해주니까. Python은 다르다. event loop를 누가 생성하고 누가 실행하는지를 명확히 알아야 한다.
import asyncio
async def say_hello():
print("hello")
await asyncio.sleep(1) # 1초 동안 다른 코루틴에 제어권 양보
print("world")
# 스크립트에서 실행할 때
asyncio.run(say_hello()) # event loop 생성 → 코루틴 실행 → loop 종료
asyncio.run()은 새 event loop를 만들고, 코루틴을 실행하고, 끝나면 loop를 닫는다. 문제는 이미 event loop가 돌고 있는 환경에서 asyncio.run()을 또 호출하면 충돌한다는 거다. FastAPI(정확히는 uvicorn)가 이미 event loop를 돌리고 있으니까, 그 안에서 asyncio.run()을 쓰면 맨 처음 봤던 RuntimeError가 터진다.
FastAPI에서의 올바른 사용법
FastAPI 엔드포인트를 async def로 선언하면 이미 event loop 위에서 돌고 있다. 추가로 loop를 만들 필요가 없다.
from fastapi import FastAPI
app = FastAPI()
@app.get("/data")
async def get_data():
# 여기서 asyncio.run() 쓰면 에러
# 그냥 await를 쓰면 된다
result = await some_async_function()
return result
이 구조를 이해하는 데 시간이 걸렸다. JavaScript에서는 async function 안에서 또 다른 비동기 함수를 부르든 뭘 하든 신경 쓸 게 없었는데, Python에서는 "지금 event loop가 있는가, 없는가"를 항상 의식해야 한다.
aiohttp — 비동기 HTTP 클라이언트의 표준
requests를 비동기로 바꾸려면 aiohttp를 써야 한다. Python 3.12 기준으로 aiohttp 3.9.x가 안정 버전이다 (출처: aiohttp 공식 문서, 2026-04-08 기준).
ClientSession의 생명주기
처음에 이렇게 썼다가 문제가 생겼다.
import aiohttp
async def fetch_one(url: str):
async with aiohttp.ClientSession() as session: # 매 호출마다 세션 생성
async with session.get(url) as resp:
return await resp.json()
호출할 때마다 ClientSession을 새로 만들면 TCP 커넥션 풀을 재활용하지 못한다. 요청이 많아지면 ResourceWarning: Unclosed client session 경고가 쏟아지기도 했다. 공식 문서에서도 "Don’t create a session per request"라고 명시하고 있다 (출처: aiohttp Client Reference).
올바른 방식은 앱 수준에서 세션을 하나 만들고 공유하는 것이다.
from contextlib import asynccontextmanager
from fastapi import FastAPI
import aiohttp
@asynccontextmanager
async def lifespan(app: FastAPI):
# 앱 시작 시 세션 생성
app.state.http_session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=10) # 전체 타임아웃 10초
)
yield
# 앱 종료 시 세션 정리
await app.state.http_session.close()
app = FastAPI(lifespan=lifespan)
FastAPI의 lifespan 컨텍스트 매니저를 쓰면 앱이 뜰 때 세션을 만들고, 내려갈 때 깔끔하게 닫을 수 있다. 이 패턴으로 바꾸고 나서 경고가 사라졌다.
asyncio.gather — 동시 실행의 핵심
여러 코루틴을 동시에 실행하는 방법이 asyncio.gather다. JavaScript의 Promise.all()과 비슷한 역할이지만 세부 동작이 좀 다르다.
import asyncio
import aiohttp
async def fetch(session: aiohttp.ClientSession, url: str):
async with session.get(url) as resp:
return await resp.json()
async def fetch_all(session: aiohttp.ClientSession):
urls = [
"https://api-a.example.com/data",
"https://api-b.example.com/data",
"https://api-c.example.com/data",
"https://api-d.example.com/data",
"https://api-e.example.com/data",
]
# 5개를 동시에 요청
results = await asyncio.gather(*[fetch(session, url) for url in urls])
return results
5개 API를 동시에 호출하니 전체 소요 시간이 가장 느린 API 하나의 응답 시간과 거의 같아졌다. 8초가 1.6초로 줄었다. 체감상 5배 정도 빨라진 셈이다.
gather vs TaskGroup
Python 3.11에서 asyncio.TaskGroup이 추가됐다. gather와 뭐가 다른지 처음엔 헷갈렸다.
| 항목 | asyncio.gather |
asyncio.TaskGroup (3.11+) |
|---|---|---|
| 에러 처리 | return_exceptions=True로 에러 수집 가능 |
하나 실패하면 나머지 자동 취소 |
| 취소 전파 | 수동으로 처리해야 함 | 구조적으로 취소 전파 |
| 반환값 | 리스트로 순서 보장 | 각 task에서 .result() 호출 |
| 최소 Python 버전 | 3.4+ | 3.11+ |
프로젝트가 Python 3.11 이상이면 TaskGroup이 에러 핸들링 면에서 낫다. "하나 실패하면 전부 취소"가 기본 동작이라서 부분 실패 상태를 신경 쓸 필요가 줄어든다. 반면에 "실패한 건 무시하고 성공한 것만 쓸래"라는 경우에는 gather에 return_exceptions=True를 주는 게 편하다.
# gather — 실패해도 나머지 결과는 받기
results = await asyncio.gather(
fetch(session, url_a),
fetch(session, url_b),
fetch(session, url_c),
return_exceptions=True # 에러도 결과 리스트에 포함
)
for r in results:
if isinstance(r, Exception):
print(f"실패: {r}")
else:
print(f"성공: {r}")
현재 내 프로젝트에서는 gather를 더 많이 쓴다. 외부 API 5개 중 2개가 실패해도 나머지 3개 결과는 보여줘야 하는 요구사항이 있어서다.
Semaphore로 동시성 제한하기
gather가 좋다고 100개 요청을 한꺼번에 던지면 문제가 생긴다. 상대 서버에서 rate limit을 걸거나, 자기 서버의 파일 디스크립터가 부족해지거나. 한번은 50개 URL을 gather로 동시에 쏘았다가 aiohttp.ClientConnectorError: Cannot connect to host가 쏟아진 적이 있다.
asyncio.Semaphore로 동시 실행 수를 제한하면 된다.
import asyncio
import aiohttp
async def fetch_with_limit(
sem: asyncio.Semaphore,
session: aiohttp.ClientSession,
url: str
):
async with sem: # 세마포어 획득 — 동시 실행 수 제한
async with session.get(url) as resp:
return await resp.json()
async def fetch_many(session: aiohttp.ClientSession, urls: list[str]):
sem = asyncio.Semaphore(10) # 동시에 최대 10개만
tasks = [fetch_with_limit(sem, session, url) for url in urls]
return await asyncio.gather(*tasks, return_exceptions=True)
Semaphore 값을 몇으로 잡느냐는 상황마다 다르다. 외부 API의 rate limit이 초당 20회면 10~15 정도로 잡고, 내부 서비스끼리 호출하는 거라면 50까지도 올릴 수 있다. 이건 테스트해보면서 조절하는 수밖에 없다.
흔한 실수 3가지
비동기 코드를 처음 짤 때 거의 반드시 겪는 실수들이다. 프론트에서 넘어온 사람 입장에서 특히 헷갈리는 것들을 골랐다.
await를 빠뜨리면 코루틴 객체만 돌아다닌다
# 잘못된 코드
async def get_user():
return {"name": "kim"}
async def main():
user = get_user() # await 빼먹음
print(user) # <coroutine object get_user at 0x...>
print(user["name"]) # TypeError: 'coroutine' object is not subscriptable
JavaScript에서 await를 빼먹으면 Promise 객체가 오지만, .then()으로 이어붙일 수 있다. Python에서는 코루틴 객체가 와버리면 아무것도 못 한다. 그리고 RuntimeWarning: coroutine 'get_user' was never awaited라는 경고가 뜬다. 이 경고가 보이면 어딘가에서 await를 빠뜨린 거다.
동기 함수를 async 안에서 부르면 전체가 블로킹된다
이게 제일 위험하다. time.sleep()이나 동기 DB 드라이버를 async 함수 안에서 호출하면, event loop 자체가 멈춘다.
import time
async def bad_example():
# 이러면 event loop가 3초 동안 멈춤
time.sleep(3) # asyncio.sleep(3)을 써야 한다
async def also_bad():
# requests는 동기 라이브러리
import requests
resp = requests.get("https://api.example.com") # event loop 블로킹
동기 함수를 꼭 써야 하는 상황이라면 asyncio.to_thread()로 별도 스레드에서 실행할 수 있다 (Python 3.9+).
import asyncio
import requests
async def fetch_sync_safely(url: str):
# 동기 함수를 스레드풀에서 실행
resp = await asyncio.to_thread(requests.get, url, timeout=10)
return resp.json()
이 방법은 레거시 라이브러리를 당장 교체할 수 없을 때 쓸만하다. 근본적으로는 aiohttp 같은 비동기 라이브러리로 바꾸는 게 맞다.
exception 처리를 빼먹으면 조용히 사라진다
gather에서 예외가 발생했는데 return_exceptions=True를 안 주면, 첫 번째 예외에서 전체가 터진다. 주면 예외가 결과 리스트에 섞여 들어온다. 어느 쪽이든 명시적으로 처리해야 한다.
이건 간단하다. gather 결과를 받은 뒤 isinstance(r, Exception) 체크를 습관적으로 넣으면 된다.
실전 패턴 — 재시도와 타임아웃 조합
실무에서는 한 번 요청해서 실패하면 재시도해야 하는 경우가 많다. aiohttp의 타임아웃과 직접 구현한 재시도 로직을 조합한 패턴이다.
import asyncio
import aiohttp
import logging
logger = logging.getLogger(__name__)
async def fetch_with_retry(
session: aiohttp.ClientSession,
url: str,
max_retries: int = 3,
backoff_base: float = 0.5,
):
for attempt in range(max_retries):
try:
async with session.get(url) as resp:
resp.raise_for_status() # 4xx, 5xx면 예외 발생
return await resp.json()
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
wait_time = backoff_base * (2 ** attempt) # 0.5초, 1초, 2초
logger.warning(
f"요청 실패 (시도 {attempt + 1}/{max_retries}): {url} - {e}"
)
if attempt < max_retries - 1:
await asyncio.sleep(wait_time) # 지수 백오프
else:
raise # 마지막 시도까지 실패하면 예외 전파
지수 백오프(exponential backoff)를 넣은 이유는 상대 서버가 일시적으로 과부하일 때 바로 재시도하면 상황이 더 나빠지기 때문이다. tenacity 같은 라이브러리를 쓰면 더 깔끔하게 구현할 수 있는데, 의존성을 하나 더 추가하는 게 싫어서 직접 짰다. 이 정도 로직이면 라이브러리까지는 필요 없다고 본다.
async generator — 대량 데이터 스트리밍
API 호출 100건의 결과를 전부 메모리에 올리고 싶지 않을 때 async generator가 유용하다. 이 부분은 아직 프로덕션에서 많이 써보지 못해서 기본 패턴만 정리한다.
async def fetch_stream(
session: aiohttp.ClientSession,
urls: list[str],
sem: asyncio.Semaphore,
):
for url in urls:
async with sem:
async with session.get(url) as resp:
data = await resp.json()
yield data # 하나씩 내보냄
async for로 받아서 처리하면 메모리를 일정하게 유지할 수 있다. 대량 크롤링이나 배치 처리에서 쓸 수 있을 것 같은데, 실제로 적용해보지는 않았다.
asyncio 디버깅 팁
비동기 코드 디버깅은 동기보다 까다롭다. event loop 위에서 여러 코루틴이 번갈아 실행되니까 traceback을 봐도 흐름이 직관적이지 않다.
Python 3.12 기준으로 asyncio.run()에 debug=True를 넘기면 느린 코루틴 감지, 닫히지 않은 코루틴 경고 같은 추가 정보를 볼 수 있다 (출처: Python 공식 문서 — asyncio Developing with asyncio).
# 개발 환경에서만 사용
asyncio.run(main(), debug=True)
그리고 PYTHONASYNCIODEBUG=1 환경 변수를 설정하면 코루틴이 생성된 위치까지 traceback에 나온다. 프로덕션에서는 끄는 게 맞다. 성능에 영향이 있다.
결국 Python asyncio를 쓸 때 기억할 건 세 가지다. event loop는 하나만 돌린다. 동기 함수를 섞지 않는다. gather 결과는 반드시 예외 체크한다. 기존 동기 엔드포인트에서 외부 호출이 2개 이상이면, async def로 바꾸고 asyncio.gather부터 적용해보는 걸 권한다.
관련 글
- Python pytest 테스트 자동화 — unittest에서 전환하며 깨달은 것들 – 커버리지 80%를 달성했는데 버그는 왜 안 줄었나. unittest에서 pytest로 전환하면서 겪은 시행착오와 fixture·mock·커…
- FastAPI JWT 인증 구현 — 리프레시 토큰과 권한 분리까지 실무에서 겪은 것들 – FastAPI에서 JWT 인증을 직접 구현하면서 3시간을 날린 경험을 바탕으로, 토큰 설계부터 Depends 체이닝을 이용한 권한 분리까지…
- Python Docker 멀티스테이지 빌드로 이미지 크기 80% 줄이기 – FastAPI 프로젝트의 Docker 이미지가 1.2GB까지 불어났다. 멀티스테이지 빌드와 .dockerignore 설정으로 187MB까지…