Python에서 subprocess를 async task로 실행할 수 있어?

네—가능합니다. “비동기적으로 서브프로세스 실행”은 보통 두 가지 방식이 있어요.

1) asyncio가 제공하는 진짜 비동기 서브프로세스 API 쓰기

핵심은 asyncio.create_subprocess_exec()(또는 create_subprocess_shell())입니다. 반환값인 Process에는 wait(), communicate() 같은 await 가능한 메서드가 있어서 이벤트 루프를 막지 않아요.

(A) 출력 전체 한 번에 받기

import asyncio, sys, asyncio.subprocess as asp

async def main():
    proc = await asyncio.create_subprocess_exec(
        sys.executable, "-u", "-c",
        "import time; [print(i) or time.sleep(0.3) for i in range(5)]",
        stdout=asp.PIPE, stderr=asp.PIPE
    )
    out, err = await proc.communicate()   # 둘 다 bytes
    print("RC:", proc.returncode)
    print("STDOUT:", out.decode())
    print("STDERR:", err.decode())

asyncio.run(main())

(B) 스트리밍 으로 줄 단위 읽기 (긴 작업/실시간 로그용)

import asyncio, sys, asyncio.subprocess as asp

async def main():
    proc = await asyncio.create_subprocess_exec(
        sys.executable, "-u", "-c",
        "import time,sys\nfor i in range(5):\n print(i); sys.stdout.flush(); time.sleep(0.3)",
        stdout=asp.PIPE, stderr=asp.STDOUT
    )

    async for raw in proc.stdout:  # bytes 스트림
        print("> ", raw.decode().rstrip())

    rc = await proc.wait()
    print("RC:", rc)

asyncio.run(main())

포인트

  • stdout=PIPE, stderr=PIPE/STDOUT로 파이프 연결.

  • communicate()는 종료까지 기다리며 전부 모아 반환.

  • 실시간이 필요하면 async for proc.stdout로 읽기.

  • 파이썬 하위 프로세스는 -u(unbuffered)로 버퍼링 지연을 줄이는 게 좋아요.

2) 기존의 블로킹 subprocess.run()을 스레드로 돌리기

레거시 코드나 간단한 호출을 그대로 쓰고 싶다면, 이벤트 루프는 살리고 작업만 스레드로 넘길 수 있어요.

import asyncio, subprocess

async def run_blocking(cmd):
    # Python 3.9+: to_thread 권장
    return await asyncio.to_thread(
        subprocess.run, cmd,
        capture_output=True, text=True, check=False
    )

async def main():
    res = await run_blocking(["git", "status"])
    print(res.stdout)

asyncio.run(main())

포인트

  • 이 방법은 진짜 비동기 I/O 는 아니고, 스레드 하나를 차지 해서 블로킹 호출을 우회합니다.

  • 입출력을 줄 단위로 실시간 처리하긴 불편하고, “끝나면 결과”가 필요한 경우에 적합.


여러 프로세스를 동시에 돌리기

import asyncio, sys, asyncio.subprocess as asp

async def run_one(n):
    return await asyncio.create_subprocess_exec(
        sys.executable, "-u", "-c", f"print({n})",
        stdout=asp.PIPE
    )

async def main():
    procs = [await run_one(i) for i in range(5)]
    outs = await asyncio.gather(*(p.communicate() for p in procs))
    for i, (out, _err) in enumerate(outs):
        print(i, out.decode().strip())

asyncio.run(main())

타임아웃/취소/정리(중요)

import asyncio, sys, asyncio.subprocess as asp

async def safe_run(cmd, timeout=10):
    proc = await asyncio.create_subprocess_exec(*cmd, stdout=asp.PIPE, stderr=asp.PIPE)
    try:
        out, err = await asyncio.wait_for(proc.communicate(), timeout)
        return proc.returncode, out, err
    except asyncio.TimeoutError:
        proc.kill()                 # or proc.terminate()
        await proc.wait()           # 좀비 방지
        raise
    except asyncio.CancelledError:
        proc.kill()
        await proc.wait()
        raise

플랫폼/주의사항 한눈에

  • Windows : 파이프가 연결된 서브프로세스는 Python ≥3.8의 기본 이벤트 루프(프로액터)에서 잘 동작합니다. (아주 옛 버전은 ProactorEventLoop 필요)

  • shell=True는 편하지만 인자 이스케이프/보안 이슈가 있으니 꼭 필요한 경우만 사용하세요.

  • 대용량 출력 을 한꺼번에 communicate()로 받으면 메모리를 많이 먹을 수 있어요. 이런 경우 (B)처럼 스트리밍이 안전합니다.

  • 텍스트 디코딩 : asyncio.subprocessbytes 를 줍니다. .decode() 직접 호출하거나 codecs.StreamReader로 감싸세요.

  • 프로세스 그룹 : 리눅스/맥에서 하위 트리까지 종료하려면 start_new_session=True로 새 세션 시작 후, 필요 시 os.killpg() 사용(고급 주제).


요약

  • “이벤트 루프를 막지 않고” 서브프로세스를 돌리려면: asyncio.create_subprocess_exec/shell을 쓰세요.

  • 기존 블로킹 코드를 그대로 쓰고 싶다면: asyncio.to_thread(subprocess.run, ...)으로 감싸세요.

  • 스트리밍, 병렬 실행, 타임아웃/취소까지 위 예제 패턴이면 대부분의 케이스를 커버합니다.

async def main(): proc = await asyncio.create_subprocess_exec( sys.executable, "-u", "-c", "import time; [print(i) or time.sleep(0.3) for i in range(5)]", stdout=asp.PIPE, stderr=asp.PIPE ) out, err = await proc.communicate() # 둘 다 bytes print("RC:", proc.returncode) print("STDOUT:", out.decode()) print("STDERR:", err.decode()) 이 동작을 task로 만들고 싶어.

가능하죠! 핵심은 “서브프로세스를 돌리는 코루틴”을 하나 만들고, 그걸 asyncio.create_task()로 스케줄하는 겁니다. 아래를 그대로 붙여 넣어 쓰세요.

import asyncio, sys
import asyncio.subprocess as asp

# 1) 서브프로세스를 실행하고 결과를 반환하는 코루틴
async def run_subprocess():
    proc = await asyncio.create_subprocess_exec(
        sys.executable, "-u", "-c",
        "import time; [print(i) or time.sleep(0.3) for i in range(5)]",
        stdout=asp.PIPE, stderr=asp.PIPE
    )
    try:
        out, err = await proc.communicate()  # bytes 튜플
        return proc.returncode, out.decode(), err.decode()
    except asyncio.CancelledError:
        # 취소 시 깔끔하게 종료(좀비 방지)
        proc.kill()
        await proc.wait()
        raise

# 2) 위 코루틴을 Task로 만들어 동시 실행
async def main():
    task = asyncio.create_task(run_subprocess(), name="my-subprocess-task")

    # (선택) 여기서 다른 비동기 작업들을 병행할 수 있음
    # await do_other_async_work()

    # (선택) 타임아웃을 걸고 싶다면:
    try:
        rc, out, err = await asyncio.wait_for(task, timeout=10)
    except asyncio.TimeoutError:
        task.cancel()
        try:
            await task
        except asyncio.CancelledError:
            pass
        raise

    print("RC:", rc)
    print("STDOUT:\n", out)
    print("STDERR:\n", err)

if __name__ == "__main__":
    asyncio.run(main())

포인트 (한걸음씩)

  1. run_subprocess()서브프로세스 생명주기 를 책임지는 코루틴입니다.

    • 정상 완료 시 (returncode, stdout(str), stderr(str))를 반환.

    • CancelledError에서 kill()+wait()로 정리(필수).

  2. main()에서 asyncio.create_task(...)Task 를 만들면, 이벤트 루프는 이 작업을 백그라운드로 진행하고, 필요할 때 await task로 결과를 회수합니다.

  3. 타임아웃asyncio.wait_for(task, timeout=...)로 감싸고, 만료 시 task.cancel() 후 정리.

  4. 실시간 로그가 필요하면, communicate() 대신 async for line in proc.stdout: 패턴을 run_subprocess() 안에 넣으면 됩니다(그 경우 문자열을 누적해서 반환하거나 콜백/큐로 전달).

원하면, 같은 패턴으로 여러 개를 동시에 돌릴 수도 있어요:

async def main_many():
    tasks = [asyncio.create_task(run_subprocess()) for _ in range(3)]
    for t in asyncio.as_completed(tasks):
        rc, out, err = await t
        print("done:", rc, out.splitlines()[-1])

이렇게 하면 “서브프로세스 실행 → Task로 스케줄 → 결과 대기/취소/타임아웃”까지 깔끔하게 처리됩니다.

proc = await asyncio.create_subprocess_exec( sys.executable, "-u", "-c", "import time; [print(i) or time.sleep(0.3) for i in range(5)]", stdout=asp.PIPE, stderr=asp.PIPE ) 이 코드를 try 안에 넣지 않는 이유가 있어?

짧게 말하면: 정리(cleanup)가 필요한 구간만try로 감싸기 위해서예요.
create_subprocess_exec() 호출 자체는 “아직 프로세스가 없거나(생성 전) 곧바로 실패하는” 단계라서, 여기서 예외/취소가 나면 정리할 대상이 없습니다.

step-by-step로 보면:

  1. await asyncio.create_subprocess_exec(...)

    • 여기서 취소(CancelledError)가 나면 프로세스는 만들어지지 않았으니 kill/wait 불필요.

    • 실행 파일 없음 등으로 FileNotFoundError/OSError가 나면 역시 프로세스가 없으니 정리할 게 없음.

  2. 프로세스가 성공적으로 만들어진 뒤 에야 proc가 생깁니다.

    • 이후 await proc.communicate()에서 취소/타임아웃이 나면 좀비 방지 를 위해 proc.kill(); await proc.wait()가 필요하죠.

    • 그래서 try/except는 이 두 번째 구간(communicate/대기 구간)에만 두는 게 맞습니다.

  3. 또 다른 이유:

    • create_subprocess_exec()try 안에 두면, 예외 흐름에 따라 proc정의되지 않은 상태에서 proc.kill()을 호출할 위험이 생깁니다(레퍼런스가 없어서 또 다른 예외 유발).

    • “취소 신호가 생성 직후~try 진입 사이에 들어오면 어떡해요?” → 취소는 다음await 지점에서 던져지므로, 생성이 끝나고 try 블록으로 들어간 뒤의 await(예: communicate)에서 처리됩니다. 레이스 걱정 없음.

원한다면 “스폰 실패”만 별도로 다루는 2단계 패턴이 가장 견고합니다:

import asyncio, sys
import asyncio.subprocess as asp

async def run_subprocess():
    try:
        proc = await asyncio.create_subprocess_exec(
            sys.executable, "-u", "-c",
            "import time; [print(i) or time.sleep(0.3) for i in range(5)]",
            stdout=asp.PIPE, stderr=asp.PIPE
        )
    except OSError as e:  # 실행 파일 없음/권한 문제 등
        # 여기서는 정리할 프로세스가 없음
        raise

    try:
        out, err = await proc.communicate()
        return proc.returncode, out.decode(), err.decode()
    except asyncio.CancelledError:
        proc.kill()
        await proc.wait()
        raise
    except Exception:
        # 원한다면 다른 예외에서도 프로세스 정리를 보장
        proc.kill()
        await proc.wait()
        raise

요약: 프로세스가 생긴 이후에만 정리가 필요하므로, 생성 호출은 try 밖에 두고, 대기/통신 구간을 try로 감싸는 게 깔끔하고 안전합니다.

기존 코드에서 subprocess 실행할 때 preexec_fn=os.setsid, creationflags=0하고, 꺼질 때는 getpgid(pid)해서 다 같이 끄는 코드가 있었는데 여기에는 필요없어?

요지는 이거예요: “전체 프로세스 그룹까지 한 번에 끄고 싶다면 필요”, 아니면 불필요.

step-by-step로 정리하면:

  1. asyncio.create_subprocess_exec() 자체는 단일 프로세스만 관리합니다.

    • proc.terminate()/proc.kill()그 프로세스만 죽입니다. 자식·손자까지 보장하지 않아요.
  2. 기존처럼 “프로세스 그룹(트리) 전체 종료”가 목적이면, 스폰할 때 그룹을 분리 해야 합니다.

    • POSIX(리눅스/맥): preexec_fn=os.setsid 대신 start_new_session=True가 권장(동일 효과, 더 안전).

    • 윈도우: creationflags=subprocess.CREATE_NEW_PROCESS_GROUP로 새 그룹.

  3. 종료할 때는 “그룹 신호”를 보내야 합니다.

    • POSIX: os.killpg(pgid, signal.SIGTERM) → 대기 → 필요 시 SIGKILL.

    • 윈도우: proc.send_signal(signal.CTRL_BREAK_EVENT)(새 그룹으로 띄웠을 때만 동작) → 필요 시 taskkill /T.

아래는 앞서 만든 Task 버전에 “그룹 분리 + 그룹 종료”를 붙인 예시입니다.

import asyncio, sys, os, signal, subprocess
import asyncio.subprocess as asp

async def run_subprocess():
    # 1) 플랫폼별로 "새 그룹/세션"로 스폰
    kwargs = {}
    if sys.platform != "win32":
        kwargs["start_new_session"] = True          # == setsid()
    else:
        kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP

    proc = await asyncio.create_subprocess_exec(
        sys.executable, "-u", "-c",
        "import time,sys; [print(i) or time.sleep(0.3) for i in range(5)]",
        stdout=asp.PIPE, stderr=asp.PIPE,
        **kwargs
    )

    try:
        out, err = await proc.communicate()
        return proc.returncode, out.decode(), err.decode()
    except asyncio.CancelledError:
        # 2) Task 취소 시: 그룹 단위로 깔끔히 종료
        await terminate_process_tree(proc)
        raise
    except Exception:
        # 다른 예외에서도 정리 보장
        await terminate_process_tree(proc)
        raise

async def terminate_process_tree(proc, timeout=5):
    if sys.platform != "win32":
        try:
            os.killpg(proc.pid, signal.SIGTERM)
        except ProcessLookupError:
            pass
        try:
            await asyncio.wait_for(proc.wait(), timeout)
        except asyncio.TimeoutError:
            try:
                os.killpg(proc.pid, signal.SIGKILL)
            except ProcessLookupError:
                pass
            await proc.wait()
    else:
        try:
            proc.send_signal(signal.CTRL_BREAK_EVENT)  # CREATE_NEW_PROCESS_GROUP 필요
        except Exception:
            pass
        try:
            await asyncio.wait_for(proc.wait(), timeout)
        except asyncio.TimeoutError:
            # 최후 수단: 하위 트리까지 강제 종료
            subprocess.run(["taskkill", "/F", "/T", "/PID", str(proc.pid)],
                           capture_output=True)

async def main():
    task = asyncio.create_task(run_subprocess())
    try:
        rc, out, err = await asyncio.wait_for(task, timeout=10)
        print("RC:", rc)
        print(out, err)
    except asyncio.TimeoutError:
        task.cancel()
        try:
            await task
        except asyncio.CancelledError:
            pass

if __name__ == "__main__":
    asyncio.run(main())

결론

  • “그냥 그 프로세스 하나만” 종료해도 된다면 start_new_session/CREATE_NEW_PROCESS_GROUP는 불필요.

  • 과거처럼 트리 전체를 확실히 종료 해야 한다면, 위처럼 스폰 시 그룹 분리 + 그룹 종료 루틴 을 그대로 넣어주세요.

  • POSIX에선 start_new_session=Truepreexec_fn=os.setsid의 안전한 대체입니다.

타이머를 따로 task로 만들어서 subprocess task, timeout task 두 개를 그룹으로 실행시키는 방식으로 만들어줘. 타임아웃이 되면 자손 프로세스도 다 죽여야해

좋아요—“타이머(Task)와 서브프로세스(Task)를 그룹처럼 함께 돌리고 , 타임아웃 시 프로세스 트리 전체 를 종료”하는 패턴을 깔끔히 만들었습니다. 그대로 붙여 쓰세요.

import asyncio, sys, os, signal, subprocess
import asyncio.subprocess as asp

async def terminate_process_tree(proc: asyncio.subprocess.Process, graceful_timeout=5):
    """proc를 '그룹 단위'로 종료. 안 죽으면 강제 종료까지."""
    if sys.platform != "win32":
        # 새 세션으로 띄웠다는 가정(start_new_session=True)
        try:
            os.killpg(proc.pid, signal.SIGTERM)
        except ProcessLookupError:
            return
        try:
            await asyncio.wait_for(proc.wait(), timeout=graceful_timeout)
            return
        except asyncio.TimeoutError:
            try:
                os.killpg(proc.pid, signal.SIGKILL)
            except ProcessLookupError:
                pass
            await proc.wait()
    else:
        # CREATE_NEW_PROCESS_GROUP로 띄웠다는 가정
        try:
            proc.send_signal(signal.CTRL_BREAK_EVENT)
        except Exception:
            pass
        try:
            await asyncio.wait_for(proc.wait(), timeout=graceful_timeout)
            return
        except asyncio.TimeoutError:
            # 최후 수단: 트리까지 모두 강제 종료
            subprocess.run(
                ["taskkill", "/F", "/T", "/PID", str(proc.pid)],
                capture_output=True
            )
            await proc.wait()

async def run_with_timeout(cmd, timeout: float):
    """
    타이머 Task와 서브프로세스 Task를 동시에 돌리되,
    타임아웃이 먼저 끝나면 '프로세스 트리'를 종료한다.
    반환: (timed_out: bool, returncode, stdout_str, stderr_str)
    """
    # 1) 프로세스를 "새 그룹/세션"으로 스폰 (자손 일괄 종료를 위해 필수)
    spawn_kwargs = {}
    if sys.platform != "win32":
        spawn_kwargs["start_new_session"] = True          # == setsid()
    else:
        spawn_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP

    proc = await asyncio.create_subprocess_exec(
        *cmd,
        stdout=asp.PIPE,
        stderr=asp.PIPE,
        **spawn_kwargs
    )

    # 2) 두 개의 Task를 그룹처럼 함께 실행
    proc_task  = asyncio.create_task(proc.communicate(), name="proc-communicate")
    timer_task = asyncio.create_task(asyncio.sleep(timeout), name="timeout-timer")

    done, pending = await asyncio.wait(
        {proc_task, timer_task},
        return_when=asyncio.FIRST_COMPLETED
    )

    # 3) 타이머가 먼저 끝남 → 트리 종료 후 결과 정리
    if timer_task in done and not proc_task.done():
        # 프로세스 트리 종료
        await terminate_process_tree(proc)
        # 통신 Task 취소 (혹시 남아있다면)
        proc_task.cancel()
        try:
            await proc_task
        except asyncio.CancelledError:
            pass
        return True, None, "", ""  # timed_out=True

    # 4) 프로세스가 먼저 끝남 → 타이머 취소 후 결과 반환
    timer_task.cancel()
    try:
        await timer_task
    except asyncio.CancelledError:
        pass

    out_b, err_b = await proc_task  # bytes
    return False, proc.returncode, out_b.decode(errors="replace"), err_b.decode(errors="replace")

# ------------------ 사용 예시 ------------------
async def main():
    # 예: 0.3초 간격으로 5줄 출력하는 하위 파이썬을 1초 타임아웃으로 실행
    cmd = [sys.executable, "-u", "-c",
           "import time,sys; [print(i) or sys.stdout.flush() or time.sleep(0.3) for i in range(5)]"]

    timed_out, rc, out, err = await run_with_timeout(cmd, timeout=1.0)
    if timed_out:
        print("[TIMEOUT] 하위 프로세스 트리를 모두 종료했습니다.")
    else:
        print("RC:", rc)
        print("STDOUT:\n", out)
        print("STDERR:\n", err)

if __name__ == "__main__":
    asyncio.run(main())

핵심 포인트 (요약)

  • 두 Task를 그룹처럼 : proc.communicate() Task와 asyncio.sleep(timeout) Task를 asyncio.wait(..., FIRST_COMPLETED)로 경합시킵니다.

  • 타임아웃 경로 : 타이머가 먼저 끝나면 terminate_process_tree()프로세스 그룹(트리)을 종료 → 통신 Task 취소.

  • 프로세스 완료 경로 : 프로세스가 먼저 끝나면 타이머 Task만 취소.

  • 트리 종료를 위해 스폰 옵션 필수

    • POSIX: start_new_session=True (≒ preexec_fn=os.setsid)

    • Windows: creationflags=CREATE_NEW_PROCESS_GROUP

  • Windows에선 CTRL_BREAK_EVENT → 불응 시 taskkill /T로 강제 정리.

이 패턴이면 “타이머를 별도 Task로 두고, 타임아웃 시 자손까지 전부 종료” 요구사항을 그대로 충족합니다.

Categories:

Updated: