Python 비동기 서브프로세스 실행
Python에서 subprocess를 async task로 실행할 수 있어?
네—가능합니다. “비동기적으로 서브프로세스 실행”은 보통 두 가지 방식이 있어요.
1) asyncio가 제공하는 진짜 비동기 서브프로세스 API 쓰기
핵심은 asyncio.create_subprocess_exec()(또는 create_subprocess_shell())입니다. 반환값인 Process에는 wait(), communicate() 같은 await 가능한 메서드가 있어서 이벤트 루프를 막지 않아요.
(A) 출력 전체 한 번에 받기
import asyncio, sys, asyncio.subprocess as asp
async def main():
proc = await asyncio.create_subprocess_exec(
sys.executable, "-u", "-c",
"import time; [print(i) or time.sleep(0.3) for i in range(5)]",
stdout=asp.PIPE, stderr=asp.PIPE
)
out, err = await proc.communicate() # 둘 다 bytes
print("RC:", proc.returncode)
print("STDOUT:", out.decode())
print("STDERR:", err.decode())
asyncio.run(main())
(B) 스트리밍 으로 줄 단위 읽기 (긴 작업/실시간 로그용)
import asyncio, sys, asyncio.subprocess as asp
async def main():
proc = await asyncio.create_subprocess_exec(
sys.executable, "-u", "-c",
"import time,sys\nfor i in range(5):\n print(i); sys.stdout.flush(); time.sleep(0.3)",
stdout=asp.PIPE, stderr=asp.STDOUT
)
async for raw in proc.stdout: # bytes 스트림
print("> ", raw.decode().rstrip())
rc = await proc.wait()
print("RC:", rc)
asyncio.run(main())
포인트
-
stdout=PIPE,stderr=PIPE/STDOUT로 파이프 연결. -
communicate()는 종료까지 기다리며 전부 모아 반환. -
실시간이 필요하면
async for proc.stdout로 읽기. -
파이썬 하위 프로세스는
-u(unbuffered)로 버퍼링 지연을 줄이는 게 좋아요.
2) 기존의 블로킹 subprocess.run()을 스레드로 돌리기
레거시 코드나 간단한 호출을 그대로 쓰고 싶다면, 이벤트 루프는 살리고 작업만 스레드로 넘길 수 있어요.
import asyncio, subprocess
async def run_blocking(cmd):
# Python 3.9+: to_thread 권장
return await asyncio.to_thread(
subprocess.run, cmd,
capture_output=True, text=True, check=False
)
async def main():
res = await run_blocking(["git", "status"])
print(res.stdout)
asyncio.run(main())
포인트
-
이 방법은 진짜 비동기 I/O 는 아니고, 스레드 하나를 차지 해서 블로킹 호출을 우회합니다.
-
입출력을 줄 단위로 실시간 처리하긴 불편하고, “끝나면 결과”가 필요한 경우에 적합.
여러 프로세스를 동시에 돌리기
import asyncio, sys, asyncio.subprocess as asp
async def run_one(n):
return await asyncio.create_subprocess_exec(
sys.executable, "-u", "-c", f"print({n})",
stdout=asp.PIPE
)
async def main():
procs = [await run_one(i) for i in range(5)]
outs = await asyncio.gather(*(p.communicate() for p in procs))
for i, (out, _err) in enumerate(outs):
print(i, out.decode().strip())
asyncio.run(main())
타임아웃/취소/정리(중요)
import asyncio, sys, asyncio.subprocess as asp
async def safe_run(cmd, timeout=10):
proc = await asyncio.create_subprocess_exec(*cmd, stdout=asp.PIPE, stderr=asp.PIPE)
try:
out, err = await asyncio.wait_for(proc.communicate(), timeout)
return proc.returncode, out, err
except asyncio.TimeoutError:
proc.kill() # or proc.terminate()
await proc.wait() # 좀비 방지
raise
except asyncio.CancelledError:
proc.kill()
await proc.wait()
raise
플랫폼/주의사항 한눈에
-
Windows : 파이프가 연결된 서브프로세스는 Python ≥3.8의 기본 이벤트 루프(프로액터)에서 잘 동작합니다. (아주 옛 버전은
ProactorEventLoop필요) -
shell=True는 편하지만 인자 이스케이프/보안 이슈가 있으니 꼭 필요한 경우만 사용하세요. -
대용량 출력 을 한꺼번에
communicate()로 받으면 메모리를 많이 먹을 수 있어요. 이런 경우 (B)처럼 스트리밍이 안전합니다. -
텍스트 디코딩 :
asyncio.subprocess는 bytes 를 줍니다..decode()직접 호출하거나codecs.StreamReader로 감싸세요. -
프로세스 그룹 : 리눅스/맥에서 하위 트리까지 종료하려면
start_new_session=True로 새 세션 시작 후, 필요 시os.killpg()사용(고급 주제).
요약
-
“이벤트 루프를 막지 않고” 서브프로세스를 돌리려면:
asyncio.create_subprocess_exec/shell을 쓰세요. -
기존 블로킹 코드를 그대로 쓰고 싶다면:
asyncio.to_thread(subprocess.run, ...)으로 감싸세요. -
스트리밍, 병렬 실행, 타임아웃/취소까지 위 예제 패턴이면 대부분의 케이스를 커버합니다.
async def main(): proc = await asyncio.create_subprocess_exec( sys.executable, "-u", "-c", "import time; [print(i) or time.sleep(0.3) for i in range(5)]", stdout=asp.PIPE, stderr=asp.PIPE ) out, err = await proc.communicate() # 둘 다 bytes print("RC:", proc.returncode) print("STDOUT:", out.decode()) print("STDERR:", err.decode()) 이 동작을 task로 만들고 싶어.
가능하죠! 핵심은 “서브프로세스를 돌리는 코루틴”을 하나 만들고, 그걸 asyncio.create_task()로 스케줄하는 겁니다. 아래를 그대로 붙여 넣어 쓰세요.
import asyncio, sys
import asyncio.subprocess as asp
# 1) 서브프로세스를 실행하고 결과를 반환하는 코루틴
async def run_subprocess():
proc = await asyncio.create_subprocess_exec(
sys.executable, "-u", "-c",
"import time; [print(i) or time.sleep(0.3) for i in range(5)]",
stdout=asp.PIPE, stderr=asp.PIPE
)
try:
out, err = await proc.communicate() # bytes 튜플
return proc.returncode, out.decode(), err.decode()
except asyncio.CancelledError:
# 취소 시 깔끔하게 종료(좀비 방지)
proc.kill()
await proc.wait()
raise
# 2) 위 코루틴을 Task로 만들어 동시 실행
async def main():
task = asyncio.create_task(run_subprocess(), name="my-subprocess-task")
# (선택) 여기서 다른 비동기 작업들을 병행할 수 있음
# await do_other_async_work()
# (선택) 타임아웃을 걸고 싶다면:
try:
rc, out, err = await asyncio.wait_for(task, timeout=10)
except asyncio.TimeoutError:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
raise
print("RC:", rc)
print("STDOUT:\n", out)
print("STDERR:\n", err)
if __name__ == "__main__":
asyncio.run(main())
포인트 (한걸음씩)
-
run_subprocess()는 서브프로세스 생명주기 를 책임지는 코루틴입니다.-
정상 완료 시
(returncode, stdout(str), stderr(str))를 반환. -
CancelledError에서kill()+wait()로 정리(필수).
-
-
main()에서asyncio.create_task(...)로 Task 를 만들면, 이벤트 루프는 이 작업을 백그라운드로 진행하고, 필요할 때await task로 결과를 회수합니다. -
타임아웃 은
asyncio.wait_for(task, timeout=...)로 감싸고, 만료 시task.cancel()후 정리. -
실시간 로그가 필요하면,
communicate()대신async for line in proc.stdout:패턴을run_subprocess()안에 넣으면 됩니다(그 경우 문자열을 누적해서 반환하거나 콜백/큐로 전달).
원하면, 같은 패턴으로 여러 개를 동시에 돌릴 수도 있어요:
async def main_many():
tasks = [asyncio.create_task(run_subprocess()) for _ in range(3)]
for t in asyncio.as_completed(tasks):
rc, out, err = await t
print("done:", rc, out.splitlines()[-1])
이렇게 하면 “서브프로세스 실행 → Task로 스케줄 → 결과 대기/취소/타임아웃”까지 깔끔하게 처리됩니다.
proc = await asyncio.create_subprocess_exec( sys.executable, "-u", "-c", "import time; [print(i) or time.sleep(0.3) for i in range(5)]", stdout=asp.PIPE, stderr=asp.PIPE ) 이 코드를 try 안에 넣지 않는 이유가 있어?
짧게 말하면: 정리(cleanup)가 필요한 구간만try로 감싸기 위해서예요.
create_subprocess_exec() 호출 자체는 “아직 프로세스가 없거나(생성 전) 곧바로 실패하는” 단계라서, 여기서 예외/취소가 나면 정리할 대상이 없습니다.
step-by-step로 보면:
-
await asyncio.create_subprocess_exec(...)-
여기서 취소(
CancelledError)가 나면 프로세스는 만들어지지 않았으니 kill/wait 불필요. -
실행 파일 없음 등으로
FileNotFoundError/OSError가 나면 역시 프로세스가 없으니 정리할 게 없음.
-
-
프로세스가 성공적으로 만들어진 뒤 에야
proc가 생깁니다.-
이후
await proc.communicate()에서 취소/타임아웃이 나면 좀비 방지 를 위해proc.kill(); await proc.wait()가 필요하죠. -
그래서
try/except는 이 두 번째 구간(communicate/대기 구간)에만 두는 게 맞습니다.
-
-
또 다른 이유:
-
create_subprocess_exec()를try안에 두면, 예외 흐름에 따라proc가 정의되지 않은 상태에서proc.kill()을 호출할 위험이 생깁니다(레퍼런스가 없어서 또 다른 예외 유발). -
“취소 신호가 생성 직후~
try진입 사이에 들어오면 어떡해요?” → 취소는 다음await지점에서 던져지므로, 생성이 끝나고try블록으로 들어간 뒤의await(예:communicate)에서 처리됩니다. 레이스 걱정 없음.
-
원한다면 “스폰 실패”만 별도로 다루는 2단계 패턴이 가장 견고합니다:
import asyncio, sys
import asyncio.subprocess as asp
async def run_subprocess():
try:
proc = await asyncio.create_subprocess_exec(
sys.executable, "-u", "-c",
"import time; [print(i) or time.sleep(0.3) for i in range(5)]",
stdout=asp.PIPE, stderr=asp.PIPE
)
except OSError as e: # 실행 파일 없음/권한 문제 등
# 여기서는 정리할 프로세스가 없음
raise
try:
out, err = await proc.communicate()
return proc.returncode, out.decode(), err.decode()
except asyncio.CancelledError:
proc.kill()
await proc.wait()
raise
except Exception:
# 원한다면 다른 예외에서도 프로세스 정리를 보장
proc.kill()
await proc.wait()
raise
요약: 프로세스가 생긴 이후에만 정리가 필요하므로, 생성 호출은 try 밖에 두고, 대기/통신 구간을 try로 감싸는 게 깔끔하고 안전합니다.
기존 코드에서 subprocess 실행할 때 preexec_fn=os.setsid, creationflags=0하고, 꺼질 때는 getpgid(pid)해서 다 같이 끄는 코드가 있었는데 여기에는 필요없어?
요지는 이거예요: “전체 프로세스 그룹까지 한 번에 끄고 싶다면 필요”, 아니면 불필요.
step-by-step로 정리하면:
-
asyncio.create_subprocess_exec()자체는 단일 프로세스만 관리합니다.proc.terminate()/proc.kill()은 그 프로세스만 죽입니다. 자식·손자까지 보장하지 않아요.
-
기존처럼 “프로세스 그룹(트리) 전체 종료”가 목적이면, 스폰할 때 그룹을 분리 해야 합니다.
-
POSIX(리눅스/맥):
preexec_fn=os.setsid대신start_new_session=True가 권장(동일 효과, 더 안전). -
윈도우:
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP로 새 그룹.
-
-
종료할 때는 “그룹 신호”를 보내야 합니다.
-
POSIX:
os.killpg(pgid, signal.SIGTERM)→ 대기 → 필요 시SIGKILL. -
윈도우:
proc.send_signal(signal.CTRL_BREAK_EVENT)(새 그룹으로 띄웠을 때만 동작) → 필요 시taskkill /T.
-
아래는 앞서 만든 Task 버전에 “그룹 분리 + 그룹 종료”를 붙인 예시입니다.
import asyncio, sys, os, signal, subprocess
import asyncio.subprocess as asp
async def run_subprocess():
# 1) 플랫폼별로 "새 그룹/세션"로 스폰
kwargs = {}
if sys.platform != "win32":
kwargs["start_new_session"] = True # == setsid()
else:
kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP
proc = await asyncio.create_subprocess_exec(
sys.executable, "-u", "-c",
"import time,sys; [print(i) or time.sleep(0.3) for i in range(5)]",
stdout=asp.PIPE, stderr=asp.PIPE,
**kwargs
)
try:
out, err = await proc.communicate()
return proc.returncode, out.decode(), err.decode()
except asyncio.CancelledError:
# 2) Task 취소 시: 그룹 단위로 깔끔히 종료
await terminate_process_tree(proc)
raise
except Exception:
# 다른 예외에서도 정리 보장
await terminate_process_tree(proc)
raise
async def terminate_process_tree(proc, timeout=5):
if sys.platform != "win32":
try:
os.killpg(proc.pid, signal.SIGTERM)
except ProcessLookupError:
pass
try:
await asyncio.wait_for(proc.wait(), timeout)
except asyncio.TimeoutError:
try:
os.killpg(proc.pid, signal.SIGKILL)
except ProcessLookupError:
pass
await proc.wait()
else:
try:
proc.send_signal(signal.CTRL_BREAK_EVENT) # CREATE_NEW_PROCESS_GROUP 필요
except Exception:
pass
try:
await asyncio.wait_for(proc.wait(), timeout)
except asyncio.TimeoutError:
# 최후 수단: 하위 트리까지 강제 종료
subprocess.run(["taskkill", "/F", "/T", "/PID", str(proc.pid)],
capture_output=True)
async def main():
task = asyncio.create_task(run_subprocess())
try:
rc, out, err = await asyncio.wait_for(task, timeout=10)
print("RC:", rc)
print(out, err)
except asyncio.TimeoutError:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
if __name__ == "__main__":
asyncio.run(main())
결론
-
“그냥 그 프로세스 하나만” 종료해도 된다면
start_new_session/CREATE_NEW_PROCESS_GROUP는 불필요. -
과거처럼 트리 전체를 확실히 종료 해야 한다면, 위처럼 스폰 시 그룹 분리 + 그룹 종료 루틴 을 그대로 넣어주세요.
-
POSIX에선
start_new_session=True가preexec_fn=os.setsid의 안전한 대체입니다.
타이머를 따로 task로 만들어서 subprocess task, timeout task 두 개를 그룹으로 실행시키는 방식으로 만들어줘. 타임아웃이 되면 자손 프로세스도 다 죽여야해
좋아요—“타이머(Task)와 서브프로세스(Task)를 그룹처럼 함께 돌리고 , 타임아웃 시 프로세스 트리 전체 를 종료”하는 패턴을 깔끔히 만들었습니다. 그대로 붙여 쓰세요.
import asyncio, sys, os, signal, subprocess
import asyncio.subprocess as asp
async def terminate_process_tree(proc: asyncio.subprocess.Process, graceful_timeout=5):
"""proc를 '그룹 단위'로 종료. 안 죽으면 강제 종료까지."""
if sys.platform != "win32":
# 새 세션으로 띄웠다는 가정(start_new_session=True)
try:
os.killpg(proc.pid, signal.SIGTERM)
except ProcessLookupError:
return
try:
await asyncio.wait_for(proc.wait(), timeout=graceful_timeout)
return
except asyncio.TimeoutError:
try:
os.killpg(proc.pid, signal.SIGKILL)
except ProcessLookupError:
pass
await proc.wait()
else:
# CREATE_NEW_PROCESS_GROUP로 띄웠다는 가정
try:
proc.send_signal(signal.CTRL_BREAK_EVENT)
except Exception:
pass
try:
await asyncio.wait_for(proc.wait(), timeout=graceful_timeout)
return
except asyncio.TimeoutError:
# 최후 수단: 트리까지 모두 강제 종료
subprocess.run(
["taskkill", "/F", "/T", "/PID", str(proc.pid)],
capture_output=True
)
await proc.wait()
async def run_with_timeout(cmd, timeout: float):
"""
타이머 Task와 서브프로세스 Task를 동시에 돌리되,
타임아웃이 먼저 끝나면 '프로세스 트리'를 종료한다.
반환: (timed_out: bool, returncode, stdout_str, stderr_str)
"""
# 1) 프로세스를 "새 그룹/세션"으로 스폰 (자손 일괄 종료를 위해 필수)
spawn_kwargs = {}
if sys.platform != "win32":
spawn_kwargs["start_new_session"] = True # == setsid()
else:
spawn_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asp.PIPE,
stderr=asp.PIPE,
**spawn_kwargs
)
# 2) 두 개의 Task를 그룹처럼 함께 실행
proc_task = asyncio.create_task(proc.communicate(), name="proc-communicate")
timer_task = asyncio.create_task(asyncio.sleep(timeout), name="timeout-timer")
done, pending = await asyncio.wait(
{proc_task, timer_task},
return_when=asyncio.FIRST_COMPLETED
)
# 3) 타이머가 먼저 끝남 → 트리 종료 후 결과 정리
if timer_task in done and not proc_task.done():
# 프로세스 트리 종료
await terminate_process_tree(proc)
# 통신 Task 취소 (혹시 남아있다면)
proc_task.cancel()
try:
await proc_task
except asyncio.CancelledError:
pass
return True, None, "", "" # timed_out=True
# 4) 프로세스가 먼저 끝남 → 타이머 취소 후 결과 반환
timer_task.cancel()
try:
await timer_task
except asyncio.CancelledError:
pass
out_b, err_b = await proc_task # bytes
return False, proc.returncode, out_b.decode(errors="replace"), err_b.decode(errors="replace")
# ------------------ 사용 예시 ------------------
async def main():
# 예: 0.3초 간격으로 5줄 출력하는 하위 파이썬을 1초 타임아웃으로 실행
cmd = [sys.executable, "-u", "-c",
"import time,sys; [print(i) or sys.stdout.flush() or time.sleep(0.3) for i in range(5)]"]
timed_out, rc, out, err = await run_with_timeout(cmd, timeout=1.0)
if timed_out:
print("[TIMEOUT] 하위 프로세스 트리를 모두 종료했습니다.")
else:
print("RC:", rc)
print("STDOUT:\n", out)
print("STDERR:\n", err)
if __name__ == "__main__":
asyncio.run(main())
핵심 포인트 (요약)
-
두 Task를 그룹처럼 :
proc.communicate()Task와asyncio.sleep(timeout)Task를asyncio.wait(..., FIRST_COMPLETED)로 경합시킵니다. -
타임아웃 경로 : 타이머가 먼저 끝나면
terminate_process_tree()로 프로세스 그룹(트리)을 종료 → 통신 Task 취소. -
프로세스 완료 경로 : 프로세스가 먼저 끝나면 타이머 Task만 취소.
-
트리 종료를 위해 스폰 옵션 필수
-
POSIX:
start_new_session=True(≒preexec_fn=os.setsid) -
Windows:
creationflags=CREATE_NEW_PROCESS_GROUP
-
-
Windows에선
CTRL_BREAK_EVENT→ 불응 시taskkill /T로 강제 정리.
이 패턴이면 “타이머를 별도 Task로 두고, 타임아웃 시 자손까지 전부 종료” 요구사항을 그대로 충족합니다.