목차
오늘 한 것: 사이드 프로젝트에서 LLM 호출 체인을 백그라운드로 돌려야 했는데, Temporal이나 Airflow를 붙이기엔 부담스러워서 Postgres만으로 워크플로우 엔진 비슷한 걸 짜봤다. 결론부터 적자면 SELECT ... FOR UPDATE SKIP LOCKED 한 줄이 절반 이상을 해결해준다.
실제로, 이 글에서 다루는 건 별도 큐(Redis, RabbitMQ, SQS) 없이 Postgres 테이블 하나로 작업 큐 + 워크플로우 상태머신을 굴리는 패턴이다. 적합한 상황은 분당 수백~수천 건 수준의 워크플로우, 단일 DB로 충분한 규모, "처음부터 Temporal 깔기엔 오버킬"인 단계. 분당 수만 건 이상이거나 분산 트랜잭션이 본격적으로 필요하면 이 방식으론 안 된다.
Postgres 16, Python 3.12, FastAPI 0.110, psycopg 3.1 기준이다. 코드는 다 직접 돌려본 거고, 벤치마크 숫자는 M1 Mac 로컬 기준이라 절대값보다 흐름을 봐주면 된다.
왜 큐 없이 DB로 가능한가
특히, 내구성 워크플로우의 핵심 요건은 세 가지다. (1) 작업이 유실되지 않을 것, (2) 같은 작업이 중복 실행되어도 결과가 같을 것(멱등성), (3) 프로세스가 죽어도 어디서부터 재개할지 알 것. 큐 시스템은 보통 (1)을 잘하고 (2)(3)은 애플리케이션 책임이다.
Postgres는 ACID 트랜잭션이 있으니 (1)은 INSERT 한 번이면 끝난다. 문제는 여러 워커가 동시에 같은 job을 집어가는 걸 막는 부분인데, 9.5에 추가된 SKIP LOCKED 절이 이걸 깔끔하게 해결한다 (공식 문서, Postgres 16.2 기준).
-- jobs 테이블
CREATE TABLE jobs (
id BIGSERIAL PRIMARY KEY,
workflow_id UUID NOT NULL,
step_name TEXT NOT NULL,
payload JSONB NOT NULL,
status TEXT NOT NULL DEFAULT 'pending', -- pending, running, done, failed
attempts INT NOT NULL DEFAULT 0,
max_attempts INT NOT NULL DEFAULT 3,
run_after TIMESTAMPTZ NOT NULL DEFAULT now(),
locked_at TIMESTAMPTZ,
locked_by TEXT,
last_error TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
-- 워커가 잡을 집어가는 쿼리
CREATE INDEX idx_jobs_pending ON jobs (run_after)
WHERE status = 'pending';
이처럼, 워커는 이 한 줄로 다음 작업을 잡는다.
UPDATE jobs
SET status = 'running',
attempts = attempts + 1,
locked_at = now(),
locked_by = $1
WHERE id = (
SELECT id FROM jobs
WHERE status = 'pending'
AND run_after <= now()
ORDER BY run_after
FOR UPDATE SKIP LOCKED
LIMIT 1
)
RETURNING *;
SKIP LOCKED가 없으면 워커가 늘어날수록 락 대기로 처참해진다. 있으면 각 워커가 다른 행을 집어가니까 경합이 사라진다. 로컬에서 워커 8개로 돌려봤을 때 초당 처리량이 거의 선형으로 늘었다.
멱등성과 재시도 처리
실제로, 여기서 한 시간쯤 헤맸다. 처음엔 작업 핸들러가 끝나면 그냥 status='done'으로 바꾸면 되겠지 싶었는데, 외부 API 호출(예: OpenAI 호출)이 성공한 뒤 DB 업데이트 직전에 워커가 죽으면 같은 호출이 또 나간다. 돈이 두 번 빠진다는 뜻이다.
멱등성 키를 작업에 박기
해결은 단순하다. 외부 호출에 멱등성 키를 같이 보낸다. OpenAI는 Idempotency-Key 헤더를 지원하고 (작성 시점 2026-05 기준), Anthropic Messages API도 마찬가지다. 키는 job id 그대로 쓰면 된다.
import anthropic
from psycopg import Connection
def run_step(conn: Connection, job: dict):
client = anthropic.Anthropic()
# job.id를 멱등성 키로 — 같은 job이 두 번 실행되어도
# Anthropic 쪽에서 캐시된 응답을 돌려준다 (24시간)
response = client.messages.create(
model="claude-sonnet-4-5",
max_tokens=1024,
messages=[{"role": "user", "content": job["payload"]["prompt"]}],
extra_headers={"Idempotency-Key": f"job-{job['id']}-attempt"},
)
# 결과는 별도 테이블에 UPSERT — job_id를 unique로
conn.execute("""
INSERT INTO job_results (job_id, result, completed_at)
VALUES (%s, %s, now())
ON CONFLICT (job_id) DO UPDATE SET
result = EXCLUDED.result,
completed_at = EXCLUDED.completed_at
""", (job["id"], response.content[0].text))
물론, 핵심은 외부 호출의 결과를 저장하는 트랜잭션과 job 상태를 done으로 바꾸는 트랜잭션을 같은 커넥션에서 하나로 묶는 것. 두 INSERT/UPDATE가 같은 BEGIN-COMMIT 안에 있으면 워커가 어디서 죽어도 일관성이 깨지지 않는다.
지수 백오프와 데드레터
재시도는 run_after를 미래로 밀어놓는 방식으로 처리한다. 따로 스케줄러가 필요없다는 게 큰 장점이다.
def fail_with_backoff(conn: Connection, job: dict, err: str):
if job["attempts"] >= job["max_attempts"]:
conn.execute("""
UPDATE jobs SET status='failed', last_error=%s,
updated_at=now()
WHERE id=%s
""", (err, job["id"]))
return
# 1차 실패 → 2초, 2차 → 4초, 3차 → 8초 ...
delay_seconds = 2 ** job["attempts"]
conn.execute("""
UPDATE jobs
SET status='pending',
run_after = now() + (%s || ' seconds')::interval,
last_error=%s,
locked_at=NULL, locked_by=NULL
WHERE id=%s
""", (delay_seconds, err, job["id"]))
그런데, 데드레터 큐를 따로 만들지 않고 status='failed' 행을 그대로 둔다. 모니터링 쿼리 한 방으로 끝난다.
SELECT step_name, count(*), max(updated_at)
FROM jobs WHERE status='failed'
GROUP BY step_name;
워커가 죽었을 때 좀비 잡 회수
그러나, 여기가 제일 거슬리던 부분이다. 워커가 running 상태로 만들어놓고 OOM으로 죽으면 그 job은 영영 running에 갇혀있다. 큐 시스템은 보통 가시성 타임아웃(visibility timeout)으로 처리하는데, Postgres에선 직접 짜야 한다.
방법은 두 가지를 시도했다.
방법 1: locked_at 타임아웃 기반 청소 (실패)
처음엔 별도 청소 job을 만들어서 "locked_at이 10분 이전인 running job을 pending으로 되돌리기"를 돌렸다. 동작은 한다. 다만 워커가 살아있는데 단순히 작업이 오래 걸리는 경우(LLM 호출이 8분쯤 걸린 적 있다)도 같이 회수해버려서 중복 실행이 생긴다.
물론, 방법 2: advisory lock (이걸로 갔다)
특히, Postgres advisory lock은 트랜잭션이나 세션 단위로 걸리고, 세션이 끊기면 자동 해제된다. 워커가 죽으면 커넥션이 끊기니까 락이 풀린다. 이걸 활용한다.
import os
from psycopg import Connection
WORKER_ID = f"worker-{os.getpid()}"
def claim_job(conn: Connection):
# 세션 advisory lock — 커넥션 끊기면 자동 해제
with conn.cursor() as cur:
cur.execute("""
UPDATE jobs
SET status='running', attempts=attempts+1,
locked_at=now(), locked_by=%s
WHERE id = (
SELECT id FROM jobs
WHERE status='pending' AND run_after <= now()
AND pg_try_advisory_xact_lock(hashtext('job:' || id::text))
ORDER BY run_after
FOR UPDATE SKIP LOCKED
LIMIT 1
)
RETURNING *
""", (WORKER_ID,))
return cur.fetchone()
pg_try_advisory_xact_lock은 트랜잭션 끝나면 풀린다. 트랜잭션 안에서 잡을 집어가고, 잡고 나서는 별도 세션 락(pg_advisory_lock)으로 옮기면 워커가 죽을 때만 풀린다. 청소 워커는 "session lock이 없는 running job"만 회수한다.
-- 좀비 회수: running인데 advisory lock이 안 걸린 행
UPDATE jobs SET status='pending', locked_at=NULL, locked_by=NULL
WHERE status='running'
AND NOT EXISTS (
SELECT 1 FROM pg_locks
WHERE locktype='advisory'
AND objid = hashtext('job:' || jobs.id::text)::bigint
)
AND locked_at < now() - interval '30 seconds';
그래서, 이게 의외로 잘 굴러간다. 워커를 kill -9로 강제 종료해봐도 30초 안에 다른 워커가 잡아간다.
워크플로우 = job들의 DAG
여기까지가 단일 작업 처리고, 워크플로우는 결국 job들의 의존성 그래프다. 별도 엔진 만들지 말고 한 컬럼만 추가하면 된다.
ALTER TABLE jobs ADD COLUMN depends_on BIGINT[] DEFAULT '{}';
-- pending 조건에 의존성 체크 추가
-- "내가 의존하는 job들이 모두 done이어야 나도 실행 가능"
집어가는 쿼리에 한 줄 추가한다.
SELECT id FROM jobs j
WHERE status='pending' AND run_after <= now()
AND NOT EXISTS (
SELECT 1 FROM unnest(j.depends_on) AS dep_id
LEFT JOIN jobs dep ON dep.id = dep_id
WHERE dep.status IS DISTINCT FROM 'done'
)
ORDER BY run_after
FOR UPDATE SKIP LOCKED
LIMIT 1;
LLM 파이프라인(요약 → 번역 → 검증)을 이 패턴으로 짰는데 노드 100개짜리 그래프도 잘 돈다. 다만 노드가 수천 개 넘어가면 의존성 체크 쿼리가 무거워질 거다(아직 그 규모는 안 가봤다).
큐 시스템과 비교
같은 워크플로우를 SQS+Lambda, Temporal, 그리고 이 Postgres 방식으로 짜본 적이 있어서 체감 차이를 표로 정리한다. 절대 성능이 아니라 도입 비용 관점이다.
| 항목 | SQS + Lambda | Temporal | Postgres |
|---|---|---|---|
| 초기 셋업 시간 | 반나절 | 1~2일 | 30분 |
| 인프라 추가 | SQS, Lambda, IAM | Temporal 서버 + DB | 없음 (기존 DB) |
| 워크플로우 상태 조회 | CloudWatch 뒤짐 | 내장 UI | SQL 한 줄 |
| 처리량 한계 | 사실상 무제한 | 매우 높음 | DB 성능에 종속 |
| 디버깅 난이도 | 로그 추적 어려움 | 좋음 | SELECT * 한 방 |
| 멱등성/재시도 | 직접 구현 | 내장 | 직접 구현 |
즉, Postgres 방식의 진짜 장점은 마지막 행. 문제가 생기면 SELECT * FROM jobs WHERE status='failed' 한 줄로 끝난다. 새 인프라 하나 더 보지 않아도 된다.
메모 (오늘 새로 알게 된 것)
SKIP LOCKED는 9.5(2016년)부터 있었다. 이걸 진작 알았으면 예전 프로젝트에서 Redis 안 깔았을 거다.pg_try_advisory_xact_lock과 세션 락은 잠금 공간이 분리되어 있다. 같은 키여도 충돌 안 한다. 처음에 이거 몰라서 헤맸다.LISTEN/NOTIFY로 워커를 깨우면 폴링 간격을 1초→100ms로 줄여도 CPU 안 튄다. 물론 NOTIFY는 트랜잭션 커밋 시점에 발송되니까 BEGIN 안에서 NOTIFY 보내고 ROLLBACK하면 안 나간다.jobs테이블이 커지면 done/failed 행을 파티션으로 분리하거나 주기적으로 아카이브 테이블로 옮겨야 한다. 1천만 행 넘어가면 인덱스가 부풀어서 claim 쿼리가 느려지더라.- Anthropic Idempotency-Key는 공식 문서상 24시간 유지(2026-05 기준). 그보다 긴 재시도는 결과를 자체 캐싱해야 한다.
그러나, 오늘 액션 3개로 줄이자면 이렇다. 첫째, 다음에 큐 깔기 전에 SELECT ... FOR UPDATE SKIP LOCKED로 먼저 짜본다. 둘째, 외부 API 호출은 무조건 Idempotency-Key를 박는다. 셋째, 좀비 회수는 timeout이 아니라 advisory lock으로 판정한다.
관련 글
- 캘리포니아 게임 EOL 패치 의무화 — SaaS 운영자가 지금 점검할 실무 영향 – 캘리포니아 게임 종료 패치 의무화는 SaaS 전반에 영향을 준다. 환불 처리와 EOL 패치 설계를 실무 관점에서 본다.
- AI 에이전트 프레임워크 비교 — LangGraph, AutoGen, CrewAI 실무 선택 기준 (2026) – AI 에이전트 프레임워크 비교가 필요한 시점이다. LangGraph, AutoGen, CrewAI의 설계 철학 차이를 분석하고, 러닝커브부…
- SQL Injection 방지 실전 가이드: Parameterized Query와 ORM 패턴 – 코드 리뷰에서 f-string으로 조립된 SQL 쿼리를 본 적이 있다. SQL Injection 방지 실전을 parameterized qu…