Async request 취소 처리
comfyui에서 custom node의 메인 함수를 async로 만들 수 있게 되었어. 그리고, openai python 패키지에서 AsyncOpenAI와 aiohttp를 이용해서 async하게 request를 보낼 수 있어. 근데 문제는 UI에서 stop을 보내고 interrupt가 즉시 동작하지 않는다는거야. request를 보낸 후 await 하는 도중에 작업이 취소되게 하고 싶어
좋아, 핵심은 “네트워크 요청 await 중에도 바로 끊기게” 만드는 거야.
asyncio에서는 취소 신호와 실제 작업을 ‘경쟁(race)’ 시키면 돼. 즉, 요청을 Task로 만들고, UI에서 오는 stop 신호(이하 stop_event)의 wait()도 Task로 만들어서 둘 중 먼저 끝나는 쪽 을 보고 행동을 바꾸면 즉시 중단이 가능해.
아래를 그대로 가져다 쓸 수 있게 단계별(그리고 안전한) 패턴으로 정리했어.
1) 공통 유틸: 취소 가능한 await 래퍼
import asyncio
import contextlib
class StopByUser(Exception):
"""UI stop/interrupt로 중단되었음을 나타내는 애플리케이션 레벨 예외."""
pass
async def await_cancellable(coro, stop_event):
"""
coro(코루틴)와 stop_event.wait()를 경쟁시킨다.
- stop이 먼저 오면 coro Task를 cancel하고 정리한 뒤 StopByUser를 올린다.
- 요청이 먼저 끝나면 stop 대기 Task를 취소하고 결과를 반환한다.
"""
req_task = asyncio.create_task(coro)
stop_task = asyncio.create_task(stop_event.wait())
try:
done, pending = await asyncio.wait(
{req_task, stop_task}, return_when=asyncio.FIRST_COMPLETED
)
if stop_task in done:
# UI가 stop을 눌렀다: 네트워크 요청을 취소
req_task.cancel()
# 취소 정리 끝까지 기다려 경고/리소스 누수 방지
with contextlib.suppress(asyncio.CancelledError):
await req_task
raise StopByUser("Stopped by user")
# 요청이 먼저 끝남: stop 대기는 취소
stop_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await stop_task
return await req_task # 결과 리턴
except asyncio.CancelledError:
# 바깥 컨텍스트(ComfyUI 실행기)가 이 Task 자체를 취소한 경우: 바로 전파
raise
finally:
# 혹시 남아있으면 확실히 정리
for t in (req_task, stop_task):
if not t.done():
t.cancel()
with contextlib.suppress(asyncio.CancelledError):
await t
중요: 코드 어딘가에
except Exception:같은 광범위한 예외 처리기가 있다면, 반드시
except asyncio.CancelledError: raise를 맨 위에 따로 두어 취소가 삼켜지지 않게 하세요.
(파이썬 버전에 따라CancelledError가 일반Exception에 잡힐 수 있으니 명시적으로 분기하는 게 안전합니다.)
2) ComfyUI 커스텀 노드(main=async)에서 적용
ComfyUI가 stop을 누르면 내부적으로 “중단 플래그/이벤트”가 켜질 텐데, 그걸 stop_event(asyncio.Event 호환)로 받는다고 가정해 보자.
(ComfyUI에서 제공하는 실제 인터럽트 신호를 어떻게 얻는지는 프로젝트마다 다를 수 있어. 노드에 주입 받거나, 실행 컨텍스트에서 꺼내 쓰는 식으로 연결해 주면 된다.)
from openai import AsyncOpenAI
import asyncio
import contextlib
class AsyncLLMNode:
@classmethod
def INPUT_TYPES(cls):
return {"required": {"prompt": ("STRING", {"multiline": True})}}
RETURN_TYPES = ("STRING",)
FUNCTION = "main"
CATEGORY = "LLM"
async def main(self, prompt, stop_event=None):
# stop_event는 ComfyUI 쪽에서 넘겨주도록 연결해 두는 걸 권장
if stop_event is None:
# 최악의 경우를 대비한 더미 (즉시 완료되지 않음)
stop_event = asyncio.Event()
client = AsyncOpenAI() # 필요시 api_key, base_url 등 설정
try:
# ◇ 비-스트리밍 요청 예시
coro = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0.2,
)
resp = await await_cancellable(coro, stop_event)
text = resp.choices[0].message.content
return (text,)
except StopByUser:
# ComfyUI가 즉시 멈췄음을 위쪽으로 알리거나, 빈 결과/상태 메시지를 반환
# ComfyUI 쪽 정책에 맞춰 raise/return 중 하나를 선택
raise # 보통은 raise로 전파하는 게 인터럽트 체인에 충실
except asyncio.CancelledError:
# 바깥에서 Task 자체가 취소된 경우
raise
finally:
# http 커넥션 정리
with contextlib.suppress(Exception):
await client.close()
3) 스트리밍(부분 응답)일 때도 즉시 중단
스트리밍은 내부적으로 async for로 청크를 읽는 동안 __anext__()에서 블록돼서
stop을 눌러도 바로 반응을 못할 수 있어. 이때도 chunk 읽기 vs stop 를 경쟁시키면 해결돼.
from openai import AsyncOpenAI
import asyncio
import contextlib
async def stream_chat_cancellable(client, prompt: str, stop_event):
# 새로운 Responses API 예시 (필요에 맞춰 Chat Completions로 바꿔도 동일 패턴)
async with client.responses.stream(
model="gpt-4o-mini",
input=prompt,
) as stream:
aiter = stream.aiter_text() # 텍스트 청크 이터레이터
chunks = []
while True:
next_chunk = asyncio.create_task(aiter.__anext__())
stop_task = asyncio.create_task(stop_event.wait())
done, pending = await asyncio.wait(
{next_chunk, stop_task}, return_when=asyncio.FIRST_COMPLETED
)
if stop_task in done:
# 읽기 중단 + 연결 정리
next_chunk.cancel()
with contextlib.suppress(asyncio.CancelledError):
await next_chunk
await stream.close() # 서버 쪽 스트림도 닫기
raise StopByUser("Stopped by user")
try:
chunks.append(next_chunk.result())
except StopAsyncIteration:
# 스트림 종료
break
finally:
stop_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await stop_task
return "".join(chunks)
ComfyUI 노드에서:
async def main(self, prompt, stop_event=None):
if stop_event is None:
stop_event = asyncio.Event()
client = AsyncOpenAI()
try:
text = await stream_chat_cancellable(client, prompt, stop_event)
return (text,)
except StopByUser:
raise
finally:
with contextlib.suppress(Exception):
await client.close()
4) aiohttp 순수 사용 시 예시
import aiohttp
import asyncio
import contextlib
from yarl import URL
async def fetch_json_cancellable(url: str, payload: dict, stop_event):
async with aiohttp.ClientSession() as session:
# POST 코루틴 생성
async def do_post():
async with session.post(url, json=payload) as resp:
resp.raise_for_status()
return await resp.json()
return await await_cancellable(do_post(), stop_event)
참고:
aiohttp는 Task 취소 시 소켓을 닫으며CancelledError가 전파돼요.
위의 패턴처럼 항상 loser task를cancel()하고await해서 경고 없이 정리하세요.
5) 흔한 “즉시 중단이 안 되는” 원인 체크리스트
-
CancelledError를 삼키는 예외 처리except Exception:이 있다면 바로 위에except asyncio.CancelledError: raise를 추가.
-
await없이 동기 블로킹 코드- CPU 바운드 루프/블로킹 I/O는 이벤트 루프를 막아 취소가 늦어짐 →
await가능한 API로 바꾸거나,run_in_executor()로 분리.
- CPU 바운드 루프/블로킹 I/O는 이벤트 루프를 막아 취소가 늦어짐 →
-
스트리밍 루프에서 stop을 안 보고 있음
- 위의 “스트리밍 경쟁” 패턴으로
__anext__()와stop_event.wait()를 경쟁시키기.
- 위의 “스트리밍 경쟁” 패턴으로
-
정리(cleanup)를 취소가 덮어버림
- 정리 과정은
asyncio.shield()로 감싸거나with contextlib.suppress(asyncio.CancelledError)로 마무리.
- 정리 과정은
-
긴 네트워크 타임아웃
- 취소 자체는 타임아웃과 무관하지만, 라이브러리 정리가 지연될 수 있음 → 세션/스트림을
close()해주기.
- 취소 자체는 타임아웃과 무관하지만, 라이브러리 정리가 지연될 수 있음 → 세션/스트림을
6) ComfyUI 쪽 ‘stop_event’ 연결 팁
-
프로젝트에 따라 다르지만, 보통 실행 컨텍스트 나 노드 인자 주입 으로
asyncio.Event를 받을 수 있게 해두면 가장 깔끔해. -
만약 지금은 그런 훅이 없다면:
-
최소한 주기적으로 확인하는 polling을 넣는 대신, 위의 경쟁 패턴 을 사용하면 “await 중에도” 바로 반응한다.
-
stop을 누를 때 노드 측에
stop_event.set()이 호출되도록 연결(주입)만 해주면 된다.
-
필요하면 너의 현재 노드 코드 일부를 붙여줘. 거기에 맞춰 await_cancellable을 딱 들어맞게 리팩터링해 줄게.
stop_event를 넣는다는게 뭔지 모르겠어. comfyui 내부적으로 interrupt 버튼이 있잖아? 그 버튼을 눌렀을 때 취소되면 좋겠어
좋아, “UI의 Interrupt 버튼(또는 /interrupt)이 눌리면 내 async 커스텀 노드가 지금 await 중인 OpenAI 요청을 즉시 끊고 싶다”가 목표지요. 핵심은 ComfyUI가 인터럽트를 “플래그”로만 알리고, 실제로는 노드가 협조적으로 취소를 처리해야 한다는 점이에요. 아래를 그대로 넣으면 됩니다 — 단계별로 설명할게요.
1) ComfyUI가 인터럽트를 어떻게 알리냐
-
프론트엔드 Interrupt 버튼/단축키가
POST /interrupt를 호출합니다. 서버는 내부 인터럽트 플래그를 세팅해요. ComfyUI+2 -
실행기는 다음 노드 경계 혹은 노드 내부에서 체크될 때
InterruptProcessingException으로 중단을 전파하고, 웹소켓 이벤트execution_interrupted를 보냅니다. ComfyUI -
단, 노드가 블로킹 상태(타이트 루프/블로킹 I/O) 면 인터럽트가 바로 처리되지 않을 수 있습니다. 그래서 노드가 주기적으로 인터럽트를 체크 하거나
await를 취소해 줘야 해요. GitHub
2) 커스텀 노드에서 “협조적 취소”를 붙이는 가장 쉬운 방법
ComfyUI는 comfy.model_management.throw_exception_if_processing_interrupted() 를 제공합니다. 이걸 주기적으로 호출하면 인터럽트 시 InterruptProcessingException 을 던져요. (공식 코드와 여러 예시 노드에서 사용됩니다.) Gitee+2
아이디어: 요청 태스크 와 인터럽트 감시 태스크 를 동시에 돌리고, 감지되면 요청 태스크를 cancel()로 끊습니다.
# 커스텀 노드 파일 안
import asyncio
import aiohttp
import comfy.model_management as mm
from openai import AsyncOpenAI
async def _watch_interrupt_and_cancel(task: asyncio.Task):
"""ComfyUI 인터럽트가 오면 주어진 task를 cancel()"""
try:
while not task.done():
await asyncio.sleep(0.05) # 폴링 주기
mm.throw_exception_if_processing_interrupted() # 인터럽트면 예외 발생
except mm.InterruptProcessingException:
task.cancel() # 메인 요청 취소
class MyAsyncNode:
CATEGORY = "example"
@classmethod
def INPUT_TYPES(cls):
return {"required": {"prompt": ("STRING", {"multiline": True})}}
RETURN_TYPES = ("STRING",)
FUNCTION = "run"
async def run(self, prompt: str):
client = AsyncOpenAI() # 필요 시 api_key, base_url, timeout 등 설정
# 1) OpenAI 비동기 요청을 태스크로 생성
req_task = asyncio.create_task(
client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
timeout=120, # 선택: 요청 타임아웃
max_retries=0 # 선택: 재시도 비활성화(취소 응답을 빠르게)
)
)
# 2) 인터럽트 감시 태스크 시작
watcher = asyncio.create_task(_watch_interrupt_and_cancel(req_task))
try:
# 3) 결과 대기 — 인터럽트 시 req_task가 CancelledError로 깨짐
resp = await req_task
text = resp.choices[0].message.content
return (text,)
except asyncio.CancelledError:
# 4) ComfyUI에 “사용자 인터럽트”로 알리려면 이 예외로 승격
raise mm.InterruptProcessingException()
finally:
# 5) 워처 정리
watcher.cancel()
with contextlib.suppress(Exception):
await watcher
왜 이렇게 하느냐?
-
UI 버튼 →
/interrupt→ 내부 플래그 on. ComfyUI -
감시 코루틴이
throw_exception_if_processing_interrupted()호출 중 예외를 받음 →req_task.cancel()→await가 즉시CancelledError로 깨짐 → 이를InterruptProcessingException으로 다시 던져 ComfyUI가 “Processing interrupted” 경로로 종료. ComfyUI+1
3) 대안: asyncio.wait()로 “레이스” 구성
폴링 코드를 줄이고 싶다면, “인터럽트 대기 코루틴”과 요청을 레이스 시키는 방법도 있어요:
async def _wait_for_interrupt():
while True:
await asyncio.sleep(0.05)
mm.throw_exception_if_processing_interrupted()
# ...
req_task = asyncio.create_task(client.chat.completions.create(...))
int_task = asyncio.create_task(_wait_for_interrupt())
done, pending = await asyncio.wait(
{req_task, int_task}, return_when=asyncio.FIRST_COMPLETED
)
if int_task in done:
req_task.cancel()
raise mm.InterruptProcessingException()
else:
int_task.cancel()
return ( (await req_task).choices[0].message.content, )
4) 자주 겪는 함정 & 팁
-
블로킹 코드(동기 HTTP, 무한 루프 등)가 있으면 인터럽트가 “즉시” 안 먹힙니다. 네트워크는 반드시 aiohttp/비동기 를 쓰고, 루프 안에는 간간이
mm.throw_exception_if_processing_interrupted()를 넣으세요. GitHub -
OpenAI 비동기 클라이언트 는
AsyncOpenAI를 쓰면 됩니다(요청 옵션에timeout등 전달 가능). -
취소 전파 :
asyncio.CancelledError를 잡아서mm.InterruptProcessingException으로 다시 던지는 것 이 포인트입니다 — 그래야 UI에 정상적인 “중단”으로 보입니다. -
키보드 단축키 도 인터럽트를 보냅니다(Ctrl+Alt+Enter). ComfyUI
5) 동작 원리 한눈 요약
-
유저가 Interrupt 클릭 →
POST /interrupt호출 → 내부 플래그 on. ComfyUI -
내 노드의 “인터럽트 워처”가 플래그 감지 → 요청 태스크 cancel().
-
내 코드가
CancelledError를 받아InterruptProcessingException재-발생 → 실행기에서execution_interrupted이벤트 전송 & 즉시 중단. ComfyUI
이렇게 붙이면 “UI에서 정지 → 곧장 당신의 await가 끊기는” 동작을 안정적으로 만들 수 있어요.