comfyui에서 custom node의 메인 함수를 async로 만들 수 있게 되었어. 그리고, openai python 패키지에서 AsyncOpenAI와 aiohttp를 이용해서 async하게 request를 보낼 수 있어. 근데 문제는 UI에서 stop을 보내고 interrupt가 즉시 동작하지 않는다는거야. request를 보낸 후 await 하는 도중에 작업이 취소되게 하고 싶어

좋아, 핵심은 “네트워크 요청 await 중에도 바로 끊기게” 만드는 거야.
asyncio에서는 취소 신호와 실제 작업을 ‘경쟁(race)’ 시키면 돼. 즉, 요청을 Task로 만들고, UI에서 오는 stop 신호(이하 stop_event)의 wait()Task로 만들어서 둘 중 먼저 끝나는 쪽 을 보고 행동을 바꾸면 즉시 중단이 가능해.

아래를 그대로 가져다 쓸 수 있게 단계별(그리고 안전한) 패턴으로 정리했어.


1) 공통 유틸: 취소 가능한 await 래퍼

import asyncio
import contextlib

class StopByUser(Exception):
    """UI stop/interrupt로 중단되었음을 나타내는 애플리케이션 레벨 예외."""
    pass

async def await_cancellable(coro, stop_event):
    """
    coro(코루틴)와 stop_event.wait()를 경쟁시킨다.
    - stop이 먼저 오면 coro Task를 cancel하고 정리한 뒤 StopByUser를 올린다.
    - 요청이 먼저 끝나면 stop 대기 Task를 취소하고 결과를 반환한다.
    """
    req_task = asyncio.create_task(coro)
    stop_task = asyncio.create_task(stop_event.wait())
    try:
        done, pending = await asyncio.wait(
            {req_task, stop_task}, return_when=asyncio.FIRST_COMPLETED
        )

        if stop_task in done:
            # UI가 stop을 눌렀다: 네트워크 요청을 취소
            req_task.cancel()
            # 취소 정리 끝까지 기다려 경고/리소스 누수 방지
            with contextlib.suppress(asyncio.CancelledError):
                await req_task
            raise StopByUser("Stopped by user")

        # 요청이 먼저 끝남: stop 대기는 취소
        stop_task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await stop_task
        return await req_task  # 결과 리턴

    except asyncio.CancelledError:
        # 바깥 컨텍스트(ComfyUI 실행기)가 이 Task 자체를 취소한 경우: 바로 전파
        raise

    finally:
        # 혹시 남아있으면 확실히 정리
        for t in (req_task, stop_task):
            if not t.done():
                t.cancel()
                with contextlib.suppress(asyncio.CancelledError):
                    await t

중요: 코드 어딘가에 except Exception: 같은 광범위한 예외 처리기가 있다면, 반드시
except asyncio.CancelledError: raise맨 위에 따로 두어 취소가 삼켜지지 않게 하세요.
(파이썬 버전에 따라 CancelledError가 일반 Exception에 잡힐 수 있으니 명시적으로 분기하는 게 안전합니다.)


2) ComfyUI 커스텀 노드(main=async)에서 적용

ComfyUI가 stop을 누르면 내부적으로 “중단 플래그/이벤트”가 켜질 텐데, 그걸 stop_event(asyncio.Event 호환)로 받는다고 가정해 보자.
(ComfyUI에서 제공하는 실제 인터럽트 신호를 어떻게 얻는지는 프로젝트마다 다를 수 있어. 노드에 주입 받거나, 실행 컨텍스트에서 꺼내 쓰는 식으로 연결해 주면 된다.)

from openai import AsyncOpenAI
import asyncio
import contextlib

class AsyncLLMNode:
    @classmethod
    def INPUT_TYPES(cls):
        return {"required": {"prompt": ("STRING", {"multiline": True})}}

    RETURN_TYPES = ("STRING",)
    FUNCTION = "main"
    CATEGORY = "LLM"

    async def main(self, prompt, stop_event=None):
        # stop_event는 ComfyUI 쪽에서 넘겨주도록 연결해 두는 걸 권장
        if stop_event is None:
            # 최악의 경우를 대비한 더미 (즉시 완료되지 않음)
            stop_event = asyncio.Event()

        client = AsyncOpenAI()  # 필요시 api_key, base_url 등 설정
        try:
            # ◇ 비-스트리밍 요청 예시
            coro = client.chat.completions.create(
                model="gpt-4o-mini",
                messages=[{"role": "user", "content": prompt}],
                temperature=0.2,
            )

            resp = await await_cancellable(coro, stop_event)
            text = resp.choices[0].message.content
            return (text,)

        except StopByUser:
            # ComfyUI가 즉시 멈췄음을 위쪽으로 알리거나, 빈 결과/상태 메시지를 반환
            # ComfyUI 쪽 정책에 맞춰 raise/return 중 하나를 선택
            raise  # 보통은 raise로 전파하는 게 인터럽트 체인에 충실

        except asyncio.CancelledError:
            # 바깥에서 Task 자체가 취소된 경우
            raise

        finally:
            # http 커넥션 정리
            with contextlib.suppress(Exception):
                await client.close()

3) 스트리밍(부분 응답)일 때도 즉시 중단

스트리밍은 내부적으로 async for로 청크를 읽는 동안 __anext__()에서 블록돼서
stop을 눌러도 바로 반응을 못할 수 있어. 이때도 chunk 읽기 vs stop 를 경쟁시키면 해결돼.

from openai import AsyncOpenAI
import asyncio
import contextlib

async def stream_chat_cancellable(client, prompt: str, stop_event):
    # 새로운 Responses API 예시 (필요에 맞춰 Chat Completions로 바꿔도 동일 패턴)
    async with client.responses.stream(
        model="gpt-4o-mini",
        input=prompt,
    ) as stream:
        aiter = stream.aiter_text()  # 텍스트 청크 이터레이터
        chunks = []
        while True:
            next_chunk = asyncio.create_task(aiter.__anext__())
            stop_task  = asyncio.create_task(stop_event.wait())
            done, pending = await asyncio.wait(
                {next_chunk, stop_task}, return_when=asyncio.FIRST_COMPLETED
            )

            if stop_task in done:
                # 읽기 중단 + 연결 정리
                next_chunk.cancel()
                with contextlib.suppress(asyncio.CancelledError):
                    await next_chunk
                await stream.close()  # 서버 쪽 스트림도 닫기
                raise StopByUser("Stopped by user")

            try:
                chunks.append(next_chunk.result())
            except StopAsyncIteration:
                # 스트림 종료
                break
            finally:
                stop_task.cancel()
                with contextlib.suppress(asyncio.CancelledError):
                    await stop_task

        return "".join(chunks)

ComfyUI 노드에서:

async def main(self, prompt, stop_event=None):
    if stop_event is None:
        stop_event = asyncio.Event()
    client = AsyncOpenAI()
    try:
        text = await stream_chat_cancellable(client, prompt, stop_event)
        return (text,)
    except StopByUser:
        raise
    finally:
        with contextlib.suppress(Exception):
            await client.close()

4) aiohttp 순수 사용 시 예시

import aiohttp
import asyncio
import contextlib
from yarl import URL

async def fetch_json_cancellable(url: str, payload: dict, stop_event):
    async with aiohttp.ClientSession() as session:
        # POST 코루틴 생성
        async def do_post():
            async with session.post(url, json=payload) as resp:
                resp.raise_for_status()
                return await resp.json()

        return await await_cancellable(do_post(), stop_event)

참고: aiohttp는 Task 취소 시 소켓을 닫으며 CancelledError가 전파돼요.
위의 패턴처럼 항상 loser task를 cancel()하고 await해서 경고 없이 정리하세요.


5) 흔한 “즉시 중단이 안 되는” 원인 체크리스트

  1. CancelledError를 삼키는 예외 처리

    • except Exception: 이 있다면 바로 위에 except asyncio.CancelledError: raise 를 추가.
  2. await 없이 동기 블로킹 코드

    • CPU 바운드 루프/블로킹 I/O는 이벤트 루프를 막아 취소가 늦어짐 → await 가능한 API로 바꾸거나, run_in_executor()로 분리.
  3. 스트리밍 루프에서 stop을 안 보고 있음

    • 위의 “스트리밍 경쟁” 패턴으로 __anext__()stop_event.wait()를 경쟁시키기.
  4. 정리(cleanup)를 취소가 덮어버림

    • 정리 과정은 asyncio.shield()로 감싸거나 with contextlib.suppress(asyncio.CancelledError)로 마무리.
  5. 긴 네트워크 타임아웃

    • 취소 자체는 타임아웃과 무관하지만, 라이브러리 정리가 지연될 수 있음 → 세션/스트림을 close() 해주기.

6) ComfyUI 쪽 ‘stop_event’ 연결 팁

  • 프로젝트에 따라 다르지만, 보통 실행 컨텍스트노드 인자 주입 으로 asyncio.Event를 받을 수 있게 해두면 가장 깔끔해.

  • 만약 지금은 그런 훅이 없다면:

    • 최소한 주기적으로 확인하는 polling을 넣는 대신, 위의 경쟁 패턴 을 사용하면 “await 중에도” 바로 반응한다.

    • stop을 누를 때 노드 측에 stop_event.set()이 호출되도록 연결(주입)만 해주면 된다.


필요하면 너의 현재 노드 코드 일부를 붙여줘. 거기에 맞춰 await_cancellable을 딱 들어맞게 리팩터링해 줄게.

stop_event를 넣는다는게 뭔지 모르겠어. comfyui 내부적으로 interrupt 버튼이 있잖아? 그 버튼을 눌렀을 때 취소되면 좋겠어

좋아, “UI의 Interrupt 버튼(또는 /interrupt)이 눌리면 내 async 커스텀 노드가 지금 await 중인 OpenAI 요청을 즉시 끊고 싶다”가 목표지요. 핵심은 ComfyUI가 인터럽트를 “플래그”로만 알리고, 실제로는 노드가 협조적으로 취소를 처리해야 한다는 점이에요. 아래를 그대로 넣으면 됩니다 — 단계별로 설명할게요.


1) ComfyUI가 인터럽트를 어떻게 알리냐

  • 프론트엔드 Interrupt 버튼/단축키가 POST /interrupt 를 호출합니다. 서버는 내부 인터럽트 플래그를 세팅해요. ComfyUI+2

  • 실행기는 다음 노드 경계 혹은 노드 내부에서 체크될 때 InterruptProcessingException 으로 중단을 전파하고, 웹소켓 이벤트 execution_interrupted 를 보냅니다. ComfyUI

  • 단, 노드가 블로킹 상태(타이트 루프/블로킹 I/O) 면 인터럽트가 바로 처리되지 않을 수 있습니다. 그래서 노드가 주기적으로 인터럽트를 체크 하거나 await 를 취소해 줘야 해요. GitHub


2) 커스텀 노드에서 “협조적 취소”를 붙이는 가장 쉬운 방법

ComfyUI는 comfy.model_management.throw_exception_if_processing_interrupted() 를 제공합니다. 이걸 주기적으로 호출하면 인터럽트 시 InterruptProcessingException 을 던져요. (공식 코드와 여러 예시 노드에서 사용됩니다.) Gitee+2

아이디어: 요청 태스크인터럽트 감시 태스크 를 동시에 돌리고, 감지되면 요청 태스크를 cancel()로 끊습니다.

# 커스텀 노드 파일 안
import asyncio
import aiohttp
import comfy.model_management as mm
from openai import AsyncOpenAI

async def _watch_interrupt_and_cancel(task: asyncio.Task):
    """ComfyUI 인터럽트가 오면 주어진 task를 cancel()"""
    try:
        while not task.done():
            await asyncio.sleep(0.05)  # 폴링 주기
            mm.throw_exception_if_processing_interrupted()  # 인터럽트면 예외 발생
    except mm.InterruptProcessingException:
        task.cancel()  # 메인 요청 취소

class MyAsyncNode:
    CATEGORY = "example"

    @classmethod
    def INPUT_TYPES(cls):
        return {"required": {"prompt": ("STRING", {"multiline": True})}}

    RETURN_TYPES = ("STRING",)
    FUNCTION = "run"

    async def run(self, prompt: str):
        client = AsyncOpenAI()  # 필요 시 api_key, base_url, timeout 등 설정

        # 1) OpenAI 비동기 요청을 태스크로 생성
        req_task = asyncio.create_task(
            client.chat.completions.create(
                model="gpt-4o-mini",
                messages=[{"role": "user", "content": prompt}],
                timeout=120,          # 선택: 요청 타임아웃
                max_retries=0         # 선택: 재시도 비활성화(취소 응답을 빠르게)
            )
        )

        # 2) 인터럽트 감시 태스크 시작
        watcher = asyncio.create_task(_watch_interrupt_and_cancel(req_task))

        try:
            # 3) 결과 대기 — 인터럽트 시 req_task가 CancelledError로 깨짐
            resp = await req_task
            text = resp.choices[0].message.content
            return (text,)
        except asyncio.CancelledError:
            # 4) ComfyUI에 “사용자 인터럽트”로 알리려면 이 예외로 승격
            raise mm.InterruptProcessingException()
        finally:
            # 5) 워처 정리
            watcher.cancel()
            with contextlib.suppress(Exception):
                await watcher

왜 이렇게 하느냐?

  • UI 버튼/interrupt → 내부 플래그 on. ComfyUI

  • 감시 코루틴이 throw_exception_if_processing_interrupted() 호출 중 예외를 받음req_task.cancel()await가 즉시 CancelledError 로 깨짐 → 이를 InterruptProcessingException 으로 다시 던져 ComfyUI가 “Processing interrupted” 경로로 종료. ComfyUI+1


3) 대안: asyncio.wait()로 “레이스” 구성

폴링 코드를 줄이고 싶다면, “인터럽트 대기 코루틴”과 요청을 레이스 시키는 방법도 있어요:

async def _wait_for_interrupt():
    while True:
        await asyncio.sleep(0.05)
        mm.throw_exception_if_processing_interrupted()

# ...
req_task = asyncio.create_task(client.chat.completions.create(...))
int_task = asyncio.create_task(_wait_for_interrupt())

done, pending = await asyncio.wait(
    {req_task, int_task}, return_when=asyncio.FIRST_COMPLETED
)

if int_task in done:
    req_task.cancel()
    raise mm.InterruptProcessingException()
else:
    int_task.cancel()
    return ( (await req_task).choices[0].message.content, )

4) 자주 겪는 함정 & 팁

  • 블로킹 코드(동기 HTTP, 무한 루프 등)가 있으면 인터럽트가 “즉시” 안 먹힙니다. 네트워크는 반드시 aiohttp/비동기 를 쓰고, 루프 안에는 간간이 mm.throw_exception_if_processing_interrupted() 를 넣으세요. GitHub

  • OpenAI 비동기 클라이언트AsyncOpenAI 를 쓰면 됩니다(요청 옵션에 timeout 등 전달 가능).

  • 취소 전파 : asyncio.CancelledError잡아서 mm.InterruptProcessingException 으로 다시 던지는 것 이 포인트입니다 — 그래야 UI에 정상적인 “중단”으로 보입니다.

  • 키보드 단축키 도 인터럽트를 보냅니다(Ctrl+Alt+Enter). ComfyUI


5) 동작 원리 한눈 요약

  1. 유저가 Interrupt 클릭 → POST /interrupt 호출 → 내부 플래그 on. ComfyUI

  2. 내 노드의 “인터럽트 워처”가 플래그 감지 → 요청 태스크 cancel().

  3. 내 코드가 CancelledError 를 받아 InterruptProcessingException 재-발생 → 실행기에서 execution_interrupted 이벤트 전송 & 즉시 중단. ComfyUI

이렇게 붙이면 “UI에서 정지 → 곧장 당신의 await가 끊기는” 동작을 안정적으로 만들 수 있어요.

3번처럼 구성했어. 그런데 너가 만들어준 노드들이 다수의 노드가 동시에 실행될수도 있는데 그러면 그 중에 하나만 취소되는거 같기도해. 왜냐면 throw_exception_if_processing_interrupted() 함수가 interrupt_processing을 False로 만들거든. 그러면 두 번째 노드는 취소된지 모르지 않을까

맞아. 네가 본 그대로야. ComfyUI의 인터럽트 플래그는 “한 번만 소비되는(edge-triggered)” 동작을 해.
소스 보면:

  • interrupt_current_processing(True) → 전역 플래그를 True 로 셋팅.

  • processing_interrupted()플래그를 읽기만 함(안 지움). Hugging Face

  • throw_exception_if_processing_interrupted() → 플래그가 True면 False로 지우고 InterruptProcessingException을 던짐(즉, 소비). 그래서 먼저 호출한 노드만 확실히 취소 신호를 보게 되고, 뒤늦게 체크한 다른 노드는 “몰랐다”가 될 수 있어. Hugging Face

그래서 “여러 노드가 동시에 돌 때 하나만 취소되는 것 같다”는 네 추측이 정확해.


이렇게 바꿔서 해결해

핵심은 노드 내부에서 플래그를 ‘읽기만’ 하되 지우지 않는 것 이야.
즉, 감시 코루틴은 throw_exception_if_processing_interrupted()(소비) 대신 processing_interrupted()(비소비)로 체크하고, 직접InterruptProcessingException을 raise 하자. 그러면 모든 실행 중 노드가 같은 인터럽트를 감지하고 동시에 멈출 수 있어.

1) 레이스 패턴(요청 vs 인터럽트) 수정본

import asyncio
import contextlib
import comfy.model_management as mm
from openai import AsyncOpenAI

async def _wait_for_interrupt_level_trigger():
    # 플래그를 '읽기만' 하는 루프: 여러 노드가 동시에 True를 감지 가능
    while True:
        await asyncio.sleep(0.05)
        if mm.processing_interrupted():           # ← 소비 안 함
            raise mm.InterruptProcessingException()

class MyAsyncNode:
    CATEGORY = "example"
    RETURN_TYPES = ("STRING",)
    FUNCTION = "run"

    @classmethod
    def INPUT_TYPES(cls):
        return {"required": {"prompt": ("STRING", {"multiline": True})}}

    async def run(self, prompt: str):
        client = AsyncOpenAI()
        try:
            req_task = asyncio.create_task(
                client.chat.completions.create(
                    model="gpt-4o-mini",
                    messages=[{"role": "user", "content": prompt}],
                    timeout=120,
                    max_retries=0,
                )
            )
            int_task = asyncio.create_task(_wait_for_interrupt_level_trigger())

            done, _ = await asyncio.wait({req_task, int_task},
                                        return_when=asyncio.FIRST_COMPLETED)

            if int_task in done:
                # 인터럽트가 먼저 옴: 네트워크 요청 즉시 취소
                req_task.cancel()
                with contextlib.suppress(asyncio.CancelledError):
                    await req_task
                # 여기서 throw_exception_if_processing_interrupted()를 부르면
                # 플래그를 '지워' 버려 다른 노드가 못 봄 → 쓰지 말자.
                raise mm.InterruptProcessingException()

            # 요청이 먼저 끝남
            int_task.cancel()
            with contextlib.suppress(asyncio.CancelledError):
                await int_task
            resp = await req_task
            return (resp.choices[0].message.content,)

        except asyncio.CancelledError:
            # 상위에서 태스크 자체를 취소한 경우
            raise
        finally:
            with contextlib.suppress(Exception):
                await client.close()

포인트: 노드 코드에서는throw_exception_if_processing_interrupted()를 호출하지 않는다.
모든 실행 중 노드가 processing_interrupted()로 동시에 감지 → 각자 자신의 작업을 취소하고 직접 InterruptProcessingException을 던진다.
이렇게 해야 플래그가 한 노드에 의해 먼저 ‘소비’되어 사라지는 일이 없다. (해당 함수가 플래그를 False로 만들고 예외를 던지는 동작은 소스에 명시돼 있음) Hugging Face

2) 스트리밍 루프도 동일 원리

청크 읽기(__anext__)와 인터럽트 감시를 레이스시키되, 감시는 processing_interrupted()만 사용:

async def stream_chat_cancellable(client, prompt: str):
    async with client.responses.stream(model="gpt-4o-mini", input=prompt) as stream:
        aiter = stream.aiter_text()
        chunks = []
        while True:
            next_chunk = asyncio.create_task(aiter.__anext__())
            int_task   = asyncio.create_task(_wait_for_interrupt_level_trigger())

            done, _ = await asyncio.wait({next_chunk, int_task},
                                         return_when=asyncio.FIRST_COMPLETED)

            if int_task in done:
                next_chunk.cancel()
                with contextlib.suppress(asyncio.CancelledError):
                    await next_chunk
                await stream.close()
                raise mm.InterruptProcessingException()

            try:
                chunks.append(next_chunk.result())
            except StopAsyncIteration:
                break
            finally:
                int_task.cancel()
                with contextlib.suppress(asyncio.CancelledError):
                    await int_task

        return "".join(chunks)

왜 이게 안전한가 (step-by-step)

  1. 사용자가 UI에서 Interrupt 버튼 → 서버가 interrupt_current_processing(True)로 전역 플래그를 킴.

  2. 각 노드의 감시 코루틴이 지우지 않는 processing_interrupted()로 True를 감지. (동시에 여러 노드 OK) Hugging Face

  3. 감지한 노드는 자기 네트워크 태스크를cancel() 하고 InterruptProcessingException직접 raise.

  4. 실행기는 이 예외를 받아 워크플로를 “Processing interrupted”로 종료.

  5. 플래그 초기화는 코어 쪽(다음 실행 준비 타이밍)에 의해 처리되거나, 코어가 적절히 throw_exception_if_processing_interrupted()를 한 번만 호출해 소비함. (이 함수가 플래그를 지우는 동작은 소스에 명확) Hugging Face

요약: 노드 단에서는 ‘읽기(레벨 트리거)’만 하고, 지우는(엣지 소비) 일은 코어에게 맡겨라.


추가 체크리스트

  • 예외 처리에서 CancelledError절대 삼키지 말고 전파 하세요. (취소 감도가 떨어지는 대표 원인)

  • aiohttp/OpenAI 비동기는 태스크 취소 시 소켓을 닫고 CancelledError를 던집니다. 취소 후에는 with suppress(asyncio.CancelledError): await task정리까지 해 주세요.

  • 장시간 블로킹/CPU 작업이 있으면 processing_interrupted()를 루프에 간간히 넣어 주세요. (블로킹이면 인터럽트가 늦게 반영)

이대로 바꾸면 “동시에 도는 여러 노드 중 하나만 멈추는” 현상 없이, 모두가 즉시 멈추는 동작이 됩니다.

Exception을 여러 노드에서 뿌려도 돼?

짧게 답하면: 네, 여러 노드에서InterruptProcessingException을 동시에(또는 거의 동시에) 던져도 됩니다.
ComfyUI 실행기는 이 예외를 “취소 신호”로 취급하고 상위 실행 루프에서 잡아서 프롬프트 실행을 중단하도록 설계되어 있어요. 한 노드가 던지든, 여러 노드가 던지든 결과는 같고, 추가로 던져지는 예외들은 상위에서 동일하게 처리(로그만 남기고 무시)됩니다. 공식 문서에도 “노드가 InterruptProcessingException을 던질 때 실행이 중단된다”고 명시돼 있어요. ComfyUI 공식 문서

조금 더 구체적으로, 단계별로 생각해보면:

  1. UI의 Stop(또는/interrupt) → 실행 중단 플래그 세트
    프론트엔드가 중단을 보내면 서버가 현재 워크플로 실행을 멈추는 라우트를 처리합니다. ComfyUI 공식 문서

  2. 노드단 체크 → 예외 발생
    각 노드(혹은 진행률 훅/대기 지점)에서 throw_exception_if_processing_interrupted()를 호출하면, 플래그가 켜져 있을 때 InterruptProcessingException을 던집니다. 이건 의도된 흐름이에요. 실행기는 이 예외를 특별취급하여 “Processing interrupted”로 처리하고 execution_interrupted 메시지를 브로드캐스트합니다. ComfyUI 공식 문서+1

  3. 여러 노드가 동시에 던져도 문제 없음
    실행기는 노드 실행을 감싸는 상위 try/except에서 InterruptProcessingException을 잡아 “중단”으로 정리합니다. 다른 노드에서도 같은 예외가 연달아 터져도 동일하게 중단 처리로 흘러갑니다(중복 로그 정도만 생길 수 있음). GitHub+1

그래서 실무 팁은 이렇습니다:

  • 각 노드에서 진입 시점과 모든await 직후
    throw_exception_if_processing_interrupted()를 호출하세요.

  • 외부 비동기 요청(예: AsyncOpenAI/aiohttp)이 있다면 이렇게 감싸면 안전합니다:
    from comfy.model_management import (
        InterruptProcessingException,
        throw_exception_if_processing_interrupted,
    )
        
    async def FUNCTION(...):
        throw_exception_if_processing_interrupted()
        task = None
        try:
            # 예: 대기 전에 한 번 더 체크
            throw_exception_if_processing_interrupted()
            task = asyncio.create_task(client.chat.completions.create(...))
            result = await task
            return result
        except InterruptProcessingException:
            # 외부 요청/태스크 정리
            if task and not task.done():
                task.cancel()
                with contextlib.suppress(asyncio.CancelledError):
                    await task
            raise  # 꼭 재전파! (중단으로 인식)
    
  • 절대 InterruptProcessingException을 일반 Exception으로 삼켜서 리턴하지 마세요. 그러면 “중단”이 아닌 “실패”로 기록됩니다(로그와 UI 이벤트가 달라짐). execution_interrupted 이벤트는 이 예외가 그대로 올라갈 때만 발생합니다. ComfyUI 공식 문서

  • 여러 노드가 동시에 실행될 수 있는 구조라면 각 노드가 독립적으로 같은 예외를 던지도록 두는 게 맞습니다. 중복 예외는 상위 실행기에서 정리되며, 전체 프롬프트는 한 번만 “중단”으로 마감됩니다. GitHub

덤: 브라우저 탭을 닫아도 중단이 발생할 수 있다는 이슈들이 있어요(탭을 닫으면 세션이 끊기며 인터럽트가 트리거). API/헤드리스로 돌릴 때는 이 점을 유의하세요. GitHub+1

정리: 여러 노드에서 마음 편히InterruptProcessingException을 던지세요. 이것이 ComfyUI가 의도한 “즉시 중단” 메커니즘이고, 예외가 여러 개여도 상위 실행기에서 안전하게 처리됩니다.

_wait_for_interrupt_level_trigger()에서 raise 안하고 그냥 return 하는건 어때 어차피 밖에서 raise 하잖아

가능해. _wait_for_interrupt_level_trigger()에서 raise 대신 return 해도 원하는 “레이스(요청 vs 인터럽트)” 동작은 똑같이 만들어져. 실무적으로는 헬퍼에서 예외 스택을 남기지 않으니 로그가 더 깔끔해지는 장점도 있어.

아래처럼 바꾸면 돼 — 단계별로 정리할게.


핵심 아이디어 (step-by-step)

  1. 헬퍼는 “인터럽트를 감지하면 끝났다”는 신호만 반환 한다.

  2. 바깥(콜러)asyncio.wait(..., FIRST_COMPLETED)req_task(네트워크 요청)와 int_task(인터럽트 감시) 중 먼저 끝난 쪽 을 확인한다.

  3. int_task가 먼저 끝났다면 요청 태스크를 cancel() 하고 , 여기서 한 번만 InterruptProcessingException을 던진다.


헬퍼: return 버전 (레벨 트리거, 소비 안 함)

import asyncio
import comfy.model_management as mm

# sentinel: 결과를 명확히 구분하고 싶다면 씀
INTERRUPTED = object()

async def _wait_for_interrupt_level_trigger(poll_ms: int = 50):
    """ComfyUI 인터럽트 플래그를 '읽기만' 하다가 감지되면 반환."""
    try:
        sleep_s = poll_ms / 1000.0
        while True:
            await asyncio.sleep(sleep_s)
            if mm.processing_interrupted():   # ← 소비하지 않음(여러 노드 동시 감지 가능)
                return INTERRUPTED            # 또는 True 같은 간단한 값
    except asyncio.CancelledError:
        # 바깥에서 int_task를 취소하면 여기로 옴. 정리 후 전파.
        raise

비-스트리밍 요청에 적용

import asyncio
import contextlib
import comfy.model_management as mm
from openai import AsyncOpenAI

async def run_node(prompt: str):
    client = AsyncOpenAI()
    req_task = asyncio.create_task(
        client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}],
            timeout=120,
            max_retries=0,
        )
    )
    int_task = asyncio.create_task(_wait_for_interrupt_level_trigger())

    try:
        done, _ = await asyncio.wait({req_task, int_task},
                                     return_when=asyncio.FIRST_COMPLETED)

        if int_task in done:
            # 1) 네트워크 요청 즉시 취소
            req_task.cancel()
            with contextlib.suppress(asyncio.CancelledError):
                await req_task

            # 2) (선택) 감시 결과를 확인해도 됨
            _ = int_task.result()  # INTERRUPTED 같은 값

            # 3) 여기서 단 한 번 raise
            raise mm.InterruptProcessingException()

        # 요청이 먼저 끝남 → 감시 태스크 취소/정리
        int_task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await int_task

        resp = await req_task
        return resp.choices[0].message.content

    except asyncio.CancelledError:
        # 상위에서 이 노드 태스크 자체를 취소했을 때
        raise
    finally:
        with contextlib.suppress(Exception):
            await client.close()

스트리밍에도 동일 패턴

import contextlib

async def stream_node(prompt: str):
    client = AsyncOpenAI()
    try:
        async with client.responses.stream(model="gpt-4o-mini", input=prompt) as stream:
            aiter = stream.aiter_text()
            chunks = []
            while True:
                next_chunk = asyncio.create_task(aiter.__anext__())
                int_task   = asyncio.create_task(_wait_for_interrupt_level_trigger())

                done, _ = await asyncio.wait({next_chunk, int_task},
                                             return_when=asyncio.FIRST_COMPLETED)

                if int_task in done:
                    # 스트림/청크 읽기 취소 및 정리
                    next_chunk.cancel()
                    with contextlib.suppress(asyncio.CancelledError):
                        await next_chunk
                    await stream.close()
                    # 여기서 raise 한 번
                    raise mm.InterruptProcessingException()

                try:
                    chunks.append(next_chunk.result())
                except StopAsyncIteration:
                    break
                finally:
                    int_task.cancel()
                    with contextlib.suppress(asyncio.CancelledError):
                        await int_task

            return "".join(chunks)
    finally:
        with contextlib.suppress(Exception):
            await client.close()

return 방식의 장단점 요약

  • 장점

    • 헬퍼 내부에서 예외 스택이 안 생겨 로그가 단정 해짐.

    • “누가 예외를 던지나?”가 콜러 한 지점 으로 모여 정책 일관성 확보.

  • ⚠️ 주의

    • 나중에 실수로 int_task.result()를 다른 곳에서 써버리면 None을 받아 헷갈릴 수 있어.
      → 위처럼 INTERRUPTED 센티널을 반환해 명시적으로 구분하는 걸 추천.

    • req_task가 먼저 끝난 케이스에서 int_task반드시 cancel & await(suppress)로 정리해야 경고가 안 남아.


여러 노드 동시 실행일 때의 의미

  • processing_interrupted()읽기만 하는 이 방식은 모든 노드가 같은 인터럽트를 함께 감지 할 수 있게 해.

  • 각 노드는 자기 요청을 끊고, 콜러 한 군데에서만 InterruptProcessingException을 던지면 됨(혹은 각 노드가 자기 컨텍스트에서 던져도 무방).

  • 핵심은 throw_exception_if_processing_interrupted()를 노드 내부 헬퍼에서 호출하지 않는 것(그 함수는 플래그를 지우므로 ‘첫 번째 소비’ 문제가 생김).


정리: 응, 헬퍼에서raise 대신 return 해도 완전히 OK.
오히려 깔끔할 때가 많고, 위 패턴대로 센티널을 반환해주면 실수도 줄일 수 있어.

동시에 요청하는 요청 수를 제한하고 싶어. (예를 들어 8개로) 최대한 간단히 추가할 수 있는 방법 추천해줘

좋아—가장 간단한 방법은 전역asyncio.BoundedSemaphore 하나로 동시 네트워크 요청 수를 제한하는 거야.
요청을 만드는 부분만 살짝 감싸면 되고, 취소(Interrupt)랑도 잘 맞물려서 자동으로 슬롯이 반환돼.

step-by-step

  1. 전역 세마포어 를 한 곳에 정의 (예: 8개 동시 요청 제한).

  2. 모든 네트워크 요청 함수(OpenAI, aiohttp, 스트리밍 포함)를 async with SEM 블록으로 감싼다.

  3. 기존의 레이스 패턴(요청 태스크 vs 인터럽트 감시)은 그대로 유지한다.

    • 인터럽트가 먼저 오면 req_task.cancel()async with가 빠져나가며 슬롯 자동 반환.

1) 공유 세마포어 (한 파일로 만들어 여러 노드에서 import)

# throttle.py
import os, asyncio

OPENAI_MAX_CONCURRENCY = int(os.getenv("OPENAI_MAX_CONCURRENCY", "8"))
OPENAI_SEMA = asyncio.BoundedSemaphore(OPENAI_MAX_CONCURRENCY)

여러 커스텀 노드에서 같은 세마포어를 쓰려면 이렇게 공유 모듈 로 빼는 게 깔끔해.


2) 비-스트리밍 요청에 붙이기 (기존 레이스 코드에 최소 수정)

# node_async.py
import asyncio, contextlib
import comfy.model_management as mm
from openai import AsyncOpenAI
from throttle import OPENAI_SEMA

INTERRUPTED = object()

async def _wait_for_interrupt_level_trigger(poll_ms: int = 50):
    import asyncio, comfy.model_management as mm
    sleep_s = poll_ms / 1000.0
    while True:
        await asyncio.sleep(sleep_s)
        if mm.processing_interrupted():   # 소비 無
            return INTERRUPTED

async def _openai_chat_with_limit(client: AsyncOpenAI, prompt: str):
    # ▼ 동시성 제한은 여기 한 줄!
    async with OPENAI_SEMA:
        return await client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}],
            timeout=120,
            max_retries=0,
        )

async def run_node(prompt: str):
    client = AsyncOpenAI()
    req_task = asyncio.create_task(_openai_chat_with_limit(client, prompt))
    int_task = asyncio.create_task(_wait_for_interrupt_level_trigger())

    try:
        done, _ = await asyncio.wait({req_task, int_task}, return_when=asyncio.FIRST_COMPLETED)

        if int_task in done:
            # 인터럽트 우선: 요청 태스크 취소 → 세마포어 슬롯 자동 반환
            req_task.cancel()
            with contextlib.suppress(asyncio.CancelledError):
                await req_task
            raise mm.InterruptProcessingException()

        # 요청이 먼저 끝난 경우: 감시 태스크 정리
        int_task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await int_task

        resp = await req_task
        return resp.choices[0].message.content
    finally:
        with contextlib.suppress(Exception):
            await client.close()

3) 스트리밍에도 동일 적용

from throttle import OPENAI_SEMA
import contextlib

async def stream_node(prompt: str):
    client = AsyncOpenAI()
    try:
        # ▼ 스트림 전 구간을 슬롯으로 보호
        async with OPENAI_SEMA:
            async with client.responses.stream(model="gpt-4o-mini", input=prompt) as stream:
                aiter = stream.aiter_text()
                chunks = []
                while True:
                    next_chunk = asyncio.create_task(aiter.__anext__())
                    int_task   = asyncio.create_task(_wait_for_interrupt_level_trigger())

                    done, _ = await asyncio.wait({next_chunk, int_task},
                                                 return_when=asyncio.FIRST_COMPLETED)

                    if int_task in done:
                        next_chunk.cancel()
                        with contextlib.suppress(asyncio.CancelledError):
                            await next_chunk
                        await stream.close()
                        raise mm.InterruptProcessingException()

                    try:
                        chunks.append(next_chunk.result())
                    except StopAsyncIteration:
                        break
                    finally:
                        int_task.cancel()
                        with contextlib.suppress(asyncio.CancelledError):
                            await int_task
            return "".join(chunks)
    finally:
        with contextlib.suppress(Exception):
            await client.close()

4) aiohttp 순수 요청도 동일

import aiohttp
from throttle import OPENAI_SEMA

async def fetch_json_cancellable(url: str, payload: dict):
    async with aiohttp.ClientSession() as session:
        async with OPENAI_SEMA:
            async with session.post(url, json=payload) as resp:
                resp.raise_for_status()
                return await resp.json()

왜 이게 좋은가

  • 간단함 : 요청부에 async with OPENAI_SEMA: 한 줄만 추가.

  • 취소 친화적 : CancelledError가 나도 컨텍스트가 자동으로 슬롯 해제.

  • 대기 중 인터럽트 : 세마포어 획득 대기 중인 태스크도 req_task.cancel()로 즉시 깨짐.

  • 확장 용이 : 환경변수(OPENAI_MAX_CONCURRENCY)로 쉽게 튜닝.

참고: 프로세스가 여러 개인 경우(예: 멀티 프로세스 실행)는 프로세스마다 8개가 적용돼. 그땐 Redis 등 외부 토큰 버킷이 필요하지만, ComfyUI 일반 사용에선 보통 위 방식이면 충분해.

필요하면 모델/엔드포인트별 로 다른 세마포어를 쓰는 패턴도 알려줄게 (dict로 base_url 키잉).

Categories:

Updated: