Asyncio 사용법 설명
python asyncio를 이용한 프로그래밍 하는 법 알려줘. 파이썬은 많이 써봤지만 이건 한 번 도 안 써본 사람 기준으로 쉽게 설명해줘.
좋아요! “asyncio”는 단일 스레드에서 I/O를 동시에 처리 하게 해주는 파이썬 표준 라이브러리예요. 처음 쓰는 분 기준으로, 개념→예제→자주 쓰는 패턴 순서로 아주 천천히 갈게요.
1) 핵심 개념 3가지만 먼저
-
코루틴(coroutine) :
async def로 만든 함수. 내부에서await로 “잠깐 쉬었다가 다시”를 할 수 있어요. -
await : “이 작업 끝날 때까지 다른 일 좀 하다가 올게!”라는 뜻. 주로 네트워크/디스크 I/O처럼 기다리는 시간 이 있는 일에 사용.
-
이벤트 루프(event loop) : 코루틴들을 돌려주는 엔진. 보통
asyncio.run(main())으로 시작해요.
기억법: “
async def로 코루틴 만들고, 느린 곳에서await, 전체는asyncio.run()으로 시작!”
2) 첫 번째 예제: 순차 vs. 동시
아래는 네트워크 대기를 흉내 내려고 asyncio.sleep()을 사용해요.
# sequential.py (순차 실행)
import asyncio, time
async def work(name, sec):
await asyncio.sleep(sec)
print(f"{name} 완료")
async def main():
t0 = time.perf_counter()
await work("A", 2) # 2초
await work("B", 2) # 또 2초 (총 4초)
print(f"걸린 시간: {time.perf_counter() - t0:.2f}s")
asyncio.run(main())
# concurrent.py (동시 실행)
import asyncio, time
async def work(name, sec):
await asyncio.sleep(sec)
print(f"{name} 완료")
async def main():
t0 = time.perf_counter()
# 동시 실행 핵심: gather 또는 create_task
await asyncio.gather(
work("A", 2),
work("B", 2),
)
print(f"걸린 시간: {time.perf_counter() - t0:.2f}s")
asyncio.run(main())
-
순차: 약 4초
-
동시: 약 2초
→ I/O 대기 시간을 겹쳐서 줄이는 게 asyncio의 본질이에요.
3) 어떻게 동시로 돌리나? (gather / create_task)
asyncio.gather(...)
-
여러 코루틴을 한 번에 돌리고 모두 끝날 때까지 기다려요.
-
실패하면
gather가 예외를 올려줘서 한 번에 처리하기 편해요.
asyncio.create_task(coro)
-
코루틴을 백그라운드 태스크 로 등록하고 즉시 제어권을 반환 해요.
-
나중에
await task로 결과를 받거나, 필요하면task.cancel()로 취소할 수 있어요.
import asyncio
async def say(msg, delay):
await asyncio.sleep(delay)
print(msg)
return msg
async def main():
t1 = asyncio.create_task(say("hello", 1))
t2 = asyncio.create_task(say("world", 1))
# 다른 일 하다가...
res1 = await t1
res2 = await t2
print(res1, res2)
asyncio.run(main())
4) 타임아웃과 취소 (실무에서 진짜 자주 씀)
네트워크는 항상 느려질 수 있으니, 타임아웃 은 기본 장착!
import asyncio
async def slow():
try:
await asyncio.sleep(5)
return 42
except asyncio.CancelledError:
print("작업이 취소됨")
raise
async def main():
try:
# 1초 넘으면 TimeoutError
result = await asyncio.wait_for(slow(), timeout=1.0)
except asyncio.TimeoutError:
print("타임아웃!")
# 임의 취소도 가능
task = asyncio.create_task(slow())
await asyncio.sleep(1)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("직접 취소 완료")
asyncio.run(main())
5) 실제 네트워크 예제: 아주 간단한 에코 서버/클라이언트
(1) 서버: asyncio.start_server
# echo_server.py
import asyncio
async def handle(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
data = await reader.readline() # 클라이언트 한 줄 읽기
msg = data.decode().strip()
addr = writer.get_extra_info('peername')
print(f"[서버] {addr} -> {msg}")
writer.write(f"Echo: {msg}\n".encode()) # 다시 돌려주기
await writer.drain() # 송신 버퍼 비우기
writer.close()
await writer.wait_closed()
async def main():
server = await asyncio.start_server(handle, "127.0.0.1", 8888)
addrs = ", ".join(str(sock.getsockname()) for sock in server.sockets)
print(f"[서버] {addrs}에서 대기중")
async with server: # 안전한 종료를 위해 context 사용
await server.serve_forever()
asyncio.run(main())
(2) 클라이언트: asyncio.open_connection
# echo_client.py
import asyncio
async def send(msg: str):
reader, writer = await asyncio.open_connection("127.0.0.1", 8888)
writer.write((msg + "\n").encode())
await writer.drain()
data = await reader.readline()
print(f"[클라이언트] 응답: {data.decode().strip()}")
writer.close()
await writer.wait_closed()
async def main():
# 여러 메시지를 동시 전송
await asyncio.gather(*(send(f"hello {i}") for i in range(3)))
asyncio.run(main())
-
터미널 1에서
python echo_server.py실행 -
터미널 2에서
python echo_client.py실행
→ 서버와 클라이언트가 비동기 스트림 으로 주고받는 걸 볼 수 있어요.
6) 동기(블로킹) 코드와 섞을 때
time.sleep()처럼 루프를 막아버리는 코드는 await asyncio.to_thread(...)로 스레드에 밀어 넣기 :
import asyncio, time
def blocking_io():
time.sleep(2) # 절대 이벤트 루프에서 직접 호출 X
return "완료"
async def main():
result = await asyncio.to_thread(blocking_io) # 별도 스레드에서 실행
print(result)
asyncio.run(main())
CPU 바운드(예: 대규모 계산)는 멀티프로세싱(또는
ProcessPoolExecutor)을 고려하세요. asyncio는 I/O 동시성 이 주특기입니다.
7) (선택) 파이썬 3.11의 TaskGroup 로 구조화된 동시성
여러 태스크를 “그룹”으로 안전하게 관리하고, 하나가 실패하면 나머지를 알아서 정리해 줘요.
import asyncio
async def job(n):
await asyncio.sleep(1)
print(f"job {n}")
return n
async def main():
results = []
async with asyncio.TaskGroup() as tg:
for i in range(5):
tg.create_task(job(i))
# TaskGroup을 나오면 전부 끝난 상태
# (반환값을 모으려면 큐/리스트/콜백으로 저장하거나, 별도 구조 사용)
print("모든 작업 완료")
asyncio.run(main())
8) 자주 하는 실수 체크리스트
-
asyncio.run()은 프로세스 당 한 번의 진입점 으로 사용 (중첩 금지). -
반드시
await가능한 것만 await 하기 (코루틴/Task/Future). -
이벤트 루프 안에서
time.sleep()/블로킹 I/O 사용 금지 →await asyncio.sleep()/await asyncio.to_thread(...). -
네트워크/외부 API는 타임아웃 (
asyncio.wait_for) 걸기. -
오래 가는 백그라운드 태스크는 취소 처리(try/except
CancelledError)를 넣어 안정적으로 종료.
9) 학습 루트(권장 순서)
-
위의 순차 vs 동시 예제를 실행해 체감
-
gather / create_task 패턴 익히기
-
타임아웃/취소 넣어보기
-
에코 서버/클라이언트 로 스트림 I/O 맛보기
-
기존 코드의 블로킹 부분을 to_thread 로 분리해 보기
-
(3.11+) TaskGroup 으로 코드 구조 정리
10) 연습 문제 (직접 해보기)
-
URL 리스트(예: 5개)에 대해 “요청→1초 대기→결과 출력”을 가짜로 흉내 내는
fetch(url)코루틴을 만들고,- (a) 순차 실행, (b)
gather동시 실행의 시간을 비교해 보세요.
- (a) 순차 실행, (b)
-
위 (1)에 타임아웃 0.5초 를 걸고, 타임아웃 난 URL만 따로 모아 출력해 보세요.
-
time.sleep(1)을 실수로 넣었을 때 프로그램이 느려지는 걸 관찰하고,await asyncio.sleep(1)로 바꿔보세요.
필요하면 위 예제 중 원하는 걸 더 확장(예: HTTP 클라이언트는 aiohttp 사용)해서 보여드릴게요. 어떤 작업을 비동기로 바꾸고 싶은지 알려주시면, 당장 적용 가능한 구조 로 같이 리팩터링해 드릴게요.
create_task랑 그냥 gather 하는 것의 차이
좋은 질문! 한 줄 요약부터:
-
create_task: “코루틴을 지금 바로 백그라운드로 돌리게 하고, 핸들(Task) 을 돌려줘. 나중에 개별적으로await/취소/타임아웃/이름붙이기 등 세밀 제어 를 할 때 써.” -
gather: “여러 작업을 묶어서 동시에 시작 하고, 전부 끝날 때까지 기다린 뒤 결과 리스트 를 돌려줘. 한 번에 기다릴 때 써.”
아래를 단계별로 짚어볼게요.
1) 무엇을 반환하나?
-
asyncio.create_task(coro)→ Task 객체 를 반환.
이 Task는 곧바로 스케줄링되고 “백그라운드”에서 돌아요. -
await asyncio.gather(*aws)→ 결과 리스트 를 반환.
입력 순서 그대로[res1, res2, ...]가 와요(완료 순서 아님).
2) 언제 시작되나?
-
create_task는 호출 즉시 스케줄링 시작. (곧 실행됨) -
gather도 내부적으로 코루틴을 Task로 만들어 즉시 시작하지만,
우리는gather자체를await하기에 “모두 끝날 때까지 기다리는” 패턴이 자연스러워요.
3) 예외/취소 동작 차이
-
gather(..., return_exceptions=False)(기본):-
하나라도 실패 하면
gather가 예외를 올리고 , 나머지 작업은 자동 취소 해요. -
즉, “올-오어-낫싱” 느낌.
-
-
gather(..., return_exceptions=True):- 예외도 결과 리스트에 값처럼 담아줘요. 개별 후처리 가능.
-
create_task:-
각 Task는 독립적. 어떤 Task가 실패해도 다른 Task는 계속 돔.
-
다만 결국
await task를 해야 예외가 전파돼요. 안 받으면 로그에
“Task exception was never retrieved” 경고가 나올 수 있어요.
-
-
취소 :
-
gather에 걸린 걸 한 번에 취소 → 모든 자식 Task 자동 취소. -
create_task로 만든 건 각각task.cancel()호출해서 관리.
-
4) 언제 무엇을 쓰나? (의사결정 표)
| 상황 | 추천 |
|---|---|
| 동시에 여러 일을 시키고, 모두 끝난 뒤 결과 가 필요 | await asyncio.gather(a(), b(), c()) |
| 일을 지금 시작 해 놓고, 중간에 다른 일도 하다가 나중에 합류 | t1 = create_task(a()); …; res = await t1 |
| 각 작업을 개별적으로 취소/타임아웃/이름으로 추적 | create_task (필요시 나중에 gather(*tasks)) |
| 하나 실패 시 전체를 취소 하고 싶다 (기본 동작 선호) | gather |
| 실패해도 다른 작업은 계속 가게 하고 싶다 | create_task + 개별 await/예외처리, 또는 gather(..., return_exceptions=True) |
| 완료되는 순서대로 결과를 받고 싶다(스트리밍 처리) | asyncio.as_completed(...) + (대개 create_task와 함께) |
5) 코드로 보는 차이
A) gather: 한 번에 시작 → 전부 끝날 때까지 기다림
import asyncio
async def work(n):
await asyncio.sleep(1)
return n*n
async def main():
results = await asyncio.gather(work(1), work(2), work(3))
print(results) # [1, 4, 9] (입력 순서대로)
asyncio.run(main())
B) create_task: 지금 시작 → 중간에 다른 일 → 나중에 개별/묶음 합류
import asyncio
async def work(name, t):
await asyncio.sleep(t)
print(f"{name} done")
return name
async def main():
t1 = asyncio.create_task(work("A", 2))
t2 = asyncio.create_task(work("B", 3))
# 다른 일도 병행
await asyncio.sleep(1)
print("doing something else...")
# 개별로 합류 가능
r1 = await t1
# 남은 것들만 묶어서 기다리기도 가능
r2, = await asyncio.gather(t2)
print(r1, r2)
asyncio.run(main())
6) 실무 팁
-
둘을 섞어 쓰는 패턴 이 흔해요: 먼저
create_task로 “미리” 태워두고,
나중에await asyncio.gather(*tasks, return_exceptions=True)로 한 번에 회수. -
create_task로 만든 장수(長壽) 백그라운드 태스크(예: 소비자 루프, 하트비트)는
종료 시task.cancel()+try/except CancelledError로 정리 코드 를 꼭 넣기. -
결과가 필요 없더라도 Task 참조를 잡아두고 예외/취소를 처리하세요.
완전한 “fire-and-forget”은 권장되지 않아요(누수·미처리 예외 위험). -
(3.11+)
asyncio.TaskGroup은 “gather+ 구조화된 취소/정리” 느낌의 현대적 대안.
그룹 스코프를 벗어날 때 모두 정리 돼서 복잡한 수명 관리가 쉬워요.
7) 한 문장으로 다시 요약
-
gather= 결과를 한 번에 모을 때. -
create_task= 지금 태워서 나중에/개별로 관리할 때.
원하는 시나리오가 있으면(예: “N개의 API 호출을 순차·동시·시간제한·부분실패 허용으로 처리”) 바로 그 패턴으로 코드를 짜서 보여줄게요.
async def main(): t1 = asyncio.create_task(work("A", 2)) t2 = asyncio.create_task(work("B", 3)) # 다른 일도 병행 await asyncio.sleep(1) print("doing something else…") # 개별로 합류 가능 r1 = await t1 # 남은 것들만 묶어서 기다리기도 가능 r2, = await asyncio.gather(t2) print(r1, r2) asyncio.run(main()) async def main(): # 다른 일도 병행 await asyncio.sleep(1) print("doing something else…") # 개별로 합류 가능 r1 = await work("A", 2) # 남은 것들만 묶어서 기다리기도 가능 r2, = await asyncio.gather(work("B", 3)) print(r1, r2) asyncio.run(main()) 이 두 코드는 같은거 아니냐는 질문이야.
아니요, 같지 않습니다.
핵심 차이는 “언제가 작업을 시작하느냐(스케줄링 시점)”에 있어요.
아래처럼 타임라인으로 비교해 볼게요. (A=2초, B=3초)
1) 코드 1: create_task로 즉시 시작
t1 = asyncio.create_task(work("A", 2)) # A 바로 시작
t2 = asyncio.create_task(work("B", 3)) # B 바로 시작
await asyncio.sleep(1) # 그 사이 A,B는 진행 중
print("doing something else...")
r1 = await t1 # t=1에서 A는 1초 남음 → t=2에 완료
r2, = await asyncio.gather(t2) # 그때 B는 1초 남음 → t=3에 완료
타임라인
-
t=0~1: A와 B 둘 다 진행 중, main은 1초 잠깐 쉼
-
t=1~2:
await t1(A 마무리), B는 계속 진행 -
t=2~3:
await t2(B 마무리)
→ 총 ~3초
2) 코드 2: await work(...)로 순차 시작
await asyncio.sleep(1) # 이때는 아무 작업도 시작 안 됨
print("doing something else...")
r1 = await work("A", 2) # 이제서야 A 시작 → t=1~3
r2, = await asyncio.gather(work("B", 3)) # 그 다음에야 B 시작 → t=3~6
타임라인
-
t=0~1: main만 쉼(아직 A,B 시작 X)
-
t=1~3: A 실행
-
t=3~6: B 실행
→ 총 ~6초
왜 이런 차이가 나나? (step-by-step 요점)
-
코루틴은 생성만 하면 “시작”이 아닙니다.
work("B", 3)는 코루틴 객체 를 만들 뿐, 스케줄링(실행 예약) 은create_task(...)나gather(...)같은 함수가 할 때 이뤄져요. -
코드 1 은
create_task가 코루틴을 즉시 스케줄링 해서 A,B가 동시에 달립니다. -
코드 2 는 A를 다 기다린 다음에야
gather(work("B",3))로 B를 그때 시작하니 순차 가 됩니다.
목적에 따라 어떻게 써야 하나?
-
동시 실행 이 목적이면
-
바로 묶어서:
await asyncio.gather(work("A",2), work("B",3)) -
또는 먼저 태우고 나중에 회수:
t1 = asyncio.create_task(work("A",2)) t2 = asyncio.create_task(work("B",3)) await asyncio.sleep(1) print("doing something else...") r1, r2 = await asyncio.gather(t1, t2)
-
-
순차 실행 이 목적이면 지금 작성하신 코드 2처럼
await work(...)를 차례로 호출하면 됩니다.
한 줄 정리
-
코드 1 = 동시(겹치기) :
create_task로 지금 출발시킴 → 총 ~3초 -
코드 2 = 순차 : 각 작업을 기다린 뒤 다음 작업 시작 → 총 ~6초
즉, 두 코드는 겉보기 형태만 비슷하고, 스케줄링 시점 이 달라서 동작과 실행 시간이 완전히 다릅니다.
아 create_task를 하고 즉시 실행되는건 아니고, 다음에 코드 내에서 await를 할 때 그 때 task들이 시작되는구나
완전 그렇게 이해하면 반쯤만 맞고 중요한 포인트가 하나 빠져요.
정확히는:
-
create_task()는 즉시 “스케줄링” 합니다. -
실행(코드가 실제로 돌아가기) 은 이벤트 루프에 제어권이 돌아오는 순간 시작돼요.
-
그 “제어권 반환”은 보통 당신이 어디선가
await해서 양보할 때 발생하죠.
(혹은 현재 코루틴이 끝나서 루프가 다음 턴으로 넘어갈 때.)
즉, 그 태스크를await할 때 시작하는 게 아니라, “루프에 양보하는 순간” 시작합니다. 그래서 당신이t를 절대 await하지 않아도, 중간에 다른 await가 있다면 t는 이미 돌기 시작해요.
미세 타이밍을 눈으로 확인해보기
1) create_task 후 다른 곳에서 await만 해도, 태스크는 이미 달린다
import asyncio
async def work():
print("work: start")
await asyncio.sleep(0)
print("work: after sleep")
async def main():
t = asyncio.create_task(work()) # 🔸 바로 스케줄링
print("main: after create_task")
await asyncio.sleep(0) # 🔸 여기서 루프에 양보 → work가 'start' 찍음
print("main: after first await")
await t # 🔸 이제서야 결과 수거(이미 실행은 진행 중)
print("main: after awaiting t")
asyncio.run(main())
출력 순서(전형적)
main: after create_task → work: start → main: after first await → work: after sleep → main: after awaiting t
포인트:
t를await하기 전에 이미work가 실행됐죠.
2) 루프를 막으면(time.sleep) 태스크는 못 돈다
import asyncio, time
async def work():
print("work: start")
await asyncio.sleep(0.1)
print("work: done")
async def main():
t = asyncio.create_task(work())
print("main: blocking...")
time.sleep(0.2) # 🔴 이벤트 루프가 멈춤 → work 못 달림
print("main: unblocked")
await t
asyncio.run(main())
여기서는 work: start가 time.sleep이 끝난 뒤에나 찍힐 거예요.
왜냐면 그동안 루프가 한 번도 돌 기회가 없었기 때문 이죠.
규칙 요약 (step-by-step)
-
create_task(coro)→ 즉시 스케줄링 (대기열에 올림). -
현재 코루틴이
await등으로 루프에 양보하는 순간, 새 태스크가 실행 시작. -
그 태스크를 반드시
await해야만 실행되는 건 아님.- 다만 예외/결과 회수와 누수 방지 를 위해 보통
await t(또는gather)로 회수 하는 게 안전.
- 다만 예외/결과 회수와 누수 방지 를 위해 보통
-
이벤트 루프를 막지 말기 (
time.sleep, 블로킹 I/O 금지).- 필요하면
await asyncio.sleep(...)또는await asyncio.to_thread(blocking_fn).
- 필요하면
한 줄로 다시
-
시작 시점 : “
await t일 때”가 아니라 “루프에 양보하는 다음 턴”. -
create_task는 바로 달리게 예약 하고, 당신이 어딘가에서await하는 순간 실제로 뛰기 시작합니다.
원하면 위 예제를 당신 환경에 맞게 살짝 변형해서(타이밍/로그) 더 보여드릴게요!