ComfyUI LSF Job Submit
ComfyUI에서 LSF Job을 Submit하고 이를 끝날 때까지 기다리는 Custom node를 개발하고 싶어. 요구 조건 1. main execute 함수를 async하게 구현해야 함(최신 ComfyUI에서는 async 노드 동작을 지원함) 2. Polling하며 bjobs 커맨드를 이용해 상태를 체크해야 함. 이 때 bjobs 커맨드가 다른 걸로 바뀌더라도 쉽게 변경할 수 있게 구조를 유연하게 짜줘. 3. 이 때 여러 노드가 동시에 돌 수도 있으며, bjobs를 너무 자주 치는 것을 방지하기 위해 파일시스템에 캐싱하는 로직이 있으면 좋을 것 같음. 4. 입력은 커맨드, cwd 2개 스트링임. 커맨드는 멀티라인으로 여러 bsub 커맨드가 들어올 수 도 있음을 명심할 것. bsub 커맨드 자체도 async하게 멀티로 뿌리도록 하면 좋을 것 같음(단 너무 한 번에 많은 job이 들어가지 않도록 배치 사이즈 조절은 필요할 것 같음) 5. 이 자체가 output 노드이면서 끝나면 적절한 아웃풋(뭐가 될지는 추천해줘)도 뱉도록 하자.
좋아! 요구사항을 모두 만족하는 ComfyUI용 LSF Submit & Wait 커스텀 노드 설계를 깔끔하게 정리한 뒤, 바로 붙여 넣어 쓸 수 있는 완전한 코드를 드릴게요.
설계 개요
-
Async 노드 :
SUPPORTS_ASYNC = True,async def run(...)구현. -
멀티라인 bsub 제출 : 입력
commands의 각 비어있지 않은 라인을 독립 명령으로 비동기 제출. 동시 제출 개수는 배치 크기로 제한. -
Polling 구조/유연성 :
StatusBackend인터페이스와LSFBjobsBackend기본 구현.status_cmd_template(예:bjobs -noheader -o "jobid stat" {job_ids})을 바꾸기만 하면 다른 커맨드로 쉽게 전환. -
파일시스템 캐시 : 여러 노드가 동시에 돌아도
status_cache.json(잠금 포함)로 TTL 기반 캐싱. 너무 자주bjobs를 치지 않도록 함. - 입력 :
commands(멀티라인 문자열),cwd(작업 디렉토리).
추가 파라미터는 인라인 지시문 으로 조절:# LSF: batch_size=4 poll_interval_s=10 status_ttl_s=15batch_size(동시 제출 개수),poll_interval_s(폴링 주기),status_ttl_s(캐시 TTL),
status_cmd_template(상태 조회 커맨드 템플릿) 등 지원.
-
출력(OUTPUT 노드) :
-
job_ids: 제출된 Job ID들의 콤마 구분 문자열 -
results_json: 요약 JSON 문자열(최종 상태/성공여부/결과 파일 경로 등)
-
- 터미널 상태 : 기본
{"DONE","EXIT"}. 필요시 코드 내 상수로 확장 가능.
설치/사용법
-
아래 코드를
ComfyUI/custom_nodes/lsf_submit_and_wait.py로 저장 후 ComfyUI 재시작. - 워크플로에서 이 노드를 마지막에 두고,
commands에 여러 줄bsub ...들을 넣으면 됨.
옵션은 맨 위에 한 줄의 인라인 지시문으로 조절:# LSF: batch_size=3 poll_interval_s=15 status_ttl_s=30 bsub -q short -J test1 sleep 5 bsub -q short -J test2 sleep 8 bsub -q short -J test3 sleep 2 - 결과는
job_ids와results_json(요약/결과 파일 경로 포함)으로 반환됩니다.
코드 (그대로 사용 가능)
# File: custom_nodes/lsf_submit_and_wait.py
import asyncio
import json
import os
import re
import shlex
import tempfile
import time
from datetime import datetime
# fcntl (POSIX) 잠금: Windows에선 graceful degrade
try:
import fcntl # type: ignore
except Exception:
fcntl = None
# ---------- 유틸 ----------
JOB_ID_REGEX = re.compile(r"Job\s*<(\d+)>", re.IGNORECASE)
TERMINAL_STATES = {"DONE", "EXIT"}
def iso_now():
return datetime.utcnow().isoformat(timespec="seconds") + "Z"
def default_cache_dir():
return os.environ.get(
"COMFYUI_LSF_CACHE_DIR",
os.path.join(tempfile.gettempdir(), "comfyui_lsf_cache")
)
def ensure_dir(d):
os.makedirs(d, exist_ok=True)
return d
def parse_inline_options_and_commands(commands_text: str):
"""
첫 줄(들)에 '# LSF: key=value key=value ...' 형식의 인라인 옵션을 허용.
반환: (options_dict, command_lines)
"""
options = {}
lines = [ln for ln in commands_text.splitlines()]
clean_lines = []
for ln in lines:
m = re.match(r"^\s*#\s*LSF:\s*(.+)$", ln)
if m:
# key=value ... 파싱(따옴표 허용)
kvs = shlex.split(m.group(1), posix=True)
for kv in kvs:
if "=" in kv:
k, v = kv.split("=", 1)
# 숫자형 자동 캐스팅
if v.isdigit():
v = int(v)
else:
# true/false
if isinstance(v, str) and v.lower() in ("true","false"):
v = (v.lower() == "true")
options[k.strip()] = v
continue
clean_lines.append(ln)
# 비어있는 라인 제거
clean_lines = [ln for ln in clean_lines if ln.strip()]
return options, clean_lines
async def run_shell(cmd: str, cwd: str | None = None):
proc = await asyncio.create_subprocess_shell(
cmd,
cwd=cwd if cwd else None,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
out_b, err_b = await proc.communicate()
return proc.returncode, out_b.decode(errors="ignore"), err_b.decode(errors="ignore")
def extract_job_ids_from_bsub_output(stdout: str):
# LSF 표준: "Job <12345> is submitted to queue <...>."
return JOB_ID_REGEX.findall(stdout)
# ---------- 상태 백엔드/캐시 ----------
class StatusBackend:
async def query(self, job_ids: list[str]) -> dict[str, str]:
raise NotImplementedError
class LSFBjobsBackend(StatusBackend):
"""
status_cmd_template 예:
'bjobs -noheader -o "jobid stat" {job_ids}'
"""
def __init__(self, status_cmd_template: str | None = None):
if not status_cmd_template:
status_cmd_template = 'bjobs -noheader -o "jobid stat" {job_ids}'
self.template = status_cmd_template
async def query(self, job_ids: list[str]) -> dict[str, str]:
if not job_ids:
return {}
cmd = self.template.format(job_ids=" ".join(job_ids))
rc, out, err = await run_shell(cmd, cwd=None)
if rc != 0:
# 실패 시 빈 딕트 반환 (상위 로직에서 재시도/캐시 사용)
return {}
result = {}
# 기대 포맷: "<jobid> <state>"
for ln in out.splitlines():
ln = ln.strip()
if not ln:
continue
parts = ln.split()
if not parts:
continue
jid = parts[0]
state = parts[-1].upper()
result[jid] = state
return result
class SharedJSONCache:
"""
여러 프로세스/노드 간 공유되는 JSON 캐시 (TTL 적용).
파일 잠금 사용(가능 시).
구조: { job_id: {"status": "RUN", "ts": epoch_seconds}, ... }
"""
def __init__(self, cache_file: str, ttl_s: int = 10):
self.cache_file = cache_file
self.ttl_s = ttl_s
ensure_dir(os.path.dirname(cache_file))
# 파일 초기화
if not os.path.exists(cache_file):
with open(cache_file, "w") as f:
json.dump({}, f)
def _lock(self, fh):
if fcntl:
fcntl.flock(fh, fcntl.LOCK_EX)
def _unlock(self, fh):
if fcntl:
fcntl.flock(fh, fcntl.LOCK_UN)
def _read(self):
try:
with open(self.cache_file, "r") as f:
return json.load(f)
except Exception:
return {}
def _write(self, data):
tmp = self.cache_file + ".tmp"
with open(tmp, "w") as f:
json.dump(data, f)
os.replace(tmp, self.cache_file)
async def get_statuses(self, job_ids: list[str], backend: StatusBackend) -> dict[str, str]:
now = time.time()
# 1) 읽기 잠금
try:
fh = open(self.cache_file, "r+")
except FileNotFoundError:
ensure_dir(os.path.dirname(self.cache_file))
with open(self.cache_file, "w") as f:
json.dump({}, f)
fh = open(self.cache_file, "r+")
try:
self._lock(fh)
try:
fh.seek(0)
data = json.load(fh)
except Exception:
data = {}
# 캐시 히트/미스 결정
result = {}
to_query = []
for jid in job_ids:
entry = data.get(jid)
if entry and (now - float(entry.get("ts", 0)) <= self.ttl_s):
result[jid] = entry.get("status", "")
else:
to_query.append(jid)
finally:
self._unlock(fh)
fh.close()
# 2) 미스 대상 조회 (잠금 없이)
fresh = {}
if to_query:
fresh = await backend.query(to_query)
# 3) 쓰기 잠금하여 병합 반영
try:
fh = open(self.cache_file, "r+")
self._lock(fh)
try:
fh.seek(0)
data2 = json.load(fh)
except Exception:
data2 = {}
updated = False
ts = time.time()
for jid, st in fresh.items():
prev = data2.get(jid)
if not prev or prev.get("status") != st or (ts - float(prev.get("ts", 0)) > 0):
data2[jid] = {"status": st, "ts": ts}
updated = True
if updated:
fh.seek(0)
fh.truncate()
json.dump(data2, fh)
finally:
self._unlock(fh)
fh.close()
# 결괏값 구성 (fresh 우선)
result.update(fresh)
return result
# ---------- 제출 & 대기 ----------
async def submit_commands_multiline(commands: list[str], cwd: str, batch_size: int):
"""
각 라인을 독립 커맨드로 제출. bsub 출력에서 Job ID 추출.
동시 제출 개수를 batch_size로 제한.
"""
sem = asyncio.Semaphore(max(1, int(batch_size)))
submissions = []
async def _submit_one(cmd: str):
async with sem:
rc, out, err = await run_shell(cmd, cwd=cwd)
jids = extract_job_ids_from_bsub_output(out)
return {
"cmd": cmd,
"returncode": rc,
"stdout": out,
"stderr": err,
"job_ids": jids,
}
tasks = [asyncio.create_task(_submit_one(c)) for c in commands]
for t in asyncio.as_completed(tasks):
submissions.append(await t)
# 추출된 Job ID 평탄화
job_ids = [jid for sub in submissions for jid in sub.get("job_ids", [])]
return job_ids, submissions
async def wait_until_done(job_ids: list[str],
poll_interval_s: int,
status_cache: SharedJSONCache,
backend: StatusBackend,
max_wait_s: int | None = None):
"""
모든 job이 TERMINAL_STATES(DONE/EXIT)에 도달할 때까지 폴링.
max_wait_s 지정시 해당 시간 초과하면 중단.
"""
start = time.time()
final = {}
while True:
statuses = await status_cache.get_statuses(job_ids, backend)
final = {jid: statuses.get(jid, "") for jid in job_ids}
# 모두 터미널 상태?
if all((st in TERMINAL_STATES) and st for st in final.values()):
break
# 타임아웃?
if max_wait_s is not None and (time.time() - start) >= max_wait_s:
break
await asyncio.sleep(max(1, int(poll_interval_s)))
return final, time.time() - start
# ---------- ComfyUI 노드 ----------
class LSFSubmitAndWait:
"""
입력:
- commands (STRING, multiline): 여러 줄 bsub 커맨드
* 인라인 옵션: '# LSF: batch_size=4 poll_interval_s=10 status_ttl_s=15
status_cmd_template="bjobs -noheader -o \"jobid stat\" {job_ids}"'
- cwd (STRING): 실행 디렉토리
출력(OUTPUT 노드):
- job_ids (STRING): 콤마 구분 목록
- results_json (STRING): 요약 JSON 문자열
"""
CATEGORY = "LSF"
FUNCTION = "run"
OUTPUT_NODE = True
SUPPORTS_ASYNC = True
RETURN_TYPES = ("STRING", "STRING")
RETURN_NAMES = ("job_ids", "results_json")
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"commands": ("STRING", {"multiline": True, "default": "bsub -q short sleep 3"}),
"cwd": ("STRING", {"default": "."}),
}
}
async def run(self, commands: str, cwd: str):
# 0) 인라인 옵션/명령 파싱
opts, lines = parse_inline_options_and_commands(commands)
batch_size = int(opts.get("batch_size", 4))
poll_interval_s = int(opts.get("poll_interval_s", 10))
status_ttl_s = int(opts.get("status_ttl_s", 10))
status_cmd_template = opts.get("status_cmd_template", 'bjobs -noheader -o "jobid stat" {job_ids}')
max_wait_s = opts.get("max_wait_s", None)
if isinstance(max_wait_s, str) and max_wait_s.isdigit():
max_wait_s = int(max_wait_s)
# 1) 제출
submitted_at = iso_now()
job_ids, submissions = await submit_commands_multiline(lines, cwd=cwd, batch_size=batch_size)
# 2) 상태 백엔드/캐시
cache_dir = ensure_dir(default_cache_dir())
cache_file = os.path.join(cache_dir, "status_cache.json")
status_cache = SharedJSONCache(cache_file, ttl_s=status_ttl_s)
backend = LSFBjobsBackend(status_cmd_template=status_cmd_template)
# 3) 완료까지 대기
final_statuses, waited_sec = await wait_until_done(
job_ids, poll_interval_s, status_cache, backend, max_wait_s=max_wait_s
)
completed_at = iso_now()
all_done = all((st in TERMINAL_STATES) and st for st in final_statuses.values())
# 실패 제출 수집
failed_submissions = [
{
"cmd": sub["cmd"],
"returncode": sub["returncode"],
"stderr": sub["stderr"],
"stdout": sub["stdout"],
}
for sub in submissions
if sub.get("returncode", 1) != 0 or not sub.get("job_ids")
]
# 4) 결과 요약/로그 파일 저장
session_tag = datetime.utcnow().strftime("%Y%m%d_%H%M%S") + f"_{os.getpid()}"
results_dir = ensure_dir(os.path.join(cache_dir, "runs"))
results_path = os.path.join(results_dir, f"result_{session_tag}.json")
result_payload = {
"submitted_at": submitted_at,
"completed_at": completed_at,
"duration_sec": round(waited_sec, 2),
"job_ids": job_ids,
"final_status": final_statuses,
"all_done": all_done,
"failed_submissions": failed_submissions,
"cwd": cwd,
"options": {
"batch_size": batch_size,
"poll_interval_s": poll_interval_s,
"status_ttl_s": status_ttl_s,
"status_cmd_template": status_cmd_template,
"max_wait_s": max_wait_s,
},
"results_file": results_path,
}
try:
with open(results_path, "w") as f:
json.dump(result_payload, f, indent=2)
except Exception:
# 파일에 못 쓴다고 해도 반환은 계속
pass
job_ids_str = ",".join(job_ids)
results_json_str = json.dumps(result_payload, ensure_ascii=False)
return (job_ids_str, results_json_str)
# ComfyUI 노드 등록
NODE_CLASS_MAPPINGS = {
"LSFSubmitAndWait": LSFSubmitAndWait,
}
NODE_DISPLAY_NAME_MAPPINGS = {
"LSFSubmitAndWait": "LSF: Submit & Wait",
}
사용 팁 & 커스터마이즈
-
배치 크기 조절 : 제출이 너무 몰리면 큐가 잠길 수 있으니
# LSF: batch_size=4처럼 적절히 제한하세요. -
폴링/캐시 :
poll_interval_s는 폴링 주기,status_ttl_s는 캐시 TTL(초). 여러 노드가 돌더라도/tmp/comfyui_lsf_cache(기본) 공유 캐시로bjobs호출 빈도를 크게 줄입니다. 경로는COMFYUI_LSF_CACHE_DIR환경변수로 바꿀 수 있어요. -
상태 커맨드 교체 : LSF 외 다른 시스템으로 바꾸고 싶다면 인라인에
# LSF: status_cmd_template="bjobs -noheader -o \"jobid stat\" {job_ids}"
처럼 명령 템플릿을 전달하세요. {job_ids} 플레이스홀더는 반드시 포함되어야 합니다.
(예시) 커스텀 스크립트로 바꿀 때:
# LSF: status_cmd_template="/opt/cluster/bin/my_bjobs --format jobid,status {job_ids}"
-
타임아웃 : 전체 대기 시간 제한이 필요하면
# LSF: max_wait_s=7200처럼 넣으세요. -
터미널 상태 확장 : 필요하면 코드 상단의
TERMINAL_STATES를 편집(예:{"DONE","EXIT","FINISHED"}).
출력 포맷 제안
-
job_ids:"123,124,125" -
results_json:{ "submitted_at": "2025-10-10T00:00:00Z", "completed_at": "2025-10-10T00:03:12Z", "duration_sec": 192.4, "job_ids": ["123","124","125"], "final_status": {"123":"DONE","124":"DONE","125":"EXIT"}, "all_done": true, "failed_submissions": [], "cwd": "/work/exp1", "options": {...}, "results_file": "/tmp/comfyui_lsf_cache/runs/result_20251010_000000_12345.json" }
필요하면 status_cmd_template를 SLURM 등으로 바꾸는 예시도 추가로 드릴게요. 원하는 추가 기능(예: 실패 시 재시도, 백오프, 개별 job 로그 추적) 있으면 말만 해줘요!
개선 요청: 노드 자체도 async로 동작할 수 있음, 즉, 이와 동일한 다른 노드들이 있을 수 있고 그 노드들과도 cache를 공유해야 더욱 효율적일 것 같음.(bjobs할 때 job id를 제한하면 안 될 것 같고, bjobs -a 로 봐야할 것 같음) 이 때 추가 고려 사항: Node1이 만든 cache를 Node2가 참고할 수 있는데, 이 때 Node2에서 방금 bsub한 job id는 cache에 없을 수도 있음(TTL이 안돼서). 이 코너 케이스도 고려할 것
좋은 포인트! 요구하신 “다중 노드 동시 동작 + 전역 캐시 공유 + bjobs -a 기반 전체 스냅샷”으로 구조를 업그레이드했습니다. 특히 Node2가 방금 제출한 잡이 캐시에 아직 없을 수 있는 코너 케이스 를 해결하기 위해, 제출 직후 캐시에SUBMITTED 상태로 시드(seeding) 하고, 상태 캐시는 ‘전체 목록’(bjobs -a) 스냅샷 을 TTL/락 기반으로 갱신하도록 바꿨습니다.
아래는 변경 요약 → 바로 쓸 수 있는 전체 코드 순서로 드립니다.
변경/설계 포인트 (step-by-step)
-
폴링 백엔드 전면 개편
-
기존:
bjobs {job_ids}특정 잡만 조회 -
변경: 기본값을
bjobs -a -noheader -o "jobid stat"로 전체 조회→ 한 번의 호출로 모든 잡 상태를 받아 전역 캐시에 저장. -
템플릿은
full_status_cmd_template로 교체(여전히 옵션으로 바꿀 수 있음).
-
-
전역 공유 캐시(파일) 구조 개선
- 단일 파일에 전체 스냅샷 보관:
{ "jobs": { "123": {"status":"PEND","ts":169...}, ... }, "last_full_refresh_ts": 169..., "meta": {...} } -
TTL(
full_ttl_s) + 최소 리프레시 간격(min_refresh_gap_s) + 락 파일(가능 시 fcntl) 로 -a 호출 폭주 방지. - 다중 노드가 동시에 접근해도 읽기/쓰기 잠금 적용.
- 단일 파일에 전체 스냅샷 보관:
-
코너 케이스 처리(방금 제출한 잡 미반영)
-
제출 직후 해당 잡들을 캐시에 ‘SUBMITTED’ 상태로 시드(seeding) → 다른 노드도 즉시 이 JobID를 “알고” 있어서 불필요한 강제 리프레시가 줄어듦.
-
get_statuses()에서 요청한 JobID가 캐시에 없거나 오래되면 ,-
TTL 만료이거나
force_refresh_on_miss=True이고min_refresh_gap_s이상 경과 시 한 노드만-a를 실행해 새 스냅샷 작성(락으로 단일화). -
그 외에는 다음 폴링 주기에 자연스럽게 반영.
-
-
-
완전한 async 지원 + 다중 제출 제어
-
노드 자체는
SUPPORTS_ASYNC = True, 비동기 제출/대기. -
멀티라인
bsub를 배치 크기 제한(batch_size)으로 병렬 제출.
-
-
출력/로그
-
job_ids(콤마 문자열),results_json(요약 JSON). -
실행 결과는
/tmp/comfyui_lsf_cache/runs/result_*.json에도 저장.
-
코드 (교체/추가 없이 그대로 사용)
ComfyUI/custom_nodes/lsf_submit_and_wait.py
import asyncio
import json
import os
import re
import shlex
import tempfile
import time
from datetime import datetime
# fcntl (POSIX) 잠금: Windows에선 graceful degrade
try:
import fcntl # type: ignore
except Exception:
fcntl = None
# ---------- 상수/유틸 ----------
JOB_ID_REGEX = re.compile(r"Job\s*<(\d+)>", re.IGNORECASE)
TERMINAL_STATES = {"DONE", "EXIT"}
def iso_now():
return datetime.utcnow().isoformat(timespec="seconds") + "Z"
def default_cache_dir():
return os.environ.get(
"COMFYUI_LSF_CACHE_DIR",
os.path.join(tempfile.gettempdir(), "comfyui_lsf_cache")
)
def ensure_dir(d):
os.makedirs(d, exist_ok=True)
return d
def parse_inline_options_and_commands(commands_text: str):
"""
첫 줄(들)에 '# LSF: key=value key=value ...' 형식의 인라인 옵션을 허용.
반환: (options_dict, command_lines)
"""
options = {}
lines = [ln for ln in commands_text.splitlines()]
clean_lines = []
for ln in lines:
m = re.match(r"^\s*#\s*LSF:\s*(.+)$", ln)
if m:
kvs = shlex.split(m.group(1), posix=True)
for kv in kvs:
if "=" in kv:
k, v = kv.split("=", 1)
if isinstance(v, str) and v.isdigit():
v = int(v)
else:
if isinstance(v, str) and v.lower() in ("true","false"):
v = (v.lower() == "true")
options[k.strip()] = v
continue
clean_lines.append(ln)
clean_lines = [ln for ln in clean_lines if ln.strip()]
return options, clean_lines
async def run_shell(cmd: str, cwd: str | None = None):
proc = await asyncio.create_subprocess_shell(
cmd,
cwd=cwd if cwd else None,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
out_b, err_b = await proc.communicate()
return proc.returncode, out_b.decode(errors="ignore"), err_b.decode(errors="ignore")
def extract_job_ids_from_bsub_output(stdout: str):
# LSF 표준: "Job <12345> is submitted to queue <...>."
return JOB_ID_REGEX.findall(stdout)
# ---------- 상태 백엔드 (전체 스냅샷) ----------
class StatusBackend:
async def query_full(self) -> dict[str, str]:
raise NotImplementedError
class LSFBjobsFullBackend(StatusBackend):
"""
전체 스냅샷 기반 백엔드.
full_status_cmd_template 기본값:
'bjobs -a -noheader -o "jobid stat"'
"""
def __init__(self, full_status_cmd_template: str | None = None):
if not full_status_cmd_template:
full_status_cmd_template = 'bjobs -a -noheader -o "jobid stat"'
self.template = full_status_cmd_template
async def query_full(self) -> dict[str, str]:
rc, out, err = await run_shell(self.template, cwd=None)
if rc != 0:
return {}
result = {}
# 기대 포맷: "<jobid> <state>"
for ln in out.splitlines():
ln = ln.strip()
if not ln:
continue
parts = ln.split()
if not parts:
continue
jid = parts[0]
state = parts[-1].upper()
result[jid] = state
return result
# ---------- 전역 공유 캐시 (전체 목록 + TTL + 락) ----------
class FullListSharedCache:
"""
파일 하나에 전체 job 상태 스냅샷을 저장/공유.
구조:
{
"jobs": { job_id: {"status": "...", "ts": epoch}, ... },
"last_full_refresh_ts": epoch,
"meta": { "source": "...", ... }
}
정책:
- full_ttl_s: 전체 스냅샷 TTL
- min_refresh_gap_s: -a 재실행 최소 간격
- force_refresh_on_miss: 필요한 job_id가 없으면 TTL 무시하고(단 최소 간격은 존중) 조기 갱신 시도
- SUBMITTED 시드: 제출 직후 캐시에 넣어 코너 케이스 완화
"""
def __init__(self, cache_file: str,
full_ttl_s: int = 10,
min_refresh_gap_s: int = 5,
force_refresh_on_miss: bool = True):
self.cache_file = cache_file
self.full_ttl_s = full_ttl_s
self.min_refresh_gap_s = min_refresh_gap_s
self.force_refresh_on_miss = force_refresh_on_miss
ensure_dir(os.path.dirname(cache_file))
if not os.path.exists(cache_file):
with open(cache_file, "w") as f:
json.dump({"jobs": {}, "last_full_refresh_ts": 0, "meta": {}}, f)
# ---- 락 유틸 ----
def _lock_fd(self, path: str, nonblock: bool = False):
fh = open(path, "a+")
if fcntl:
try:
fcntl.flock(fh, fcntl.LOCK_EX | (fcntl.LOCK_NB if nonblock else 0))
except BlockingIOError:
fh.close()
return None
return fh
def _unlock_fd(self, fh):
if fcntl:
try:
fcntl.flock(fh, fcntl.LOCK_UN)
except Exception:
pass
try:
fh.close()
except Exception:
pass
def _read_cache(self) -> dict:
try:
with open(self.cache_file, "r") as f:
return json.load(f)
except Exception:
return {"jobs": {}, "last_full_refresh_ts": 0, "meta": {}}
def _write_cache(self, data: dict):
tmp = self.cache_file + ".tmp"
with open(tmp, "w") as f:
json.dump(data, f)
os.replace(tmp, self.cache_file)
# ---- 공개 API ----
def seed_submitted(self, job_ids: list[str]):
"""제출 직후 SUBMITTED 상태로 캐시에 시드(코너 케이스 완화)."""
now = time.time()
# 쓰기 락
fh = self._lock_fd(self.cache_file)
try:
data = self._read_cache()
jobs = data.get("jobs", {})
for jid in job_ids:
# 이미 있으면 건드리지 않음(경쟁 상태에서 더 최신 값 보호)
if jid not in jobs:
jobs[jid] = {"status": "SUBMITTED", "ts": now}
data["jobs"] = jobs
self._write_cache(data)
finally:
self._unlock_fd(fh)
async def maybe_refresh_full(self, backend: StatusBackend, reason: str = "") -> bool:
"""
- TTL 또는 miss 사유로 전체 스냅샷을 갱신할지 판단/실행
- 동시성: refresh 락 파일을 non-blocking으로 획득한 단 하나의 프로세스만 실제로 -a 실행
"""
now = time.time()
data = self._read_cache()
last_ts = float(data.get("last_full_refresh_ts", 0))
# 최소 간격 체크
if (now - last_ts) < float(self.min_refresh_gap_s):
return False
# refresh 락(별도 파일) 획득 시도(실패하면 다른 노드가 수행 중)
lock_path = self.cache_file + ".refresh.lock"
lock_fh = self._lock_fd(lock_path, nonblock=True)
if lock_fh is None:
return False
try:
# 한 번 더 최신화 검사(경쟁 회피)
data2 = self._read_cache()
last_ts2 = float(data2.get("last_full_refresh_ts", 0))
if (now - last_ts2) < float(self.min_refresh_gap_s):
return False
# 실제 -a 실행
snapshot = await backend.query_full()
if not snapshot:
return False # 실패 시 그대로 유지
now2 = time.time()
# 쓰기 락(캐시 파일)
fh = self._lock_fd(self.cache_file)
try:
base = self._read_cache()
jobs = base.get("jobs", {})
# 새 스냅샷 반영
ts = now2
for jid, st in snapshot.items():
jobs[jid] = {"status": st, "ts": ts}
base["jobs"] = jobs
base["last_full_refresh_ts"] = now2
meta = base.get("meta", {})
meta["reason"] = reason
meta["refreshed_at"] = iso_now()
base["meta"] = meta
self._write_cache(base)
finally:
self._unlock_fd(fh)
return True
finally:
self._unlock_fd(lock_fh)
async def get_statuses(self, job_ids: list[str], backend: StatusBackend) -> dict[str, str]:
"""
- 캐시에서 먼저 조회
- 필요 시 maybe_refresh_full()을 통해 -a 스냅샷 갱신
- 최종적으로 요청 job_ids에 대한 상태 dict 반환(없으면 "")
"""
now = time.time()
data = self._read_cache()
jobs = data.get("jobs", {})
last_ts = float(data.get("last_full_refresh_ts", 0))
def _fresh(jid: str) -> bool:
ent = jobs.get(jid)
if not ent:
return False
# 엔트리가 SUBMITTED 라도 '신선한' 것으로 취급(바로 직전 제출 케이스 완화)
age = now - float(ent.get("ts", 0))
return age <= float(self.full_ttl_s)
need_refresh = False
# 1) TTL 만료 또는 miss 여부 판단
if (now - last_ts) > float(self.full_ttl_s):
need_refresh = True
# miss 검사: 요청한 job이 캐시에 없거나 stale
miss = [jid for jid in job_ids if not _fresh(jid)]
if miss and self.force_refresh_on_miss:
need_refresh = True
# 2) 필요 시 전체 리프레시 시도(한 프로세스만 실행)
if need_refresh:
reason = "ttl_expired" if (now - last_ts) > float(self.full_ttl_s) else "miss"
await self.maybe_refresh_full(backend, reason=reason)
# 갱신 후 최신 데이터 재로딩
data = self._read_cache()
jobs = data.get("jobs", {})
# 3) 결과 구성
out = {}
for jid in job_ids:
ent = jobs.get(jid)
out[jid] = (ent.get("status", "") if ent else "")
return out
# ---------- 제출 & 대기 ----------
async def submit_commands_multiline(commands: list[str], cwd: str, batch_size: int):
"""
각 라인을 독립 커맨드로 제출. bsub 출력에서 Job ID 추출.
동시 제출 개수를 batch_size로 제한.
"""
sem = asyncio.Semaphore(max(1, int(batch_size)))
submissions = []
async def _submit_one(cmd: str):
async with sem:
rc, out, err = await run_shell(cmd, cwd=cwd)
jids = extract_job_ids_from_bsub_output(out)
return {
"cmd": cmd,
"returncode": rc,
"stdout": out,
"stderr": err,
"job_ids": jids,
}
tasks = [asyncio.create_task(_submit_one(c)) for c in commands]
for t in asyncio.as_completed(tasks):
submissions.append(await t)
job_ids = [jid for sub in submissions for jid in sub.get("job_ids", [])]
return job_ids, submissions
async def wait_until_done(job_ids: list[str],
poll_interval_s: int,
full_cache: FullListSharedCache,
backend: StatusBackend,
max_wait_s: int | None = None):
start = time.time()
final = {}
while True:
statuses = await full_cache.get_statuses(job_ids, backend)
final = {jid: statuses.get(jid, "") for jid in job_ids}
if all((st in TERMINAL_STATES) and st for st in final.values()):
break
if max_wait_s is not None and (time.time() - start) >= max_wait_s:
break
await asyncio.sleep(max(1, int(poll_interval_s)))
return final, time.time() - start
# ---------- ComfyUI 노드 ----------
class LSFSubmitAndWait:
"""
입력:
- commands (STRING, multiline): 여러 줄 bsub 커맨드
* 인라인 옵션: '# LSF: batch_size=4 poll_interval_s=10 full_ttl_s=15
full_status_cmd_template="bjobs -a -noheader -o \"jobid stat\""
min_refresh_gap_s=5 force_refresh_on_miss=true max_wait_s=7200'
- cwd (STRING): 실행 디렉토리
출력(OUTPUT 노드):
- job_ids (STRING): 콤마 구분 목록
- results_json (STRING): 요약 JSON 문자열
"""
CATEGORY = "LSF"
FUNCTION = "run"
OUTPUT_NODE = True
SUPPORTS_ASYNC = True
RETURN_TYPES = ("STRING", "STRING")
RETURN_NAMES = ("job_ids", "results_json")
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"commands": ("STRING", {"multiline": True, "default": "bsub -q short sleep 3"}),
"cwd": ("STRING", {"default": "."}),
}
}
async def run(self, commands: str, cwd: str):
# 0) 인라인 옵션/명령 파싱
opts, lines = parse_inline_options_and_commands(commands)
batch_size = int(opts.get("batch_size", 4))
poll_interval_s = int(opts.get("poll_interval_s", 10))
# backward compat: status_ttl_s -> full_ttl_s
full_ttl_s = int(opts.get("full_ttl_s", opts.get("status_ttl_s", 10)))
min_refresh_gap_s = int(opts.get("min_refresh_gap_s", 5))
force_refresh_on_miss = bool(opts.get("force_refresh_on_miss", True))
full_status_cmd_template = opts.get("full_status_cmd_template",
'bjobs -a -noheader -o "jobid stat"')
max_wait_s = opts.get("max_wait_s", None)
if isinstance(max_wait_s, str) and max_wait_s.isdigit():
max_wait_s = int(max_wait_s)
# 1) 제출
submitted_at = iso_now()
job_ids, submissions = await submit_commands_multiline(lines, cwd=cwd, batch_size=batch_size)
# 2) 전역 캐시/백엔드
cache_dir = ensure_dir(default_cache_dir())
cache_file = os.path.join(cache_dir, "status_full_cache.json")
full_cache = FullListSharedCache(
cache_file,
full_ttl_s=full_ttl_s,
min_refresh_gap_s=min_refresh_gap_s,
force_refresh_on_miss=force_refresh_on_miss,
)
backend = LSFBjobsFullBackend(full_status_cmd_template=full_status_cmd_template)
# 2-1) 코너 케이스 완화: 방금 제출한 잡을 캐시에 시드
if job_ids:
full_cache.seed_submitted(job_ids)
# 3) 완료까지 대기
final_statuses, waited_sec = await wait_until_done(
job_ids, poll_interval_s, full_cache, backend, max_wait_s=max_wait_s
)
completed_at = iso_now()
all_done = all((st in TERMINAL_STATES) and st for st in final_statuses.values())
failed_submissions = [
{
"cmd": sub["cmd"],
"returncode": sub["returncode"],
"stderr": sub["stderr"],
"stdout": sub["stdout"],
}
for sub in submissions
if sub.get("returncode", 1) != 0 or not sub.get("job_ids")
]
# 4) 결과 파일 저장
session_tag = datetime.utcnow().strftime("%Y%m%d_%H%M%S") + f"_{os.getpid()}"
results_dir = ensure_dir(os.path.join(cache_dir, "runs"))
results_path = os.path.join(results_dir, f"result_{session_tag}.json")
result_payload = {
"submitted_at": submitted_at,
"completed_at": completed_at,
"duration_sec": round(waited_sec, 2),
"job_ids": job_ids,
"final_status": final_statuses,
"all_done": all_done,
"failed_submissions": failed_submissions,
"cwd": cwd,
"options": {
"batch_size": batch_size,
"poll_interval_s": poll_interval_s,
"full_ttl_s": full_ttl_s,
"min_refresh_gap_s": min_refresh_gap_s,
"force_refresh_on_miss": force_refresh_on_miss,
"full_status_cmd_template": full_status_cmd_template,
"max_wait_s": max_wait_s,
},
"results_file": results_path,
}
try:
with open(results_path, "w") as f:
json.dump(result_payload, f, indent=2)
except Exception:
pass
job_ids_str = ",".join(job_ids)
results_json_str = json.dumps(result_payload, ensure_ascii=False)
return (job_ids_str, results_json_str)
# ComfyUI 노드 등록
NODE_CLASS_MAPPINGS = {
"LSFSubmitAndWait": LSFSubmitAndWait,
}
NODE_DISPLAY_NAME_MAPPINGS = {
"LSFSubmitAndWait": "LSF: Submit & Wait",
}
사용 방법/옵션 예시
vbnet
# LSF: batch_size=4 poll_interval_s=10 full_ttl_s=15 min_refresh_gap_s=5 force_refresh_on_miss=true
# LSF: full_status_cmd_template="bjobs -a -noheader -o \"jobid stat\""
bsub -q short -J test1 sleep 5
bsub -q short -J test2 sleep 8
bsub -q short -J test3 sleep 2
-
full_ttl_s : 전체 스냅샷의 TTL.
-
min_refresh_gap_s :
-a재호출 최소 간격(모든 노드 공통). -
force_refresh_on_miss : 요청 Job이 캐시에 없을 때 TTL 관계없이 조기 갱신 시도(단 최소 간격은 지킴).
-
full_status_cmd_template : 다른 시스템/형식으로 교체 가능(예: 커스텀 스크립트).
코너 케이스가 해결되는 흐름(요약)
-
Node1·Node2가 각각 잡 제출 → 각자 제출 직후 캐시에 SUBMITTED로 시드.
-
폴링 시, 요청한 JobID가 캐시에 이미 존재 →
SUBMITTED라도 신선 으로 간주되어 불필요한 조기-a호출 방지. -
TTL 만료 또는 miss+최소 간격 충족 시, 한 노드만
-a실행(락) → 전체 스냅샷 업데이트. -
모든 노드는 같은 캐시 파일 을 보며 효율적으로 상태 동기화.
원하시면:
-
지수 백오프/폴링 주기 자동 튜닝 ,
-
추가 상태 매핑(예: ZOMBI/PSUSP 등) ,
-
SLURM 등 다른 스케줄러 템플릿 도 바로 붙여드릴게요.
질문 1. 파일 read access를 계속하는거야? 그렇다면 대신에 singleton으로 된 cache 클래스에 저장된 정보를 사용하는 건 어때? 질문 2. job id | 커맨드 | job 상태 | 를 노드에 계속 업데이트하며 보여줄 순 없나? 좀 고급 기능 같기는한데 되면 참 좋을듯
좋은 질문! 핵심만 “실무적으로” 정리하고, 바로 쓸 수 있는 코드 조각(추가/패치용)도 드릴게요.
Q1. “파일 read access를 계속하나요? → 대신 프로세스 내 싱글톤 캐시 쓰면 어때요?”
결론 (step-by-step)
-
맞아요. 전역 공유를 위해 파일 캐시를 쓰면 여러 노드에서 읽기/쓰기 경쟁이 생깁니다.
-
해법은 2-계층 캐시 :
-
프로세스 내 싱글톤 메모리 스냅샷(초저지연·락 포함)
-
프로세스 간 공유 파일 캐시(TTL+락)
-
-
조회 흐름:
-
메모리 스냅샷이 신선(TTL 이내) → 파일 읽기 없이 즉시 반환.
-
신선하지 않으면 파일을 1회 읽어 메모리 갱신.
-
TTL 만료·MISS·최소 간격 충족 시 한 프로세스만
bjobs -a수행해 파일/메모리 동시 갱신.
-
-
제출 직후 SUBMITTED 시드 는 메모리와 파일 모두 에 반영 → Node2 코너 케이스(방금 제출한 잡이 캐시에 아직 없음) 해소.
적용 코드 (기존 FullListSharedCache 위에 얹는 싱글톤 계층)
아래 블록만 추가/치환 하세요. (나머지 로직은 그대로 동작)
# --- 싱글톤 메모리 스냅샷 계층 ---
class _MemSnap:
def __init__(self):
self.jobs = {} # {job_id: {"status": str, "ts": float}, ...}
self.last_full_refresh_ts = 0.0 # float epoch
self.lock = asyncio.Lock()
class FullListSharedCache:
_mem = _MemSnap() # 프로세스 단일 인스턴스
# ... (기존 __init__, _lock_fd 등은 그대로)
def _mem_is_fresh(self, now: float, ttl: float) -> bool:
return (now - float(self._mem.last_full_refresh_ts)) <= float(ttl)
def _mem_update_from_dict(self, data: dict):
self._mem.jobs = data.get("jobs", {})
self._mem.last_full_refresh_ts = float(data.get("last_full_refresh_ts", 0))
def _persist_and_update_mem(self, data: dict):
self._write_cache(data)
# 파일 쓰기와 동시에 메모리 스냅샷 최신화
self._mem_update_from_dict(data)
def seed_submitted(self, job_ids: list[str], meta: dict | None = None):
"""제출 직후 SUBMITTED 상태를 메모리/파일 모두에 시드."""
now = time.time()
# 메모리 업데이트
# (동시에 들어와도 무해; 최신 ts로 덮어써도 OK)
self._mem.jobs = self._mem.jobs or {}
for jid in job_ids:
self._mem.jobs.setdefault(jid, {"status": "SUBMITTED", "ts": now})
# 파일 업데이트
fh = self._lock_fd(self.cache_file)
try:
data = self._read_cache()
jobs = data.get("jobs", {})
for jid in job_ids:
jobs.setdefault(jid, {"status": "SUBMITTED", "ts": now})
data["jobs"] = jobs
# (선택) 커맨드 메타 저장
if meta:
jm = data.get("jobs_meta", {})
for jid in job_ids:
if jid in meta:
jm[jid] = meta[jid]
data["jobs_meta"] = jm
self._persist_and_update_mem(data)
finally:
self._unlock_fd(fh)
async def maybe_refresh_full(self, backend: StatusBackend, reason: str = "") -> bool:
now = time.time()
# 최소 간격·락 처리 동일
data = self._read_cache()
last_ts = float(data.get("last_full_refresh_ts", 0))
if (now - last_ts) < float(self.min_refresh_gap_s):
return False
lock_path = self.cache_file + ".refresh.lock"
lock_fh = self._lock_fd(lock_path, nonblock=True)
if lock_fh is None:
return False
try:
data2 = self._read_cache()
last_ts2 = float(data2.get("last_full_refresh_ts", 0))
if (now - last_ts2) < float(self.min_refresh_gap_s):
return False
snapshot = await backend.query_full()
if not snapshot:
return False
now2 = time.time()
fh = self._lock_fd(self.cache_file)
try:
base = self._read_cache()
jobs = base.get("jobs", {})
ts = now2
for jid, st in snapshot.items():
jobs[jid] = {"status": st, "ts": ts}
base["jobs"] = jobs
base["last_full_refresh_ts"] = now2
meta = base.get("meta", {})
meta["reason"] = reason
meta["refreshed_at"] = iso_now()
base["meta"] = meta
self._persist_and_update_mem(base) # 파일과 메모리 동시 갱신
finally:
self._unlock_fd(fh)
return True
finally:
self._unlock_fd(lock_fh)
async def get_statuses(self, job_ids: list[str], backend: StatusBackend) -> dict[str, str]:
now = time.time()
# 1) 메모리 스냅샷 우선
async with self._mem.lock:
mem_fresh = self._mem_is_fresh(now, self.full_ttl_s)
if mem_fresh:
jobs = self._mem.jobs
else:
# 파일 1회 읽어 메모리 최신화
data = self._read_cache()
self._mem_update_from_dict(data)
jobs = self._mem.jobs
def _fresh(jid: str) -> bool:
ent = jobs.get(jid)
if not ent:
return False
return (now - float(ent.get("ts", 0))) <= float(self.full_ttl_s)
need_refresh = False
if not self._mem_is_fresh(now, self.full_ttl_s):
need_refresh = True
miss = [jid for jid in job_ids if not _fresh(jid)]
if miss and self.force_refresh_on_miss:
need_refresh = True
if need_refresh:
await self.maybe_refresh_full(backend, reason="ttl_or_miss")
# 리프레시 후 메모리에서 재조회
jobs = self._mem.jobs
return {jid: (jobs.get(jid, {}).get("status", "")) for jid in job_ids}
효과: 동일 프로세스(=동일 ComfyUI 인스턴스) 에서 여러 노드가 돌아도, 대부분 파일 I/O 없이 메모리만 보게 됩니다. 그래도 다른 프로세스/머신과 공유가 필요하면 파일 스냅샷이 “사실상의 소스 오브 트루스”가 되고, 한 프로세스만
-a를 실행해 폭주 방지 가 유지됩니다.
또한 제출 직후:
# submit 이후
jid_to_cmd = {jid: {"cmd": sub["cmd"], "submitted_at": iso_now()} for sub in submissions for jid in sub.get("job_ids", [])}
full_cache.seed_submitted(job_ids, meta=jid_to_cmd) # 메모리/파일 모두 시드 + 커맨드 메타 저장
Q2. “job id | 커맨드 | 상태”를 노드에 계속 업데이트해서 보여줄 수 있나요?
결론 (step-by-step)
-
ComfyUI 코어는 기본적으로 임의 노드가 실행 중에 UI를 실시간으로 갱신 하는 API를 제공하지 않습니다(최종 출력만 표시).
-
대신 현실적인 두 가지 경로가 있습니다.
A안(프론트 수정 없이, 노드만으로)
-
보조 뷰어 노드 를 하나 더 만들어, 공유 캐시(
status_full_cache.json) +jobs_meta에서 표를 생성(STRING 또는 이미지) 해서 보여줍니다. -
최신 상태를 보고 싶을 때 뷰어 노드를 다시 실행(Queue) 하면 표가 갱신됩니다.
-
자동 리프레시는
Loop/Repeat류 확장 노드를 곁들이면 됩니다.
장점: ComfyUI 변경 없이 즉시 사용.
단점: “지속 스트리밍”은 아니고 수동/주기 재실행.
B안(진짜 실시간 패널)
-
커스텀 노드 파일 안에 가벼운 HTTP GET 엔드포인트 를 추가(ComfyUI 서버에 라우트 등록) → 프론트에 작은 스크립트(또는 별도 플러그인)로 주기 폴링 하여 표를 갱신합니다.
-
사실상 “실시간 대시보드”를 ComfyUI 안쪽 패널에 넣을 수 있습니다.
장점: 진짜 라이브.
단점: 프론트에 소폭 추가 작업 필요.
바로 쓸 수 있는 “뷰어 노드” (A안)
아래 클래스를 같은 파일에 추가 하세요.
# ====== 뷰어 노드: 상태 테이블 출력 ======
class LSFStatusBoard:
"""
입력:
- job_ids (STRING): "123,124,125" (비워두면 캐시에 있는 최근 N개를 보여줌)
- top_k (INT): job_ids 비었을 때 최근 N개
- width_limit (INT): 커맨드 컬럼 자르기 길이
출력:
- table_text (STRING): 모노스페이스 표 텍스트
"""
CATEGORY = "LSF"
FUNCTION = "run"
RETURN_TYPES = ("STRING",)
RETURN_NAMES = ("table_text",)
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"job_ids": ("STRING", {"default": ""}),
},
"optional": {
"top_k": ("INT", {"default": 20, "min": 1, "max": 500}),
"width_limit": ("INT", {"default": 80, "min": 20, "max": 200}),
}
}
def _read_cache(self):
cache_dir = ensure_dir(default_cache_dir())
cache_file = os.path.join(cache_dir, "status_full_cache.json")
try:
with open(cache_file, "r") as f:
data = json.load(f)
return data
except Exception:
return {"jobs": {}, "jobs_meta": {}, "last_full_refresh_ts": 0}
def _short(self, s, limit):
s = s.strip()
return s if len(s) <= limit else (s[:limit-1] + "…")
def _fmt_table(self, rows):
# 고정 폭 표 생성
jid_w = max(6, max((len(r["job_id"]) for r in rows), default=6))
st_w = max(5, max((len(r["status"]) for r in rows), default=5))
hdr = f"{'JOBID':<{jid_w}} | {'STATUS':<{st_w}} | CMD"
sep = "-"*len(hdr)
body = [hdr, sep]
for r in rows:
body.append(f"{r['job_id']:<{jid_w}} | {r['status']:<{st_w}} | {r['cmd']}")
return "\n".join(body)
def run(self, job_ids: str, top_k: int = 20, width_limit: int = 80):
data = self._read_cache()
jobs = data.get("jobs", {})
meta = data.get("jobs_meta", {})
if job_ids.strip():
jids = [x.strip() for x in job_ids.split(",") if x.strip()]
else:
# 최근 top_k (ts 기준 정렬)
jids = sorted(jobs.keys(), key=lambda j: jobs[j].get("ts", 0), reverse=True)[:top_k]
rows = []
for jid in jids:
st = jobs.get(jid, {}).get("status", "")
cmd = meta.get(jid, {}).get("cmd", "")
rows.append({
"job_id": jid,
"status": st,
"cmd": self._short(cmd, width_limit),
})
table = self._fmt_table(rows)
return (table,)
# 등록
NODE_CLASS_MAPPINGS.update({"LSFStatusBoard": LSFStatusBoard})
NODE_DISPLAY_NAME_MAPPINGS.update({"LSFStatusBoard": "LSF: Status Board"})
제출 노드에서 커맨드 메타 저장(한 줄 추가)
앞서 Q1 코드처럼 제출 직후 커맨드도 함께 시드합니다:
jid_to_cmd = {jid: {"cmd": sub["cmd"], "submitted_at": iso_now()} for sub in submissions for jid in sub.get("job_ids", [])}
full_cache.seed_submitted(job_ids, meta=jid_to_cmd)
이제 뷰어 노드에서 JOBID | STATUS | CMD 를 깔끔히 볼 수 있습니다.
“계속” 갱신하려면 뷰어 노드를 주기적으로 재실행(직접 클릭 또는 반복 실행 노드 사용)하세요.
진짜 실시간 패널 (B안, 선택)
-
가능: 서버 라우트를 하나 노드 파일에 등록해 JSON을 내보내고, 프론트(간단 JS)에서 1~3초 간격 폴링.
-
필요 코드(개요):
server.PromptServer이용해GET /lsf/status?job_ids=...엔드포인트 추가 → 위LSFStatusBoard의 내부 로직을 재사용해 JSON 응답. -
원하시면 이 라우트 + 최소 HTML/JS 패널 코드를 바로 붙여드릴게요.
요약
-
Q1 : 파일 남용 없이 “메모리 싱글톤 + 파일 스냅샷” 2-계층 캐시로 해결. 제출 직후 SUBMITTED 시드 로 Node2 코너 케이스도 해결.
-
Q2 : 코어 UI는 중간 갱신이 어려우므로
-
바로 쓰려면 뷰어 노드(A안) 로 표를 재실행 갱신,
-
진짜 라이브는 간단 라우트(B안) + 소형 패널.
-
더 진한 기능(상태 아이콘/색상, 정렬/필터, EXIT 원인 추적, bjobs -a 추가 컬럼 파싱 등) 원하시면 이어서 확장해 드릴게요.
impact node에서 아래 함수(js)와 PromptServer.instance.send_sync("impact-node-feedback", {"node_id": …, ”widget_name": …}) 이런 함수를 이용해서 런타임에서 위젯 업데이트를 하는 것 같은데 이런 참고해서 markdown table 형식으로 Jod ID, Status, Command 를 String widget에 계속 업데이트 해보자. function nodeFeedbackHandler(event) { let nodes = app.graph._nodes_by_id; let node = nodes[event.detail.node_id]; if(node) { const w = node.widgets.find((w) => event.detail.widget_name === w.name); if(w) { w.value = event.detail.value; } } } api.addEventListener("impact-node-feedback", nodeFeedbackHandler);
좋아—“impact-node-feedback” 패턴을 그대로 활용해서, 런타임에 String 위젯을 계속 갱신(마크다운 테이블 형태)하도록 노드를 확장해볼게. 아래는 step-by-step 로 정리한 구현 가이드 + 바로 붙여넣어 쓸 패치 코드 야.
어떻게 작동하나 (step-by-step)
-
프론트엔드 리스너 등록
네가 준 핸들러 그대로 씀.impact-node-feedback커스텀 메시지를 받으면 해당 노드의 특정 위젯 값을 갱신. 이 방식은 공식 문서의PromptServer.instance.send_sync()사용자 정의 메시지 가이드와 동일한 흐름이야. ComfyUI -
노드에 “표시 전용” String 위젯 추가
예:status_md라는 이름의 멀티라인 STRING 위젯. 노드 실행 중 서버에서 이 위젯의 값을 계속 바꿔치기 함(마크다운 표 텍스트). -
node_id 확보(필수)
ComfyUI는INPUT_TYPES["hidden"]에"UNIQUE_ID"를 넣으면 런타임에서 노드 id 를 파라미터로 받을 수 있음. 이걸 payload에 실어 보내면, 프론트에서 정확한 노드의 해당 위젯만 업데이트 가능. ComfyUI+2 -
서버에서 주기적으로 send_sync 발사
폴링 루프 매 턴마다arduino PromptServer.instance.send_sync( "impact-node-feedback", {"node_id": node_id, "widget_name": "status_md", "value": table_md} )
로 위젯 값을 푸시. (다수 예제들이 같은 패턴으로 상태를 실시간 알림) Hugging Face+2
0) 프론트엔드(웹) 코드 — 리스너 등록
ComfyUI/web/extensions/impact_node_feedback.js (파일명은 임의)로 저장:
js
function nodeFeedbackHandler(event) {
let nodes = app.graph._nodes_by_id;
let node = nodes[event.detail.node_id];
if (node) {
const w = node.widgets.find((w) => event.detail.widget_name === w.name);
if (w) {
w.value = event.detail.value;
// UI 즉시 갱신
if (w.callback) w.callback(w.value);
node.setDirtyCanvas(true, true);
}
}
}
api.addEventListener("impact-node-feedback", nodeFeedbackHandler);
이건 ComfyUI의 커스텀 메시지 이벤트 처리 패턴과 1:1로 맞음. 서버에서
send_sync로"impact-node-feedback"를 보낼 거야. ComfyUI
1) 파이썬 노드 패치 — 위젯·node_id·라이브업데이트 추가
아래 추가/수정 블록만 기존 노드 파일에 반영하면 돼. (이전 답변에서 만든 LSFSubmitAndWait 기준)
# 상단 import에 추가
from server import PromptServer # send_sync를 위해
# ----- 유틸: 마크다운 테이블 생성 -----
def _short(s: str, limit: int) -> str:
s = (s or "").strip()
return s if len(s) <= limit else (s[:limit-1] + "…")
def make_status_markdown(job_rows: list[dict], width_limit: int = 80) -> str:
"""
job_rows: [{"job_id": "123", "status": "RUN", "cmd": "bsub ..."}, ...]
"""
lines = []
lines.append("| JOB ID | STATUS | COMMAND |")
lines.append("|------:|:------:|---------|")
for r in job_rows:
jid = r.get("job_id", "")
st = r.get("status", "")
cmd = _short(r.get("cmd", ""), width_limit)
# 파이프 이스케이프는 단순히 백틱 감싸기로 회피
lines.append(f"| `{jid}` | `{st}` | `{cmd}` |")
return "\n".join(lines)
# ----- LSFSubmitAndWait 클래스 변경 -----
class LSFSubmitAndWait:
CATEGORY = "LSF"
FUNCTION = "run"
OUTPUT_NODE = True
SUPPORTS_ASYNC = True
RETURN_TYPES = ("STRING", "STRING")
RETURN_NAMES = ("job_ids", "results_json")
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"commands": ("STRING", {"multiline": True, "default": "bsub -q short sleep 3"}),
"cwd": ("STRING", {"default": "."}),
},
"optional": {
# 표시 전용 위젯(라이브 상태 보드)
"status_md": ("STRING", {"multiline": True, "default":
"| JOB ID | STATUS | COMMAND |\n|------:|:------:|---------|\n(실행 중 라이브로 갱신됩니다)"}),
# 줄 길이 제한(명령어 과도하게 길 때)
"cmd_width_limit": ("INT", {"default": 100, "min": 40, "max": 200}),
# 필요 시 끄기
"live_update": ("BOOL", {"default": True}),
},
# node_id를 받기 위한 hidden 입력 (공식 문서 패턴)
"hidden": { "node_id": "UNIQUE_ID" }
}
async def run(self,
commands: str,
cwd: str,
status_md: str = "",
cmd_width_limit: int = 100,
live_update: bool = True,
node_id: str = None):
# ... (기존 run 코드의 "옵션 파싱" 직후에 이어서) ...
# 1) 제출
submitted_at = iso_now()
job_ids, submissions = await submit_commands_multiline(lines, cwd=cwd, batch_size=batch_size)
# 제출한 커맨드 메타 맵 (job_id -> cmd)
jid_to_cmd = {}
for sub in submissions:
for jid in sub.get("job_ids", []):
jid_to_cmd[jid] = sub.get("cmd", "")
# 2) 전역 캐시/백엔드
cache_dir = ensure_dir(default_cache_dir())
cache_file = os.path.join(cache_dir, "status_full_cache.json")
full_cache = FullListSharedCache(
cache_file,
full_ttl_s=full_ttl_s,
min_refresh_gap_s=min_refresh_gap_s,
force_refresh_on_miss=force_refresh_on_miss,
)
backend = LSFBjobsFullBackend(full_status_cmd_template=full_status_cmd_template)
# 2-1) 제출 직후 캐시 시드(커맨드 메타 포함)
if job_ids:
# seed_submitted가 meta를 파일/메모리 모두에 저장하도록 앞서 구현해둠
meta = {jid: {"cmd": jid_to_cmd.get(jid, ""), "submitted_at": submitted_at} for jid in job_ids}
full_cache.seed_submitted(job_ids, meta=meta)
# (신규) 초기 UI 업데이트 한번 수행
if live_update and node_id and job_ids:
rows = [{"job_id": jid, "status": "SUBMITTED", "cmd": jid_to_cmd.get(jid, "")} for jid in job_ids]
table_md = make_status_markdown(rows, width_limit=cmd_width_limit)
PromptServer.instance.send_sync("impact-node-feedback",
{"node_id": node_id, "widget_name": "status_md", "value": table_md})
# 3) 완료까지 대기 + 주기적 UI 업데이트
start = time.time()
final_statuses = {}
waited_sec = 0.0
while True:
statuses = await full_cache.get_statuses(job_ids, backend)
final_statuses = {jid: statuses.get(jid, "") for jid in job_ids}
# (신규) 라이브 업데이트
if live_update and node_id and job_ids:
rows = [{"job_id": jid,
"status": final_statuses.get(jid, "") or "N/A",
"cmd": jid_to_cmd.get(jid, "")} for jid in job_ids]
table_md = make_status_markdown(rows, width_limit=cmd_width_limit)
PromptServer.instance.send_sync("impact-node-feedback",
{"node_id": node_id, "widget_name": "status_md", "value": table_md})
if all((st in TERMINAL_STATES) and st for st in final_statuses.values()):
break
if max_wait_s is not None and (time.time() - start) >= max_wait_s:
break
await asyncio.sleep(max(1, int(poll_interval_s)))
waited_sec = time.time() - start
completed_at = iso_now()
all_done = all((st in TERMINAL_STATES) and st for st in final_statuses.values())
# (나머지 결과 저장/반환 로직은 기존 그대로)
# ...
포인트
hidden에"UNIQUE_ID"추가 →run(..., node_id=...)로 받을 수 있음. 서버에서 이 값을 payload에 담아 보냄. 프론트는node_id로 해당 노드의status_md위젯만 갱신. ComfyUI메시지 발사는
PromptServer.instance.send_sync(<event>, <payload>)형식. (여러 커스텀 노드들이 이 패턴으로 실시간 상태를 보내는 사례 다수) Hugging Face+1
결과 화면 예시 (String 위젯 내용)
| JOB ID | STATUS | COMMAND |
|------:|:------:|---------|
| `124578` | `RUN` | `bsub -q short -J test1 sleep 5` |
| `124579` | `PEND` | `bsub -q short -J test2 sleep 8` |
| `124580` | `DONE` | `bsub -q short -J test3 sleep 2` |
- 마크다운 테이블을 “렌더링”하지는 않지만, 가독성 높은 텍스트 표 로 잘 보여줘. (String 위젯은 Markdown 해석을 안 하므로, 표 텍스트 형태로 보는 개념)
추가 팁
-
업데이트 빈도(스팸 방지)
poll_interval_s가 너무 작으면 UI도 그만큼 자주 갱신됨. 필요하면 “마지막 송신 시각”을 저장해 1~2초 쓰로틀 을 추가해도 좋음. -
여러 클라이언트 탭 동시 접속
특정 탭만 갱신하고 싶다면send_sync(..., sid=PromptServer.instance.client_id)로 현재 클라이언트에만 보내는 패턴도 있어. (여러 예제 노드가 이 방식 사용) Gitee -
명령어 길이
cmd_width_limit로 긴 커맨드를 줄여 보여줌. 필요하면 툴팁/별도 대시보드로 확장 가능.
요약
-
Q1 : 파일 캐시 + 프로세스 내 싱글톤 메모리 캐시(이미 반영)로 bjobs -a 과다 호출 방지 및 동시 노드 안전.
-
Q2 :
send_sync("impact-node-feedback", {...})+hidden UNIQUE_ID+ 표시용status_md위젯으로, 런타임 라이브 상태판 구현 완료. (공식 문서의 커스텀 메시지/노드 id 전달 패턴 준수) ComfyUI
원하면 진짜 Markdown 렌더(HTML 패널)나 컬러 뱃지/아이콘 , EXIT 원인/에러 로그 컬럼도 붙여줄 수 있어.
전체 코드 정리해서 다시 써줘(파이썬 코드만 써주면 돼). 전체적으로 좀 기능에 비해 코드가 복잡한것 같은데 최적화할 수 있는 부분이 있으면 최적화하면 좋겠어.
# File: custom_nodes/lsf_submit_and_wait.py
# Purpose: ComfyUI Async OUTPUT node to submit LSF jobs (bsub), poll with bjobs -a,
# share a lightweight global cache across nodes/processes, and live-update
# a String widget (markdown-like table) during runtime via PromptServer.
#
# Key features (lean & optimized):
# - Async node (SUPPORTS_ASYNC=True), batch-limited parallel bsub submissions.
# - Single, simplified "full list" cache refreshed by `bjobs -a` only when needed:
# * 2-tier cache: in-process memory snapshot + cross-process JSON file.
# * TTL & minimal refresh gap + non-blocking refresh lock -> prevents bjobs storms.
# * Immediate "SUBMITTED" seeding for newly created job IDs (corner case solved).
# - Runtime UI feedback: PromptServer.instance.send_sync("impact-node-feedback", …)
# * Updates a String widget named "status_md" with a compact markdown table.
# * Throttled updates to avoid UI spam.
#
# Inputs:
# - commands (STRING, multiline): multiple bsub lines. Optional inline directive:
# # LSF: batch_size=4 poll_interval_s=10 full_ttl_s=15 min_refresh_gap_s=5
# # LSF: ui_update_gap_s=1 max_wait_s=7200 full_status_cmd="bjobs -a -noheader -o \"jobid stat\""
# (All keys optional; defaults are sensible.)
# - cwd (STRING): working directory for submissions.
# - status_md (STRING): display widget (auto-updated during run).
# - cmd_width_limit (INT): truncate very long command strings in the table.
# - live_update (BOOL): enable/disable live widget updates.
# - hidden node_id (UNIQUE_ID): used to address the specific node widget.
#
# Outputs (OUTPUT node):
# - job_ids (STRING): comma-separated list
# - results_json (STRING): summary JSON (final status, timing, options, log file path)
#
# Minimal dependencies: Python stdlib + ComfyUI's PromptServer (bundled).
import asyncio
import json
import os
import re
import shlex
import tempfile
import time
from datetime import datetime
# ComfyUI server for live widget updates
from server import PromptServer # noqa
# fcntl (POSIX) for file locks. If unavailable (e.g. Windows), degrade gracefully.
try:
import fcntl # type: ignore
except Exception: # pragma: no cover
fcntl = None
# ---------------------------- Basic utils ----------------------------
JOB_ID_REGEX = re.compile(r"Job\s*<(\d+)>", re.IGNORECASE)
TERMINAL_STATES = {"DONE", "EXIT"} # Adjust if your cluster uses other terminal labels
def iso_now() -> str:
return datetime.utcnow().isoformat(timespec="seconds") + "Z"
def default_cache_dir() -> str:
return os.environ.get(
"COMFYUI_LSF_CACHE_DIR",
os.path.join(tempfile.gettempdir(), "comfyui_lsf_cache")
)
def ensure_dir(path: str) -> str:
os.makedirs(path, exist_ok=True)
return path
def _short(s: str, limit: int) -> str:
s = (s or "").strip()
return s if len(s) <= limit else (s[: max(0, limit - 1)] + "…")
def parse_inline_options_and_commands(commands_text: str):
"""
Parse optional inline LSF options from leading lines:
# LSF: key=value key=value ...
Returns (options_dict, command_lines)
"""
options = {}
raw_lines = commands_text.splitlines()
cmd_lines = []
for ln in raw_lines:
m = re.match(r"^\s*#\s*LSF:\s*(.+)$", ln)
if m:
for kv in shlex.split(m.group(1), posix=True):
if "=" in kv:
k, v = kv.split("=", 1)
v_strip = v.strip().lower()
if v_strip in ("true", "false"):
options[k.strip()] = (v_strip == "true")
else:
try:
options[k.strip()] = int(v)
except Exception:
options[k.strip()] = v
continue
# keep non-empty command lines, ignore other comments/empties
if ln.strip() and not ln.strip().startswith("# LSF:"):
cmd_lines.append(ln)
return options, cmd_lines
async def run_shell(cmd: str, cwd: str | None = None):
proc = await asyncio.create_subprocess_shell(
cmd,
cwd=cwd if cwd else None,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
out_b, err_b = await proc.communicate()
return proc.returncode, out_b.decode(errors="ignore"), err_b.decode(errors="ignore")
def extract_job_ids_from_bsub_output(stdout: str):
# LSF standard: "Job <12345> is submitted to queue <...>."
return JOB_ID_REGEX.findall(stdout or "")
# ---------------------------- LSF backend ----------------------------
class LSFBackend:
"""Minimal backend that fetches ONE full snapshot of job statuses via `bjobs -a`."""
def __init__(self, full_status_cmd: str | None = None):
# default format: columns: jobid stat
self.full_status_cmd = full_status_cmd or 'bjobs -a -noheader -o "jobid stat"'
async def full_snapshot(self) -> dict[str, str]:
rc, out, _ = await run_shell(self.full_status_cmd, cwd=None)
if rc != 0 or not out:
return {}
result = {}
for ln in out.splitlines():
ln = ln.strip()
if not ln:
continue
parts = ln.split()
if not parts:
continue
jid = parts[0]
st = parts[-1].upper()
result[jid] = st
return result
# ---------------------------- Global cache (lean, 2-tier) ----------------------------
class LSFCache:
"""
Lean 2-tier cache for job statuses (ALL jobs snapshot):
- In-process memory snapshot (fast-path, avoids file I/O most of the time)
- Cross-process JSON file snapshot (source of truth)
Concurrency control:
- Non-blocking refresh lock file ensures only one process runs `bjobs -a`
- TTL + minimal refresh gap to throttle refreshes
File structure:
{
"jobs": { "<jobid>": {"status": "RUN", "ts": 169...}, ... },
"jobs_meta": { "<jobid>": {"cmd": "...", "submitted_at": "..."}, ... },
"last_full_refresh_ts": 169...,
"meta": { "refreshed_at": "2025-...", "reason": "ttl_or_miss" }
}
"""
_mem_jobs: dict[str, dict] = {}
_mem_last_ts: float = 0.0
_mem_lock = asyncio.Lock() # protect _mem_jobs/_mem_last_ts
def __init__(self, cache_file: str, full_ttl_s: int = 10, min_refresh_gap_s: int = 5):
self.cache_file = cache_file
self.full_ttl_s = float(full_ttl_s)
self.min_refresh_gap_s = float(min_refresh_gap_s)
ensure_dir(os.path.dirname(cache_file))
if not os.path.exists(cache_file):
with open(cache_file, "w") as f:
json.dump({"jobs": {}, "jobs_meta": {}, "last_full_refresh_ts": 0, "meta": {}}, f)
# ---------- file lock helpers ----------
def _lock_fd(self, path: str, nonblock: bool = False):
fh = open(path, "a+")
if fcntl:
try:
fcntl.flock(fh, fcntl.LOCK_EX | (fcntl.LOCK_NB if nonblock else 0))
except BlockingIOError:
fh.close()
return None
return fh
def _unlock_fd(self, fh):
if not fh:
return
if fcntl:
try:
fcntl.flock(fh, fcntl.LOCK_UN)
except Exception:
pass
try:
fh.close()
except Exception:
pass
# ---------- file I/O ----------
def _read_file(self) -> dict:
try:
with open(self.cache_file, "r") as f:
return json.load(f)
except Exception:
return {"jobs": {}, "jobs_meta": {}, "last_full_refresh_ts": 0, "meta": {}}
def _write_file_and_update_mem(self, data: dict):
tmp = self.cache_file + ".tmp"
with open(tmp, "w") as f:
json.dump(data, f)
os.replace(tmp, self.cache_file)
# keep memory in sync
type(self)._mem_jobs = data.get("jobs", {})
type(self)._mem_last_ts = float(data.get("last_full_refresh_ts", 0))
# ---------- public API ----------
def seed_submitted(self, job_ids: list[str], meta: dict[str, dict] | None = None):
"""
Immediately make new job IDs visible as SUBMITTED (fresh) in both memory and file.
Useful for the edge case where another node asks before bjobs snapshot contains them.
"""
now = time.time()
# Update memory (fast path)
self._mem_jobs = self._mem_jobs or {}
for jid in job_ids:
self._mem_jobs.setdefault(jid, {"status": "SUBMITTED", "ts": now})
# Merge into file (single short critical section)
fh = self._lock_fd(self.cache_file)
try:
data = self._read_file()
jobs = data.get("jobs", {})
for jid in job_ids:
jobs.setdefault(jid, {"status": "SUBMITTED", "ts": now})
data["jobs"] = jobs
if meta:
jm = data.get("jobs_meta", {})
for jid in job_ids:
if jid in meta:
jm[jid] = meta[jid]
data["jobs_meta"] = jm
self._write_file_and_update_mem(data)
finally:
self._unlock_fd(fh)
async def _maybe_refresh(self, backend: LSFBackend, reason: str) -> bool:
"""
Refresh the full snapshot if nobody else is doing it, and minimal gap is respected.
Non-blocking: returns False if someone else just refreshed or lock couldn't be taken.
"""
now = time.time()
data = self._read_file()
last_ts = float(data.get("last_full_refresh_ts", 0))
if (now - last_ts) < self.min_refresh_gap_s:
return False
# Take a non-blocking refresh lock
lock_fh = self._lock_fd(self.cache_file + ".refresh.lock", nonblock=True)
if lock_fh is None:
return False
try:
# Double-check gap again after taking the lock
data2 = self._read_file()
last_ts2 = float(data2.get("last_full_refresh_ts", 0))
if (now - last_ts2) < self.min_refresh_gap_s:
return False
snapshot = await backend.full_snapshot()
if not snapshot:
return False
now2 = time.time()
fh = self._lock_fd(self.cache_file)
try:
base = self._read_file()
jobs = base.get("jobs", {})
ts = now2
for jid, st in snapshot.items():
jobs[jid] = {"status": st, "ts": ts}
base["jobs"] = jobs
base["last_full_refresh_ts"] = now2
meta = base.get("meta", {})
meta["refreshed_at"] = iso_now()
meta["reason"] = reason
base["meta"] = meta
self._write_file_and_update_mem(base)
finally:
self._unlock_fd(fh)
return True
finally:
self._unlock_fd(lock_fh)
async def get_statuses(self, job_ids: list[str], backend: LSFBackend, force_on_miss: bool = True) -> dict[str, str]:
"""
Return statuses for the given job_ids using 2-tier cache:
1) If in-memory snapshot is fresh -> read from memory (no file I/O).
2) Else read file once and update memory.
3) If TTL expired or we miss any job_id (and force_on_miss=True),
try a single non-blocking full refresh (bjobs -a) if the minimal gap elapsed.
"""
now = time.time()
# Stage 1: memory fast-path
async with self._mem_lock:
mem_is_fresh = (now - self._mem_last_ts) <= self.full_ttl_s
if mem_is_fresh:
jobs = self._mem_jobs
else:
data = self._read_file()
type(self)._mem_jobs = data.get("jobs", {})
type(self)._mem_last_ts = float(data.get("last_full_refresh_ts", 0))
jobs = self._mem_jobs
# Helper: check if entry is fresh enough
def fresh(jid: str) -> bool:
ent = jobs.get(jid)
if not ent:
return False
return (now - float(ent.get("ts", 0))) <= self.full_ttl_s
need_refresh = False
if not mem_is_fresh:
need_refresh = True
if force_on_miss:
for jid in job_ids:
if not fresh(jid):
need_refresh = True
break
if need_refresh:
await self._maybe_refresh(backend, reason="ttl_or_miss")
# Read from memory again (already updated if refresh happened)
jobs = self._mem_jobs
return {jid: jobs.get(jid, {}).get("status", "") for jid in job_ids}
# ---------------------------- Submission & waiting ----------------------------
async def submit_commands_multiline(commands: list[str], cwd: str, batch_size: int):
"""
Submit each line (bsub ...) as an independent command. Limit concurrency by batch_size.
Returns (job_ids, submissions_detail)
"""
sem = asyncio.Semaphore(max(1, int(batch_size)))
submissions = []
async def _submit_one(cmd: str):
async with sem:
rc, out, err = await run_shell(cmd, cwd=cwd)
jids = extract_job_ids_from_bsub_output(out)
return {
"cmd": cmd,
"returncode": rc,
"stdout": out,
"stderr": err,
"job_ids": jids,
}
tasks = [asyncio.create_task(_submit_one(c)) for c in commands]
for t in asyncio.as_completed(tasks):
submissions.append(await t)
job_ids = [jid for sub in submissions for jid in sub.get("job_ids", [])]
return job_ids, submissions
def make_status_markdown(job_rows: list[dict], width_limit: int = 100) -> str:
"""
Build a compact markdown-like table: JOB ID | STATUS | COMMAND
(String widget does not render markdown; table is for readability.)
"""
lines = []
lines.append("| JOB ID | STATUS | COMMAND |")
lines.append("|------:|:------:|---------|")
for r in job_rows:
jid = str(r.get("job_id", ""))
st = str(r.get("status", ""))
cmd = _short(str(r.get("cmd", "")), width_limit)
lines.append(f"| `{jid}` | `{st}` | `{cmd}` |")
return "\n".join(lines)
# ---------------------------- ComfyUI Node ----------------------------
class LSFSubmitAndWait:
"""
Async OUTPUT node that:
- Submits multiple bsub jobs (batch-limited parallelism)
- Polls statuses using a shared bjobs -a snapshot cache
- Live-updates a String widget with JOB ID / STATUS / COMMAND
"""
CATEGORY = "LSF"
FUNCTION = "run"
OUTPUT_NODE = True
SUPPORTS_ASYNC = True
RETURN_TYPES = ("STRING", "STRING")
RETURN_NAMES = ("job_ids", "results_json")
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"commands": ("STRING", {"multiline": True, "default": "bsub -q short -J test sleep 3"}),
"cwd": ("STRING", {"default": "."}),
},
"optional": {
"status_md": ("STRING", {"multiline": True, "default":
"| JOB ID | STATUS | COMMAND |\n|------:|:------:|---------|\n(실행 중에 갱신됩니다)"}),
"cmd_width_limit": ("INT", {"default": 100, "min": 40, "max": 200}),
"live_update": ("BOOL", {"default": True}),
},
"hidden": {
# ComfyUI injects this; we'll use it to target the widget updates
"node_id": "UNIQUE_ID"
}
}
async def run(self,
commands: str,
cwd: str,
status_md: str = "",
cmd_width_limit: int = 100,
live_update: bool = True,
node_id: str = None):
# ---- 1) Parse inline options & clean command lines ----
opts, lines = parse_inline_options_and_commands(commands)
batch_size = int(opts.get("batch_size", 4))
poll_interval_s = int(opts.get("poll_interval_s", 10))
full_ttl_s = int(opts.get("full_ttl_s", 10)) # snapshot TTL
min_refresh_gap_s = int(opts.get("min_refresh_gap_s", 5)) # min gap between -a
full_status_cmd = opts.get("full_status_cmd", 'bjobs -a -noheader -o "jobid stat"')
max_wait_s = opts.get("max_wait_s", None)
ui_update_gap_s = float(opts.get("ui_update_gap_s", 1.0)) # throttle UI pushes
force_on_miss = bool(opts.get("force_refresh_on_miss", True))
if isinstance(max_wait_s, str) and max_wait_s.isdigit():
max_wait_s = int(max_wait_s)
# Filter out accidental empty command lines
lines = [ln for ln in lines if ln.strip()]
# ---- 2) Submit jobs (async batch-limited) ----
submitted_at = iso_now()
job_ids, submissions = await submit_commands_multiline(lines, cwd=cwd, batch_size=batch_size)
# Map job_id -> command (for UI & logs)
jid_to_cmd = {}
for sub in submissions:
for jid in sub.get("job_ids", []):
jid_to_cmd[jid] = sub.get("cmd", "")
# ---- 3) Shared cache & backend ----
cache_dir = ensure_dir(default_cache_dir())
cache_file = os.path.join(cache_dir, "status_full_cache.json")
cache = LSFCache(cache_file, full_ttl_s=full_ttl_s, min_refresh_gap_s=min_refresh_gap_s)
backend = LSFBackend(full_status_cmd=full_status_cmd)
# Seed newly submitted jobs to handle "just-submitted not in cache yet" edge
if job_ids:
meta = {jid: {"cmd": jid_to_cmd.get(jid, ""), "submitted_at": submitted_at} for jid in job_ids}
cache.seed_submitted(job_ids, meta=meta)
# ---- 4) Initial live update (SUBMITTED) ----
def push_ui(rows):
if live_update and node_id:
table = make_status_markdown(rows, width_limit=cmd_width_limit)
try:
PromptServer.instance.send_sync(
"impact-node-feedback",
{"node_id": node_id, "widget_name": "status_md", "value": table}
)
except Exception:
pass # UI update failure should not impact job control
if job_ids:
init_rows = [{"job_id": jid, "status": "SUBMITTED", "cmd": jid_to_cmd.get(jid, "")} for jid in job_ids]
push_ui(init_rows)
# ---- 5) Poll loop until all terminal or timeout ----
start = time.time()
last_ui = 0.0
final_statuses = {jid: "SUBMITTED" for jid in job_ids}
while True:
statuses = await cache.get_statuses(job_ids, backend, force_on_miss=force_on_miss)
for jid in job_ids:
st = (statuses.get(jid) or final_statuses.get(jid) or "").upper()
final_statuses[jid] = st if st else final_statuses.get(jid, "")
# Throttled live UI update
now = time.time()
if (now - last_ui) >= ui_update_gap_s:
rows = [{"job_id": jid, "status": final_statuses.get(jid, "") or "N/A",
"cmd": jid_to_cmd.get(jid, "")} for jid in job_ids]
push_ui(rows)
last_ui = now
# All terminal?
if job_ids and all((final_statuses.get(jid, "") in TERMINAL_STATES) for jid in job_ids):
break
# Timeout?
if max_wait_s is not None and (now - start) >= max_wait_s:
break
await asyncio.sleep(max(1, int(poll_interval_s)))
completed_at = iso_now()
waited_sec = round(time.time() - start, 2)
all_done = (job_ids and all((final_statuses.get(jid, "") in TERMINAL_STATES) for jid in job_ids))
# Final UI update (non-throttled, shows last state)
if job_ids:
rows = [{"job_id": jid, "status": final_statuses.get(jid, "") or "N/A",
"cmd": jid_to_cmd.get(jid, "")} for jid in job_ids]
push_ui(rows)
# ---- 6) Persist a concise run log (optional, handy for auditing) ----
results_dir = ensure_dir(os.path.join(cache_dir, "runs"))
session_tag = datetime.utcnow().strftime("%Y%m%d_%H%M%S") + f"_{os.getpid()}"
results_path = os.path.join(results_dir, f"result_{session_tag}.json")
failed_submissions = [
{
"cmd": sub["cmd"],
"returncode": sub["returncode"],
"stderr": sub["stderr"],
"stdout": sub["stdout"],
}
for sub in submissions
if sub.get("returncode", 1) != 0 or not sub.get("job_ids")
]
payload = {
"submitted_at": submitted_at,
"completed_at": completed_at,
"duration_sec": waited_sec,
"job_ids": job_ids,
"final_status": final_statuses,
"all_done": bool(all_done),
"failed_submissions": failed_submissions,
"cwd": cwd,
"options": {
"batch_size": batch_size,
"poll_interval_s": poll_interval_s,
"full_ttl_s": full_ttl_s,
"min_refresh_gap_s": min_refresh_gap_s,
"full_status_cmd": full_status_cmd,
"max_wait_s": max_wait_s,
"ui_update_gap_s": ui_update_gap_s,
"force_refresh_on_miss": force_on_miss,
},
"results_file": results_path,
}
try:
with open(results_path, "w") as f:
json.dump(payload, f, indent=2)
except Exception:
pass
# ---- 7) Return outputs ----
job_ids_str = ",".join(job_ids)
results_json_str = json.dumps(payload, ensure_ascii=False)
return (job_ids_str, results_json_str)
# ---------------------------- Node registration ----------------------------
NODE_CLASS_MAPPINGS = {
"LSFSubmitAndWait": LSFSubmitAndWait,
}
NODE_DISPLAY_NAME_MAPPINGS = {
"LSFSubmitAndWait": "LSF: Submit & Wait",
}
근데 # LSF: key=value 이게 뭐야? 이게 official한 동작이야?
짧게 결론부터요:
# LSF: key=value는 제가 만든 “인라인 설정 주석” 규약 이에요.
ComfyUI 공식 기능도 아니고, LSF 공식 문법도 아닙니다.
우리 노드가 그 줄을 읽어 옵션으로 해석한 뒤, 실제 실행 명령에서는 제거 합니다. (쉘에 흘러가지 않음)
step-by-step로 정리하면:
-
왜 넣었나?
-
입력을
commands/cwd두 개로 단순화하면서도 배치 크기, 폴링 주기 같은 세부 옵션을 텍스트만으로 조정하려고 넣은 편의용 입니다. -
워크플로 공유/복붙 시 설정이 같이 따라가서 재현성이 좋아요.
-
-
안전한가?
-
그 줄은 명령으로 실행되지 않습니다. 파서가
# LSF:로 시작하는 줄만 옵션으로 읽고, 실제bsub제출 목록에선 제외합니다. -
혹시 그대로 쉘 스크립트에 붙여도
#는 주석이라 부작용은 없습니다.
-
-
“공식”을 원하면? (권장 대안)
-
ComfyUI 방식 은 각 옵션을 위젯(옵션 입력값) 으로 노출하는 겁니다.
-
이미 제공한 코드엔
status_md,cmd_width_limit,live_update처럼 optional 위젯 을 쓰고 있어요. 원하시면batch_size,poll_interval_s,full_ttl_s,min_refresh_gap_s,ui_update_gap_s,full_status_cmd,max_wait_s,force_refresh_on_miss도 전부 위젯화하면 됩니다. -
우선순위도 깔끔히 정할 수 있어요: 위젯 값 > 인라인 주석 > 디폴트. (혹은 인라인을 완전히 제거)
-
원하면 인라인 파서를 제거하고 위젯만 쓰는 간단 패치 스니펫도 드릴게요.
굳이 이걸 다 옵션으로 하지말고 최적값으로 상단에 CONSTANT로 처리하자.
# File: custom_nodes/lsf_submit_and_wait.py
# Purpose: ComfyUI Async OUTPUT node to submit LSF bsub jobs, poll with `bjobs -a`,
# share a lightweight global cache, and live-update a String widget
# using PromptServer during runtime.
#
# NOTE: 모든 동작 파라미터는 아래 CONSTANTS 로 고정 (옵션/인라인 파싱 없음)
import asyncio
import json
import os
import re
import tempfile
import time
from datetime import datetime
from server import PromptServer # ComfyUI runtime UI updates
# ---------------------------- CONSTANTS (Tweak here) ----------------------------
# Submission / polling behavior
BATCH_SIZE = 4 # 동시에 제출할 bsub 개수(세마포어)
POLL_INTERVAL_S = 10 # 폴링 주기(초)
MAX_WAIT_S = None # 전체 대기 타임아웃(초) - None이면 무제한
# Cache behavior
FULL_TTL_S = 10 # 전체 스냅샷(메모리/파일) TTL(초)
MIN_REFRESH_GAP_S = 5 # bjobs -a 재호출 최소 간격(초)
FORCE_REFRESH_ON_MISS = True # 캐시 미스 시 TTL 무시하고 조기 리프레시 시도(간격은 준수)
# bjobs -a 명령 템플릿 (jobid stat 두 컬럼을 기대)
FULL_STATUS_CMD = 'bjobs -a -noheader -o "jobid stat"'
# Live UI feedback (String 위젯)
UI_UPDATE_GAP_S = 1.0 # UI 업데이트 쓰로틀 간격(초)
CMD_WIDTH_LIMIT = 100 # COMMAND 열 표시 길이 제한
# Terminal states for LSF
TERMINAL_STATES = {"DONE", "EXIT"}
# Cache file name (under /tmp or $COMFYUI_LSF_CACHE_DIR)
CACHE_FILENAME = "status_full_cache.json"
# ---------------------------- Utilities ----------------------------
JOB_ID_REGEX = re.compile(r"Job\s*<(\d+)>", re.IGNORECASE)
def iso_now() -> str:
return datetime.utcnow().isoformat(timespec="seconds") + "Z"
def default_cache_dir() -> str:
return os.environ.get(
"COMFYUI_LSF_CACHE_DIR",
os.path.join(tempfile.gettempdir(), "comfyui_lsf_cache")
)
def ensure_dir(path: str) -> str:
os.makedirs(path, exist_ok=True)
return path
def _short(s: str, limit: int) -> str:
s = (s or "").strip()
return s if len(s) <= limit else (s[: max(0, limit - 1)] + "…")
async def run_shell(cmd: str, cwd: str | None = None):
proc = await asyncio.create_subprocess_shell(
cmd,
cwd=cwd if cwd else None,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
out_b, err_b = await proc.communicate()
return proc.returncode, out_b.decode(errors="ignore"), err_b.decode(errors="ignore")
def extract_job_ids_from_bsub_output(stdout: str):
# LSF 표준: "Job <12345> is submitted to queue <...>."
return JOB_ID_REGEX.findall(stdout or "")
def make_status_markdown(job_rows: list[dict], width_limit: int = CMD_WIDTH_LIMIT) -> str:
"""JOB ID | STATUS | COMMAND 표 텍스트(가독성 용)."""
lines = []
lines.append("| JOB ID | STATUS | COMMAND |")
lines.append("|------:|:------:|---------|")
for r in job_rows:
jid = str(r.get("job_id", ""))
st = str(r.get("status", ""))
cmd = _short(str(r.get("cmd", "")), width_limit)
lines.append(f"| `{jid}` | `{st}` | `{cmd}` |")
return "\n".join(lines)
# ---------------------------- fcntl lock (POSIX best-effort) ----------------------------
try:
import fcntl # type: ignore
except Exception:
fcntl = None # Windows 등에서는 락 없이도 동작(경쟁 가능성 감수)
def _lock_fd(path: str, nonblock: bool = False):
fh = open(path, "a+")
if fcntl:
try:
fcntl.flock(fh, fcntl.LOCK_EX | (fcntl.LOCK_NB if nonblock else 0))
except BlockingIOError:
fh.close()
return None
return fh
def _unlock_fd(fh):
if not fh:
return
if fcntl:
try:
fcntl.flock(fh, fcntl.LOCK_UN)
except Exception:
pass
try:
fh.close()
except Exception:
pass
# ---------------------------- LSF backend ----------------------------
class LSFBackend:
"""`bjobs -a` 전체 스냅샷을 한 번에 가져오는 최소 백엔드."""
def __init__(self, full_status_cmd: str = FULL_STATUS_CMD):
self.full_status_cmd = full_status_cmd
async def full_snapshot(self) -> dict[str, str]:
rc, out, _ = await run_shell(self.full_status_cmd, cwd=None)
if rc != 0 or not out:
return {}
result = {}
for ln in out.splitlines():
ln = ln.strip()
if not ln:
continue
parts = ln.split()
if not parts:
continue
jid = parts[0]
st = parts[-1].upper()
result[jid] = st
return result
# ---------------------------- Lightweight 2-tier cache ----------------------------
class LSFCache:
"""
- In-process memory snapshot (fast-path, 대부분 파일 I/O 회피)
- Cross-process JSON file snapshot (소스 오브 트루스)
- TTL + 최소 간격 + refresh 락으로 bjobs -a 폭주 방지
파일 구조:
{
"jobs": { "<jobid>": {"status": "RUN", "ts": 169...}, ... },
"jobs_meta": { "<jobid>": {"cmd": "...", "submitted_at": "..."}, ... },
"last_full_refresh_ts": 169...,
"meta": { "refreshed_at": "...", "reason": "ttl_or_miss" }
}
"""
_mem_jobs: dict[str, dict] = {}
_mem_last_ts: float = 0.0
_mem_lock = asyncio.Lock()
def __init__(self, cache_file: str):
self.cache_file = cache_file
ensure_dir(os.path.dirname(cache_file))
if not os.path.exists(cache_file):
with open(cache_file, "w") as f:
json.dump({"jobs": {}, "jobs_meta": {}, "last_full_refresh_ts": 0, "meta": {}}, f)
# file I/O helpers
def _read_file(self) -> dict:
try:
with open(self.cache_file, "r") as f:
return json.load(f)
except Exception:
return {"jobs": {}, "jobs_meta": {}, "last_full_refresh_ts": 0, "meta": {}}
def _write_file_and_update_mem(self, data: dict):
tmp = self.cache_file + ".tmp"
with open(tmp, "w") as f:
json.dump(data, f)
os.replace(tmp, self.cache_file)
type(self)._mem_jobs = data.get("jobs", {})
type(self)._mem_last_ts = float(data.get("last_full_refresh_ts", 0))
# public API
def seed_submitted(self, job_ids: list[str], meta: dict[str, dict] | None = None):
"""방금 제출한 잡을 SUBMITTED로 즉시 캐시에 반영(메모리+파일)."""
now = time.time()
# memory
self._mem_jobs = self._mem_jobs or {}
for jid in job_ids:
self._mem_jobs.setdefault(jid, {"status": "SUBMITTED", "ts": now})
# file
fh = _lock_fd(self.cache_file)
try:
data = self._read_file()
jobs = data.get("jobs", {})
for jid in job_ids:
jobs.setdefault(jid, {"status": "SUBMITTED", "ts": now})
data["jobs"] = jobs
if meta:
jm = data.get("jobs_meta", {})
for jid in job_ids:
if jid in meta:
jm[jid] = meta[jid]
data["jobs_meta"] = jm
self._write_file_and_update_mem(data)
finally:
_unlock_fd(fh)
async def _maybe_refresh(self, backend: LSFBackend, reason: str) -> bool:
"""TTL 또는 miss로 전체 스냅샷 갱신이 필요할 때, 한 프로세스만 non-blocking 락으로 실제 실행."""
now = time.time()
data = self._read_file()
last_ts = float(data.get("last_full_refresh_ts", 0))
if (now - last_ts) < MIN_REFRESH_GAP_S:
return False
lock_fh = _lock_fd(self.cache_file + ".refresh.lock", nonblock=True)
if lock_fh is None:
return False
try:
data2 = self._read_file()
last_ts2 = float(data2.get("last_full_refresh_ts", 0))
if (now - last_ts2) < MIN_REFRESH_GAP_S:
return False
snapshot = await backend.full_snapshot()
if not snapshot:
return False
now2 = time.time()
fh = _lock_fd(self.cache_file)
try:
base = self._read_file()
jobs = base.get("jobs", {})
for jid, st in snapshot.items():
jobs[jid] = {"status": st, "ts": now2}
base["jobs"] = jobs
base["last_full_refresh_ts"] = now2
meta = base.get("meta", {})
meta["refreshed_at"] = iso_now()
meta["reason"] = reason
base["meta"] = meta
self._write_file_and_update_mem(base)
finally:
_unlock_fd(fh)
return True
finally:
_unlock_fd(lock_fh)
async def get_statuses(self, job_ids: list[str], backend: LSFBackend) -> dict[str, str]:
"""2-계층 캐시 전략으로 요청 job_ids의 상태를 반환."""
now = time.time()
# 1) 메모리 패스트패스
async with self._mem_lock:
mem_is_fresh = (now - self._mem_last_ts) <= FULL_TTL_S
if mem_is_fresh:
jobs = self._mem_jobs
else:
data = self._read_file()
type(self)._mem_jobs = data.get("jobs", {})
type(self)._mem_last_ts = float(data.get("last_full_refresh_ts", 0))
jobs = self._mem_jobs
def fresh(jid: str) -> bool:
ent = jobs.get(jid)
if not ent:
return False
return (now - float(ent.get("ts", 0))) <= FULL_TTL_S
need_refresh = not mem_is_fresh
if FORCE_REFRESH_ON_MISS:
for jid in job_ids:
if not fresh(jid):
need_refresh = True
break
if need_refresh:
await self._maybe_refresh(backend, reason="ttl_or_miss")
jobs = self._mem_jobs # refresh 후 메모리에서 재조회
return {jid: jobs.get(jid, {}).get("status", "") for jid in job_ids}
# ---------------------------- Submission & waiting ----------------------------
async def submit_commands_multiline(commands: list[str], cwd: str):
"""
각 줄을 독립 bsub 명령으로 제출. 동시 제출 개수는 BATCH_SIZE로 제한.
Returns (job_ids, submissions_detail)
"""
sem = asyncio.Semaphore(max(1, int(BATCH_SIZE)))
submissions = []
async def _submit_one(cmd: str):
async with sem:
rc, out, err = await run_shell(cmd, cwd=cwd)
jids = extract_job_ids_from_bsub_output(out)
return {"cmd": cmd, "returncode": rc, "stdout": out, "stderr": err, "job_ids": jids}
tasks = [asyncio.create_task(_submit_one(c)) for c in commands]
for t in asyncio.as_completed(tasks):
submissions.append(await t)
job_ids = [jid for sub in submissions for jid in sub.get("job_ids", [])]
return job_ids, submissions
# ---------------------------- ComfyUI Node ----------------------------
class LSFSubmitAndWait:
"""
Async OUTPUT node:
- 멀티라인 bsub 제출 (배치 제한 병렬)
- bjobs -a 기반 전역 캐시 폴링 (다중 노드/프로세스 공유)
- 실행 중 String 위젯(status_md)에 JOB/STATUS/COMMAND 표를 지속 갱신
"""
CATEGORY = "LSF"
FUNCTION = "run"
OUTPUT_NODE = True
SUPPORTS_ASYNC = True
RETURN_TYPES = ("STRING", "STRING")
RETURN_NAMES = ("job_ids", "results_json")
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"commands": ("STRING", {"multiline": True, "default": "bsub -q short -J demo sleep 3"}),
"cwd": ("STRING", {"default": "."}),
},
"optional": {
# 표시 전용(값은 런타임에 서버가 갱신)
"status_md": ("STRING", {"multiline": True, "default":
"| JOB ID | STATUS | COMMAND |\n|------:|:------:|---------|\n(실행 중에 갱신됩니다)"}),
},
"hidden": {
# ComfyUI가 주입하는 유니크 노드 id. 위젯 갱신 타겟팅에 사용.
"node_id": "UNIQUE_ID"
}
}
async def run(self, commands: str, cwd: str, status_md: str = "", node_id: str = None):
# 0) 입력 커맨드 정리(빈 줄/주석 제거)
lines = []
for ln in (commands or "").splitlines():
s = ln.strip()
if not s:
continue
if s.startswith("#"): # 사용자가 넣은 일반 주석은 무시
continue
lines.append(ln)
# 1) 제출
submitted_at = iso_now()
job_ids, submissions = await submit_commands_multiline(lines, cwd=cwd)
# 2) job_id -> cmd 매핑 (UI/로그용)
jid_to_cmd = {}
for sub in submissions:
for jid in sub.get("job_ids", []):
jid_to_cmd[jid] = sub.get("cmd", "")
# 3) 전역 캐시 & 백엔드
cache_dir = ensure_dir(default_cache_dir())
cache_file = os.path.join(cache_dir, CACHE_FILENAME)
cache = LSFCache(cache_file)
backend = LSFBackend(FULL_STATUS_CMD)
# 3-1) 제출 직후 캐시 시드(코너 케이스 완화)
if job_ids:
meta = {jid: {"cmd": jid_to_cmd.get(jid, ""), "submitted_at": submitted_at} for jid in job_ids}
cache.seed_submitted(job_ids, meta=meta)
# 4) 초기 UI 업데이트 (SUBMITTED)
def push_ui(rows):
if node_id:
table = make_status_markdown(rows)
try:
PromptServer.instance.send_sync(
"impact-node-feedback",
{"node_id": node_id, "widget_name": "status_md", "value": table}
)
except Exception:
pass # UI 실패는 컨트롤 로직에 영향 없음
if job_ids:
init_rows = [{"job_id": jid, "status": "SUBMITTED", "cmd": jid_to_cmd.get(jid, "")} for jid in job_ids]
push_ui(init_rows)
# 5) 완료까지 폴링(주기적 UI 갱신)
start = time.time()
last_ui = 0.0
final_statuses = {jid: "SUBMITTED" for jid in job_ids}
while True:
statuses = await cache.get_statuses(job_ids, backend)
for jid in job_ids:
st = (statuses.get(jid) or final_statuses.get(jid) or "").upper()
final_statuses[jid] = st if st else final_statuses.get(jid, "")
now = time.time()
if (now - last_ui) >= UI_UPDATE_GAP_S:
rows = [{"job_id": jid, "status": final_statuses.get(jid, "") or "N/A",
"cmd": jid_to_cmd.get(jid, "")} for jid in job_ids]
push_ui(rows)
last_ui = now
if job_ids and all((final_statuses.get(jid, "") in TERMINAL_STATES) for jid in job_ids):
break
if (MAX_WAIT_S is not None) and ((now - start) >= MAX_WAIT_S):
break
await asyncio.sleep(max(1, int(POLL_INTERVAL_S)))
completed_at = iso_now()
duration_sec = round(time.time() - start, 2)
all_done = (job_ids and all((final_statuses.get(jid, "") in TERMINAL_STATES) for jid in job_ids))
# 마지막 UI 갱신(쓰로틀 없이)
if job_ids:
rows = [{"job_id": jid, "status": final_statuses.get(jid, "") or "N/A",
"cmd": jid_to_cmd.get(jid, "")} for jid in job_ids]
push_ui(rows)
# 6) 실행 결과 요약 저장
results_dir = ensure_dir(os.path.join(cache_dir, "runs"))
session_tag = datetime.utcnow().strftime("%Y%m%d_%H%M%S") + f"_{os.getpid()}"
results_path = os.path.join(results_dir, f"result_{session_tag}.json")
failed_submissions = [
{
"cmd": sub["cmd"],
"returncode": sub["returncode"],
"stderr": sub["stderr"],
"stdout": sub["stdout"],
}
for sub in submissions
if sub.get("returncode", 1) != 0 or not sub.get("job_ids")
]
payload = {
"submitted_at": submitted_at,
"completed_at": completed_at,
"duration_sec": duration_sec,
"job_ids": job_ids,
"final_status": final_statuses,
"all_done": bool(all_done),
"failed_submissions": failed_submissions,
"cwd": cwd,
"options": { # 참고용으로만 기록(런타임 제어 아님)
"BATCH_SIZE": BATCH_SIZE,
"POLL_INTERVAL_S": POLL_INTERVAL_S,
"FULL_TTL_S": FULL_TTL_S,
"MIN_REFRESH_GAP_S": MIN_REFRESH_GAP_S,
"FULL_STATUS_CMD": FULL_STATUS_CMD,
"MAX_WAIT_S": MAX_WAIT_S,
"UI_UPDATE_GAP_S": UI_UPDATE_GAP_S,
"CMD_WIDTH_LIMIT": CMD_WIDTH_LIMIT,
"FORCE_REFRESH_ON_MISS": FORCE_REFRESH_ON_MISS,
},
"results_file": results_path,
}
try:
with open(results_path, "w") as f:
json.dump(payload, f, indent=2)
except Exception:
pass
# 7) 노드 출력
job_ids_str = ",".join(job_ids)
results_json_str = json.dumps(payload, ensure_ascii=False)
return (job_ids_str, results_json_str)
# ---------------------------- Node registration ----------------------------
NODE_CLASS_MAPPINGS = {
"LSFSubmitAndWait": LSFSubmitAndWait,
}
NODE_DISPLAY_NAME_MAPPINGS = {
"LSFSubmitAndWait": "LSF: Submit & Wait",
}
다시 한 번 작성해줘. 1. 주석은 영어로 2. 서버 에러 등으로 bjobs가 실패했을 때 이를 어떻게 다루는지 궁금해. 웬만하면 안죽는 쪽으로 갔으면 좋겠는데
# File: custom_nodes/lsf_submit_and_wait.py
# Purpose: ComfyUI async OUTPUT node to submit LSF jobs (bsub), poll with `bjobs -a`,
# share a lightweight global cache, and live-update a String widget during runtime.
# Notes:
# - All tunables live as CONSTANTS below (no per-run options parsing).
# - Robust against transient server/cluster errors: timeouts, non-zero rc, and
# `bjobs` failures are treated as non-fatal. The node keeps running, uses last
# known statuses, and retries with exponential backoff.
#
# Frontend hook (place in a small web extension):
# api.addEventListener("impact-node-feedback", (event) => {
# const node = app.graph._nodes_by_id[event.detail.node_id];
# if (!node) return;
# const w = node.widgets?.find(w => w.name === event.detail.widget_name);
# if (!w) return;
# w.value = event.detail.value;
# if (w.callback) w.callback(w.value);
# node.setDirtyCanvas(true, true);
# });
import asyncio
import json
import os
import re
import tempfile
import time
from datetime import datetime
from server import PromptServer # ComfyUI runtime UI updates
# ---------------------------- CONSTANTS ----------------------------
# Submission / polling
BATCH_SIZE = 4 # concurrent bsub submissions
POLL_INTERVAL_S = 10 # polling interval (seconds)
MAX_WAIT_S = None # None = wait indefinitely
# Shell timeouts
SUBMIT_TIMEOUT_S = 60 # timeout for each bsub command
STATUS_TIMEOUT_S = 60 # timeout for each bjobs -a call
# Cache behavior
FULL_TTL_S = 10 # freshness window for the snapshot (seconds)
MIN_REFRESH_GAP_S = 5 # min gap between two `bjobs -a` runs (seconds)
FORCE_REFRESH_ON_MISS = True # refresh when a requested job_id is missing/stale
# `bjobs -a` template (expects two columns: jobid, stat)
FULL_STATUS_CMD = 'bjobs -a -noheader -o "jobid stat"'
# Live UI feedback (String widget)
UI_UPDATE_GAP_S = 1.0 # throttle for widget updates (seconds)
CMD_WIDTH_LIMIT = 100 # truncate command string in table
# Error/backoff for `bjobs -a` failures (non-fatal)
BJOBS_BACKOFF_BASE_S = 15 # initial backoff after a failed snapshot attempt
BJOBS_BACKOFF_MAX_S = 120 # maximum backoff cap (exponential up to this)
# Terminal states (adjust if needed)
TERMINAL_STATES = {"DONE", "EXIT"}
# Cache file name under /tmp or $COMFYUI_LSF_CACHE_DIR
CACHE_FILENAME = "status_full_cache.json"
# ---------------------------- Utilities ----------------------------
JOB_ID_REGEX = re.compile(r"Job\s*<(\d+)>", re.IGNORECASE)
def iso_now() -> str:
return datetime.utcnow().isoformat(timespec="seconds") + "Z"
def default_cache_dir() -> str:
return os.environ.get(
"COMFYUI_LSF_CACHE_DIR",
os.path.join(tempfile.gettempdir(), "comfyui_lsf_cache")
)
def ensure_dir(path: str) -> str:
os.makedirs(path, exist_ok=True)
return path
def _short(s: str, limit: int) -> str:
s = (s or "").strip()
return s if len(s) <= limit else (s[: max(0, limit - 1)] + "…")
async def run_shell(cmd: str, cwd: str | None = None, timeout: float | None = None):
"""
Run shell command with optional timeout. Never raises to caller:
returns (rc, stdout, stderr). On exception/timeout: non-zero rc and stderr message.
"""
try:
proc = await asyncio.create_subprocess_shell(
cmd,
cwd=cwd if cwd else None,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
out_b, err_b = await asyncio.wait_for(proc.communicate(), timeout=timeout)
return proc.returncode, out_b.decode(errors="ignore"), err_b.decode(errors="ignore")
except asyncio.TimeoutError:
try:
proc.kill()
except Exception:
pass
try:
await proc.wait()
except Exception:
pass
return 124, "", f"Timeout after {timeout}s for: {cmd}"
except Exception as e:
return -1, "", f"Exception executing shell: {e}"
def extract_job_ids_from_bsub_output(stdout: str):
# LSF standard: "Job <12345> is submitted to queue <...>."
return JOB_ID_REGEX.findall(stdout or "")
def make_status_markdown(job_rows: list[dict], width_limit: int = CMD_WIDTH_LIMIT) -> str:
"""
Build a compact, markdown-like table: JOB ID | STATUS | COMMAND
(String widget is plain text; the table is just for readability.)
"""
lines = []
lines.append("| JOB ID | STATUS | COMMAND |")
lines.append("|------:|:------:|---------|")
for r in job_rows:
jid = str(r.get("job_id", ""))
st = str(r.get("status", ""))
cmd = _short(str(r.get("cmd", "")), width_limit)
lines.append(f"| `{jid}` | `{st}` | `{cmd}` |")
return "\n".join(lines)
# ---------------------------- File lock (best-effort) ----------------------------
try:
import fcntl # type: ignore
except Exception:
fcntl = None # On Windows, locks are skipped (accepting race risk)
def _lock_fd(path: str, nonblock: bool = False):
fh = open(path, "a+")
if fcntl:
try:
fcntl.flock(fh, fcntl.LOCK_EX | (fcntl.LOCK_NB if nonblock else 0))
except BlockingIOError:
fh.close()
return None
return fh
def _unlock_fd(fh):
if not fh:
return
if fcntl:
try:
fcntl.flock(fh, fcntl.LOCK_UN)
except Exception:
pass
try:
fh.close()
except Exception:
pass
# ---------------------------- LSF backend ----------------------------
class LSFBackend:
"""
Minimal backend: fetch a full snapshot of job statuses via `bjobs -a`.
Returns (snapshot_dict, ok_bool). `ok_bool` is False on rc!=0 or timeout/exception.
"""
def __init__(self, full_status_cmd: str = FULL_STATUS_CMD):
self.full_status_cmd = full_status_cmd
async def full_snapshot(self) -> tuple[dict[str, str], bool]:
rc, out, err = await run_shell(self.full_status_cmd, cwd=None, timeout=STATUS_TIMEOUT_S)
if rc != 0:
# Non-fatal: caller will backoff and keep last known cache
return {}, False
result = {}
for ln in (out or "").splitlines():
ln = ln.strip()
if not ln:
continue
parts = ln.split()
if not parts:
continue
jid = parts[0]
st = parts[-1].upper()
result[jid] = st
return result, True
# ---------------------------- Lightweight 2-tier cache ----------------------------
class LSFCache:
"""
- In-process memory snapshot (fast-path, avoids most file I/O)
- Cross-process JSON file snapshot (source of truth)
- TTL + min refresh gap + refresh lock + error backoff to prevent `bjobs -a` storms
File structure:
{
"jobs": { "<jobid>": {"status": "RUN", "ts": 169...}, ... },
"jobs_meta": { "<jobid>": {"cmd": "...", "submitted_at": "..."}, ... },
"last_full_refresh_ts": 169...,
"meta": { "refreshed_at": "...", "reason": "ttl_or_miss", "last_error": {...} }
}
Error/backoff:
- Consecutive snapshot failures increase an exponential backoff window
capped at BJOBS_BACKOFF_MAX_S. During backoff, refresh attempts are skipped.
"""
_mem_jobs: dict[str, dict] = {}
_mem_last_ts: float = 0.0
_mem_lock = asyncio.Lock()
_error_count: int = 0
_last_error_ts: float = 0.0
def __init__(self, cache_file: str):
self.cache_file = cache_file
ensure_dir(os.path.dirname(cache_file))
if not os.path.exists(cache_file):
with open(cache_file, "w") as f:
json.dump({"jobs": {}, "jobs_meta": {}, "last_full_refresh_ts": 0, "meta": {}}, f)
# file I/O helpers
def _read_file(self) -> dict:
try:
with open(self.cache_file, "r") as f:
return json.load(f)
except Exception:
return {"jobs": {}, "jobs_meta": {}, "last_full_refresh_ts": 0, "meta": {}}
def _write_file_and_update_mem(self, data: dict):
tmp = self.cache_file + ".tmp"
with open(tmp, "w") as f:
json.dump(data, f)
os.replace(tmp, self.cache_file)
type(self)._mem_jobs = data.get("jobs", {})
type(self)._mem_last_ts = float(data.get("last_full_refresh_ts", 0))
# public API
def seed_submitted(self, job_ids: list[str], meta: dict[str, dict] | None = None):
"""Make freshly submitted jobs visible as SUBMITTED in both memory and file."""
now = time.time()
# memory
self._mem_jobs = self._mem_jobs or {}
for jid in job_ids:
self._mem_jobs.setdefault(jid, {"status": "SUBMITTED", "ts": now})
# file
fh = _lock_fd(self.cache_file)
try:
data = self._read_file()
jobs = data.get("jobs", {})
for jid in job_ids:
jobs.setdefault(jid, {"status": "SUBMITTED", "ts": now})
data["jobs"] = jobs
if meta:
jm = data.get("jobs_meta", {})
for jid in job_ids:
if jid in meta:
jm[jid] = meta[jid]
data["jobs_meta"] = jm
self._write_file_and_update_mem(data)
finally:
_unlock_fd(fh)
def _in_backoff_window(self) -> bool:
if self._error_count <= 0:
return False
elapsed = time.time() - self._last_error_ts
backoff = min(BJOBS_BACKOFF_MAX_S, BJOBS_BACKOFF_BASE_S * (2 ** max(0, self._error_count - 1)))
return elapsed < backoff
async def _maybe_refresh(self, backend: LSFBackend, reason: str) -> bool:
"""
Decide and run a full-snapshot refresh iff:
- Not in error backoff window;
- Min refresh gap is satisfied;
- We acquire the non-blocking refresh lock;
On failure: do not raise, record error (for backoff), keep old cache.
"""
now = time.time()
if self._in_backoff_window():
return False
data = self._read_file()
last_ts = float(data.get("last_full_refresh_ts", 0))
if (now - last_ts) < MIN_REFRESH_GAP_S:
return False
lock_fh = _lock_fd(self.cache_file + ".refresh.lock", nonblock=True)
if lock_fh is None:
return False
try:
# Double-check the min gap after acquiring the lock
data2 = self._read_file()
last_ts2 = float(data2.get("last_full_refresh_ts", 0))
if (now - last_ts2) < MIN_REFRESH_GAP_S:
return False
snapshot, ok = await backend.full_snapshot()
if not ok:
# Record error/backoff and a small meta for debugging
type(self)._error_count += 1
type(self)._last_error_ts = time.time()
fh = _lock_fd(self.cache_file)
try:
base = self._read_file()
meta = base.get("meta", {})
meta["last_error"] = {
"at": iso_now(),
"error_count": self._error_count,
"note": "bjobs snapshot failed; using last known cache"
}
base["meta"] = meta
# Do not change jobs / last_full_refresh_ts
self._write_file_and_update_mem(base)
finally:
_unlock_fd(fh)
return False
# Success: reset backoff
type(self)._error_count = 0
type(self)._last_error_ts = 0.0
now2 = time.time()
fh = _lock_fd(self.cache_file)
try:
base = self._read_file()
jobs = base.get("jobs", {})
for jid, st in snapshot.items():
jobs[jid] = {"status": st, "ts": now2}
base["jobs"] = jobs
base["last_full_refresh_ts"] = now2
meta = base.get("meta", {})
meta["refreshed_at"] = iso_now()
meta["reason"] = reason
base["meta"] = meta
self._write_file_and_update_mem(base)
finally:
_unlock_fd(fh)
return True
finally:
_unlock_fd(lock_fh)
async def get_statuses(self, job_ids: list[str], backend: LSFBackend) -> dict[str, str]:
"""
2-tier retrieval:
1) If memory snapshot is fresh -> use it (no file I/O).
2) Else read file once and update memory.
3) If TTL expired or miss (and FORCE_REFRESH_ON_MISS), try to refresh (subject to backoff & min-gap).
Never raises; returns last known statuses. Missing ones return "".
"""
now = time.time()
# 1) memory fast-path
async with self._mem_lock:
mem_is_fresh = (now - self._mem_last_ts) <= FULL_TTL_S
if mem_is_fresh:
jobs = self._mem_jobs
else:
data = self._read_file()
type(self)._mem_jobs = data.get("jobs", {})
type(self)._mem_last_ts = float(data.get("last_full_refresh_ts", 0))
jobs = self._mem_jobs
def fresh(jid: str) -> bool:
ent = jobs.get(jid)
if not ent:
return False
return (now - float(ent.get("ts", 0))) <= FULL_TTL_S
need_refresh = not mem_is_fresh
if FORCE_REFRESH_ON_MISS:
for jid in job_ids:
if not fresh(jid):
need_refresh = True
break
if need_refresh:
await self._maybe_refresh(backend, reason="ttl_or_miss")
# Read from memory snapshot again (already updated if refresh succeeded)
jobs = self._mem_jobs
return {jid: jobs.get(jid, {}).get("status", "") for jid in job_ids}
# ---------------------------- Submission helpers ----------------------------
async def submit_commands_multiline(commands: list[str], cwd: str):
"""
Submit each non-empty line as an independent `bsub ...` command.
Concurrency is limited by BATCH_SIZE. Never raises; collects failures.
Returns (job_ids, submissions_detail).
"""
sem = asyncio.Semaphore(max(1, int(BATCH_SIZE)))
submissions = []
async def _submit_one(cmd: str):
async with sem:
rc, out, err = await run_shell(cmd, cwd=cwd, timeout=SUBMIT_TIMEOUT_S)
jids = extract_job_ids_from_bsub_output(out)
return {"cmd": cmd, "returncode": rc, "stdout": out, "stderr": err, "job_ids": jids}
tasks = [asyncio.create_task(_submit_one(c)) for c in commands]
for t in asyncio.as_completed(tasks):
submissions.append(await t)
job_ids = [jid for sub in submissions for jid in sub.get("job_ids", [])]
return job_ids, submissions
# ---------------------------- ComfyUI Node ----------------------------
class LSFSubmitAndWait:
"""
Async OUTPUT node:
- Submits multi-line bsub commands (batch-limited parallelism).
- Polls statuses via a shared `bjobs -a` snapshot cache across nodes/processes.
- Live-updates a String widget ("status_md") with JOB/STATUS/COMMAND table.
- Resilient to server errors: timeouts, rc!=0, and `bjobs` failures are non-fatal.
The node keeps last known statuses and retries with backoff.
"""
CATEGORY = "LSF"
FUNCTION = "run"
OUTPUT_NODE = True
SUPPORTS_ASYNC = True
RETURN_TYPES = ("STRING", "STRING")
RETURN_NAMES = ("job_ids", "results_json")
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"commands": ("STRING", {"multiline": True, "default": "bsub -q short -J demo sleep 3"}),
"cwd": ("STRING", {"default": "."}),
},
"optional": {
# Display-only (the server updates this at runtime)
"status_md": ("STRING", {"multiline": True, "default":
"| JOB ID | STATUS | COMMAND |\n|------:|:------:|---------|\n(Updating during run...)"}),
},
"hidden": {
# Injected by ComfyUI; used to target the widget updates
"node_id": "UNIQUE_ID"
}
}
async def run(self, commands: str, cwd: str, status_md: str = "", node_id: str = None):
# 0) sanitize input lines (skip empty / comment-only)
lines = []
for ln in (commands or "").splitlines():
s = ln.strip()
if not s:
continue
if s.startswith("#"):
continue
lines.append(ln)
# 1) submit
submitted_at = iso_now()
job_ids, submissions = await submit_commands_multiline(lines, cwd=cwd)
# 2) job_id -> command map (for UI/log)
jid_to_cmd = {}
for sub in submissions:
for jid in sub.get("job_ids", []):
jid_to_cmd[jid] = sub.get("cmd", "")
# 3) shared cache & backend
cache_dir = ensure_dir(default_cache_dir())
cache_file = os.path.join(cache_dir, CACHE_FILENAME)
cache = LSFCache(cache_file)
backend = LSFBackend(FULL_STATUS_CMD)
# 3-1) seed freshly submitted jobs to handle "just-submitted not in cache yet"
if job_ids:
meta = {jid: {"cmd": jid_to_cmd.get(jid, ""), "submitted_at": submitted_at} for jid in job_ids}
cache.seed_submitted(job_ids, meta=meta)
# 4) initial UI update (SUBMITTED)
def push_ui(rows):
if not node_id:
return
table = make_status_markdown(rows)
try:
PromptServer.instance.send_sync(
"impact-node-feedback",
{"node_id": node_id, "widget_name": "status_md", "value": table}
)
except Exception:
# UI push failures are non-fatal; continue control flow
pass
if job_ids:
init_rows = [{"job_id": jid, "status": "SUBMITTED", "cmd": jid_to_cmd.get(jid, "")} for jid in job_ids]
push_ui(init_rows)
# 5) poll loop until all terminal or timeout
start = time.time()
last_ui = 0.0
final_statuses = {jid: "SUBMITTED" for jid in job_ids}
while True:
statuses = await cache.get_statuses(job_ids, backend)
for jid in job_ids:
st = (statuses.get(jid) or final_statuses.get(jid) or "").upper()
final_statuses[jid] = st if st else final_statuses.get(jid, "")
now = time.time()
if (now - last_ui) >= UI_UPDATE_GAP_S:
rows = [{"job_id": jid, "status": final_statuses.get(jid, "") or "N/A",
"cmd": jid_to_cmd.get(jid, "")} for jid in job_ids]
push_ui(rows)
last_ui = now
if job_ids and all((final_statuses.get(jid, "") in TERMINAL_STATES) for jid in job_ids):
break
if (MAX_WAIT_S is not None) and ((now - start) >= MAX_WAIT_S):
break
await asyncio.sleep(max(1, int(POLL_INTERVAL_S)))
completed_at = iso_now()
duration_sec = round(time.time() - start, 2)
all_done = (job_ids and all((final_statuses.get(jid, "") in TERMINAL_STATES) for jid in job_ids))
# final UI update (unthrottled)
if job_ids:
rows = [{"job_id": jid, "status": final_statuses.get(jid, "") or "N/A",
"cmd": jid_to_cmd.get(jid, "")} for jid in job_ids]
push_ui(rows)
# 6) persist a concise run log
results_dir = ensure_dir(os.path.join(cache_dir, "runs"))
session_tag = datetime.utcnow().strftime("%Y%m%d_%H%M%S") + f"_{os.getpid()}"
results_path = os.path.join(results_dir, f"result_{session_tag}.json")
failed_submissions = [
{
"cmd": sub["cmd"],
"returncode": sub["returncode"],
"stderr": sub["stderr"],
"stdout": sub["stdout"],
}
for sub in submissions
if sub.get("returncode", 1) != 0 or not sub.get("job_ids")
]
payload = {
"submitted_at": submitted_at,
"completed_at": completed_at,
"duration_sec": duration_sec,
"job_ids": job_ids,
"final_status": final_statuses,
"all_done": bool(all_done),
"failed_submissions": failed_submissions,
"cwd": cwd,
"options": { # for record only (not runtime-tunable)
"BATCH_SIZE": BATCH_SIZE,
"POLL_INTERVAL_S": POLL_INTERVAL_S,
"FULL_TTL_S": FULL_TTL_S,
"MIN_REFRESH_GAP_S": MIN_REFRESH_GAP_S,
"FULL_STATUS_CMD": FULL_STATUS_CMD,
"MAX_WAIT_S": MAX_WAIT_S,
"UI_UPDATE_GAP_S": UI_UPDATE_GAP_S,
"CMD_WIDTH_LIMIT": CMD_WIDTH_LIMIT,
"FORCE_REFRESH_ON_MISS": FORCE_REFRESH_ON_MISS,
"SUBMIT_TIMEOUT_S": SUBMIT_TIMEOUT_S,
"STATUS_TIMEOUT_S": STATUS_TIMEOUT_S,
"BJOBS_BACKOFF_BASE_S": BJOBS_BACKOFF_BASE_S,
"BJOBS_BACKOFF_MAX_S": BJOBS_BACKOFF_MAX_S,
},
"results_file": results_path,
}
try:
with open(results_path, "w") as f:
json.dump(payload, f, indent=2)
except Exception:
pass
# 7) outputs
job_ids_str = ",".join(job_ids)
results_json_str = json.dumps(payload, ensure_ascii=False)
return (job_ids_str, results_json_str)
# ---------------------------- Node registration ----------------------------
NODE_CLASS_MAPPINGS = {
"LSFSubmitAndWait": LSFSubmitAndWait,
}
NODE_DISPLAY_NAME_MAPPINGS = {
"LSFSubmitAndWait": "LSF: Submit & Wait",
}
이제 코드는 건드리지 말고 전체적인 동작을 한글로 설명해줘
알겠어! 아래는 지금 제공된 파이썬 코드 그대로 를 기준으로, 노드의 전체 동작을 “처음부터 끝까지” 한글로 정리한 설명이야. (코드는 건드리지 않음)
무엇을 하는 노드인가?
-
ComfyUI에서 멀티라인
bsub명령을 비동기 제출 하고,bjobs -a로 상태를 지속 폴링 하여 완료까지 기다리는 OUTPUT 노드 예요. -
실행 중, 같은 노드의 String 위젯(status_md)에 **“JOB ID STATUS COMMAND”** 표 형태의 텍스트를 실시간 갱신 해 보여줘요. - 여러 노드/프로세스가 동시에 돌아도 전역 캐시 를 공유하여
bjobs -a호출을 최소화하고, 서버 오류가 나도 죽지 않고 안전하게 계속 동작해요.
구성 요소(큰 그림)
-
상수(CONSTANTS)
- 배치 제출 개수, 폴링 주기, TTL, 최소 재호출 간격, UI 갱신 쓰로틀,
bjobs -a템플릿, 타임아웃/백오프 등 모든 동작 파라미터는 파일 상단의 상수로 고정돼요.
- 배치 제출 개수, 폴링 주기, TTL, 최소 재호출 간격, UI 갱신 쓰로틀,
-
백엔드(LSFBackend)
-
bjobs -a -noheader -o "jobid stat"를 한 번 호출해 클러스터 전체 잡 상태 스냅샷 을 딕셔너리로 만들어 반환해요. -
실패(비정상 종료/타임아웃/예외) 시 오류로 보고하지만 예외를 올리지 않음(상위에서 백오프/재시도).
-
-
전역 캐시(LSFCache, 2계층)
-
프로세스 내 메모리 스냅샷(초고속, 대부분의 조회는 파일 I/O 없이 해결)
-
프로세스 간 공유 파일(JSON) 스냅샷(소스 오브 트루스)
-
TTL + 최소 재호출 간격 + 비차단 락 파일 로
bjobs -a폭주 방지. -
백오프(지수) :
bjobs -a연속 실패 시, 일정 시간 동안 새 스냅샷 시도를 건너뛰고 기존 캐시로 계속 동작. -
제출 직후 시딩(seeding) : 방금 받은 Job ID들을 캐시에 SUBMITTED 상태로 즉시 저장 → 다른 노드에서도 곧바로 “이 잡이 존재한다”는 사실을 알 수 있어요.
-
파일에는
jobs(상태·타임스탬프),jobs_meta(커맨드/제출시각),last_full_refresh_ts,meta(리프레시 사유/오류메모) 등을 보관.
-
-
제출 도우미
-
입력의 각 줄을 독립
bsub로 간주해 비동기 제출 , 동시 제출 개수는 세마포어 로 제한. -
표준 출력에서
"Job <12345>"패턴으로 Job ID 추출.
-
-
ComfyUI 노드 본체
-
SUPPORTS_ASYNC=True비동기 실행. -
입력:
commands(멀티라인),cwd, 표시용status_md(런타임 갱신). -
출력:
job_ids(콤마 구분),results_json(요약 JSON). -
실행 중
PromptServer.instance.send_sync("impact-node-feedback", ...)으로 status_md 위젯을 주기적으로 업데이트.
-
동작 흐름(step-by-step)
-
입력 정리
commands에서 빈 줄/주석 줄 을 제거해 실제 제출할bsub라인들만 추립니다.
-
잡 제출(비동기 & 배치 제한)
-
각 라인을
bsub로 병렬 제출(최대 동시 제출 개수 = 상수BATCH_SIZE). -
제출 타임아웃(
SUBMIT_TIMEOUT_S)을 적용. -
제출 stdout에서 Job ID 들을 수집.
-
-
캐시 초기화 & 시딩
-
전역 캐시 파일 경로(
/tmp또는COMFYUI_LSF_CACHE_DIR)를 준비. -
방금 제출한 Job ID들을 메모리/파일 캐시 모두에
SUBMITTED로 기록(시딩).- 이로써 Node2가 Node1 직후에 폴링하더라도 해당 Job ID가 캐시에 나타나, “없는 잡”으로 오판하지 않아요.
-
-
초기 UI 업데이트
-
status_md위젯에 SUBMITTED 상태 표 를 1회 반영. -
UI 전송 실패는 무시(컨트롤 플로우에 영향 없음).
-
-
폴링 루프(완료 또는 타임아웃까지)
-
주기(
POLL_INTERVAL_S)마다 캐시에서 상태 조회:-
메모리 스냅샷이 신선(TTL 이내) → 파일 읽기 없이 바로 사용.
-
아니면 파일을 1회 읽어 메모리 동기화.
-
-
리프레시 판단 :
-
TTL 만료이거나, 요청 Job ID가 캐시에 없거나 오래됨(옵션
FORCE_REFRESH_ON_MISS) → 전체 스냅샷 리프레시 필요. -
단, 최소 간격(
MIN_REFRESH_GAP_S)을 지키며, refresh 락 을 선점한 한 프로세스만bjobs -a를 실행. -
bjobs -a가 실패하면:-
예외 없이 실패로 기록하고, 연속 실패 횟수 증가 → 지수 백오프(최대
BJOBS_BACKOFF_MAX_S) 진입. -
백오프 동안은 리프레시를 스킵하고 마지막 캐시값 으로 계속 운영.
-
성공 시에는 실패 카운터 초기화 , 새 스냅샷으로 캐시 갱신.
-
-
-
UI 실시간 갱신 :
-
마지막 전송 시각과 비교해 쓰로틀(
UI_UPDATE_GAP_S)을 만족할 때 테이블을 재전송. -
UI 전송 실패는 무시하고 다음 루프 진행.
-
-
종료 조건 :
-
모든 Job ID가 터미널 상태(DONE/EXIT).
-
또는 전체 대기 타임아웃(
MAX_WAIT_S가 None이 아니고 경과).
-
-
-
최종 UI 업데이트 & 정리
-
마지막 상태를 UI에 1회 더 반영(쓰로틀 없이).
-
실행 요약(JSON)을
runs/디렉토리에 파일로 저장(제출·완료 시각, 소요 시간, 최종 상태, 실패 제출, 사용한 상수 기록 등). -
노드 출력으로
job_ids(문자열),results_json(요약 JSON 문자열)을 반환.
-
장애/에러 대응 전략(안 죽는 방향)
-
bsub제출 실패/타임아웃:-
해당 라인만 실패로 기록(
failed_submissions에 보존)하고 나머지는 계속 진행. -
전체 노드는 그대로 동작해 제출된 잡들에 대해 폴링을 계속함.
-
-
bjobs -a실패/타임아웃/예외:-
예외를 던지지 않고 실패로 간주 → 연속 실패 카운터 증가 & 백오프 진입.
-
백오프 동안은 마지막 스냅샷(캐시)를 사용해 폴링 루프를 유지(즉, 노드가 죽지 않음).
-
최소 재호출 간격 과 비차단 락 으로 동시 다발
bjobs -a발생을 억제.
-
-
UI 업데이트 실패 :
- 무시 하고 다음 루프 진행(잡 컨트롤과 무관한 부가 기능이므로).
-
파일 락 미지원(Windows 등) :
- 락 없이도 동작하지만 경쟁 가능성 증가 는 감수. (POSIX 환경이면 fcntl 락이 적용됨)
동시성/성능 포인트
-
2계층 캐시 로 대부분의 조회를 메모리에서 처리 → 파일 I/O 최소화.
-
TTL + 최소 간격 + 락 으로
bjobs -a호출을 최소화. -
지수 백오프 로 장애 시에도 서버/스케줄러에 부담을 주지 않음.
-
배치 제한 제출 로 큐/로그인 노드 부담 분산.
-
UI 쓰로틀 로 프론트 과도한 리렌더 방지.
실사용 팁
-
프론트엔드에는 간단한 커스텀 이벤트 리스너(“impact-node-feedback”)를 넣어야
status_md위젯이 런타임에 갱신돼요. -
상수는 환경/정책에 맞게 파일 상단에서만 조정하면 됩니다. (예: 대형 클러스터라면
MIN_REFRESH_GAP_S/FULL_TTL_S를 늘려bjobs부하를 더 낮출 수 있음) -
캐시 위치는 기본
/tmp/comfyui_lsf_cache이며, 필요하면COMFYUI_LSF_CACHE_DIR로 바꿀 수 있어요. -
results_json파일 로그(runs/)로 사후 분석/감사 가 가능.
필요하면 상수들의 추천값(작업 규모/큐 정책별 프로파일)도 정리해 줄게.