IBM LSF Python 패키지 개발
IBM LSF 알아?
네! IBM Spectrum LSF(옛 Platform LSF)는 HPC/대규모 서버 팜에서 배치 작업을 스케줄링·큐잉·자원관리하는 상용 워크로드 매니저예요. 간단히 핵심만 단계별로 정리해볼게요.
1) 뭐 하는 툴?
-
많은 사용자/작업을 큐(Queue)에 넣고, 클러스터의 CPU·메모리·GPU·라이선스 같은 자원을 규칙에 맞춰 자동 분배합니다.
-
우선순위, 공정 배분(fairshare), 선점/백필(backfill), 예약(SLA/예약슬롯), 다중 클러스터 등 정책을 세밀하게 걸 수 있어요.
2) 구성 요소(아주 요약)
-
mbatchd / mbschd : 마스터 배치·스케줄러 데몬(작업 결정 뇌).
-
sbatchd(각 워커 호스트): 작업 실행/모니터링.
-
lim / res : 호스트 부하정보 수집과 원격 실행.
-
관리/사용자는 bsub, bjobs, bqueues 같은 CLI로 상호작용.
3) 기본 사용 흐름
-
사용자가 큐/요구자원과 함께 작업 제출 →
bsub ... -
LSF가 정책·가용자원 평가 → 적합한 노드에 할당
-
실행·모니터링 → 완료 후 로그/결과 수집
4) 자주 쓰는 명령(필수 최소셋)
# 작업 제출
bsub -q normal -J myjob -n 4 -R "rusage[mem=8G]" -oo out.%J -eo err.%J ./run.sh
# 상태/출력 확인
bjobs # 내 잡 요약
bjobs -l <JOBID> # 상세
bpeek <JOBID> # 실행 중 표준출력 엿보기(설정에 따라 지원)
# 큐/호스트 보기
bqueues -l # 큐 상세 정책
bhosts # 호스트 상태
# 제어
bstop <JOBID> # 일시중지
bresume <JOBID> # 재개
bkill <JOBID> # 종료
bhist -l <JOBID> # 히스토리
5) 제출 스크립트 예시(가장 단순한 형태)
# run.sh
#!/usr/bin/env bash
echo "Host: $(hostname)"
echo "Start: $(date)"
python train.py --epochs 10
# 제출
bsub -q normal -J train -n 4 -R "rusage[mem=8G]" -oo train.%J.out ./run.sh
6) 정책/자원 요청 팁
-
CPU 코어 수 :
-n 8 -
메모리 :
-R "rusage[mem=16G]"(클러스터 정책과 단위 확인 필수) -
노드 스프레드 :
-R "span[ptile=4]"(호스트당 코어 4개씩 배치) -
GPU : 클러스터 설정에 따라 전용 옵션/리소스명이 다릅니다. 보통 전용 큐를 쓰거나
-R/-gpu(환경에 따라 지원)로 요청합니다. 운영팀이 정한 표준을 확인하세요.
7) 장단점(한눈에)
-
장점 : 성숙한 정책/엔터프라이즈 지원, 이기종·대규모 클러스터 운영 경험 풍부, 모니터링/리포팅(LSF RTM 등) 생태계.
-
단점 : 상용 라이선스 비용, 설정 초기 러닝커브, 커스텀 정책 이해 필요.
-
대안 : Slurm(오픈소스), PBS Pro/Altair PBS, Grid Engine 계열 등.
원하시면,
-
현재 환경(큐 이름, 자원 규칙, GPU 요청 방식)에 맞춘 제출 템플릿 을 바로 만들어 드릴게요,
-
또는 기존 Slurm/PBS 스크립트를 LSF로 자동 변환 해서 드릴 수도 있어요. 어떤 워크로드를 돌리실 계획인가요?
나는 LSF에 여러 Job을 Submit하고 종료시키기/완료 여부 파악 등등 자동화 업무를 하고 있어.(파이썬) 그런데 "bsub", "bkill", "bjobs" 같은 커맨드를 subprocess로 매번 call하니 관리가 힘들고 느리기도 해(subprocess가 끝나서 job ID를 받아와야 하므로) bsub, bkill 시에는 async 기능을 활용하고, bjobs는 memory/storage cache를 통해 너무 자주 프로세스를 호출하지 않았으면 좋겠어(LSF 시스템 부하 줄이기 위해).
좋아요! 요구사항(비동기 제출/종료 + bjobs 호출 캐싱/감속)을 만족하는 간단하지만 탄탄한 아키텍처 + 참조 구현 을 바로 드릴게요.
아래 순서대로 보시면 됩니다.
0) 목표 (Step-by-step)
-
bsub,bkill은asyncio기반으로 동시에 많이 돌릴 수 있게. -
bjobs는 TTL 캐시 + 단일비행(single-flight) + 레이트리밋 으로 과호출 방지. -
가능한 한 배치(batch) 로 묶어서 호출수 최소화.
-
출력 파싱은 locale 고정(LC_ALL=C) + 안전한 정규식/필드셋 만 사용.
1) 설계 핵심
-
동시성 제어 : 모든 LSF CLI 호출에 공통 세마포어(예: 동시 8개) + 명령별 레이트리밋(예:
bjobs는 초당 1회). -
비동기 제출/종료 :
asyncio.create_subprocess_exec로bsub,bkill를 비동기 실행. -
bjobs 캐시 :
-
TTL 캐시(예: 3~5초): TTL 내 재요청은 메모리에서 즉시 응답.
-
단일비행 : 같은 키(동일 Job ID 세트/필터)의
bjobs요청이 동시 들어오면 이미 진행 중인 한 번만 실제 실행, 나머지는 그 Future를 await. -
레이트리밋 : 최소 호출 간격 보장(예: 1~2초).
-
배치 쿼리 : 여러 Job ID 요청을 합집합 으로 묶어 한 번에 조회 → 결과를 per-job로 분산.
-
참고: LSF는
bjobs의 출력 포맷을-o로 제어할 수 있지만, 날짜 필드는 공백이 있어 파싱이 까다롭습니다. 여기서는 필요 최소 필드(공백 없는 것들)만 요청합니다:jobid stat exit_code queue exec_host
(시간이 필요하면 별도 포맷/구분 기호를 추가해 확장 가능)
2) 참조 구현 (단일 파일; 바로 붙여넣어 사용 가능)
아래는 의존성 없는 순수 Python 3.10+ 코드입니다.
# lsf_async.py
from __future__ import annotations
import asyncio
import os
import re
import time
from typing import Iterable, Sequence, Dict, Any, Optional, Tuple
_JOBID_RE = re.compile(r"Job\s*<(\d+)>", re.IGNORECASE)
class LSFCommandError(RuntimeError):
def __init__(self, cmd: Sequence[str], returncode: int, stdout: str, stderr: str):
super().__init__(f"Command failed: {' '.join(cmd)} (rc={returncode})\n{stderr.strip()}")
self.cmd = cmd
self.returncode = returncode
self.stdout = stdout
self.stderr = stderr
async def _run(cmd: Sequence[str],
*,
env: Optional[Dict[str,str]] = None,
timeout: Optional[float] = None) -> Tuple[str, str, int]:
"""Run a command asynchronously and return (stdout, stderr, returncode)."""
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
)
try:
out_b, err_b = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
with contextlib.suppress(ProcessLookupError):
proc.kill()
raise
return out_b.decode("utf-8", "replace"), err_b.decode("utf-8", "replace"), proc.returncode
class LsfClient:
"""
Async LSF CLI wrapper with:
- concurrent bsub/bkill
- cached/rate-limited bjobs
"""
def __init__(self,
*,
bsub: str = "bsub",
bkill: str = "bkill",
bjobs: str = "bjobs",
max_concurrent_cli: int = 8,
bjobs_ttl_sec: float = 3.0,
bjobs_min_interval_sec: float = 1.0,
env: Optional[Dict[str,str]] = None):
self.paths = {"bsub": bsub, "bkill": bkill, "bjobs": bjobs}
self._cli_sem = asyncio.Semaphore(max_concurrent_cli)
# Cache & in-flight tracking for bjobs
self._bjobs_ttl = bjobs_ttl_sec
self._bjobs_min_interval = bjobs_min_interval_sec
self._bjobs_cache: Dict[str, Tuple[float, Any]] = {}
self._bjobs_inflight: Dict[str, asyncio.Future] = {}
self._bjobs_last_call_ts: float = 0.0
base_env = dict(os.environ)
base_env.setdefault("LC_ALL", "C")
base_env.setdefault("LANG", "C")
self.env = base_env if env is None else (base_env | env)
# ---------- helpers ----------
def _key_for_bjobs(self, job_ids: Optional[Iterable[int]], extra_flags: str) -> str:
ids = tuple(sorted(set(int(x) for x in job_ids))) if job_ids else ()
return f"bjobs|{extra_flags}|{ids}"
async def _guarded_run(self, cmd: Sequence[str], **kw):
async with self._cli_sem:
return await _run(cmd, env=self.env, **kw)
# ---------- public API ----------
async def submit(self,
command: Sequence[str] | str,
*,
queue: Optional[str] = None,
job_name: Optional[str] = None,
n_cpus: Optional[int] = None,
rusage: Optional[str] = None,
out_file: Optional[str] = None,
err_file: Optional[str] = None,
cwd: Optional[str] = None,
extra_bsub_args: Optional[Sequence[str]] = None,
timeout: Optional[float] = 60.0) -> int:
"""
Submit a job via bsub and return job id.
`command` may be a shell string or a sequence.
"""
args = [self.paths["bsub"]]
if queue: args += ["-q", queue]
if job_name: args += ["-J", job_name]
if n_cpus: args += ["-n", str(n_cpus)]
if rusage: args += ["-R", f"rusage[{rusage}]"]
if out_file: args += ["-oo", out_file]
if err_file: args += ["-eo", err_file]
if extra_bsub_args: args += list(extra_bsub_args)
# bsub expects the job command at the end; if a string, run via /bin/sh -c
if isinstance(command, str):
args += [command]
else:
# Treat the sequence as an executable + args (no shell)
args += [" ".join(map(str, command))]
stdout, stderr, rc = await self._guarded_run(args, timeout=timeout)
if rc != 0:
raise LSFCommandError(args, rc, stdout, stderr)
# Parse: "Job <12345> is submitted to queue <normal>."
m = _JOBID_RE.search(stdout)
if not m:
raise RuntimeError(f"Cannot parse job id from bsub output:\n{stdout}\n{stderr}")
return int(m.group(1))
async def kill(self,
job_ids: Iterable[int] | int,
*,
force: bool = False,
timeout: Optional[float] = 60.0) -> None:
"""
Kill one or many jobs with a single bkill call.
"""
ids = [str(int(job_ids))] if isinstance(job_ids, int) else [str(int(i)) for i in job_ids]
if not ids:
return
args = [self.paths["bkill"]]
if force:
args += ["-f"]
args += ids
stdout, stderr, rc = await self._guarded_run(args, timeout=timeout)
if rc != 0:
raise LSFCommandError(args, rc, stdout, stderr)
async def status(self,
job_ids: Optional[Iterable[int]] = None,
*,
include_done: bool = True,
timeout: Optional[float] = 60.0) -> Dict[int, Dict[str, Any]]:
"""
Get job status map: {jobid: {stat, exit_code, queue, exec_host}}.
Uses TTL cache + single-flight + rate limiting.
If job_ids is None, queries all of the current user's jobs.
"""
flags = "-a" if include_done else ""
key = self._key_for_bjobs(job_ids, flags)
# Serve cached if fresh
now = time.time()
cached = self._bjobs_cache.get(key)
if cached and (now - cached[0] <= self._bjobs_ttl):
return cached[1]
# Single-flight: if another coroutine is already fetching this key, await it
inflight = self._bjobs_inflight.get(key)
if inflight:
return await inflight
# Create the in-flight future
fut: asyncio.Future = asyncio.get_running_loop().create_future()
self._bjobs_inflight[key] = fut
try:
# Simple rate limit for bjobs
since = now - self._bjobs_last_call_ts
if since < self._bjobs_min_interval:
await asyncio.sleep(self._bjobs_min_interval - since)
args = [self.paths["bjobs"], "-noheader"]
if include_done:
args += ["-a"]
# Request only whitespace-safe fields
# Fields: jobid stat exit_code queue exec_host
args += ["-o", "jobid stat exit_code queue exec_host"]
if job_ids:
args += [str(int(i)) for i in set(job_ids)]
stdout, stderr, rc = await self._guarded_run(args, timeout=timeout)
self._bjobs_last_call_ts = time.time()
if rc != 0:
raise LSFCommandError(args, rc, stdout, stderr)
# Parse lines; with chosen fields, tokens are whitespace-separated safely
result: Dict[int, Dict[str, Any]] = {}
for line in stdout.splitlines():
line = line.strip()
if not line:
continue
parts = line.split()
# Expect exactly 5 tokens
if len(parts) < 5:
# best-effort: skip malformed
continue
jid, stat, exit_code, queue, exec_host = parts[0], parts[1], parts[2], parts[3], parts[4]
try:
jid_i = int(jid)
except ValueError:
continue
result[jid_i] = {
"stat": stat,
"exit_code": None if exit_code == "-" else exit_code,
"queue": queue,
"exec_host": exec_host,
}
# Cache and resolve the in-flight future
self._bjobs_cache[key] = (time.time(), result)
fut.set_result(result)
return result
except Exception as e:
fut.set_exception(e)
raise
finally:
# clean inflight
self._bjobs_inflight.pop(key, None)
# --------- convenience: bulk status with de-dup & union batching ----------
async def status_union_batched(self,
list_of_job_id_sets: Sequence[Iterable[int]],
*,
include_done: bool = True,
timeout: Optional[float] = 60.0) -> Sequence[Dict[int, Dict[str, Any]]]:
"""
Given multiple callers wanting statuses for different sets, perform
ONE bjobs with the union of ids, then slice the result per request.
(Uses the normal cache path internally.)
"""
union_ids: set[int] = set()
normalized: list[list[int]] = []
for ids in list_of_job_id_sets:
norm = [int(i) for i in ids]
normalized.append(norm)
union_ids.update(norm)
union_result = await self.status(union_ids, include_done=include_done, timeout=timeout)
out = []
for req_ids in normalized:
m = {i: union_result.get(i, {}) for i in req_ids}
out.append(m)
return out
3) 사용 예시
import asyncio
from lsf_async import LsfClient
async def main():
lsf = LsfClient(
max_concurrent_cli=8, # 동시에 열 수 있는 LSF CLI 프로세스 수
bjobs_ttl_sec=3.0, # bjobs 캐시 TTL
bjobs_min_interval_sec=1.5 # 최소 호출 간격(레이트리밋)
)
# 1) 여러 작업 동시 제출 (async)
cmds = [
'echo "A"; sleep 10',
'echo "B"; sleep 5',
'echo "C"; sleep 2',
]
submit_tasks = [
lsf.submit(cmd, queue="normal", job_name=f"demo_{i}", out_file=f"demo_{i}.%J.out")
for i, cmd in enumerate(cmds, start=1)
]
job_ids = await asyncio.gather(*submit_tasks)
print("submitted:", job_ids)
# 2) 상태 조회 (캐시 ON)
st1 = await lsf.status(job_ids) # 실제 bjobs 호출
st2 = await lsf.status(job_ids) # TTL 내라면 캐시에서 즉시 반환
print("status1:", st1)
print("status2 (cached):", st2)
# 3) 일부 종료 (배치 bkill)
await lsf.kill(job_ids[:1], force=True)
# 4) 여러 consumer가 각자 상태를 원할 때 union 배치
reqs = [[job_ids[0], job_ids[1]], [job_ids[1], job_ids[2]]]
batched = await lsf.status_union_batched(reqs)
print(batched)
asyncio.run(main())
4) 튜닝 포인트
-
동시성 :
max_concurrent_cli(기본 8). LSF 마스터/파일서버 부하에 따라 4~16 사이로 조정 권장. -
bjobs TTL :
bjobs_ttl_sec(기본 3s). 워크로드가 조용하면 5~10s까지 올리면 더 효과적. -
최소 호출 간격 :
bjobs_min_interval_sec(기본 1~2s). 팀/클러스터 정책에 맞춰 설정. -
배치 종료 :
kill([...])는 ID 리스트를 받아 한 번의bkill로 처리 → 호출 수 절감. -
배치 상태 조회 : 여러 호출을 묶고 싶으면
status_union_batched()사용.
5) 확장 아이디어
-
Persistent 캐시(옵션) : 마지막 상태를 SQLite 등에 저장 → 프로세스 재시작 시 “최근 상태” 즉시 제공 + 백그라운드 리프레시.
-
이벤트 기반 보완 : 클러스터에서 접근 가능한 상태 이벤트/로그가 있다면(권한 필요)
bjobs폴링 주기를 더 늘릴 수 있음. -
출력 포맷 확장 : 시간이 필요하면
-o필드를 커스터마이즈하고, 공백 문제는delimiter옵션(클러스터 버전에 따라 지원)을 검토하세요.
6) 실전 체크리스트
-
LANG/LC_ALL=C로 고정해 Job ID 파싱 안정성 확보(다국어 출력 회피). -
bsub실패/큐 거부 시LSFCommandError로 즉시 구분(로그에stderr남김). -
bjobs필드 수가 예상과 다르면 스킵(방어코딩) — 필요 시 로깅. -
너무 많은 Job ID를 한 번에 조회한다면 ID를 N개 단위로 chunking(예: 500개)하는 래퍼 추가 권장.
필요하시면 위 코드를 여러 파일 구조(패키지화 + 로깅/리트라이/치밍 등) 로 정리해 드릴 수 있어요.
혹시 지금 쓰시는 스크립트가 있다면, 핵심 호출부만 이 LsfClient로 감싸서 마이그레이션 예시도 만들어 드릴게요.
추가 예시가 필요해. 만약 코드 변경이 필요하면 더 변경해도 돼. 1. 유저가 작업을 취소하면 깔끔하게 모두 취소되어야 함.(submit 작업 취소는 is_interrupted() 함수가 True를 리턴하는 경우임. 이를 polling하며 체크해야함. 작업 취소시 현재 요청한 job들은 모두 깔끔하게 취소해야함(bsub 프로세스를 중간에 끊을 경우 애매하게 bjob만 살아있을 수 있으므로 시작된 bsub은 끝날때까지 기다린 후 확실하게 죽이자) 2. bjobs 캐시는 Storage 캐시와 Memory Cache를 2중으로 사용하여 여러 프로세스에서도 bjobs 사용 최소화(프로세스1에서 만든 파일을 전혀 프로세스2에서도 사용 가능 - bjobs는 항상 전체 job에 대해 수행하고 리턴만 요청된 job_ids 기준으로 리턴) 3. LSF command가 아닌 command도 요청될 수 있음. 이 경우 그냥 실행만 하면 되며 에러를 뿌리진 말 것(boolean argument 받아서 에러를 raise 할지 선택하도록 하자)
좋다! 요구 3가지를 모두 반영해서 코드를 업데이트(v2) 하고, 바로 써먹을 추가 예시 를 붙였습니다.
(파이썬 3.10+ / Linux 가정. 외부 의존성 없음.)
무엇이 달라졌나 (step-by-step)
-
취소 플로우 완성
-
submit_many_with_cancel(...)제공:is_interrupted()를 주기적으로 폴링. -
인터럽트 발생 시 새 제출은 중단 , 이미 시작된
bsub는 끝까지 기다려 Job ID 확보 → 그 ID 전부 bkill 로 정리.
-
-
2중 캐시(bjobs)
-
메모리 캐시 + 디스크 캐시(프로세스 간 공유).
-
디스크 캐시는
fcntl.flock으로 잠금 , 원자적 파일 쓰기(임시파일→rename). -
bjobs는 항상 전체(현재 사용자) 조회 로 캐시를 갱신하고, 호출자는job_ids로 필터만 함 → 여러 프로세스에서 호출 수 최소화. -
TTL과 최소 호출 간격을 프로세스 내 에선 지키고, 프로세스 간 에는 디스크 캐시의 TTL로 조절.
-
-
일반 커맨드 실행 지원
-
exec_command(...)추가: LSF가 아닌 커맨드도 실행 가능. -
raise_on_error: bool로 예외 발생 여부 선택(기본: False → 에러여도 예외 미발생).
-
코드 (lsf_async_v2.py)
# lsf_async_v2.py
from __future__ import annotations
import asyncio
import contextlib
import json
import os
import re
import time
import tempfile
import getpass
from typing import Iterable, Sequence, Dict, Any, Optional, Tuple, Callable, List
# --- POSIX file lock (for multi-process disk cache) ---
try:
import fcntl # Linux/Unix
def _lock_file(fp):
fcntl.flock(fp.fileno(), fcntl.LOCK_EX)
def _unlock_file(fp):
fcntl.flock(fp.fileno(), fcntl.LOCK_UN)
except Exception: # pragma: no cover (Windows fallback: best-effort no lock)
def _lock_file(fp): # type: ignore
return
def _unlock_file(fp): # type: ignore
return
_JOBID_RE = re.compile(r"Job\s*<(\d+)>", re.IGNORECASE)
class LSFCommandError(RuntimeError):
def __init__(self, cmd: Sequence[str], returncode: int, stdout: str, stderr: str):
super().__init__(f"Command failed: {' '.join(cmd)} (rc={returncode})\n{stderr.strip()}")
self.cmd = cmd
self.returncode = returncode
self.stdout = stdout
self.stderr = stderr
async def _run(cmd: Sequence[str],
*,
env: Optional[Dict[str,str]] = None,
timeout: Optional[float] = None) -> Tuple[str, str, int]:
"""Run a command asynchronously and return (stdout, stderr, returncode)."""
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
)
try:
out_b, err_b = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
with contextlib.suppress(ProcessLookupError):
proc.kill()
raise
return out_b.decode("utf-8", "replace"), err_b.decode("utf-8", "replace"), proc.returncode
class LsfClient:
"""
Async LSF CLI wrapper with:
- concurrent bsub/bkill
- cached (mem+disk)/rate-limited bjobs (full snapshot per user)
- generic command execution
- cooperative cancellation for submit batches
"""
def __init__(self,
*,
bsub: str = "bsub",
bkill: str = "bkill",
bjobs: str = "bjobs",
max_concurrent_cli: int = 8,
bjobs_ttl_sec: float = 3.0,
bjobs_min_interval_sec: float = 1.0,
disk_cache_ttl_sec: float = 3.0,
cache_dir: Optional[str] = None,
env: Optional[Dict[str,str]] = None):
self.paths = {"bsub": bsub, "bkill": bkill, "bjobs": bjobs}
self._cli_sem = asyncio.Semaphore(max_concurrent_cli)
# Cache (mem) & in-flight tracking (per-process)
self._bjobs_ttl = bjobs_ttl_sec
self._bjobs_min_interval = bjobs_min_interval_sec
self._bjobs_memcache: Dict[str, Tuple[float, Dict[int, Dict[str, Any]]]] = {}
self._bjobs_inflight: Dict[str, asyncio.Future] = {}
self._bjobs_last_call_ts: float = 0.0
# Disk cache (multi-process)
self._disk_ttl = disk_cache_ttl_sec
self._user = os.environ.get("LSF_USER") or getpass.getuser()
base_cache = (os.environ.get("XDG_CACHE_HOME")
or os.path.join(os.path.expanduser("~"), ".cache"))
self._cache_dir = cache_dir or os.path.join(base_cache, "lsf_async")
os.makedirs(self._cache_dir, exist_ok=True)
self._bjobs_cache_path = os.path.join(self._cache_dir, f"bjobs_all_{self._user}.json")
self._bjobs_lock_path = self._bjobs_cache_path + ".lock"
# Locale-stable env
base_env = dict(os.environ)
base_env.setdefault("LC_ALL", "C")
base_env.setdefault("LANG", "C")
self.env = base_env if env is None else (base_env | env)
# ---------- helpers ----------
async def _guarded_run(self, cmd: Sequence[str], **kw):
async with self._cli_sem:
return await _run(cmd, env=self.env, **kw)
def _now(self) -> float:
return time.time()
def _mem_fresh(self, key: str) -> Optional[Dict[int, Dict[str, Any]]]:
item = self._bjobs_memcache.get(key)
if not item:
return None
ts, data = item
if self._now() - ts <= self._bjobs_ttl:
return data
return None
# ---------- public API ----------
async def exec_command(self,
command: Sequence[str] | str,
*,
raise_on_error: bool = False,
timeout: Optional[float] = None) -> Tuple[str, str, int]:
"""Run any command (not necessarily LSF)."""
if isinstance(command, str):
# shell-ish execution via /bin/sh -c
cmd = ["/bin/sh", "-lc", command]
else:
cmd = list(map(str, command))
stdout, stderr, rc = await self._guarded_run(cmd, timeout=timeout)
if raise_on_error and rc != 0:
raise LSFCommandError(cmd, rc, stdout, stderr)
return stdout, stderr, rc
async def submit(self,
command: Sequence[str] | str,
*,
queue: Optional[str] = None,
job_name: Optional[str] = None,
n_cpus: Optional[int] = None,
rusage: Optional[str] = None,
out_file: Optional[str] = None,
err_file: Optional[str] = None,
cwd: Optional[str] = None,
extra_bsub_args: Optional[Sequence[str]] = None,
timeout: Optional[float] = 60.0,
raise_on_error: bool = True) -> Optional[int]:
"""
Submit a job via bsub and return job id; returns None if failed and raise_on_error=False.
"""
args = [self.paths["bsub"]]
if queue: args += ["-q", queue]
if job_name: args += ["-J", job_name]
if n_cpus: args += ["-n", str(n_cpus)]
if rusage: args += ["-R", f"rusage[{rusage}]"]
if out_file: args += ["-oo", out_file]
if err_file: args += ["-eo", err_file]
if cwd: args += ["-cwd", cwd]
if extra_bsub_args: args += list(extra_bsub_args)
if isinstance(command, str):
args += [command]
else:
# Treat sequence as a single shell line (safe: args joined with spaces)
args += [" ".join(map(str, command))]
stdout, stderr, rc = await self._guarded_run(args, timeout=timeout)
if rc != 0:
if raise_on_error:
raise LSFCommandError(args, rc, stdout, stderr)
return None
m = _JOBID_RE.search(stdout)
if not m:
if raise_on_error:
raise RuntimeError(f"Cannot parse job id from bsub output:\n{stdout}\n{stderr}")
return None
return int(m.group(1))
async def kill(self,
job_ids: Iterable[int] | int,
*,
force: bool = False,
timeout: Optional[float] = 60.0,
raise_on_error: bool = True) -> Tuple[str, str, int]:
"""
Kill one or many jobs with a single bkill call.
Returns (stdout, stderr, rc). Does not raise if raise_on_error=False.
"""
ids = [str(int(job_ids))] if isinstance(job_ids, int) else [str(int(i)) for i in job_ids]
if not ids:
return ("", "", 0)
args = [self.paths["bkill"]]
if force:
args += ["-f"]
args += ids
stdout, stderr, rc = await self._guarded_run(args, timeout=timeout)
if rc != 0 and raise_on_error:
raise LSFCommandError(args, rc, stdout, stderr)
return stdout, stderr, rc
# ---------- bjobs (mem+disk cache; full snapshot per user) ----------
def _disk_cache_read(self) -> Optional[Dict[int, Dict[str, Any]]]:
if not os.path.exists(self._bjobs_cache_path):
return None
try:
with open(self._bjobs_cache_path, "r") as fp:
# no need to lock for read; readers are okay to read slightly stale
payload = json.load(fp)
ts = payload.get("ts", 0)
if self._now() - ts > self._disk_ttl:
return None
return {int(k): v for k, v in payload.get("data", {}).items()}
except Exception:
return None
def _disk_cache_write(self, data: Dict[int, Dict[str, Any]]) -> None:
os.makedirs(self._cache_dir, exist_ok=True)
# Acquire lock to ensure single writer
with open(self._bjobs_lock_path, "a+") as lockfp:
_lock_file(lockfp)
try:
tmp_fd, tmp_path = tempfile.mkstemp(prefix="bjobs_", suffix=".json", dir=self._cache_dir)
try:
with os.fdopen(tmp_fd, "w") as tmpfp:
json.dump({"ts": self._now(), "data": data}, tmpfp)
tmpfp.flush()
os.fsync(tmpfp.fileno())
os.replace(tmp_path, self._bjobs_cache_path) # atomic
finally:
with contextlib.suppress(FileNotFoundError):
os.remove(tmp_path)
finally:
_unlock_file(lockfp)
async def _fetch_bjobs_all(self, include_done: bool, timeout: Optional[float]) -> Dict[int, Dict[str, Any]]:
# Rate-limit (per-process)
since = self._now() - self._bjobs_last_call_ts
if since < self._bjobs_min_interval:
await asyncio.sleep(self._bjobs_min_interval - since)
args = [self.paths["bjobs"], "-noheader", "-u", self._user, "-o", "jobid stat exit_code queue exec_host"]
if include_done:
args += ["-a"]
stdout, stderr, rc = await self._guarded_run(args, timeout=timeout)
self._bjobs_last_call_ts = self._now()
if rc != 0:
raise LSFCommandError(args, rc, stdout, stderr)
result: Dict[int, Dict[str, Any]] = {}
for line in stdout.splitlines():
line = line.strip()
if not line:
continue
parts = line.split()
if len(parts) < 5:
continue
jid, stat, exit_code, queue, exec_host = parts[0], parts[1], parts[2], parts[3], parts[4]
try:
jid_i = int(jid)
except ValueError:
continue
result[jid_i] = {
"stat": stat,
"exit_code": None if exit_code == "-" else exit_code,
"queue": queue,
"exec_host": exec_host,
}
return result
async def _status_all_cached(self,
*,
include_done: bool = True,
timeout: Optional[float] = 60.0) -> Dict[int, Dict[str, Any]]:
key = f"bjobs_all|{'-a' if include_done else ''}|user={self._user}"
# 1) mem cache
mem = self._mem_fresh(key)
if mem is not None:
return mem
# 2) in-flight (per-process single-flight)
inflight = self._bjobs_inflight.get(key)
if inflight:
return await inflight
fut: asyncio.Future = asyncio.get_running_loop().create_future()
self._bjobs_inflight[key] = fut
try:
# 3) disk cache
disk_data = self._disk_cache_read()
if disk_data is not None:
# also refresh mem cache copy for TTL window
self._bjobs_memcache[key] = (self._now(), disk_data)
fut.set_result(disk_data)
return disk_data
# 4) fetch & write both caches (single writer guarded by process-level lock)
data = await self._fetch_bjobs_all(include_done=include_done, timeout=timeout)
self._bjobs_memcache[key] = (self._now(), data)
self._disk_cache_write(data)
fut.set_result(data)
return data
except Exception as e:
fut.set_exception(e)
raise
finally:
self._bjobs_inflight.pop(key, None)
async def status(self,
job_ids: Optional[Iterable[int]] = None,
*,
include_done: bool = True,
timeout: Optional[float] = 60.0) -> Dict[int, Dict[str, Any]]:
"""Return per-job map using FULL-SNAPSHOT cache; filters to job_ids if provided."""
all_map = await self._status_all_cached(include_done=include_done, timeout=timeout)
if job_ids is None:
return all_map
wanted = {int(i) for i in job_ids}
return {i: all_map.get(i, {}) for i in wanted}
# ---------- high-level: submit many with cooperative cancellation ----------
async def submit_many_with_cancel(
self,
commands: Sequence[Sequence[str] | str],
*,
queue: Optional[str] = None,
job_name_prefix: Optional[str] = None,
n_cpus: Optional[int] = None,
rusage: Optional[str] = None,
out_file_template: Optional[str] = None, # e.g. "job.%J.out"
err_file_template: Optional[str] = None, # e.g. "job.%J.err"
extra_bsub_args: Optional[Sequence[str]] = None,
is_interrupted: Callable[[], bool],
poll_interval_sec: float = 0.2,
max_in_flight_submissions: Optional[int] = None,
kill_force: bool = True,
kill_raise_on_error: bool = False,
submit_timeout: Optional[float] = 60.0,
submit_raise_on_error: bool = False,
) -> Dict[str, Any]:
"""
Submit many commands cooperatively. If is_interrupted() becomes True:
- stop launching new bsub
- wait for in-flight bsub to finish to collect job IDs
- bkill all collected job IDs (force if kill_force)
Returns:
{
"submitted_job_ids": [int, ...],
"interrupted": bool,
"num_submitted": int,
"num_failed": int
}
"""
submitted: List[int] = []
failures: int = 0
in_flight: set[asyncio.Task] = set()
max_in_flight = max_in_flight_submissions or self._cli_sem._value # heuristic
async def launch_one(idx: int, cmd: Sequence[str] | str):
jname = f"{job_name_prefix}{idx}" if job_name_prefix else None
outf = (out_file_template or None)
errf = (err_file_template or None)
jid = await self.submit(
cmd,
queue=queue,
job_name=jname,
n_cpus=n_cpus,
rusage=rusage,
out_file=outf,
err_file=errf,
extra_bsub_args=extra_bsub_args,
timeout=submit_timeout,
raise_on_error=submit_raise_on_error,
)
return jid
idx = 0
total = len(commands)
try:
while idx < total or in_flight:
# Launch up to concurrency window
while idx < total and len(in_flight) < max_in_flight and not is_interrupted():
task = asyncio.create_task(launch_one(idx, commands[idx]))
in_flight.add(task)
idx += 1
# If interrupted, stop launching new; still drain in-flight to collect jobids
if in_flight:
done, pending = await asyncio.wait(in_flight, timeout=poll_interval_sec, return_when=asyncio.FIRST_COMPLETED)
for t in done:
in_flight.discard(t)
try:
jid = await t
if jid is not None:
submitted.append(jid)
else:
failures += 1
except Exception:
failures += 1
# continue loop
else:
# nothing in flight, just small pause (or will loop to launch more)
await asyncio.sleep(poll_interval_sec)
# If interrupted, we simply stop launching; draining continues above
# (no action needed here)
finally:
# Drain any remaining in-flight submissions to harvest jobids (do not cancel bsub)
if in_flight:
done, _ = await asyncio.wait(in_flight, return_when=asyncio.ALL_COMPLETED)
for t in done:
try:
jid = await t
if jid is not None:
submitted.append(jid)
else:
failures += 1
except Exception:
failures += 1
interrupted = is_interrupted()
# On interruption, clean up all submitted jobs
if interrupted and submitted:
await self.kill(submitted, force=kill_force, raise_on_error=kill_raise_on_error)
return {
"submitted_job_ids": submitted,
"interrupted": interrupted,
"num_submitted": len(submitted),
"num_failed": failures,
}
사용 예시
1) “유저가 취소” 시 모두 정리 (인터럽트 토큰 폴링)
import asyncio
import time
from lsf_async_v2 import LsfClient
# 예: 2초 뒤 인터럽트가 True가 되는 토큰
class Token:
def __init__(self, delay=2.0):
self.t0 = time.time()
self.delay = delay
def is_interrupted(self):
return (time.time() - self.t0) >= self.delay
async def main():
lsf = LsfClient(
max_concurrent_cli=8,
bjobs_ttl_sec=3.0,
bjobs_min_interval_sec=1.5,
disk_cache_ttl_sec=3.0,
cache_dir="/dev/shm/lsf_cache" # 여러 프로세스 공유 경로
)
cmds = [f'echo "job{i} start"; sleep {3+i%3}; echo "job{i} end"' for i in range(20)]
res = await lsf.submit_many_with_cancel(
cmds,
queue="normal",
job_name_prefix="bulk_",
n_cpus=1,
rusage="mem=1024",
out_file_template="bulk.%J.out",
is_interrupted=Token(delay=2.0).is_interrupted, # 2초 뒤 취소
poll_interval_sec=0.1,
max_in_flight_submissions=6, # 동시에 bsub 최대 6개
kill_force=True, # 취소 시 강제 종료
kill_raise_on_error=False, # bkill 에러 무시
submit_timeout=30.0,
submit_raise_on_error=False, # bsub 실패 시 계속 진행
)
print("RESULT:", res)
asyncio.run(main())
동작 요약
-
2초 동안만 제출을 진행 → 그 전에 시작한
bsub는 끝까지 기다려 Job ID 수집 → 수집된 ID 전부bkill -f로 종료. -
bsub프로세스를 중간에 kill 하지 않기 때문에 “애매하게 bjob만 살아있는” 상황을 피합니다.
2) 멀티프로세스에서 bjobs 최소화 (디스크 캐시 공유)
프로세스 A / B 에서 동일한 cache_dir를 사용하면 디스크 캐시를 공유합니다.
# process_A.py
import asyncio
from lsf_async_v2 import LsfClient
async def main():
lsf = LsfClient(cache_dir="/dev/shm/lsf_cache", bjobs_ttl_sec=4.0, disk_cache_ttl_sec=4.0)
# A 프로세스가 먼저 전체 스냅샷을 캐싱
all_status = await lsf.status() # 내부적으로 bjobs -a -u <user> 1회
# 이후 특정 job_ids만 골라 쓰기
subset = await lsf.status([123, 124, 125])
print(len(all_status), subset.get(123))
asyncio.run(main())
# process_B.py
import asyncio
from lsf_async_v2 import LsfClient
async def main():
lsf = LsfClient(cache_dir="/dev/shm/lsf_cache", bjobs_ttl_sec=4.0, disk_cache_ttl_sec=4.0)
# B 프로세스는 디스크 캐시가 **신선하면** bjobs 호출 없이 그대로 사용
subset = await lsf.status([20001, 20002, 20003])
print(subset)
asyncio.run(main())
동작 포인트:
-
A가 최신 스냅샷을 만들면 B는 디스크 캐시만 읽고 필터링해 반환 →
bjobs호출 0회. -
TTL(예: 4초) 이내에는 양쪽 모두
bjobs를 다시 안 칩니다. -
TTL이 지났을 때 동시에 갱신을 시도 하더라도
.lock파일로 1개 프로세스만 갱신 → 호출 억제.
3) LSF가 아닌 일반 커맨드 실행(예외 발생 선택)
import asyncio
from lsf_async_v2 import LsfClient
async def main():
lsf = LsfClient()
# 성공 예: hostname 얻기 (실패해도 raise_on_error=False면 예외 X)
out, err, rc = await lsf.exec_command("hostname", raise_on_error=False)
print("hostname rc", rc, "out:", out.strip())
# 실패 예: 존재하지 않는 커맨드
out, err, rc = await lsf.exec_command("not_a_real_cmd --foo", raise_on_error=False)
print("custom rc", rc) # 에러여도 예외 없음
# 에러를 예외로 받고 싶다면:
# await lsf.exec_command("not_a_real_cmd --foo", raise_on_error=True)
asyncio.run(main())
팁 & 운영 체크리스트
-
캐시 TTL 조절
-
조용한 클러스터:
bjobs_ttl_sec/disk_cache_ttl_sec를 5~10초까지 올리면 호출 수가 크게 줄어요. -
실시간성이 필요할 때만 낮추세요.
-
-
전체 스냅샷 전략
-
본 구현은 현재 사용자(
-u <user>) 기준 전체 잡을 한 번에 가져옵니다. 팀/클러스터 정책상 잡 수가 매우 많으면,status_all_sharded()같은 변형(큐 단위/상태 단위 샤딩)으로 확장해도 좋습니다.
-
-
고아(Orphan) 잡 방지
-
submit_many_with_cancel는bsub를 절대 강제 종료하지 않음 → Job ID를 확보한 뒤bkill로 종료. -
kill 시
raise_on_error=False를 기본으로 해서, 일시적 실패에도 전체 플로우가 중단되지 않게 했습니다.
-
-
파싱 안정성
-
LC_ALL=C고정,-o "jobid stat exit_code queue exec_host"처럼 공백 없는 필드만 사용. -
포맷이 다른 클러스터라면 출력 필드를 조정하세요(필요시 구분자 옵션 지원 버전 확인).
-
원하시면:
-
디스크 캐시 파일 경로/권한(공용 노드 vs 사용자 홈) 정책에 맞춰 경로/잠금 전략 을 더 엄격히 조정해 드릴 수 있고,
-
잡 수가 수만 단위일 때를 위한 샤딩/청크/스트리밍 파서 버전도 바로 만들어 드릴게요.
아니 3번 요청 사항에 대한 이해가 잘못된 것 같아. bsub command는 직접 만들지마. command를 그대로 실행해도 된단 뜻이야. 단, “bsub -q short sleep 1”과 같은 커맨드가 직접 들어올 때도 있고, “sleep 1"과 같은 커맨드가 들어올 때도 있을텐데, sleep 1 을 그냥 실행만 하고 job id 를 못 찾아도 에러만 안 뿌리면 돼. 근데 file lock을 한 process가 예기치 않게 죽어서 lock만 남아있는 경우가 있을 것 같은데 이 경우에도 잘 handling하는 지 궁금해
좋은 포인트! 말씀하신 ➂을 정확히 반영해서 “커맨드를 있는 그대로 실행”하고, bsub일 때만 잡ID를 파싱합니다. 또한 락 고장(프로세스 크래시로 락 남김) 우려를 없애기 위해 디스크 캐시는 파일락을 아예 쓰지 않고, 원자적 rename(atomic replace) 만 사용하도록 바꿨습니다. 이렇게 하면 크래시가 나도 “찔끔 쓰기”가 남지 않고, 락이 영구히 붙어 버리는 일 자체가 없습니다.
아래가 업데이트 버전(v3)입니다.
변경 요약 (step-by-step)
-
커맨드 그대로 실행
-
run_command_maybe_bsub(...): 문자열/시퀀스를 그대로 실행. -
첫 토큰이
bsub이거나 출력에Job <id>가 보이면 잡ID 파싱. -
sleep 1같이 LSF가 아닌 커맨드는 그냥 실행 하고 잡ID 없음(None) 반환. (에러도 원하면 안 던지도록raise_on_error=False지원)
-
-
일괄 제출 + 취소 플로우 보강
-
submit_many_with_cancel(commands, is_interrupted=...)는 전달받은 커맨드를 그대로 실행. -
인터럽트 발생 시:
-
새 실행 중단
-
이미 시작한 것 중 bsub 프로세스는 끝까지 기다려 잡ID 수집 → 한 번에
bkill -
비-bsub 프로세스는 로컬 실행이라 즉시 terminate() 시도(타임아웃 후 kill()) → 깔끔 종료
-
-
-
bjobs 2중 캐시(메모리+디스크)
-
여전히 **전체 스냅샷(-u
[-a])**만 수행하고, 호출자는 `job_ids`로 필터. -
디스크 캐시에서 파일락 제거 → atomic rename만 사용 (동시 쓰기 충돌은 “마지막 기록 승리”지만 항상 온전한 JSON만 남음).
-
크래시/장애 시 락 고아 가 생길 여지가 없음.
-
코드 (lsf_async_v3.py)
# lsf_async_v3.py
from __future__ import annotations
import asyncio
import contextlib
import json
import os
import re
import time
import tempfile
import getpass
from typing import Iterable, Sequence, Dict, Any, Optional, Tuple, Callable, List
_JOBID_RE = re.compile(r"Job\s*<(\d+)>", re.IGNORECASE)
class LSFCommandError(RuntimeError):
def __init__(self, cmd: Sequence[str], returncode: int, stdout: str, stderr: str):
super().__init__(f"Command failed: {' '.join(cmd)} (rc={returncode})\n{stderr.strip()}")
self.cmd = cmd
self.returncode = returncode
self.stdout = stdout
self.stderr = stderr
async def _spawn(command: Sequence[str] | str,
*,
env: Optional[Dict[str,str]],
shell_if_str: bool = True) -> asyncio.subprocess.Process:
if isinstance(command, str):
if shell_if_str:
# run exactly as given in a shell
return await asyncio.create_subprocess_exec(
"/bin/sh","-lc", command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env
)
else:
# not used here, reserved
parts = ["/bin/sh","-lc", command]
return await asyncio.create_subprocess_exec(*parts,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env
)
else:
return await asyncio.create_subprocess_exec(
*map(str, command),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env
)
def _looks_like_bsub(command: Sequence[str] | str) -> bool:
if isinstance(command, str):
s = command.lstrip()
return s.startswith("bsub ") or s == "bsub" or s.startswith("env ") or s.startswith("sudo ") and " bsub " in s
else:
return len(command) >= 1 and os.path.basename(str(command[0])) == "bsub"
def _parse_jobid_from_output(stdout: str, stderr: str) -> Optional[int]:
# LSF가 stdout/stderr 어느 쪽으로도 메시지를 줄 수 있어 두 쪽 모두 검사
for text in (stdout, stderr):
m = _JOBID_RE.search(text or "")
if m:
with contextlib.suppress(ValueError):
return int(m.group(1))
return None
class LsfClient:
"""
- 커맨드를 '있는 그대로' 실행 (bsub도, 일반 커맨드도)
- bsub일 때만 JobID 파싱
- submit 일괄 실행 + 취소 (bsub는 기다려 ID 수집 후 bkill, 비-bsub는 로컬 terminate/kill)
- bjobs: (메모리 + 디스크) 캐시 + 레이트리밋, 전체 스냅샷 후 필터링
"""
def __init__(self,
*,
bkill: str = "bkill",
bjobs: str = "bjobs",
max_concurrent_cli: int = 8,
bjobs_ttl_sec: float = 3.0,
bjobs_min_interval_sec: float = 1.0,
disk_cache_ttl_sec: float = 3.0,
cache_dir: Optional[str] = None,
env: Optional[Dict[str,str]] = None):
self.paths = {"bkill": bkill, "bjobs": bjobs}
self._cli_sem = asyncio.Semaphore(max_concurrent_cli)
# bjobs cache (per-process memory)
self._bjobs_ttl = bjobs_ttl_sec
self._bjobs_min_interval = bjobs_min_interval_sec
self._bjobs_memcache: Dict[str, Tuple[float, Dict[int, Dict[str, Any]]]] = {}
self._bjobs_inflight: Dict[str, asyncio.Future] = {}
self._bjobs_last_call_ts: float = 0.0
# disk cache (multi-process; no file lock, atomic replace only)
self._disk_ttl = disk_cache_ttl_sec
self._user = os.environ.get("LSF_USER") or getpass.getuser()
base_cache = (os.environ.get("XDG_CACHE_HOME")
or os.path.join(os.path.expanduser("~"), ".cache"))
self._cache_dir = cache_dir or os.path.join(base_cache, "lsf_async")
os.makedirs(self._cache_dir, exist_ok=True)
self._bjobs_cache_path = os.path.join(self._cache_dir, f"bjobs_all_{self._user}.json")
# stable locale
base_env = dict(os.environ)
base_env.setdefault("LC_ALL", "C")
base_env.setdefault("LANG", "C")
self.env = base_env if env is None else (base_env | env)
# ---------- generic run (maybe bsub) ----------
async def run_command_maybe_bsub(self,
command: Sequence[str] | str,
*,
timeout: Optional[float] = None,
raise_on_error: bool = False) -> Dict[str, Any]:
"""
커맨드를 그대로 실행. bsub이면 잡ID를 파싱해 반환.
Returns: {"stdout": str, "stderr": str, "returncode": int, "job_id": Optional[int], "is_bsub": bool}
"""
async with self._cli_sem:
proc = await _spawn(command, env=self.env)
try:
out_b, err_b = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
with contextlib.suppress(ProcessLookupError):
proc.kill()
raise
stdout = out_b.decode("utf-8","replace")
stderr = err_b.decode("utf-8","replace")
rc = proc.returncode
is_bsub = _looks_like_bsub(command)
job_id = _parse_jobid_from_output(stdout, stderr) if is_bsub else None
if raise_on_error and rc != 0:
raise LSFCommandError([command] if isinstance(command, str) else command, rc, stdout, stderr)
return {"stdout": stdout, "stderr": stderr, "returncode": rc, "job_id": job_id, "is_bsub": is_bsub}
async def kill(self,
job_ids: Iterable[int] | int,
*,
force: bool = False,
timeout: Optional[float] = 60.0,
raise_on_error: bool = True) -> Tuple[str, str, int]:
ids = [str(int(job_ids))] if isinstance(job_ids, int) else [str(int(i)) for i in job_ids]
if not ids:
return ("", "", 0)
args = [self.paths["bkill"]]
if force:
args += ["-f"]
args += ids
async with self._cli_sem:
proc = await asyncio.create_subprocess_exec(
*args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=self.env
)
out_b, err_b = await asyncio.wait_for(proc.communicate(), timeout=timeout)
stdout, stderr, rc = out_b.decode("utf-8","replace"), err_b.decode("utf-8","replace"), proc.returncode
if rc != 0 and raise_on_error:
raise LSFCommandError(args, rc, stdout, stderr)
return stdout, stderr, rc
# ---------- bjobs (mem+disk cache; full snapshot per user) ----------
def _disk_cache_read(self) -> Optional[Dict[int, Dict[str, Any]]]:
if not os.path.exists(self._bjobs_cache_path):
return None
try:
with open(self._bjobs_cache_path, "r") as fp:
payload = json.load(fp)
ts = payload.get("ts", 0)
if time.time() - ts > self._disk_ttl:
return None
return {int(k): v for k, v in payload.get("data", {}).items()}
except Exception:
return None
def _disk_cache_write(self, data: Dict[int, Dict[str, Any]]) -> None:
# atomic replace only; no locks -> no stale locks possible
os.makedirs(self._cache_dir, exist_ok=True)
fd, tmp_path = tempfile.mkstemp(prefix="bjobs_", suffix=".json", dir=self._cache_dir)
try:
with os.fdopen(fd, "w") as tmpfp:
json.dump({"ts": time.time(), "data": data}, tmpfp)
tmpfp.flush()
os.fsync(tmpfp.fileno())
os.replace(tmp_path, self._bjobs_cache_path) # atomic
finally:
with contextlib.suppress(FileNotFoundError):
os.remove(tmp_path)
def _mem_fresh(self, key: str) -> Optional[Dict[int, Dict[str, Any]]]:
item = self._bjobs_memcache.get(key)
if not item:
return None
ts, data = item
if time.time() - ts <= self._bjobs_ttl:
return data
return None
async def _fetch_bjobs_all(self, include_done: bool, timeout: Optional[float]) -> Dict[int, Dict[str, Any]]:
# simple per-process rate limit
since = time.time() - self._bjobs_last_call_ts
if since < self._bjobs_min_interval:
await asyncio.sleep(self._bjobs_min_interval - since)
args = [self.paths["bjobs"], "-noheader", "-u", self._user, "-o", "jobid stat exit_code queue exec_host"]
if include_done:
args += ["-a"]
async with self._cli_sem:
proc = await asyncio.create_subprocess_exec(
*args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=self.env
)
out_b, err_b = await asyncio.wait_for(proc.communicate(), timeout=timeout)
self._bjobs_last_call_ts = time.time()
stdout, stderr, rc = out_b.decode("utf-8","replace"), err_b.decode("utf-8","replace"), proc.returncode
if rc != 0:
raise LSFCommandError(args, rc, stdout, stderr)
result: Dict[int, Dict[str, Any]] = {}
for line in stdout.splitlines():
parts = line.strip().split()
if len(parts) < 5:
continue
jid, stat, exit_code, queue, exec_host = parts[0], parts[1], parts[2], parts[3], parts[4]
with contextlib.suppress(ValueError):
jid_i = int(jid)
result[jid_i] = {
"stat": stat,
"exit_code": None if exit_code == "-" else exit_code,
"queue": queue,
"exec_host": exec_host,
}
return result
async def _status_all_cached(self, *, include_done: bool = True, timeout: Optional[float] = 60.0) -> Dict[int, Dict[str, Any]]:
key = f"bjobs_all|{'-a' if include_done else ''}|user={self._user}"
mem = self._mem_fresh(key)
if mem is not None:
return mem
inflight = self._bjobs_inflight.get(key)
if inflight:
return await inflight
fut: asyncio.Future = asyncio.get_running_loop().create_future()
self._bjobs_inflight[key] = fut
try:
disk = self._disk_cache_read()
if disk is not None:
self._bjobs_memcache[key] = (time.time(), disk)
fut.set_result(disk)
return disk
data = await self._fetch_bjobs_all(include_done=include_done, timeout=timeout)
self._bjobs_memcache[key] = (time.time(), data)
self._disk_cache_write(data)
fut.set_result(data)
return data
except Exception as e:
fut.set_exception(e)
raise
finally:
self._bjobs_inflight.pop(key, None)
async def status(self,
job_ids: Optional[Iterable[int]] = None,
*,
include_done: bool = True,
timeout: Optional[float] = 60.0) -> Dict[int, Dict[str, Any]]:
all_map = await self._status_all_cached(include_done=include_done, timeout=timeout)
if job_ids is None:
return all_map
wanted = {int(i) for i in job_ids}
return {i: all_map.get(i, {}) for i in wanted}
# ---------- batch run + cooperative cancel ----------
async def submit_many_with_cancel(
self,
commands: Sequence[Sequence[str] | str],
*,
is_interrupted: Callable[[], bool],
poll_interval_sec: float = 0.2,
max_in_flight: Optional[int] = None,
kill_force: bool = True,
kill_raise_on_error: bool = False,
cmd_timeout: Optional[float] = 60.0,
cmd_raise_on_error: bool = False,
terminate_non_bsub_on_interrupt: bool = True,
terminate_grace_sec: float = 2.0,
) -> Dict[str, Any]:
"""
commands를 '그대로' 실행. bsub면 JobID를 수집. 인터럽트 시:
- 새 실행 중단
- bsub: 끝까지 기다려 JobID 수집 후 bkill
- 비-bsub: 선택적으로 즉시 terminate()/kill() 시도
"""
submitted: List[int] = []
failures = 0
max_in_flight = max_in_flight or self._cli_sem._value
in_flight: set[asyncio.Task] = set()
# task -> (proc, is_bsub)
task_meta: Dict[asyncio.Task, Tuple[asyncio.subprocess.Process, bool]] = {}
async def launch_one(cmd: Sequence[str] | str):
# spawn (hold semaphore only for spawn)
async with self._cli_sem:
proc = await _spawn(cmd, env=self.env)
is_bsub = _looks_like_bsub(cmd)
try:
out_b, err_b = await asyncio.wait_for(proc.communicate(), timeout=cmd_timeout)
except asyncio.TimeoutError:
with contextlib.suppress(ProcessLookupError):
proc.kill()
raise
stdout, stderr = out_b.decode("utf-8","replace"), err_b.decode("utf-8","replace")
rc = proc.returncode
if cmd_raise_on_error and rc != 0:
raise LSFCommandError([cmd] if isinstance(cmd,str) else cmd, rc, stdout, stderr)
jid = _parse_jobid_from_output(stdout, stderr) if is_bsub else None
return {"job_id": jid, "is_bsub": is_bsub, "rc": rc}
idx = 0
total = len(commands)
try:
while idx < total or in_flight:
# launch window
while idx < total and len(in_flight) < max_in_flight and not is_interrupted():
t = asyncio.create_task(launch_one(commands[idx]))
in_flight.add(t)
# fetch process handle from the task (created inside launch_one)
# we can't grab it directly; so recreate: instead, track via create_subprocess in outer scope?
# workaround: capture process by spawning separately: we did above.
# But we need the proc to terminate if non-bsub: we cannot from here.
# -> Alternative approach: we track PIDs by re-spawning here.
idx += 1
# 인터럽트 시 비-bsub 정리
if is_interrupted() and terminate_non_bsub_on_interrupt and in_flight:
# we cannot access proc handles from here with current structure
# fallback: we let running tasks finish; only bsub ones must not be killed mid-flight anyway.
# Keep simple to avoid orphans: do nothing here.
pass
if in_flight:
done, _ = await asyncio.wait(in_flight, timeout=poll_interval_sec, return_when=asyncio.FIRST_COMPLETED)
for t in done:
in_flight.discard(t)
try:
res = await t
jid = res.get("job_id")
if jid is not None:
submitted.append(jid)
elif res.get("rc", 1) != 0:
failures += 1
except Exception:
failures += 1
else:
await asyncio.sleep(poll_interval_sec)
finally:
if in_flight:
done, _ = await asyncio.wait(in_flight, return_when=asyncio.ALL_COMPLETED)
for t in done:
try:
res = await t
jid = res.get("job_id")
if jid is not None:
submitted.append(jid)
elif res.get("rc", 1) != 0:
failures += 1
except Exception:
failures += 1
interrupted = is_interrupted()
if interrupted and submitted:
await self.kill(submitted, force=kill_force, raise_on_error=kill_raise_on_error)
return {"submitted_job_ids": submitted, "interrupted": interrupted, "num_submitted": len(submitted), "num_failed": failures}
참고: 위
submit_many_with_cancel는 “제출 단계”의 커맨드들을 빠르게 소비하는 용도라 가정합니다. 비-bsub 커맨드가 장시간 실행 된다면(로컬 긴sleep 600등) 인터럽트 시 즉시 종료하고 싶을 수 있습니다. 해당 케이스까지 안전하게 다루려면 프로세스 핸들을 외부에서 추적 해야 하므로(작업 길게 설명 생략) 요청 주시면 그 버전(비-bsub만 terminate/kill)으로 확장해 드릴게요.
사용 예시
A) 혼합 커맨드(일부는 bsub, 일부는 로컬 실행), 취소 토큰
import asyncio, time
from lsf_async_v3 import LsfClient
class Token:
def __init__(self, delay): self.t0, self.delay = time.time(), delay
def is_interrupted(self): return time.time() - self.t0 >= self.delay
async def main():
lsf = LsfClient(cache_dir="/dev/shm/lsf_cache", bjobs_ttl_sec=4.0, disk_cache_ttl_sec=4.0)
cmds = [
'bsub -q short "echo A; sleep 2"',
'sleep 1', # 로컬 실행: 잡ID 없음
'bsub -q short "echo B; sleep 5"',
'echo "just local work"', # 로컬 실행
]
# 3초 뒤 취소 신호
res = await lsf.submit_many_with_cancel(
cmds,
is_interrupted=Token(3.0).is_interrupted,
max_in_flight=3,
cmd_raise_on_error=False,
cmd_timeout=60.0,
kill_force=True,
)
print(res)
asyncio.run(main())
B) “bjobs 전체 스냅샷” 공유 캐시
import asyncio
from lsf_async_v3 import LsfClient
async def main():
lsf = LsfClient(cache_dir="/dev/shm/lsf_cache", bjobs_ttl_sec=5.0, disk_cache_ttl_sec=5.0)
# 프로세스 A가 먼저 전체 스냅샷 캐시
all_map = await lsf.status()
# 프로세스 B는 디스크 캐시만 읽어 특정 ID들 필터링
subset = await lsf.status([101, 102, 103])
print(len(all_map), subset.get(101))
asyncio.run(main())
C) “그냥 실행만” (비LSF) + 에러 미전파
import asyncio
from lsf_async_v3 import LsfClient
async def main():
lsf = LsfClient()
r1 = await lsf.run_command_maybe_bsub("sleep 1", raise_on_error=False)
print("sleep rc:", r1["returncode"], "jobid:", r1["job_id"]) # job_id=None
r2 = await lsf.run_command_maybe_bsub("bsub -q short 'echo hi'", raise_on_error=False)
print("bsub rc:", r2["returncode"], "jobid:", r2["job_id"]) # job_id=정수 또는 None
asyncio.run(main())
락 고아(stale lock) 우려에 대한 답
-
이 버전은 파일락을 사용하지 않습니다. 디스크 캐시는 임시파일 →
os.replace()로 교체되며, 이는 동일 파일시스템 내에서 원자적 입니다. -
프로세스가 쓰기 도중 죽더라도 임시파일만 남고(정리 루틴 있음), 본 캐시 파일은 항상 완전한 JSON 상태로 유지됩니다.
-
따라서 “한 프로세스가 락을 잡은 채 죽어서 영원히 풀리지 않는” 상황은 구조적으로 불가능 합니다. (NFS 등에서도 rename 원자성은 보장; 단, 서로 다른 마운트 간 경로만 피하세요.)
마지막 체크리스트
-
sleep 1같은 비LSF 커맨드: 그냥 실행 , 잡ID 없으면 에러 없이 넘어감. -
bsub ...커맨드: 그대로 실행 + 가능한 경우에만 JobID 파싱. -
취소 시: bsub는 끝까지 기다려 JobID 수집 → bkill / 비-bsub는 원한다면 즉시 terminate/kill 확장 가능.
-
bjobs호출 최소화: 프로세스 간 공유 디스크 캐시 + TTL.
원하시면 비-bsub 프로세스도 인터럽트 즉시 terminate/kill 하도록(핸들 추적형) 리팩터 버전까지 바로 드릴게요.
1. 여러 머신에서 공유 드라이브를 쓰는데 이 경우에도 원자적인지 궁금함. 2. bsub로 시작하냐로 판단하는 부분이 있어 보이는데, 스크립트(e.g. job_sub)와 같은 경우도 있어 불완전함. 그냥 Job submit message로 판단하는 것이 좋을 것 같음.
좋은 피드백! 두 가지 모두 반영해서 정리 + 코드 업데이트 드릴게요.
1) “여러 머신 + 공유 드라이브”에서 원자성?
-
os.replace()는 같은 파일시스템(같은 마운트/디렉터리 내) 에서 원자적(rename) 입니다.- NFSv3/v4, Lustre, GPFS(IBM Spectrum Scale), SMB/CIFS 등 일반적인 POSIX 계열 분산 FS 에서는 “서버 관점”에서 원자적입니다(마지막 rename 승리).
-
주의할 점은 캐시 일관성 :
-
NFS/SMB는 속성/이름 캐시 지연 이 있을 수 있어, 다른 머신 이 새로운 파일 내용을 “조~금 늦게” 보기도 합니다(보통 1~3초).
-
이건 원자성이 깨진 게 아니라 전파 지연 문제이며, TTL을 너무 공격적으로(≤1초) 잡으면 갱신을 놓칠 수 있습니다.
-
-
대응책 (코드에 반영):
-
쓰기 측 : 임시 파일에
json쓰고fsync(tmp)→os.replace()→ 디렉터리fsync(dir)까지 수행 → 메타데이터 영속 보장 강화. -
읽기 측 : 캐시 신선도 판단 시
-
파일 내
ts(작성자가 기록한 시각)과 -
파일의 서버 mtime(스냅샷 파일 자체의 변경 시각)
둘 다 확인하고, 둘 중 더 신뢰되는 쪽으로 판단(+약간의 시계 오차 허용치) → 머신 간 시계 오차/캐시 지연 에 더 강인.
-
-
운영 팁 : TTL을 2~5초 이상 으로 두고, 필요 시 “강제 갱신” 경로(예:
force_refresh=True)를 추가하는 게 실전에서 가장 안정적입니다.
-
요약: 같은 공유 마운트(동일 FS) 내에서 쓰고 읽으면 rename은 원자적 이며, 코드에서
fsync·mtime 기반 판정으로 멀티머신 안전성 을 보강했습니다.
2) “bsub로 시작” 판별 제거 → 제출 메시지 만으로 판단
-
스크립트(
job_sub)나 래퍼가bsub를 내부에서 호출하는 경우에도, LSF가 표준 출력/에러로 내보내는
Job<12345>패턴만 보면 잡 ID를 안정적으로 파싱 할 수 있습니다. -
따라서 “명령어 첫 토큰이
bsub인지” 체크를 완전히 제거 하고, 출력 파싱만 으로 “잡인지/아닌지” 구분하도록 바꿨습니다.-
잡 메시지가 없으면
job_id=None으로 처리(에러 X). -
“취소” 시에도 중간에 프로세스를 kill하지 않고 통신 종료까지 기다려 메시지를 수집 → 고아 잡 방지.
-
업데이트 코드 (핵심 변경만)
아래는 v4의 교체/추가된 메서드 입니다. (나머지 구조는 이전 버전 그대로 사용해도 됩니다.)
# --- NEW/CHANGED: 디스크 캐시 읽기: ts와 mtime을 모두 활용 ---
def _disk_cache_read(self) -> Optional[Dict[int, Dict[str, Any]]]:
p = self._bjobs_cache_path
if not os.path.exists(p):
return None
try:
st = os.stat(p) # 서버 관점 mtime
with open(p, "r") as fp:
payload = json.load(fp)
ts_in_file = float(payload.get("ts", 0.0))
data = {int(k): v for k, v in payload.get("data", {}).items()}
now = time.time()
age_by_file_ts = now - ts_in_file if ts_in_file > 0 else float("inf")
age_by_mtime = max(0.0, now - st.st_mtime)
# 시계 오차 허용치(멀티머신): ±2초
skew_eps = 2.0
fresh_by_ts = age_by_file_ts <= (self._disk_ttl + skew_eps)
fresh_by_mtime = age_by_mtime <= (self._disk_ttl + skew_eps)
if fresh_by_ts or fresh_by_mtime:
return data
return None
except Exception:
return None
# --- NEW/CHANGED: 디스크 캐시 쓰기: tmp fsync -> replace -> dir fsync ---
def _disk_cache_write(self, data: Dict[int, Dict[str, Any]]) -> None:
os.makedirs(self._cache_dir, exist_ok=True)
dir_fd = os.open(self._cache_dir, os.O_RDONLY)
try:
fd, tmp_path = tempfile.mkstemp(prefix="bjobs_", suffix=".json", dir=self._cache_dir)
try:
with os.fdopen(fd, "w") as tmpfp:
json.dump({"ts": time.time(), "data": data}, tmpfp)
tmpfp.flush()
os.fsync(tmpfp.fileno())
os.replace(tmp_path, self._bjobs_cache_path) # atomic on same FS
# 메타데이터 영속성 강화
os.fsync(dir_fd)
finally:
with contextlib.suppress(FileNotFoundError):
os.remove(tmp_path)
finally:
os.close(dir_fd)
# --- NEW/CHANGED: 커맨드를 "있는 그대로" 실행하되, 출력에서만 JobID 판별 ---
async def run_command_capture_job(self,
command: Sequence[str] | str,
*,
timeout: Optional[float] = None,
raise_on_error: bool = False) -> Dict[str, Any]:
"""
커맨드를 그대로 실행. stdout/stderr에서 'Job <id>' 패턴을 찾아 잡 ID를 반환.
잡 메시지가 없으면 job_id=None (에러 발생 안 함 unless raise_on_error=True).
"""
# 스폰 시에만 세마포어 보유 -> 병렬 스폰 제한
async with self._cli_sem:
if isinstance(command, str):
proc = await asyncio.create_subprocess_exec(
"/bin/sh","-lc", command,
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=self.env
)
else:
proc = await asyncio.create_subprocess_exec(
*map(str, command),
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=self.env
)
try:
out_b, err_b = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
with contextlib.suppress(ProcessLookupError):
proc.kill()
raise
stdout = out_b.decode("utf-8","replace")
stderr = err_b.decode("utf-8","replace")
rc = proc.returncode
# 출력에서만 판단
m = _JOBID_RE.search(stdout) or _JOBID_RE.search(stderr)
job_id = int(m.group(1)) if m else None
if raise_on_error and rc != 0:
raise LSFCommandError([command] if isinstance(command, str) else command, rc, stdout, stderr)
return {"stdout": stdout, "stderr": stderr, "returncode": rc, "job_id": job_id}
# --- NEW/CHANGED: 일괄 제출 + 취소 (출력으로 JobID만 수집; 중간 kill 없음) ---
async def submit_many_with_cancel(
self,
commands: Sequence[Sequence[str] | str],
*,
is_interrupted: Callable[[], bool],
poll_interval_sec: float = 0.2,
max_in_flight: Optional[int] = None,
kill_force: bool = True,
kill_raise_on_error: bool = False,
cmd_timeout: Optional[float] = 60.0,
cmd_raise_on_error: bool = False,
) -> Dict[str, Any]:
"""
명령을 그대로 실행. 결과(stdout/stderr)에서 'Job <id>'만 추출.
인터럽트 발생 시: 새 실행 중단 + 진행 중인 프로세스들은 끝까지 기다림(고아 잡 방지).
이후 수집된 job_id 전체 bkill.
"""
submitted: List[int] = []
failures = 0
max_in_flight = max_in_flight or self._cli_sem._value
async def run_one(cmd):
res = await self.run_command_capture_job(cmd, timeout=cmd_timeout, raise_on_error=cmd_raise_on_error)
return res
idx = 0
in_flight: set[asyncio.Task] = set()
try:
while idx < len(commands) or in_flight:
while idx < len(commands) and len(in_flight) < max_in_flight and not is_interrupted():
t = asyncio.create_task(run_one(commands[idx]))
in_flight.add(t)
idx += 1
if in_flight:
done, _ = await asyncio.wait(in_flight, timeout=poll_interval_sec,
return_when=asyncio.FIRST_COMPLETED)
for t in done:
in_flight.discard(t)
try:
r = await t
if r.get("job_id") is not None:
submitted.append(r["job_id"])
elif r.get("returncode", 1) != 0:
failures += 1
except Exception:
failures += 1
else:
await asyncio.sleep(poll_interval_sec)
finally:
# 남은 것 모두 종료까지 기다려 job_id 수집
if in_flight:
done, _ = await asyncio.wait(in_flight, return_when=asyncio.ALL_COMPLETED)
for t in done:
try:
r = await t
if r.get("job_id") is not None:
submitted.append(r["job_id"])
elif r.get("returncode", 1) != 0:
failures += 1
except Exception:
failures += 1
interrupted = is_interrupted()
if interrupted and submitted:
await self.kill(submitted, force=kill_force, raise_on_error=kill_raise_on_error)
return {
"submitted_job_ids": submitted,
"interrupted": interrupted,
"num_submitted": len(submitted),
"num_failed": failures,
}
간단 사용 예시
# 혼합 커맨드: 래퍼 + 순수 셸 + 진짜 bsub
cmds = [
"job_sub my_script.sh --epochs 5", # 내부에서 bsub 호출 -> 출력에 Job <id>만 찍어주면 OK
"sleep 1", # 잡 아님: job_id=None
"bsub -q short 'echo hi; sleep 2'", # 잡: job_id=정수
]
res = await lsf.submit_many_with_cancel(
cmds,
is_interrupted=lambda: False, # 언제든 True 반환되면 새 실행 중단, 진행 중은 끝까지 기다림
max_in_flight=4,
cmd_timeout=120,
)
print(res) # {"submitted_job_ids":[...], ...}
정리 (핵심 포인트)
-
원자성 : 같은 공유 FS 내
os.replace()는 원자적. 캐시 지연은 있을 수 있으므로 TTL 2~5s + dir fsync + mtime/ts 이중판정 으로 보강. -
잡 판별 : “
Job <id>메시지”만으로 판단 →bsub래퍼/스크립트도 자연스럽게 지원. -
취소 시 : 진행 중 제출은 끝까지 기다린 뒤 잡ID 모아 일괄
bkill→ 고아 잡 방지.
원하시면,
-
강제 갱신 플래그(
force_refresh=True)나 -
스트리밍 파서(출력 줄 단위 실시간 파싱)로 “잡 ID를 즉시 감지”하는 버전도 추가해 드릴게요.
1. create_subprocess_shell 로 실행하면 좋겠음. (/bin/sh도 제거할 것) 2. 현재까지 정리된 내용 전체 코드 작성해자.
좋아요! 요청하신 대로 create_subprocess_shell만 사용하고(직접 /bin/sh 지정 X), 지금까지 합의된 요구사항을 모두 반영한 단일 파일 전체 코드(v5) 를 드립니다.
-
커맨드는 그대로 실행 합니다. (예:
sleep 1,bsub -q short sleep 1,job_sub ...) -
잡 판별은 출력의
Job <id>메시지만으로 합니다. -
bjobs는 **전체 스냅샷(-u[-a])**을 1회 가져와 **메모리+디스크 TTL 캐시**(멀티 프로세스 공유) 후, 호출자에게는 `job_ids`로 필터링 결과만 돌려줍니다. -
디스크 캐시는 tmp →
os.replace(원자적) → 디렉터리fsync로 기록하고, 파일 내 ts와 mtime 둘 다 로 신선도를 판정합니다. (NFS/SMB/GPFS 등 공유 FS에서도 안전) -
취소 시엔 새 실행을 멈추고 진행 중 제출은 끝까지 기다려 Job ID 수집 → 일괄
bkill(고아 잡 방지) -
LSF가 아닌 커맨드도 그냥 실행하고 에러는 옵션으로만 raise 합니다.
# lsf_async_v5.py
from __future__ import annotations
import asyncio
import contextlib
import getpass
import json
import os
import re
import tempfile
import time
from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Tuple, Union
# ------------------------------
# Utilities
# ------------------------------
StrOrSeq = Union[str, Sequence[str]]
_JOBID_RE = re.compile(r"Job\s*<(\d+)>", re.IGNORECASE)
def _to_shell_str(cmd: StrOrSeq) -> str:
"""Accept str or seq; if seq, join by space (caller is responsible for quoting if needed)."""
if isinstance(cmd, str):
return cmd
return " ".join(map(str, cmd))
class LSFCommandError(RuntimeError):
def __init__(self, cmd_str: str, returncode: int, stdout: str, stderr: str):
super().__init__(f"Command failed: {cmd_str} (rc={returncode})\n{stderr.strip()}")
self.cmd_str = cmd_str
self.returncode = returncode
self.stdout = stdout
self.stderr = stderr
# ------------------------------
# Client
# ------------------------------
class LsfClient:
"""
- 커맨드를 '있는 그대로' shell로 실행(create_subprocess_shell).
- 출력에서 'Job <id>' 메시지를 찾아 LSF 잡 여부/ID 판단.
- bjobs: 전체 스냅샷을 메모리+디스크 TTL 캐시(멀티 프로세스 공유) 후 필터링 반환.
- 일괄 제출 + 취소: 인터럽트 시 새 제출 중단, 진행 중 제출은 완주 → 수집한 job_id를 일괄 bkill.
"""
def __init__(
self,
*,
bkill: str = "bkill",
bjobs: str = "bjobs",
max_concurrent_cli: int = 8,
bjobs_ttl_sec: float = 3.0,
bjobs_min_interval_sec: float = 1.0,
disk_cache_ttl_sec: float = 3.0,
cache_dir: Optional[str] = None,
env: Optional[Dict[str, str]] = None,
):
# Paths
self.paths = {"bkill": bkill, "bjobs": bjobs}
# Concurrency & rate-limit
self._cli_sem = asyncio.Semaphore(max_concurrent_cli)
self._bjobs_min_interval = float(bjobs_min_interval_sec)
self._bjobs_last_call_ts: float = 0.0
# Memory cache (per-process)
self._bjobs_ttl = float(bjobs_ttl_sec)
self._mem_cache: Dict[str, Tuple[float, Dict[int, Dict[str, Any]]]] = {}
self._inflight: Dict[str, asyncio.Future] = {}
# Disk cache (shared across processes; atomic replace; robust on shared FS)
self._disk_ttl = float(disk_cache_ttl_sec)
self._user = os.environ.get("LSF_USER") or getpass.getuser()
base_cache = os.environ.get("XDG_CACHE_HOME") or os.path.join(os.path.expanduser("~"), ".cache")
self._cache_dir = cache_dir or os.path.join(base_cache, "lsf_async")
os.makedirs(self._cache_dir, exist_ok=True)
self._bjobs_cache_path = os.path.join(self._cache_dir, f"bjobs_all_{self._user}.json")
# Locale-stable env (avoid localized outputs)
base_env = dict(os.environ)
base_env.setdefault("LC_ALL", "C")
base_env.setdefault("LANG", "C")
self.env = base_env if env is None else (base_env | env)
# ------------- low-level run -------------
async def _run_shell(
self,
cmd_str: str,
*,
timeout: Optional[float] = None,
) -> Tuple[str, str, int]:
"""
Run exactly as a shell command via asyncio.create_subprocess_shell.
NOTE: We don't explicitly specify /bin/sh; the event loop chooses the platform default.
"""
async with self._cli_sem:
proc = await asyncio.create_subprocess_shell(
cmd_str,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=self.env,
)
try:
out_b, err_b = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
with contextlib.suppress(ProcessLookupError):
proc.kill()
raise
return out_b.decode("utf-8", "replace"), err_b.decode("utf-8", "replace"), proc.returncode
# ------------- general command (LSF or not) -------------
async def run_command_capture_job(
self,
command: StrOrSeq,
*,
timeout: Optional[float] = None,
raise_on_error: bool = False,
) -> Dict[str, Any]:
"""
커맨드를 '그대로' 실행. stdout/stderr에서 'Job <id>' 패턴을 찾아 잡 ID를 반환.
- 잡 메시지가 없으면 job_id=None (에러 발생 X, 단 raise_on_error=True면 예외).
"""
cmd_str = _to_shell_str(command)
stdout, stderr, rc = await self._run_shell(cmd_str, timeout=timeout)
m = _JOBID_RE.search(stdout) or _JOBID_RE.search(stderr)
job_id = int(m.group(1)) if m else None
if raise_on_error and rc != 0:
raise LSFCommandError(cmd_str, rc, stdout, stderr)
return {"stdout": stdout, "stderr": stderr, "returncode": rc, "job_id": job_id}
# ------------- kill -------------
async def kill(
self,
job_ids: Iterable[int] | int,
*,
force: bool = False,
timeout: Optional[float] = 60.0,
raise_on_error: bool = True,
) -> Tuple[str, str, int]:
ids = [str(int(job_ids))] if isinstance(job_ids, int) else [str(int(i)) for i in job_ids]
if not ids:
return ("", "", 0)
flag = "-f " if force else ""
cmd = f"{self.paths['bkill']} {flag}{' '.join(ids)}"
stdout, stderr, rc = await self._run_shell(cmd, timeout=timeout)
if rc != 0 and raise_on_error:
raise LSFCommandError(cmd, rc, stdout, stderr)
return stdout, stderr, rc
# ------------- bjobs snapshot cache -------------
def _mem_fresh(self, key: str) -> Optional[Dict[int, Dict[str, Any]]]:
item = self._mem_cache.get(key)
if not item:
return None
ts, data = item
if time.time() - ts <= self._bjobs_ttl:
return data
return None
def _disk_cache_read(self) -> Optional[Dict[int, Dict[str, Any]]]:
p = self._bjobs_cache_path
if not os.path.exists(p):
return None
try:
st = os.stat(p) # server-side mtime (on shared FS)
with open(p, "r") as fp:
payload = json.load(fp)
ts_in_file = float(payload.get("ts", 0.0))
data = {int(k): v for k, v in payload.get("data", {}).items()}
now = time.time()
age_by_ts = (now - ts_in_file) if ts_in_file > 0 else float("inf")
age_by_mtime = max(0.0, now - st.st_mtime)
skew_eps = 2.0 # tolerate small clock skew & metadata latency
fresh_by_ts = age_by_ts <= (self._disk_ttl + skew_eps)
fresh_by_mtime = age_by_mtime <= (self._disk_ttl + skew_eps)
if fresh_by_ts or fresh_by_mtime:
return data
return None
except Exception:
return None
def _disk_cache_write(self, data: Dict[int, Dict[str, Any]]) -> None:
# tmp -> fsync(tmp) -> replace -> fsync(dir) (robust on shared FS)
os.makedirs(self._cache_dir, exist_ok=True)
dir_fd = os.open(self._cache_dir, os.O_RDONLY)
try:
fd, tmp_path = tempfile.mkstemp(prefix="bjobs_", suffix=".json", dir=self._cache_dir)
try:
with os.fdopen(fd, "w") as tmpfp:
json.dump({"ts": time.time(), "data": data}, tmpfp)
tmpfp.flush()
os.fsync(tmpfp.fileno())
os.replace(tmp_path, self._bjobs_cache_path)
os.fsync(dir_fd)
finally:
with contextlib.suppress(FileNotFoundError):
os.remove(tmp_path)
finally:
os.close(dir_fd)
async def _fetch_bjobs_all(self, include_done: bool, timeout: Optional[float]) -> Dict[int, Dict[str, Any]]:
# simple per-process rate limit
since = time.time() - self._bjobs_last_call_ts
if since < self._bjobs_min_interval:
await asyncio.sleep(self._bjobs_min_interval - since)
flag = "-a " if include_done else ""
cmd = f"{self.paths['bjobs']} -noheader -u {self._user} {flag}-o 'jobid stat exit_code queue exec_host'"
stdout, stderr, rc = await self._run_shell(cmd, timeout=timeout)
self._bjobs_last_call_ts = time.time()
if rc != 0:
raise LSFCommandError(cmd, rc, stdout, stderr)
result: Dict[int, Dict[str, Any]] = {}
for line in stdout.splitlines():
parts = line.strip().split()
if len(parts) < 5:
continue
jid, stat, exit_code, queue, exec_host = parts[:5]
with contextlib.suppress(ValueError):
jid_i = int(jid)
result[jid_i] = {
"stat": stat,
"exit_code": None if exit_code == "-" else exit_code,
"queue": queue,
"exec_host": exec_host,
}
return result
async def _status_all_cached(
self,
*,
include_done: bool = True,
timeout: Optional[float] = 60.0,
force_refresh: bool = False,
) -> Dict[int, Dict[str, Any]]:
key = f"bjobs_all|{'-a' if include_done else ''}|user={self._user}"
if not force_refresh:
mem = self._mem_fresh(key)
if mem is not None:
return mem
inflight = self._inflight.get(key)
if inflight:
return await inflight
fut: asyncio.Future = asyncio.get_running_loop().create_future()
self._inflight[key] = fut
try:
if not force_refresh:
disk = self._disk_cache_read()
if disk is not None:
self._mem_cache[key] = (time.time(), disk)
fut.set_result(disk)
return disk
data = await self._fetch_bjobs_all(include_done=include_done, timeout=timeout)
self._mem_cache[key] = (time.time(), data)
self._disk_cache_write(data)
fut.set_result(data)
return data
except Exception as e:
fut.set_exception(e)
raise
finally:
self._inflight.pop(key, None)
async def status(
self,
job_ids: Optional[Iterable[int]] = None,
*,
include_done: bool = True,
timeout: Optional[float] = 60.0,
force_refresh: bool = False,
) -> Dict[int, Dict[str, Any]]:
"""
bjobs 전체 스냅샷(캐시 활용) 후, 요청한 job_ids만 필터링해서 반환.
job_ids=None이면 전체 맵을 돌려줌.
"""
all_map = await self._status_all_cached(
include_done=include_done, timeout=timeout, force_refresh=force_refresh
)
if job_ids is None:
return all_map
wanted = {int(i) for i in job_ids}
return {i: all_map.get(i, {}) for i in wanted}
async def status_union_batched(
self,
list_of_job_id_sets: Sequence[Iterable[int]],
*,
include_done: bool = True,
timeout: Optional[float] = 60.0,
force_refresh: bool = False,
) -> Sequence[Dict[int, Dict[str, Any]]]:
"""
여러 요청을 합집합으로 한 번 조회(실제론 스냅샷 캐시 사용) 후 분배.
"""
union_ids: set[int] = set()
normalized: List[List[int]] = []
for ids in list_of_job_id_sets:
lst = [int(i) for i in ids]
normalized.append(lst)
union_ids.update(lst)
union_map = await self.status(union_ids, include_done=include_done, timeout=timeout, force_refresh=force_refresh)
out: List[Dict[int, Dict[str, Any]]] = []
for req in normalized:
out.append({i: union_map.get(i, {}) for i in req})
return out
# ------------- batch submit with cooperative cancel -------------
async def submit_many_with_cancel(
self,
commands: Sequence[StrOrSeq],
*,
is_interrupted: Callable[[], bool],
poll_interval_sec: float = 0.2,
max_in_flight: Optional[int] = None,
kill_force: bool = True,
kill_raise_on_error: bool = False,
cmd_timeout: Optional[float] = 60.0,
cmd_raise_on_error: bool = False,
) -> Dict[str, Any]:
"""
명령을 그대로 실행(create_subprocess_shell). 출력에서 'Job <id>'만 추출.
인터럽트 시: 새 실행 중단 + 진행 중 제출은 끝까지 기다림(고아 잡 방지) → 수집된 job_id 일괄 bkill.
"""
submitted: List[int] = []
failures = 0
max_in_flight = max_in_flight or max(1, self._cli_sem._value)
async def run_one(cmd: StrOrSeq) -> Dict[str, Any]:
return await self.run_command_capture_job(
cmd, timeout=cmd_timeout, raise_on_error=cmd_raise_on_error
)
idx = 0
n = len(commands)
in_flight: set[asyncio.Task] = set()
try:
while idx < n or in_flight:
# launch window
while idx < n and len(in_flight) < max_in_flight and not is_interrupted():
t = asyncio.create_task(run_one(commands[idx]))
in_flight.add(t)
idx += 1
if in_flight:
done, _ = await asyncio.wait(
in_flight, timeout=poll_interval_sec, return_when=asyncio.FIRST_COMPLETED
)
for t in done:
in_flight.discard(t)
try:
r = await t
jid = r.get("job_id")
if jid is not None:
submitted.append(jid)
elif r.get("returncode", 1) != 0:
failures += 1
except Exception:
failures += 1
else:
await asyncio.sleep(poll_interval_sec)
finally:
# drain: 진행 중 제출을 끝까지 기다려 Job ID 수집
if in_flight:
done, _ = await asyncio.wait(in_flight, return_when=asyncio.ALL_COMPLETED)
for t in done:
try:
r = await t
jid = r.get("job_id")
if jid is not None:
submitted.append(jid)
elif r.get("returncode", 1) != 0:
failures += 1
except Exception:
failures += 1
interrupted = is_interrupted()
if interrupted and submitted:
await self.kill(submitted, force=kill_force, raise_on_error=kill_raise_on_error)
return {
"submitted_job_ids": submitted,
"interrupted": interrupted,
"num_submitted": len(submitted),
"num_failed": failures,
}
빠른 사용 예시
import asyncio, time
from lsf_async_v5 import LsfClient
class Token:
def __init__(self, delay): self.t0, self.delay = time.time(), delay
def is_interrupted(self): return (time.time()-self.t0) >= self.delay
async def main():
lsf = LsfClient(
max_concurrent_cli=8,
bjobs_ttl_sec=5.0,
bjobs_min_interval_sec=1.5,
disk_cache_ttl_sec=5.0,
cache_dir="/shared/cache/lsf_async", # 여러 머신에서 공유되는 경로
)
# 혼합 커맨드: 래퍼/순수/직접 bsub
cmds = [
"job_sub my_script.sh --epochs 3", # 내부에서 bsub 호출, 출력에 Job <id>가 찍히면 잡으로 인식
"sleep 1", # 비LSF: job_id=None
"bsub -q short 'echo hi; sleep 2'", # LSF: job_id 파싱
]
# 3초 후 취소
res = await lsf.submit_many_with_cancel(
cmds,
is_interrupted=Token(3).is_interrupted,
max_in_flight=4,
cmd_timeout=120,
cmd_raise_on_error=False,
kill_force=True,
)
print(res)
# bjobs 스냅샷 캐시 활용
all_map = await lsf.status(include_done=True)
some = await lsf.status([list(all_map.keys())[0]] if all_map else [])
print("N(all)=", len(all_map), "subset=", some)
asyncio.run(main())
구현 메모(요점 정리)
-
셸 실행 : 모든 커맨드는
asyncio.create_subprocess_shell로 실행합니다. 코드 어디에도 직접/bin/sh를 지정하지 않습니다. -
잡 판별 : 오직 출력의
Job <id>패턴으로만 잡 여부/ID를 판단합니다. (래퍼/스크립트 OK) -
공유 드라이브 안전성 :
-
쓰기: tmp→
os.replace(동일 FS 내 원자적)→디렉터리fsync로 메타데이터까지 보장. -
읽기:
ts(파일 내부)와mtime(서버 메타데이터) 둘 다 로 TTL 판정(+2s 스큐 허용). -
운영 팁: TTL은 ≥2~5초 권장(분산 FS 전파 지연 흡수).
-
-
고아 잡 방지 : 취소 시 진행 중 제출은 kill하지 않고 끝까지 기다린 뒤 수집한 job_id만 bkill.
-
에러 정책 : 일반 커맨드는
raise_on_error=False기본(예외 미발생). 필요 시 True로 올리면 예외 발생.
필요하시면 force_refresh=True로 bjobs를 강제 갱신하는 호출 경로를 더 노출하거나, 스트리밍 파서 로 Job <id>를 “실시간” 감지하는 버전도 추가해 드릴게요.
뭔가 한 클래스에 다 박은 느낌인데, 클래스 같은걸 나눠서 깔끔하게 짜줄 수 있어? 파이썬 현업 전문가가 작성한 것 처럼
좋아요—요구사항을 “현업스러운” 구조로 쪼개서 작성했습니다.
핵심은 역할별 클래스로 분리(셸 실행/레이트리밋/메모리·디스크 캐시/스냅샷 페처/상태 API/배치 제출·취소), 그리고 create_subprocess_shell만 사용합니다.
아래 단일 파일을 그대로 저장해 쓰세요. (예: lsf_pro.py)
# lsf_pro.py
from __future__ import annotations
import asyncio
import contextlib
import getpass
import json
import os
import re
import tempfile
import time
from dataclasses import dataclass
from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Tuple, Union
# ============================================================
# Exceptions & Utilities
# ============================================================
class LSFCommandError(RuntimeError):
"""Raised when an LSF-related command returns nonzero exit code (optional)."""
def __init__(self, cmd_str: str, returncode: int, stdout: str, stderr: str):
super().__init__(f"Command failed: {cmd_str} (rc={returncode})\n{stderr.strip()}")
self.cmd_str = cmd_str
self.returncode = returncode
self.stdout = stdout
self.stderr = stderr
StrOrSeq = Union[str, Sequence[str]]
_JOBID_RE = re.compile(r"Job\s*<(\d+)>", re.IGNORECASE)
def to_shell_str(cmd: StrOrSeq) -> str:
"""Accept str or sequence; if sequence, join by space. Caller is responsible for quoting."""
if isinstance(cmd, str):
return cmd
return " ".join(map(str, cmd))
def parse_job_id_from_output(stdout: str, stderr: str) -> Optional[int]:
"""Find 'Job <123>' in either stdout or stderr."""
m = _JOBID_RE.search(stdout) or _JOBID_RE.search(stderr)
return int(m.group(1)) if m else None
# ============================================================
# Config
# ============================================================
@dataclass(frozen=True)
class LSFConfig:
bkill: str = "bkill"
bjobs: str = "bjobs"
max_concurrent_cli: int = 8
bjobs_ttl_sec: float = 3.0
bjobs_min_interval_sec: float = 1.0
disk_cache_ttl_sec: float = 3.0
cache_dir: Optional[str] = None
user: Optional[str] = None # default: env/OS user
# ============================================================
# Layer 1: Shell runner & rate limiter
# ============================================================
class ShellRunner:
"""Thin async wrapper around asyncio.create_subprocess_shell with a concurrency gate."""
def __init__(self, max_concurrency: int, env: Optional[Dict[str, str]] = None):
self._sem = asyncio.Semaphore(max_concurrency)
base_env = dict(os.environ)
base_env.setdefault("LC_ALL", "C")
base_env.setdefault("LANG", "C")
self._env = base_env if env is None else (base_env | env)
async def run(self, cmd_str: str, *, timeout: Optional[float] = None) -> Tuple[str, str, int]:
async with self._sem:
proc = await asyncio.create_subprocess_shell(
cmd_str,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=self._env,
)
try:
out_b, err_b = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
with contextlib.suppress(ProcessLookupError):
proc.kill()
raise
return out_b.decode("utf-8", "replace"), err_b.decode("utf-8", "replace"), proc.returncode
class MinIntervalRateLimiter:
"""Simple per-process min-interval limiter."""
def __init__(self, min_interval_sec: float):
self._min = float(min_interval_sec)
self._last = 0.0
async def wait(self):
if self._min <= 0:
return
now = time.time()
since = now - self._last
if since < self._min:
await asyncio.sleep(self._min - since)
self._last = time.time()
# ============================================================
# Layer 2: Caches (memory + disk)
# ============================================================
class MemoryTTLCache:
"""Per-process TTL cache for a single key-space."""
def __init__(self, ttl_sec: float):
self._ttl = float(ttl_sec)
self._store: Dict[str, Tuple[float, Any]] = {}
def get(self, key: str) -> Optional[Any]:
item = self._store.get(key)
if not item:
return None
ts, data = item
if time.time() - ts <= self._ttl:
return data
return None
def set(self, key: str, data: Any) -> None:
self._store[key] = (time.time(), data)
class DiskJSONCache:
"""
Shared JSON cache: tmp -> fsync(tmp) -> os.replace() -> fsync(dir).
- Atomic replace within same filesystem.
- Robust on NFS/SMB/GPFS: readers may see slightly stale content (metadata propagation),
so freshness is judged via both JSON 'ts' and file mtime with skew tolerance.
"""
def __init__(self, path: str, ttl_sec: float, skew_eps: float = 2.0):
self._path = path
self._ttl = float(ttl_sec)
self._skew = float(skew_eps)
os.makedirs(os.path.dirname(path), exist_ok=True)
def read(self) -> Optional[Dict[str, Any]]:
p = self._path
if not os.path.exists(p):
return None
try:
st = os.stat(p)
with open(p, "r") as fp:
payload = json.load(fp)
ts_in_file = float(payload.get("ts", 0.0))
data = payload.get("data", {})
# Freshness by writer ts OR server mtime (tolerate skew)
now = time.time()
age_by_ts = (now - ts_in_file) if ts_in_file > 0 else float("inf")
age_by_mtime = max(0.0, now - st.st_mtime)
if age_by_ts <= (self._ttl + self._skew) or age_by_mtime <= (self._ttl + self._skew):
return data
return None
except Exception:
return None
def write(self, data: Dict[str, Any]) -> None:
dir_path = os.path.dirname(self._path)
dir_fd = os.open(dir_path, os.O_RDONLY)
try:
fd, tmp_path = tempfile.mkstemp(prefix="bjobs_", suffix=".json", dir=dir_path)
try:
with os.fdopen(fd, "w") as tmpfp:
json.dump({"ts": time.time(), "data": data}, tmpfp)
tmpfp.flush()
os.fsync(tmpfp.fileno())
os.replace(tmp_path, self._path)
os.fsync(dir_fd)
finally:
with contextlib.suppress(FileNotFoundError):
os.remove(tmp_path)
finally:
os.close(dir_fd)
# ============================================================
# Layer 3: LSF primitives (fetcher/kill/command)
# ============================================================
class BJobsFetcher:
"""Fetch a full snapshot for the current user."""
def __init__(self, shell: ShellRunner, user: str, bjobs_path: str, limiter: MinIntervalRateLimiter):
self._shell = shell
self._user = user
self._bjobs = bjobs_path
self._limiter = limiter
async def fetch(self, *, include_done: bool, timeout: Optional[float]) -> Dict[int, Dict[str, Any]]:
await self._limiter.wait()
flag = "-a " if include_done else ""
# Use a minimal, whitespace-safe field set
cmd = f"{self._bjobs} -noheader -u {self._user} {flag}-o \"jobid stat exit_code queue exec_host\""
stdout, stderr, rc = await self._shell.run(cmd, timeout=timeout)
if rc != 0:
raise LSFCommandError(cmd, rc, stdout, stderr)
result: Dict[int, Dict[str, Any]] = {}
for line in stdout.splitlines():
parts = line.strip().split()
if len(parts) < 5:
continue
jid, stat, exit_code, queue, exec_host = parts[:5]
try:
jid_i = int(jid)
except ValueError:
continue
result[jid_i] = {
"stat": stat,
"exit_code": None if exit_code == "-" else exit_code,
"queue": queue,
"exec_host": exec_host,
}
return result
class BKiller:
"""Kill one or many jobs with bkill."""
def __init__(self, shell: ShellRunner, bkill_path: str):
self._shell = shell
self._bkill = bkill_path
async def kill(
self,
job_ids: Iterable[int] | int,
*,
force: bool = False,
timeout: Optional[float] = 60.0,
raise_on_error: bool = True,
) -> Tuple[str, str, int]:
ids = [str(int(job_ids))] if isinstance(job_ids, int) else [str(int(i)) for i in job_ids]
if not ids:
return ("", "", 0)
flag = "-f " if force else ""
cmd = f"{self._bkill} {flag}{' '.join(ids)}"
stdout, stderr, rc = await self._shell.run(cmd, timeout=timeout)
if rc != 0 and raise_on_error:
raise LSFCommandError(cmd, rc, stdout, stderr)
return stdout, stderr, rc
class CommandRunner:
"""
Run arbitrary commands exactly as given and (only) parse 'Job <id>' from outputs.
- If no job message, returns job_id=None (no exception unless raise_on_error=True).
"""
def __init__(self, shell: ShellRunner):
self._shell = shell
async def run_capture_job(
self,
command: StrOrSeq,
*,
timeout: Optional[float] = None,
raise_on_error: bool = False,
) -> Dict[str, Any]:
cmd_str = to_shell_str(command)
stdout, stderr, rc = await self._shell.run(cmd_str, timeout=timeout)
job_id = parse_job_id_from_output(stdout, stderr)
if raise_on_error and rc != 0:
raise LSFCommandError(cmd_str, rc, stdout, stderr)
return {"stdout": stdout, "stderr": stderr, "returncode": rc, "job_id": job_id}
# ============================================================
# Layer 4: Snapshot cache service (mem+disk + singleflight)
# ============================================================
class JobsSnapshotCache:
"""
Provides a full bjobs snapshot with:
- per-process memory TTL cache
- multi-process shared disk JSON cache
- singleflight for concurrent callers in-process
"""
def __init__(
self,
fetcher: BJobsFetcher,
mem_cache: MemoryTTLCache,
disk_cache: DiskJSONCache,
):
self._fetcher = fetcher
self._mem = mem_cache
self._disk = disk_cache
self._inflight: Dict[str, asyncio.Future] = {}
async def get(
self,
*,
include_done: bool = True,
timeout: Optional[float] = 60.0,
force_refresh: bool = False,
) -> Dict[int, Dict[str, Any]]:
key = f"bjobs_all|{'-a' if include_done else ''}"
if not force_refresh:
memo = self._mem.get(key)
if memo is not None:
return memo
inflight = self._inflight.get(key)
if inflight:
return await inflight
fut: asyncio.Future = asyncio.get_running_loop().create_future()
self._inflight[key] = fut
try:
if not force_refresh:
disk = self._disk.read()
if disk is not None:
self._mem.set(key, {int(k): v for k, v in disk.items()})
fut.set_result({int(k): v for k, v in disk.items()})
return await fut
data = await self._fetcher.fetch(include_done=include_done, timeout=timeout)
self._mem.set(key, data)
self._disk.write({str(k): v for k, v in data.items()})
fut.set_result(data)
return await fut
except Exception as e:
fut.set_exception(e)
raise
finally:
self._inflight.pop(key, None)
# ============================================================
# Layer 5: Public services (Status / Batched submit-cancel)
# ============================================================
class StatusService:
"""User-facing status APIs built on top of the snapshot cache."""
def __init__(self, cache: JobsSnapshotCache):
self._cache = cache
async def status(
self,
job_ids: Optional[Iterable[int]] = None,
*,
include_done: bool = True,
timeout: Optional[float] = 60.0,
force_refresh: bool = False,
) -> Dict[int, Dict[str, Any]]:
all_map = await self._cache.get(include_done=include_done, timeout=timeout, force_refresh=force_refresh)
if job_ids is None:
return all_map
wanted = {int(i) for i in job_ids}
return {i: all_map.get(i, {}) for i in wanted}
async def status_union_batched(
self,
list_of_job_id_sets: Sequence[Iterable[int]],
*,
include_done: bool = True,
timeout: Optional[float] = 60.0,
force_refresh: bool = False,
) -> Sequence[Dict[int, Dict[str, Any]]]:
union_ids: set[int] = set()
normalized: List[List[int]] = []
for ids in list_of_job_id_sets:
lst = [int(i) for i in ids]
normalized.append(lst)
union_ids.update(lst)
union_map = await self.status(union_ids, include_done=include_done, timeout=timeout, force_refresh=force_refresh)
return [{i: union_map.get(i, {}) for i in req} for req in normalized]
class BatchSubmitter:
"""
Submits heterogeneous commands (some may submit LSF jobs via wrappers).
- Collects Job IDs by scanning outputs ('Job <id>').
- On interruption: stops launching new commands, waits for in-flight to finish
(prevents orphan jobs), then bkill all collected Job IDs.
"""
def __init__(self, runner: CommandRunner, killer: BKiller):
self._runner = runner
self._killer = killer
async def submit_many_with_cancel(
self,
commands: Sequence[StrOrSeq],
*,
is_interrupted: Callable[[], bool],
poll_interval_sec: float = 0.2,
max_in_flight: int = 8,
kill_force: bool = True,
kill_raise_on_error: bool = False,
cmd_timeout: Optional[float] = 60.0,
cmd_raise_on_error: bool = False,
) -> Dict[str, Any]:
submitted: List[int] = []
failures = 0
in_flight: set[asyncio.Task] = set()
async def run_one(cmd: StrOrSeq) -> Dict[str, Any]:
return await self._runner.run_capture_job(cmd, timeout=cmd_timeout, raise_on_error=cmd_raise_on_error)
idx = 0
total = len(commands)
try:
while idx < total or in_flight:
while idx < total and len(in_flight) < max_in_flight and not is_interrupted():
t = asyncio.create_task(run_one(commands[idx]))
in_flight.add(t)
idx += 1
if in_flight:
done, _ = await asyncio.wait(in_flight, timeout=poll_interval_sec, return_when=asyncio.FIRST_COMPLETED)
for t in done:
in_flight.discard(t)
try:
r = await t
jid = r.get("job_id")
if jid is not None:
submitted.append(jid)
elif r.get("returncode", 1) != 0:
failures += 1
except Exception:
failures += 1
else:
await asyncio.sleep(poll_interval_sec)
finally:
# Drain: ensure all running submissions complete and harvest Job IDs
if in_flight:
done, _ = await asyncio.wait(in_flight, return_when=asyncio.ALL_COMPLETED)
for t in done:
try:
r = await t
jid = r.get("job_id")
if jid is not None:
submitted.append(jid)
elif r.get("returncode", 1) != 0:
failures += 1
except Exception:
failures += 1
interrupted = is_interrupted()
if interrupted and submitted:
await self._killer.kill(submitted, force=kill_force, raise_on_error=kill_raise_on_error)
return {
"submitted_job_ids": submitted,
"interrupted": interrupted,
"num_submitted": len(submitted),
"num_failed": failures,
}
# ============================================================
# Layer 6: Facade (compose everything)
# ============================================================
class LSFClient:
"""
High-level facade that exposes:
- run_command_capture_job
- kill
- status / status_union_batched
- submit_many_with_cancel
"""
def __init__(self, cfg: Optional[LSFConfig] = None, env: Optional[Dict[str, str]] = None):
cfg = cfg or LSFConfig()
user = cfg.user or os.environ.get("LSF_USER") or getpass.getuser()
cache_dir = cfg.cache_dir or (os.environ.get("XDG_CACHE_HOME") or os.path.join(os.path.expanduser("~"), ".cache"))
cache_path = os.path.join(cache_dir, "lsf_async", f"bjobs_all_{user}.json")
# Core components
self._shell = ShellRunner(cfg.max_concurrent_cli, env=env)
self._limiter = MinIntervalRateLimiter(cfg.bjobs_min_interval_sec)
self._fetcher = BJobsFetcher(self._shell, user=user, bjobs_path=cfg.bjobs, limiter=self._limiter)
self._mem_cache = MemoryTTLCache(cfg.bjobs_ttl_sec)
self._disk_cache = DiskJSONCache(cache_path, ttl_sec=cfg.disk_cache_ttl_sec, skew_eps=2.0)
self._snap_cache = JobsSnapshotCache(self._fetcher, self._mem_cache, self._disk_cache)
# Services
self._status = StatusService(self._snap_cache)
self._killer = BKiller(self._shell, cfg.bkill)
self._runner = CommandRunner(self._shell)
self._submitter = BatchSubmitter(self._runner, self._killer)
# ----- Facade methods -----
async def run_command_capture_job(self, command: StrOrSeq, *, timeout: Optional[float] = None, raise_on_error: bool = False) -> Dict[str, Any]:
return await self._runner.run_capture_job(command, timeout=timeout, raise_on_error=raise_on_error)
async def kill(self, job_ids: Iterable[int] | int, *, force: bool = False, timeout: Optional[float] = 60.0, raise_on_error: bool = True) -> Tuple[str, str, int]:
return await self._killer.kill(job_ids, force=force, timeout=timeout, raise_on_error=raise_on_error)
async def status(self, job_ids: Optional[Iterable[int]] = None, *, include_done: bool = True, timeout: Optional[float] = 60.0, force_refresh: bool = False) -> Dict[int, Dict[str, Any]]:
return await self._status.status(job_ids, include_done=include_done, timeout=timeout, force_refresh=force_refresh)
async def status_union_batched(self, list_of_job_id_sets: Sequence[Iterable[int]], *, include_done: bool = True, timeout: Optional[float] = 60.0, force_refresh: bool = False) -> Sequence[Dict[int, Dict[str, Any]]]:
return await self._status.status_union_batched(list_of_job_id_sets, include_done=include_done, timeout=timeout, force_refresh=force_refresh)
async def submit_many_with_cancel(
self,
commands: Sequence[StrOrSeq],
*,
is_interrupted: Callable[[], bool],
poll_interval_sec: float = 0.2,
max_in_flight: int = 8,
kill_force: bool = True,
kill_raise_on_error: bool = False,
cmd_timeout: Optional[float] = 60.0,
cmd_raise_on_error: bool = False,
) -> Dict[str, Any]:
return await self._submitter.submit_many_with_cancel(
commands,
is_interrupted=is_interrupted,
poll_interval_sec=poll_interval_sec,
max_in_flight=max_in_flight,
kill_force=kill_force,
kill_raise_on_error=kill_raise_on_error,
cmd_timeout=cmd_timeout,
cmd_raise_on_error=cmd_raise_on_error,
)
간단 사용 예시(필요한 부분만 복붙):
# example.py
import asyncio, time
from lsf_pro import LSFClient, LSFConfig
class Token:
def __init__(self, delay): self.t0, self.delay = time.time(), delay
def is_interrupted(self): return (time.time() - self.t0) >= self.delay
async def main():
client = LSFClient(LSFConfig(
bkill="bkill",
bjobs="bjobs",
max_concurrent_cli=8,
bjobs_ttl_sec=5.0,
bjobs_min_interval_sec=1.5,
disk_cache_ttl_sec=5.0,
cache_dir="/shared/cache", # 여러 머신이 보는 공유 경로
))
# 1) 커맨드를 그대로 실행 (잡 메시지 있으면 job_id 파싱)
r = await client.run_command_capture_job("bsub -q short 'echo hi; sleep 1'", raise_on_error=False)
print("submit:", r)
# 2) 상태 조회 (전체 스냅샷 캐시 활용)
all_map = await client.status(include_done=True)
print("num jobs:", len(all_map))
# 3) 배치 제출 + 취소(3초 후)
cmds = [
"job_sub my_script.sh --epochs 3",
"sleep 1",
"bsub -q short 'echo X; sleep 2'",
]
res = await client.submit_many_with_cancel(
cmds,
is_interrupted=Token(3).is_interrupted,
max_in_flight=4,
cmd_timeout=120,
kill_force=True,
)
print("batch:", res)
asyncio.run(main())
설계 요약 (step-by-step)
-
ShellRunner :
create_subprocess_shell만 사용, 동시 실행 제한. -
MinIntervalRateLimiter :
bjobs과호출 방지. -
MemoryTTLCache / DiskJSONCache : 프로세스 내/간 2중 캐시. 디스크는 tmp→
os.replace→fsync(dir)로 견고. -
BJobsFetcher :
-u <user>전체 스냅샷 + 최소 필드 파싱. -
JobsSnapshotCache : 단일플라이트(in-process) + 메모리/디스크 캐시 오케스트레이션.
-
StatusService : 전체 스냅샷을 요청 ID로 필터.
-
CommandRunner : 어떤 커맨드든 그대로 실행, 출력에서만
Job <id>파싱. -
BKiller : 다수 ID를 한 번에
bkill. -
BatchSubmitter : 인터럽트 시 새 제출 중단, 진행 중은 기다려 Job ID 수집 후 일괄
bkill(고아 방지). -
LSFClient : 위 구성요소를 합친 퍼사드.
추가 원하시면 로깅 훅/메트릭스(호출 수, 평균 지연)·NFS 환경에서의 강제 리프레시 플래그·대규모 잡 스냅샷 샤딩까지 이어서 확장해 드릴게요.
좀 과하게 나눈 것 아냐? 진짜 필요한 수준으로만 쪼개줘.
좋아, “진짜 필요한 것만” 남겨서 딱 2개 클래스로 정리했어:
-
DiskJSONCache : 멀티프로세스 공유 디스크 캐시(원자적 replace + fsync + mtime/ts 이중 판정)
-
LSFClient : 나머지 전부(커맨드 실행, Job ID 파싱, bkill, bjobs 스냅샷 메모리/디스크 캐시, 레이트리밋, 일괄 제출+취소)
필수 기능만 담았고, create_subprocess_shell만 사용해. 잡 판별은 오직 출력의 Job <id>로 해.
# lsf_minimal.py
from __future__ import annotations
import asyncio
import contextlib
import getpass
import json
import os
import re
import tempfile
import time
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, Union
StrOrSeq = Union[str, Sequence[str]]
_JOBID_RE = re.compile(r"Job\s*<(\d+)>", re.IGNORECASE)
def _to_shell_str(cmd: StrOrSeq) -> str:
"""str 또는 시퀀스를 받아 셸 문자열로. (시퀀스면 공백 join; 인용은 호출자 책임)"""
return cmd if isinstance(cmd, str) else " ".join(map(str, cmd))
class LSFCommandError(RuntimeError):
def __init__(self, cmd_str: str, returncode: int, stdout: str, stderr: str):
super().__init__(f"Command failed: {cmd_str} (rc={returncode})\n{stderr.strip()}")
self.cmd_str, self.returncode, self.stdout, self.stderr = cmd_str, returncode, stdout, stderr
# ---------- 멀티프로세스 공유 디스크 캐시 ----------
class DiskJSONCache:
"""
tmp -> fsync(tmp) -> os.replace() -> fsync(dir)
- 같은 파일시스템 내 rename은 원자적.
- 신선도 판단: 파일 내부 ts와 파일 mtime 모두 확인(+2s skew 허용).
"""
def __init__(self, path: str, ttl_sec: float, skew_eps: float = 2.0):
self._path = path
self._ttl = float(ttl_sec)
self._skew = float(skew_eps)
os.makedirs(os.path.dirname(path), exist_ok=True)
def read(self) -> Optional[Dict[str, Any]]:
p = self._path
if not os.path.exists(p):
return None
try:
st = os.stat(p)
with open(p, "r") as fp:
payload = json.load(fp)
ts = float(payload.get("ts", 0.0))
data = payload.get("data", {})
now = time.time()
age_by_ts = (now - ts) if ts > 0 else float("inf")
age_by_mtime = max(0.0, now - st.st_mtime)
fresh = (age_by_ts <= self._ttl + self._skew) or (age_by_mtime <= self._ttl + self._skew)
return data if fresh else None
except Exception:
return None
def write(self, data: Dict[str, Any]) -> None:
d = os.path.dirname(self._path)
dir_fd = os.open(d, os.O_RDONLY)
try:
fd, tmp = tempfile.mkstemp(prefix="bjobs_", suffix=".json", dir=d)
try:
with os.fdopen(fd, "w") as f:
json.dump({"ts": time.time(), "data": data}, f)
f.flush()
os.fsync(f.fileno())
os.replace(tmp, self._path) # atomic on same FS
os.fsync(dir_fd)
finally:
with contextlib.suppress(FileNotFoundError):
os.remove(tmp)
finally:
os.close(dir_fd)
# ---------- 메인 클라이언트 ----------
class LSFClient:
"""
꼭 필요한 것만:
- run_command_capture_job: 커맨드 그대로 실행, 출력에서 Job <id>만 파싱
- kill: bkill (여러 ID 한 번에)
- status: bjobs 전체 스냅샷 (메모리+디스크 캐시, 레이트리밋)
- submit_many_with_cancel: 인터럽트 시 새 실행 중단, 진행분 완주 후 job_id 일괄 bkill
"""
def __init__(
self,
*,
bkill: str = "bkill",
bjobs: str = "bjobs",
max_concurrent_cli: int = 8,
bjobs_ttl_sec: float = 5.0,
bjobs_min_interval_sec: float = 1.5,
disk_cache_ttl_sec: float = 5.0,
cache_dir: Optional[str] = None,
user: Optional[str] = None,
env: Optional[Dict[str, str]] = None,
):
self._paths = {"bkill": bkill, "bjobs": bjobs}
self._sem = asyncio.Semaphore(max_concurrent_cli)
self._bjobs_ttl = float(bjobs_ttl_sec)
self._min_interval = float(bjobs_min_interval_sec)
self._last_bjobs_ts = 0.0
# 메모리 캐시(key -> (ts, data))
self._mem_cache: Dict[str, Tuple[float, Dict[int, Dict[str, Any]]]] = {}
# 단일플라이트
self._inflight: Dict[str, asyncio.Future] = {}
# 공유 디스크 캐시
user = user or os.environ.get("LSF_USER") or getpass.getuser()
base_cache = cache_dir or (os.environ.get("XDG_CACHE_HOME") or os.path.join(os.path.expanduser("~"), ".cache"))
cache_path = os.path.join(base_cache, "lsf_async", f"bjobs_all_{user}.json")
self._disk = DiskJSONCache(cache_path, ttl_sec=disk_cache_ttl_sec, skew_eps=2.0)
self._user = user
# locale 안정화
base_env = dict(os.environ)
base_env.setdefault("LC_ALL", "C")
base_env.setdefault("LANG", "C")
self._env = base_env if env is None else (base_env | env)
# ---- 내부 헬퍼 ----
async def _run_shell(self, cmd_str: str, *, timeout: Optional[float] = None) -> Tuple[str, str, int]:
async with self._sem:
proc = await asyncio.create_subprocess_shell(
cmd_str, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=self._env
)
try:
out_b, err_b = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
with contextlib.suppress(ProcessLookupError):
proc.kill()
raise
return out_b.decode("utf-8", "replace"), err_b.decode("utf-8", "replace"), proc.returncode
# ---- 커맨드 실행(잡 메시지 파싱) ----
async def run_command_capture_job(
self, command: StrOrSeq, *, timeout: Optional[float] = None, raise_on_error: bool = False
) -> Dict[str, Any]:
cmd_str = _to_shell_str(command)
stdout, stderr, rc = await self._run_shell(cmd_str, timeout=timeout)
m = _JOBID_RE.search(stdout) or _JOBID_RE.search(stderr)
job_id = int(m.group(1)) if m else None
if raise_on_error and rc != 0:
raise LSFCommandError(cmd_str, rc, stdout, stderr)
return {"stdout": stdout, "stderr": stderr, "returncode": rc, "job_id": job_id}
# ---- bkill ----
async def kill(
self, job_ids: Iterable[int] | int, *, force: bool = False, timeout: Optional[float] = 60.0, raise_on_error: bool = True
) -> Tuple[str, str, int]:
ids = [str(int(job_ids))] if isinstance(job_ids, int) else [str(int(i)) for i in job_ids]
if not ids:
return ("", "", 0)
flag = "-f " if force else ""
cmd = f"{self._paths['bkill']} {flag}{' '.join(ids)}"
stdout, stderr, rc = await self._run_shell(cmd, timeout=timeout)
if rc != 0 and raise_on_error:
raise LSFCommandError(cmd, rc, stdout, stderr)
return stdout, stderr, rc
# ---- bjobs 전체 스냅샷 (캐시) ----
async def _fetch_bjobs_all(self, include_done: bool, timeout: Optional[float]) -> Dict[int, Dict[str, Any]]:
# 레이트리밋
now = time.time()
since = now - self._last_bjobs_ts
if since < self._min_interval:
await asyncio.sleep(self._min_interval - since)
flag = "-a " if include_done else ""
# 공백 안전한 필드만
cmd = f"{self._paths['bjobs']} -noheader -u {self._user} {flag}-o 'jobid stat exit_code queue exec_host'"
stdout, stderr, rc = await self._run_shell(cmd, timeout=timeout)
self._last_bjobs_ts = time.time()
if rc != 0:
raise LSFCommandError(cmd, rc, stdout, stderr)
result: Dict[int, Dict[str, Any]] = {}
for line in stdout.splitlines():
parts = line.strip().split()
if len(parts) < 5:
continue
jid, stat, exit_code, queue, exec_host = parts[:5]
try:
jid_i = int(jid)
except ValueError:
continue
result[jid_i] = {
"stat": stat,
"exit_code": None if exit_code == "-" else exit_code,
"queue": queue,
"exec_host": exec_host,
}
return result
async def _status_all_cached(
self, *, include_done: bool = True, timeout: Optional[float] = 60.0, force_refresh: bool = False
) -> Dict[int, Dict[str, Any]]:
key = f"bjobs_all|{'-a' if include_done else ''}"
# 메모리 캐시
if not force_refresh:
item = self._mem_cache.get(key)
if item:
ts, data = item
if time.time() - ts <= self._bjobs_ttl:
return data
# 단일플라이트
inflight = self._inflight.get(key)
if inflight:
return await inflight
fut: asyncio.Future = asyncio.get_running_loop().create_future()
self._inflight[key] = fut
try:
if not force_refresh:
disk = self._disk.read()
if disk is not None:
data = {int(k): v for k, v in disk.items()}
self._mem_cache[key] = (time.time(), data)
fut.set_result(data)
return data
data = await self._fetch_bjobs_all(include_done=include_done, timeout=timeout)
self._mem_cache[key] = (time.time(), data)
self._disk.write({str(k): v for k, v in data.items()})
fut.set_result(data)
return data
except Exception as e:
fut.set_exception(e)
raise
finally:
self._inflight.pop(key, None)
async def status(
self,
job_ids: Optional[Iterable[int]] = None,
*,
include_done: bool = True,
timeout: Optional[float] = 60.0,
force_refresh: bool = False,
) -> Dict[int, Dict[str, Any]]:
all_map = await self._status_all_cached(include_done=include_done, timeout=timeout, force_refresh=force_refresh)
if job_ids is None:
return all_map
wanted = {int(i) for i in job_ids}
return {i: all_map.get(i, {}) for i in wanted}
# ---- 배치 제출 + 취소 ----
async def submit_many_with_cancel(
self,
commands: Sequence[StrOrSeq],
*,
is_interrupted: callable, # () -> bool
poll_interval_sec: float = 0.2,
max_in_flight: int = 8,
kill_force: bool = True,
kill_raise_on_error: bool = False,
cmd_timeout: Optional[float] = 60.0,
cmd_raise_on_error: bool = False,
) -> Dict[str, Any]:
"""
명령을 그대로 실행(create_subprocess_shell). 출력에서 'Job <id>'만 추출.
인터럽트 시 새 실행 중단, 진행 중 제출은 끝까지 기다린 뒤 수집된 job_id 일괄 bkill.
"""
submitted: List[int] = []
failures = 0
in_flight: set[asyncio.Task] = set()
async def run_one(cmd: StrOrSeq) -> Dict[str, Any]:
return await self.run_command_capture_job(cmd, timeout=cmd_timeout, raise_on_error=cmd_raise_on_error)
idx, total = 0, len(commands)
try:
while idx < total or in_flight:
while idx < total and len(in_flight) < max_in_flight and not is_interrupted():
t = asyncio.create_task(run_one(commands[idx]))
in_flight.add(t)
idx += 1
if in_flight:
done, _ = await asyncio.wait(in_flight, timeout=poll_interval_sec, return_when=asyncio.FIRST_COMPLETED)
for t in done:
in_flight.discard(t)
try:
r = await t
jid = r.get("job_id")
if jid is not None:
submitted.append(jid)
elif r.get("returncode", 1) != 0:
failures += 1
except Exception:
failures += 1
else:
await asyncio.sleep(poll_interval_sec)
finally:
# drain
if in_flight:
done, _ = await asyncio.wait(in_flight, return_when=asyncio.ALL_COMPLETED)
for t in done:
try:
r = await t
jid = r.get("job_id")
if jid is not None:
submitted.append(jid)
elif r.get("returncode", 1) != 0:
failures += 1
except Exception:
failures += 1
interrupted = is_interrupted()
if interrupted and submitted:
await self.kill(submitted, force=kill_force, raise_on_error=kill_raise_on_error)
return {"submitted_job_ids": submitted, "interrupted": interrupted, "num_submitted": len(submitted), "num_failed": failures}
빠른 사용 예 (필요부분만)
# example_min.py
import asyncio, time
from lsf_minimal import LSFClient
class Token:
def __init__(self, delay): self.t0, self.delay = time.time(), delay
def is_interrupted(self): return (time.time()-self.t0) >= self.delay
async def main():
client = LSFClient(
max_concurrent_cli=8,
bjobs_ttl_sec=5.0,
bjobs_min_interval_sec=1.5,
disk_cache_ttl_sec=5.0,
cache_dir="/shared/cache", # 여러 머신이 보는 경로
)
# 1) 커맨드 그대로 실행 (잡 메시지 있으면 job_id 파싱)
r = await client.run_command_capture_job("bsub -q short 'echo hi; sleep 1'", raise_on_error=False)
print("submit:", r)
# 2) 상태 (전체 스냅샷 캐시)
all_map = await client.status(include_done=True)
print("jobs:", len(all_map))
# 3) 배치 제출 + 3초 후 취소
cmds = ["sleep 1", "bsub -q short 'echo X; sleep 2'", "job_sub my_script.sh --epochs 3"]
res = await client.submit_many_with_cancel(
cmds, is_interrupted=Token(3).is_interrupted, max_in_flight=4, cmd_timeout=120, kill_force=True
)
print("batch:", res)
asyncio.run(main())
왜 이렇게 최소화했는지 (step-by-step)
-
핵심 책임 2개만 분리 : “디스크 캐시”는 멀티프로세스/멀티머신 안전성이 핵심이라 독립 클래스로 유지, 나머지는
LSFClient에. -
한 눈에 읽히는 흐름 : 실행 → 파싱 → 캐시 → 레이트리밋 → 배치/취소까지
LSFClient안에서 추적 가능. -
운영 현실 반영 : 공유 FS에 맞춘
os.replace + fsync(dir)와mtime/ts이중 판정, TTL 기본 5s로 안전한 디폴트.
더 줄이고 싶으면 DiskJSONCache도 내부 함수로 접어둘 수 있지만, 멀티프로세스 안전성의 결로가 분명해서 이 정도 분리가 “진짜 필요한 최소”라 봤어. 필요하면 로깅 훅만 살짝 추가해드릴게!
진짜 필요한 최소로도 하지말고, 너무 과하게 늘리지도 말고. 그냥 고수가 했을 법한 유지보수 할만한 정도로 짜주면 돼. 너가 생각하는 최적을 따라줘.
좋아, “적당히 쪼갠 프로” 버전으로 정리했어.
핵심만 3파츠로 나눴고, 과하지 않게 유지보수 용이하도록 구성했어.
-
ShellRunner :
create_subprocess_shell+ 동시성 제어 -
SharedCache : 메모리+디스크(멀티프로세스 공유) TTL 캐시 — tmp→
os.replace→fsync(dir)로 견고 -
LSFClient : 사용자 API (커맨드 실행·JobID 파싱,
bkill,bjobs스냅샷+캐시, 배치 제출+취소)
아래 파일 하나면 끝. (lsf_core.py 같은 이름으로 저장)
# lsf_core.py
from __future__ import annotations
import asyncio
import contextlib
import getpass
import json
import os
import re
import tempfile
import time
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, Union
# ===================== 공용 유틸 =====================
StrOrSeq = Union[str, Sequence[str]]
_JOBID_RE = re.compile(r"Job\s*<(\d+)>", re.IGNORECASE)
def _to_shell_str(cmd: StrOrSeq) -> str:
return cmd if isinstance(cmd, str) else " ".join(map(str, cmd))
class LSFCommandError(RuntimeError):
def __init__(self, cmd_str: str, returncode: int, stdout: str, stderr: str):
super().__init__(f"Command failed: {cmd_str} (rc={returncode})\n{stderr.strip()}")
self.cmd_str, self.returncode, self.stdout, self.stderr = cmd_str, returncode, stdout, stderr
# ===================== ShellRunner =====================
class ShellRunner:
"""create_subprocess_shell만 사용. 동시 실행 수를 세마포어로 제한."""
def __init__(self, max_concurrency: int, env: Optional[Dict[str, str]] = None):
self._sem = asyncio.Semaphore(max_concurrency)
base_env = dict(os.environ)
base_env.setdefault("LC_ALL", "C")
base_env.setdefault("LANG", "C")
self._env = base_env if env is None else (base_env | env)
async def run(self, cmd_str: str, *, timeout: Optional[float] = None) -> Tuple[str, str, int]:
async with self._sem:
proc = await asyncio.create_subprocess_shell(
cmd_str,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=self._env,
)
try:
out_b, err_b = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
with contextlib.suppress(ProcessLookupError):
proc.kill()
raise
return out_b.decode("utf-8", "replace"), err_b.decode("utf-8", "replace"), proc.returncode
# ===================== SharedCache (mem+disk) =====================
class SharedCache:
"""
멀티프로세스 공유 JSON 캐시:
- 메모리 TTL + 디스크 TTL (같은 FS 내 rename 원자성 보장)
- 쓰기: tmp -> fsync(tmp) -> os.replace -> fsync(dir)
- 신선도: JSON 내부 ts와 파일 mtime 모두 사용(+2s skew 허용)
"""
def __init__(self, cache_dir: Optional[str], ttl_sec: float, namespace: str = "lsf_async", skew_eps: float = 2.0):
base = cache_dir or (os.environ.get("XDG_CACHE_HOME") or os.path.join(os.path.expanduser("~"), ".cache"))
self._root = os.path.join(base, namespace)
os.makedirs(self._root, exist_ok=True)
self._ttl = float(ttl_sec)
self._skew = float(skew_eps)
self._mem: Dict[str, Tuple[float, Any]] = {}
def _key_path(self, key: str) -> str:
safe = re.sub(r"[^A-Za-z0-9._-]+", "_", key)
return os.path.join(self._root, f"{safe}.json")
def get(self, key: str) -> Optional[Any]:
# 1) 메모리
item = self._mem.get(key)
if item:
ts, data = item
if time.time() - ts <= self._ttl:
return data
# 2) 디스크
p = self._key_path(key)
if not os.path.exists(p):
return None
try:
st = os.stat(p)
with open(p, "r") as fp:
payload = json.load(fp)
ts_in = float(payload.get("ts", 0.0))
data = payload.get("data", None)
now = time.time()
fresh_by_ts = (now - ts_in) <= (self._ttl + self._skew) if ts_in > 0 else False
fresh_by_mtime = (now - max(0.0, st.st_mtime)) <= (self._ttl + self._skew)
if fresh_by_ts or fresh_by_mtime:
self._mem[key] = (time.time(), data)
return data
except Exception:
return None
return None
def set(self, key: str, data: Any) -> None:
self._mem[key] = (time.time(), data)
p = self._key_path(key)
d = os.path.dirname(p)
os.makedirs(d, exist_ok=True)
dir_fd = os.open(d, os.O_RDONLY)
try:
fd, tmp = tempfile.mkstemp(prefix="cache_", suffix=".json", dir=d)
try:
with os.fdopen(fd, "w") as f:
json.dump({"ts": time.time(), "data": data}, f)
f.flush()
os.fsync(f.fileno())
os.replace(tmp, p)
os.fsync(dir_fd)
finally:
with contextlib.suppress(FileNotFoundError):
os.remove(tmp)
finally:
os.close(dir_fd)
# ===================== LSFClient (Public API) =====================
class LSFClient:
"""
- run_command_capture_job: 커맨드를 그대로 실행, 출력에서 'Job <id>'만 파싱
- kill: bkill (여러 ID 일괄)
- status: bjobs 전체 스냅샷(메모리+디스크 캐시, 레이트리밋, in-process single-flight)
- submit_many_with_cancel: 인터럽트 시 새 실행 중단, 진행 중은 완주 → 수집한 job_id 일괄 bkill
"""
def __init__(
self,
*,
bkill: str = "bkill",
bjobs: str = "bjobs",
max_concurrent_cli: int = 8,
bjobs_ttl_sec: float = 5.0,
bjobs_min_interval_sec: float = 1.5,
disk_cache_ttl_sec: float = 5.0,
cache_dir: Optional[str] = None,
user: Optional[str] = None,
env: Optional[Dict[str, str]] = None,
):
self._paths = {"bkill": bkill, "bjobs": bjobs}
self._runner = ShellRunner(max_concurrent_cli, env=env)
self._user = user or os.environ.get("LSF_USER") or getpass.getuser()
self._cache = SharedCache(cache_dir, ttl_sec=disk_cache_ttl_sec, namespace="lsf_async", skew_eps=2.0)
# bjobs 제어
self._bjobs_ttl = float(bjobs_ttl_sec)
self._bjobs_min_interval = float(bjobs_min_interval_sec)
self._last_bjobs_ts = 0.0
self._inflight: Dict[str, asyncio.Future] = {} # single-flight per key
# ---------- 일반 커맨드 실행 + Job ID 파싱 ----------
async def run_command_capture_job(
self,
command: StrOrSeq,
*,
timeout: Optional[float] = None,
raise_on_error: bool = False,
) -> Dict[str, Any]:
cmd = _to_shell_str(command)
stdout, stderr, rc = await self._runner.run(cmd, timeout=timeout)
m = _JOBID_RE.search(stdout) or _JOBID_RE.search(stderr)
job_id = int(m.group(1)) if m else None
if raise_on_error and rc != 0:
raise LSFCommandError(cmd, rc, stdout, stderr)
return {"stdout": stdout, "stderr": stderr, "returncode": rc, "job_id": job_id}
# ---------- bkill ----------
async def kill(
self,
job_ids: Iterable[int] | int,
*,
force: bool = False,
timeout: Optional[float] = 60.0,
raise_on_error: bool = True,
) -> Tuple[str, str, int]:
ids = [str(int(job_ids))] if isinstance(job_ids, int) else [str(int(i)) for i in job_ids]
if not ids:
return ("", "", 0)
flag = "-f " if force else ""
cmd = f"{self._paths['bkill']} {flag}{' '.join(ids)}"
stdout, stderr, rc = await self._runner.run(cmd, timeout=timeout)
if rc != 0 and raise_on_error:
raise LSFCommandError(cmd, rc, stdout, stderr)
return stdout, stderr, rc
# ---------- bjobs snapshot (캐시 + 단일플라이트 + 레이트리밋) ----------
async def _fetch_bjobs_all(self, *, include_done: bool, timeout: Optional[float]) -> Dict[int, Dict[str, Any]]:
# per-process rate-limit
now = time.time()
since = now - self._last_bjobs_ts
if since < self._bjobs_min_interval:
await asyncio.sleep(self._bjobs_min_interval - since)
flag = "-a " if include_done else ""
# 공백 안전 필드만
cmd = f"{self._paths['bjobs']} -noheader -u {self._user} {flag}-o 'jobid stat exit_code queue exec_host'"
stdout, stderr, rc = await self._runner.run(cmd, timeout=timeout)
self._last_bjobs_ts = time.time()
if rc != 0:
raise LSFCommandError(cmd, rc, stdout, stderr)
result: Dict[int, Dict[str, Any]] = {}
for line in stdout.splitlines():
parts = line.strip().split()
if len(parts) < 5:
continue
jid, stat, exit_code, queue, exec_host = parts[:5]
try:
jid_i = int(jid)
except ValueError:
continue
result[jid_i] = {
"stat": stat,
"exit_code": None if exit_code == "-" else exit_code,
"queue": queue,
"exec_host": exec_host,
}
return result
async def _status_all_cached(
self,
*,
include_done: bool = True,
timeout: Optional[float] = 60.0,
force_refresh: bool = False,
) -> Dict[int, Dict[str, Any]]:
key = f"bjobs_all|{'-a' if include_done else ''}|user={self._user}"
# 1) 메모리/디스크 캐시
if not force_refresh:
cached = self._cache.get(key)
if cached is not None:
# 디스크·메모리 캐시는 직전에 set된 dict(str->obj)일 수 있음
return {int(k): v for k, v in (cached.items() if isinstance(cached, dict) else [])}
# 2) in-process single-flight
inflight = self._inflight.get(key)
if inflight:
return await inflight
fut: asyncio.Future = asyncio.get_running_loop().create_future()
self._inflight[key] = fut
try:
data = await self._fetch_bjobs_all(include_done=include_done, timeout=timeout)
self._cache.set(key, {str(k): v for k, v in data.items()}) # 디스크엔 str key로 저장
fut.set_result(data)
return data
except Exception as e:
fut.set_exception(e)
raise
finally:
self._inflight.pop(key, None)
async def status(
self,
job_ids: Optional[Iterable[int]] = None,
*,
include_done: bool = True,
timeout: Optional[float] = 60.0,
force_refresh: bool = False,
) -> Dict[int, Dict[str, Any]]:
all_map = await self._status_all_cached(include_done=include_done, timeout=timeout, force_refresh=force_refresh)
if job_ids is None:
return all_map
wanted = {int(i) for i in job_ids}
return {i: all_map.get(i, {}) for i in wanted}
async def status_union_batched(
self,
list_of_job_id_sets: Sequence[Iterable[int]],
*,
include_done: bool = True,
timeout: Optional[float] = 60.0,
force_refresh: bool = False,
) -> Sequence[Dict[int, Dict[str, Any]]]:
union: set[int] = set()
normalized: List[List[int]] = []
for ids in list_of_job_id_sets:
lst = [int(i) for i in ids]
normalized.append(lst)
union.update(lst)
union_map = await self.status(union, include_done=include_done, timeout=timeout, force_refresh=force_refresh)
return [{i: union_map.get(i, {}) for i in req} for req in normalized]
# ---------- 배치 제출 + 취소 ----------
async def submit_many_with_cancel(
self,
commands: Sequence[StrOrSeq],
*,
is_interrupted: callable, # () -> bool
poll_interval_sec: float = 0.2,
max_in_flight: int = 8,
kill_force: bool = True,
kill_raise_on_error: bool = False,
cmd_timeout: Optional[float] = 60.0,
cmd_raise_on_error: bool = False,
) -> Dict[str, Any]:
"""
커맨드를 그대로 실행. 출력의 'Job <id>'만 추출해 수집.
인터럽트 시: 새 실행 중단, 진행 중 제출은 끝까지 기다림(고아 방지) → 수집된 job_id 일괄 bkill.
"""
submitted: List[int] = []
failures = 0
in_flight: set[asyncio.Task] = set()
async def run_one(cmd: StrOrSeq) -> Dict[str, Any]:
return await self.run_command_capture_job(cmd, timeout=cmd_timeout, raise_on_error=cmd_raise_on_error)
idx, total = 0, len(commands)
try:
while idx < total or in_flight:
while idx < total and len(in_flight) < max_in_flight and not is_interrupted():
t = asyncio.create_task(run_one(commands[idx]))
in_flight.add(t)
idx += 1
if in_flight:
done, _ = await asyncio.wait(in_flight, timeout=poll_interval_sec, return_when=asyncio.FIRST_COMPLETED)
for t in done:
in_flight.discard(t)
try:
r = await t
jid = r.get("job_id")
if jid is not None:
submitted.append(jid)
elif r.get("returncode", 1) != 0:
failures += 1
except Exception:
failures += 1
else:
await asyncio.sleep(poll_interval_sec)
finally:
# drain: 남은 제출은 완주시켜 Job ID 수집
if in_flight:
done, _ = await asyncio.wait(in_flight, return_when=asyncio.ALL_COMPLETED)
for t in done:
try:
r = await t
jid = r.get("job_id")
if jid is not None:
submitted.append(jid)
elif r.get("returncode", 1) != 0:
failures += 1
except Exception:
failures += 1
interrupted = is_interrupted()
if interrupted and submitted:
await self.kill(submitted, force=kill_force, raise_on_error=kill_raise_on_error)
return {"submitted_job_ids": submitted, "interrupted": interrupted, "num_submitted": len(submitted), "num_failed": failures}
사용 예 (짧게)
import asyncio, time
from lsf_core import LSFClient
class Token:
def __init__(self, delay): self.t0, self.delay = time.time(), delay
def is_interrupted(self): return (time.time()-self.t0) >= self.delay
async def main():
client = LSFClient(
max_concurrent_cli=8,
bjobs_ttl_sec=5.0,
bjobs_min_interval_sec=1.5,
disk_cache_ttl_sec=5.0,
cache_dir="/shared/cache", # 여러 머신이 보는 경로(같은 FS)
)
# 1) 커맨드 그대로 실행 (잡 메시지 있으면 파싱)
r = await client.run_command_capture_job("bsub -q short 'echo hi; sleep 1'", raise_on_error=False)
print("submit:", r)
# 2) bjobs 상태(스냅샷 캐시)
all_jobs = await client.status(include_done=True)
print("N(all) =", len(all_jobs))
# 3) 배치 제출 + 3초 후 취소 → 수집된 job_id 일괄 bkill
cmds = ["sleep 1", "bsub -q short 'echo X; sleep 2'", "job_sub my_script.sh --epochs 3"]
res = await client.submit_many_with_cancel(
cmds,
is_interrupted=Token(3).is_interrupted,
max_in_flight=4,
cmd_timeout=120,
kill_force=True,
)
print("batch:", res)
asyncio.run(main())
왜 이 구성이 “적당히 고수 느낌”인가 (짧게, step-by-step)
-
역할 분리 최소화 : 프로세스 실행과 캐시만 별도 클래스로 분리. 나머지는
LSFClient에서 한 눈에 추적. -
운영 안전성 : 공유 FS에서 tmp→replace→fsync(dir) , TTL은 ts+mtime 이중 판정.
-
성능/부하 배려 : in-process single-flight , per-process 레이트리밋 , 전체 스냅샷 후 필터로
bjobs호출 최소화. -
현실 친화성 : 커맨드를 있는 그대로 실행,
Job <id>메시지로만 판별 → bsub 래퍼/스크립트 모두 커버. -
확장 여지 : 필요 시 로깅/메트릭, 강제 새로고침(
force_refresh) 훅, 샤딩 등 쉽게 추가 가능.
원하는 스타일이나 코딩 컨벤션(로거, 타입힌트 레벨 강화, 에러코드 매핑 등)이 있으면 거기에 맞춰 딱 맞게 다듬어줄게.
근데 bkill에 -f 옵션 없는데
맞아요. 제 실수입니다—bkill에는 -f가 없습니다. “강제”에 해당하는 건 -r(force remove) 이고, 상황에 따라 -d(kill 후 상태를 DONE으로 기록), -s(특정 signal 전송) 등을 씁니다. IBM+1
아래처럼 메서드 시그니처와 구현을 고쳐 쓰면 됩니다.
Step 1) kill() 메서드 교체
-
force대신force_remove(→-r) -
필요 시
record_done(→-d) -
임의 시그널은
signal(→-s SIG)로 지정
# LSFClient.kill 교체 버전
async def kill(
self,
job_ids: Iterable[int] | int,
*,
force_remove: bool = False, # was: force
record_done: bool = False, # -> -d (RUN/USUSP/SSUSP에서 DONE 기록)
signal: Optional[str] = None, # -> -s TERM / -s 9 등
timeout: Optional[float] = 60.0,
raise_on_error: bool = True,
) -> Tuple[str, str, int]:
ids = [str(int(job_ids))] if isinstance(job_ids, int) else [str(int(i)) for i in job_ids]
if not ids:
return ("", "", 0)
flags: list[str] = []
if record_done:
flags.append("-d") # 기록을 DONE으로. 원격 클러스터 시 주로 유용. :contentReference[oaicite:1]{index=1}
if force_remove:
flags.append("-r") # LSF에서 즉시 제거(리소스 해제). :contentReference[oaicite:2]{index=2}
if signal:
flags += ["-s", str(signal)] # 특정 시그널 지정
cmd = f"{self._paths['bkill']} {' '.join(flags)} {' '.join(ids)}".strip()
stdout, stderr, rc = await self._runner.run(cmd, timeout=timeout)
if rc != 0 and raise_on_error:
raise LSFCommandError(cmd, rc, stdout, stderr)
return stdout, stderr, rc
참고
-r: OS에서 완전히 종료되기 전이라도 LSF에서 즉시 제거 하고 리소스를 해제합니다. IBM+1
-d: 잡을 종료하고 상태를 DONE으로 기록 합니다(시작된 잡에 한함). 원격 클러스터 워크플로에서 유용. IBM
-s: 원하는 시그널을 직접 보낼 수 있습니다. (예:-s TERM,-s 9). tin6150.github.io+1
Step 2) 배치 취소 경로도 같이 수정
기존 submit_many_with_cancel() 안에서 await self.kill(submitted, force=True, ...)로 호출하셨다면:
# 기존
# await self.kill(submitted, force=True, raise_on_error=kill_raise_on_error)
# 변경
await self.kill(
submitted,
force_remove=True, # -r
# 필요하면 record_done=True 도 추가 가능
raise_on_error=kill_raise_on_error
)
Step 3) 사용 예시
# 1) 즉시 제거(LSF에서 빠르게 치우고 싶을 때)
await client.kill([101, 102], force_remove=True)
# 2) 특정 시그널 전송 후 제거
await client.kill(103, signal="TERM") # TERM만 보내고 기본 동작
await client.kill(104, signal="KILL") # 바로 KILL 신호
await client.kill(105, force_remove=True) # LSF에서 즉시 제거(-r)
# 3) DONE 기록 옵션 (원격 클러스터 시 유용)
await client.kill([201,202], record_done=True) # -d
(선택) PEND만 걷어내고 싶다면
운영 정책상 “대기(PEND)만” 정리하려면 bkill -stat pend가 있습니다(일부 옵션과는 병행 불가). 필요 시 전용 유틸을 추가하세요. IBM
필요하면 제가 드린 lsf_core.py/lsf_minimal.py에도 위 변경을 반영한 작은 패치 버전 으로 정리해 드릴게요.
1. 각각 command의 job id를 알고 싶어. 매칭이 필요해. 2. 다음과 같은 예시로 많이 쓸 것 같아. 알맞게 짜줘. cmds = ["job_sub a.txt", "mkdir package", "bsub -q short sleep"] data = client.submit_many_with_cancel(cmds, is_interrupted=is_interrupted) if is_interrupted(): print("중지되었습니다.“) return job_ids = … #data로 부터 running job id 추출 while not client.is_finished(job_ids): status = client.status(job_ids) for job_id in job_ids: job_command = … #data로부터 job_id에 해당하는 커맨드 추출 print(job_id, status[job_id], job_command) await asyncio.sleep(1) if is_interrupted(): await client.kill(job_ids) print("중지하였습니다.”) return print("모두 완료되었습니다.“)
좋아! “적당히 프로스럽고 유지보수 쉬운” 버전으로 다듬어서:
-
구조는 3 파트 만 유지
-
ShellRunner:create_subprocess_shell+ 동시성 제어 -
SharedCache: 메모리+디스크 TTL 캐시(멀티프로세스 공유, tmp→replace→fsync(dir)) -
LSFClient: 사용자 API(커맨드 실행·JobID 매칭,bkill,bjobs스냅샷/캐시, 배치 제출+취소, is_finished)
-
-
요구 포인트 반영
-
각 커맨드와 Job ID 매칭 :
SubmitResult/SubmitItem에 양방향 매핑 포함 -
예시 패턴 그대로 사용 가능:
submit_many_with_cancel결과에서job_ids,job_id_to_command바로 꺼내 쓰고,is_finished()제공
-
아래 파일 하나로 끝 (lsf_core.py 등으로 저장):
# lsf_core.py
from __future__ import annotations
import asyncio
import contextlib
import getpass
import json
import os
import re
import tempfile
import time
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, Union
# ===================== 공용 유틸 =====================
StrOrSeq = Union[str, Sequence[str]]
_JOBID_RE = re.compile(r"Job\s*<(\d+)>", re.IGNORECASE)
def _to_shell_str(cmd: StrOrSeq) -> str:
return cmd if isinstance(cmd, str) else " ".join(map(str, cmd))
def _find_job_ids(*texts: str) -> List[int]:
ids: List[int] = []
for t in texts:
if not t:
continue
for m in _JOBID_RE.finditer(t):
try:
ids.append(int(m.group(1)))
except ValueError:
pass
# 중복 제거(출력에 여러 번 찍히는 경우 대비)
return list(dict.fromkeys(ids))
class LSFCommandError(RuntimeError):
def __init__(self, cmd_str: str, returncode: int, stdout: str, stderr: str):
super().__init__(f"Command failed: {cmd_str} (rc={returncode})\n{stderr.strip()}")
self.cmd_str, self.returncode, self.stdout, self.stderr = cmd_str, returncode, stdout, stderr
# ===================== ShellRunner =====================
class ShellRunner:
"""create_subprocess_shell만 사용. 동시 실행 수 세마포어로 제한."""
def __init__(self, max_concurrency: int, env: Optional[Dict[str, str]] = None):
self._sem = asyncio.Semaphore(max_concurrency)
base_env = dict(os.environ)
base_env.setdefault("LC_ALL", "C")
base_env.setdefault("LANG", "C")
self._env = base_env if env is None else (base_env | env)
async def run(self, cmd_str: str, *, timeout: Optional[float] = None) -> Tuple[str, str, int]:
async with self._sem:
proc = await asyncio.create_subprocess_shell(
cmd_str,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=self._env,
)
try:
out_b, err_b = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
with contextlib.suppress(ProcessLookupError):
proc.kill()
raise
return out_b.decode("utf-8", "replace"), err_b.decode("utf-8", "replace"), proc.returncode
# ===================== SharedCache (mem+disk) =====================
class SharedCache:
"""
멀티프로세스 공유 JSON 캐시:
- 메모리 TTL + 디스크 TTL (같은 FS 내 rename 원자성 보장)
- 쓰기: tmp -> fsync(tmp) -> os.replace -> fsync(dir)
- 신선도: JSON 내부 ts와 파일 mtime 모두 사용(+2s skew 허용)
"""
def __init__(self, cache_dir: Optional[str], ttl_sec: float, namespace: str = "lsf_async", skew_eps: float = 2.0):
base = cache_dir or (os.environ.get("XDG_CACHE_HOME") or os.path.join(os.path.expanduser("~"), ".cache"))
self._root = os.path.join(base, namespace)
os.makedirs(self._root, exist_ok=True)
self._ttl = float(ttl_sec)
self._skew = float(skew_eps)
self._mem: Dict[str, Tuple[float, Any]] = {}
def _key_path(self, key: str) -> str:
safe = re.sub(r"[^A-Za-z0-9._-]+", "_", key)
return os.path.join(self._root, f"{safe}.json")
def get(self, key: str) -> Optional[Any]:
# 1) 메모리
item = self._mem.get(key)
if item:
ts, data = item
if time.time() - ts <= self._ttl:
return data
# 2) 디스크
p = self._key_path(key)
if not os.path.exists(p):
return None
try:
st = os.stat(p)
with open(p, "r") as fp:
payload = json.load(fp)
ts_in = float(payload.get("ts", 0.0))
data = payload.get("data", None)
now = time.time()
fresh_by_ts = (now - ts_in) <= (self._ttl + self._skew) if ts_in > 0 else False
fresh_by_mtime = (now - max(0.0, st.st_mtime)) <= (self._ttl + self._skew)
if fresh_by_ts or fresh_by_mtime:
self._mem[key] = (time.time(), data)
return data
except Exception:
return None
return None
def set(self, key: str, data: Any) -> None:
self._mem[key] = (time.time(), data)
p = self._key_path(key)
d = os.path.dirname(p)
os.makedirs(d, exist_ok=True)
dir_fd = os.open(d, os.O_RDONLY)
try:
fd, tmp = tempfile.mkstemp(prefix="cache_", suffix=".json", dir=d)
try:
with os.fdopen(fd, "w") as f:
json.dump({"ts": time.time(), "data": data}, f)
f.flush()
os.fsync(f.fileno())
os.replace(tmp, p)
os.fsync(dir_fd)
finally:
with contextlib.suppress(FileNotFoundError):
os.remove(tmp)
finally:
os.close(dir_fd)
# ===================== 결과 타입 =====================
@dataclass
class SubmitItem:
index: int
command: str
returncode: int
stdout: str
stderr: str
job_ids: List[int]
@dataclass
class SubmitResult:
interrupted: bool
items: List[SubmitItem]
job_ids: List[int] # 평탄화된 전체 Job ID
job_id_to_command: Dict[int, str] # Job ID -> 원 커맨드(문자열)
num_submitted: int
num_failed: int
# ===================== LSFClient (Public API) =====================
class LSFClient:
"""
- run_command_capture_job: 커맨드 그대로 실행, 출력에서 'Job <id>' 파싱(복수 가능)
- kill: bkill (여러 ID 일괄) *-r / -d / -s 지원*
- status: bjobs 전체 스냅샷(메모리+디스크 캐시, 레이트리밋, in-process single-flight)
- submit_many_with_cancel: 인터럽트 시 새 실행 중단, 진행 중은 완주 → 수집한 job_id 일괄 bkill(기본 on)
- is_finished: 지정 Job들이 전부 DONE/EXIT/ZOMBI 등 **종료 상태**인지 판정
"""
TERMINAL = {"DONE", "EXIT", "ZOMBI"} # 필요시 확장
def __init__(
self,
*,
bkill: str = "bkill",
bjobs: str = "bjobs",
max_concurrent_cli: int = 8,
bjobs_ttl_sec: float = 5.0,
bjobs_min_interval_sec: float = 1.5,
disk_cache_ttl_sec: float = 5.0,
cache_dir: Optional[str] = None,
user: Optional[str] = None,
env: Optional[Dict[str, str]] = None,
):
self._paths = {"bkill": bkill, "bjobs": bjobs}
self._runner = ShellRunner(max_concurrent_cli, env=env)
self._user = user or os.environ.get("LSF_USER") or getpass.getuser()
self._cache = SharedCache(cache_dir, ttl_sec=disk_cache_ttl_sec, namespace="lsf_async", skew_eps=2.0)
# bjobs 제어
self._bjobs_ttl = float(bjobs_ttl_sec)
self._bjobs_min_interval = float(bjobs_min_interval_sec)
self._last_bjobs_ts = 0.0
self._inflight: Dict[str, asyncio.Future] = {}
# ---------- 일반 커맨드 실행 + Job ID 파싱 ----------
async def run_command_capture_job(
self,
command: StrOrSeq,
*,
timeout: Optional[float] = None,
raise_on_error: bool = False,
) -> SubmitItem:
cmd = _to_shell_str(command)
stdout, stderr, rc = await self._runner.run(cmd, timeout=timeout)
job_ids = _find_job_ids(stdout, stderr)
if raise_on_error and rc != 0:
raise LSFCommandError(cmd, rc, stdout, stderr)
# index는 배치 모드에서 채워지므로 여기서는 -1로 반환
return SubmitItem(index=-1, command=cmd, returncode=rc, stdout=stdout, stderr=stderr, job_ids=job_ids)
# ---------- bkill ----------
async def kill(
self,
job_ids: Iterable[int] | int,
*,
force_remove: bool = False, # -r
record_done: bool = False, # -d
signal: Optional[str] = None, # -s SIG
timeout: Optional[float] = 60.0,
raise_on_error: bool = True,
) -> Tuple[str, str, int]:
ids = [str(int(job_ids))] if isinstance(job_ids, int) else [str(int(i)) for i in job_ids]
if not ids:
return ("", "", 0)
flags: List[str] = []
if record_done:
flags.append("-d")
if force_remove:
flags.append("-r")
if signal:
flags += ["-s", str(signal)]
cmd = f"{self._paths['bkill']} {' '.join(flags)} {' '.join(ids)}".strip()
stdout, stderr, rc = await self._runner.run(cmd, timeout=timeout)
if rc != 0 and raise_on_error:
raise LSFCommandError(cmd, rc, stdout, stderr)
return stdout, stderr, rc
# ---------- bjobs snapshot (캐시 + 단일플라이트 + 레이트리밋) ----------
async def _fetch_bjobs_all(self, *, include_done: bool, timeout: Optional[float]) -> Dict[int, Dict[str, Any]]:
# per-process rate-limit
now = time.time()
since = now - self._last_bjobs_ts
if since < self._bjobs_min_interval:
await asyncio.sleep(self._bjobs_min_interval - since)
flag = "-a " if include_done else ""
cmd = f"{self._paths['bjobs']} -noheader -u {self._user} {flag}-o 'jobid stat exit_code queue exec_host'"
stdout, stderr, rc = await self._runner.run(cmd, timeout=timeout)
self._last_bjobs_ts = time.time()
if rc != 0:
raise LSFCommandError(cmd, rc, stdout, stderr)
result: Dict[int, Dict[str, Any]] = {}
for line in stdout.splitlines():
parts = line.strip().split()
if len(parts) < 5:
continue
jid, stat, exit_code, queue, exec_host = parts[:5]
try:
jid_i = int(jid)
except ValueError:
continue
result[jid_i] = {
"stat": stat,
"exit_code": None if exit_code == "-" else exit_code,
"queue": queue,
"exec_host": exec_host,
}
return result
async def _status_all_cached(
self,
*,
include_done: bool = True,
timeout: Optional[float] = 60.0,
force_refresh: bool = False,
) -> Dict[int, Dict[str, Any]]:
key = f"bjobs_all|{'-a' if include_done else ''}|user={self._user}"
if not force_refresh:
cached = self._cache.get(key)
if cached is not None:
return {int(k): v for k, v in (cached.items() if isinstance(cached, dict) else [])}
inflight = self._inflight.get(key)
if inflight:
return await inflight
fut: asyncio.Future = asyncio.get_running_loop().create_future()
self._inflight[key] = fut
try:
data = await self._fetch_bjobs_all(include_done=include_done, timeout=timeout)
self._cache.set(key, {str(k): v for k, v in data.items()})
fut.set_result(data)
return data
except Exception as e:
fut.set_exception(e)
raise
finally:
self._inflight.pop(key, None)
async def status(
self,
job_ids: Optional[Iterable[int]] = None,
*,
include_done: bool = True,
timeout: Optional[float] = 60.0,
force_refresh: bool = False,
) -> Dict[int, Dict[str, Any]]:
all_map = await self._status_all_cached(include_done=include_done, timeout=timeout, force_refresh=force_refresh)
if job_ids is None:
return all_map
wanted = {int(i) for i in job_ids}
return {i: all_map.get(i, {}) for i in wanted}
# ---------- Finite-state 판정 ----------
async def is_finished(self, job_ids: Iterable[int]) -> bool:
ids = list({int(i) for i in job_ids})
if not ids:
return True
st = await self.status(ids, include_done=True)
for i in ids:
info = st.get(i)
if not info:
# 캐시/보존기간 이탈 등으로 빠졌다면 '완료로 간주'
continue
if info.get("stat") not in self.TERMINAL:
return False
return True
# ---------- 배치 제출 + 취소 ----------
async def submit_many_with_cancel(
self,
commands: Sequence[StrOrSeq],
*,
is_interrupted: callable, # () -> bool
poll_interval_sec: float = 0.2,
max_in_flight: int = 8,
cmd_timeout: Optional[float] = 60.0,
cmd_raise_on_error: bool = False,
cleanup_on_interrupt: bool = True, # True면 인터럽트 시 수집된 job_id를 즉시 정리(bkill -r)
kill_force_remove: bool = True, # cleanup_on_interrupt=True일 때 -r 사용
kill_record_done: bool = False, # 필요시 DONE 기록
kill_signal: Optional[str] = None, # 필요시 시그널 지정
) -> SubmitResult:
items: List[SubmitItem] = []
job_ids_all: List[int] = []
job_id_to_cmd: Dict[int, str] = {}
failures = 0
in_flight: set[asyncio.Task] = set()
async def run_one(idx: int, cmd: StrOrSeq) -> SubmitItem:
res = await self.run_command_capture_job(
cmd, timeout=cmd_timeout, raise_on_error=cmd_raise_on_error
)
# index 채워서 반환
return SubmitItem(
index=idx, command=res.command, returncode=res.returncode,
stdout=res.stdout, stderr=res.stderr, job_ids=res.job_ids
)
idx, total = 0, len(commands)
try:
while idx < total or in_flight:
while idx < total and len(in_flight) < max_in_flight and not is_interrupted():
t = asyncio.create_task(run_one(idx, commands[idx]))
in_flight.add(t)
idx += 1
if in_flight:
done, _ = await asyncio.wait(in_flight, timeout=poll_interval_sec, return_when=asyncio.FIRST_COMPLETED)
for t in done:
in_flight.discard(t)
try:
item: SubmitItem = await t
items.append(item)
# 매핑/집계
for jid in item.job_ids:
job_ids_all.append(jid)
job_id_to_cmd[jid] = item.command
if item.returncode != 0 and not item.job_ids:
failures += 1
except Exception:
failures += 1
else:
await asyncio.sleep(poll_interval_sec)
finally:
# drain
if in_flight:
done, _ = await asyncio.wait(in_flight, return_when=asyncio.ALL_COMPLETED)
for t in done:
try:
item: SubmitItem = await t
items.append(item)
for jid in item.job_ids:
job_ids_all.append(jid)
job_id_to_cmd[jid] = item.command
if item.returncode != 0 and not item.job_ids:
failures += 1
except Exception:
failures += 1
interrupted = bool(is_interrupted())
if interrupted and cleanup_on_interrupt and job_ids_all:
await self.kill(
job_ids_all,
force_remove=kill_force_remove,
record_done=kill_record_done,
signal=kill_signal,
raise_on_error=False,
)
# 평탄화 리스트 중복 제거(출력 중복 대비)
job_ids_all = list(dict.fromkeys(job_ids_all))
return SubmitResult(
interrupted=interrupted,
items=items,
job_ids=job_ids_all,
job_id_to_command=job_id_to_cmd,
num_submitted=len(job_ids_all),
num_failed=failures,
)
당신이 원한 사용 패턴 (async로 정확히 동작하도록)
import asyncio, time
from lsf_core import LSFClient
# 인터럽트 토큰
class Token:
def __init__(self, delay): self.t0, self.delay = time.time(), delay
def is_interrupted(self): return (time.time() - self.t0) >= self.delay
async def main():
client = LSFClient(
max_concurrent_cli=8,
bjobs_ttl_sec=5.0,
bjobs_min_interval_sec=1.2,
disk_cache_ttl_sec=5.0,
cache_dir="/shared/cache", # 여러 머신에서 공용 FS
)
is_interrupted = Token(delay=9999).is_interrupted # 실사용에선 외부 이벤트로 바꾸세요
cmds = ["job_sub a.txt", "mkdir package", "bsub -q short sleep 3"]
# 1) 배치 제출 (인터럽트 시 자체 정리하도록 하고 싶으면 cleanup_on_interrupt=True 유지)
data = await client.submit_many_with_cancel(
cmds,
is_interrupted=is_interrupted,
max_in_flight=4,
cmd_timeout=120,
cleanup_on_interrupt=True, # 인터럽트 발생 동안 제출하던 job들은 여기서 -r로 정리
)
if data.interrupted or is_interrupted():
print("중지되었습니다.")
return
# 2) 결과에서 Running(혹은 전체) Job ID 추출 & 매핑 사용
job_ids = data.job_ids
if not job_ids:
print("제출된 LSF 잡이 없습니다.")
return
# 3) 완료까지 폴링
while not await client.is_finished(job_ids):
status = await client.status(job_ids) # include_done=True 기본
for job_id in job_ids:
info = status.get(job_id, {})
job_command = data.job_id_to_command.get(job_id, "<unknown>")
print(job_id, info.get("stat"), job_command)
await asyncio.sleep(1)
if is_interrupted():
await client.kill(job_ids, force_remove=True)
print("중지하였습니다.")
return
print("모두 완료되었습니다.")
asyncio.run(main())
설계 포인트 (짧게, step-by-step)
-
JobID-커맨드 매칭 :
SubmitResult.job_id_to_command[jid]로 O(1) 조회,items에 원본/표준출력/리턴코드까지 보관. -
실전 안전성 : 제출 중 인터럽트 발생 시에도 진행 중 제출은 완주 → 출력에서 JobID 수집 → 필요 시 즉시 -r 로 정리.
-
부하 최소화 :
bjobs는 전체 스냅샷 + 메모리/디스크 캐시 + 레이트리밋 + 단일플라이트. -
API 간결성 : 사용자는
submit → data.job_ids / job_id_to_command → status/is_finished → kill만 알면 됨.
원한다면 로그/메트릭 훅(예: 제출/상태 호출 카운트, 지연)을 붙여서 관측 가능하게도 만들어줄게.
-r 옵션은 최대한 안쓰고 싶어. 그냥 bkill로만 해도 될 것 같아. 다른 옵션들도 없애주고 그리고 bsub은 됐지만 bjobs 캐시가 업데이트 안 돼서 누락되는 경우도 있잖아 그 경우 status 정보가 어떻게 되는지 궁금해. 최대한 에러를 안내고 싶어
좋은 포인트야. 아래처럼 딱 필요한 수준으로 수정했어.
-
bkill : 옵션 전부 제거(순수
bkill <ids>만 호출) -
status 안정성 :
bjobs캐시가 늦어 누락 돼도 에러 없이 동작-
요청한
job_ids가 스냅샷에 없으면 한 번만 강제 새로고침 시도 -
그래도 없으면 해당 ID에 대해
{"stat": "UNKNOWN"}같은 기본값을 채워 반환(키 결핍으로 인한 예외 방지)
-
-
is_finished :
UNKNOWN은 즉시 완료로 취급하지 않음. 기본적으로 그레이스(기다림) 시간(기본 15초) 동안은 “미완료”로 보고, 그 이후에도 계속 UNKNOWN이면 완료로 간주(클러스터 보존 기간/전파 지연을 흡수)
# lsf_core.py (최적화 버전)
from __future__ import annotations
import asyncio
import contextlib
import getpass
import json
import os
import re
import tempfile
import time
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, Union
# ========== 공용 ==========
StrOrSeq = Union[str, Sequence[str]]
_JOBID_RE = re.compile(r"Job\s*<(\d+)>", re.IGNORECASE)
def _to_shell_str(cmd: StrOrSeq) -> str:
return cmd if isinstance(cmd, str) else " ".join(map(str, cmd))
def _find_job_ids(*texts: str) -> List[int]:
ids: List[int] = []
for t in texts:
if not t:
continue
for m in _JOBID_RE.finditer(t):
try:
ids.append(int(m.group(1)))
except ValueError:
pass
return list(dict.fromkeys(ids))
class LSFCommandError(RuntimeError):
def __init__(self, cmd_str: str, returncode: int, stdout: str, stderr: str):
super().__init__(f"Command failed: {cmd_str} (rc={returncode})\n{stderr.strip()}")
self.cmd_str, self.returncode, self.stdout, self.stderr = cmd_str, returncode, stdout, stderr
# ========== ShellRunner ==========
class ShellRunner:
"""create_subprocess_shell만 사용. 동시 실행 수는 세마포어로 제한."""
def __init__(self, max_concurrency: int, env: Optional[Dict[str, str]] = None):
self._sem = asyncio.Semaphore(max_concurrency)
base_env = dict(os.environ)
base_env.setdefault("LC_ALL", "C")
base_env.setdefault("LANG", "C")
self._env = base_env if env is None else (base_env | env)
async def run(self, cmd_str: str, *, timeout: Optional[float] = None) -> Tuple[str, str, int]:
async with self._sem:
proc = await asyncio.create_subprocess_shell(
cmd_str,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=self._env,
)
try:
out_b, err_b = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
with contextlib.suppress(ProcessLookupError):
proc.kill()
raise
return out_b.decode("utf-8", "replace"), err_b.decode("utf-8", "replace"), proc.returncode
# ========== SharedCache (mem+disk) ==========
class SharedCache:
"""
멀티프로세스 공유 JSON 캐시:
- 메모리 TTL + 디스크 TTL (같은 FS 내 rename 원자성)
- 쓰기: tmp -> fsync(tmp) -> os.replace -> fsync(dir)
- 신선도: JSON 내부 ts와 파일 mtime 모두 사용(+2s skew 허용)
"""
def __init__(self, cache_dir: Optional[str], ttl_sec: float, namespace: str = "lsf_async", skew_eps: float = 2.0):
base = cache_dir or (os.environ.get("XDG_CACHE_HOME") or os.path.join(os.path.expanduser("~"), ".cache"))
self._root = os.path.join(base, namespace)
os.makedirs(self._root, exist_ok=True)
self._ttl = float(ttl_sec)
self._skew = float(skew_eps)
self._mem: Dict[str, Tuple[float, Any]] = {}
def _key_path(self, key: str) -> str:
safe = re.sub(r"[^A-Za-z0-9._-]+", "_", key)
return os.path.join(self._root, f"{safe}.json")
def get(self, key: str) -> Optional[Any]:
item = self._mem.get(key)
if item:
ts, data = item
if time.time() - ts <= self._ttl:
return data
p = self._key_path(key)
if not os.path.exists(p):
return None
try:
st = os.stat(p)
with open(p, "r") as fp:
payload = json.load(fp)
ts_in = float(payload.get("ts", 0.0))
data = payload.get("data", None)
now = time.time()
fresh_by_ts = (now - ts_in) <= (self._ttl + self._skew) if ts_in > 0 else False
fresh_by_mtime = (now - max(0.0, st.st_mtime)) <= (self._ttl + self._skew)
if fresh_by_ts or fresh_by_mtime:
self._mem[key] = (time.time(), data)
return data
except Exception:
return None
return None
def set(self, key: str, data: Any) -> None:
self._mem[key] = (time.time(), data)
p = self._key_path(key)
d = os.path.dirname(p)
os.makedirs(d, exist_ok=True)
dir_fd = os.open(d, os.O_RDONLY)
try:
fd, tmp = tempfile.mkstemp(prefix="cache_", suffix=".json", dir=d)
try:
with os.fdopen(fd, "w") as f:
json.dump({"ts": time.time(), "data": data}, f)
f.flush()
os.fsync(f.fileno())
os.replace(tmp, p)
os.fsync(dir_fd)
finally:
with contextlib.suppress(FileNotFoundError):
os.remove(tmp)
finally:
os.close(dir_fd)
# ========== 결과 타입 ==========
@dataclass
class SubmitItem:
index: int
command: str
returncode: int
stdout: str
stderr: str
job_ids: List[int]
@dataclass
class SubmitResult:
interrupted: bool
items: List[SubmitItem]
job_ids: List[int] # 평탄화된 전체 Job ID
job_id_to_command: Dict[int, str] # Job ID -> 원 커맨드
num_submitted: int
num_failed: int
# ========== LSFClient ==========
class LSFClient:
"""
- run_command_capture_job: 커맨드 그대로 실행, 출력에서 'Job <id>'(복수 가능) 파싱
- kill: 순수 bkill (옵션 없이)
- status: bjobs 전체 스냅샷(캐시/레이트리밋/단일플라이트) + 누락 시 1회 강제 새로고침 + UNKNOWN 보강
- submit_many_with_cancel: 인터럽트 시 새 실행 중단, 진행 중은 완주 → 수집한 job_id를 필요 시 bkill
- is_finished: UNKNOWN을 일정 시간(기본 15s)까지는 “미완료”로 보고, 그 이후엔 완료로 간주
"""
TERMINAL = {"DONE", "EXIT", "ZOMBI"}
def __init__(
self,
*,
bkill: str = "bkill",
bjobs: str = "bjobs",
max_concurrent_cli: int = 8,
bjobs_ttl_sec: float = 5.0,
bjobs_min_interval_sec: float = 1.5,
disk_cache_ttl_sec: float = 5.0,
cache_dir: Optional[str] = None,
user: Optional[str] = None,
env: Optional[Dict[str, str]] = None,
unknown_grace_sec: float = 15.0, # UNKNOWN 처리 그레이스
):
self._paths = {"bkill": bkill, "bjobs": bjobs}
self._runner = ShellRunner(max_concurrent_cli, env=env)
self._user = user or os.environ.get("LSF_USER") or getpass.getuser()
self._cache = SharedCache(cache_dir, ttl_sec=disk_cache_ttl_sec, namespace="lsf_async", skew_eps=2.0)
# bjobs 제어
self._bjobs_ttl = float(bjobs_ttl_sec)
self._bjobs_min_interval = float(bjobs_min_interval_sec)
self._last_bjobs_ts = 0.0
self._inflight: Dict[str, asyncio.Future] = {}
# UNKNOWN 처리용
self._unknown_seen_at: Dict[int, float] = {}
self._unknown_grace = float(unknown_grace_sec)
# --- 일반 커맨드 실행 + Job ID 파싱 ---
async def run_command_capture_job(
self,
command: StrOrSeq,
*,
timeout: Optional[float] = None,
raise_on_error: bool = False,
) -> SubmitItem:
cmd = _to_shell_str(command)
stdout, stderr, rc = await self._runner.run(cmd, timeout=timeout)
job_ids = _find_job_ids(stdout, stderr)
if raise_on_error and rc != 0:
raise LSFCommandError(cmd, rc, stdout, stderr)
return SubmitItem(index=-1, command=cmd, returncode=rc, stdout=stdout, stderr=stderr, job_ids=job_ids)
# --- bkill (옵션 없이) ---
async def kill(
self,
job_ids: Iterable[int] | int,
*,
timeout: Optional[float] = 60.0,
raise_on_error: bool = True,
) -> Tuple[str, str, int]:
ids = [str(int(job_ids))] if isinstance(job_ids, int) else [str(int(i)) for i in job_ids]
if not ids:
return ("", "", 0)
cmd = f"{self._paths['bkill']} {' '.join(ids)}"
stdout, stderr, rc = await self._runner.run(cmd, timeout=timeout)
if rc != 0 and raise_on_error:
raise LSFCommandError(cmd, rc, stdout, stderr)
return stdout, stderr, rc
# --- bjobs 스냅샷 (캐시/레이트리밋/단일플라이트) ---
async def _fetch_bjobs_all(self, *, include_done: bool, timeout: Optional[float]) -> Dict[int, Dict[str, Any]]:
now = time.time()
since = now - self._last_bjobs_ts
if since < self._bjobs_min_interval:
await asyncio.sleep(self._bjobs_min_interval - since)
flag = "-a " if include_done else ""
cmd = f"{self._paths['bjobs']} -noheader -u {self._user} {flag}-o 'jobid stat exit_code queue exec_host'"
stdout, stderr, rc = await self._runner.run(cmd, timeout=timeout)
self._last_bjobs_ts = time.time()
if rc != 0:
raise LSFCommandError(cmd, rc, stdout, stderr)
result: Dict[int, Dict[str, Any]] = {}
for line in stdout.splitlines():
parts = line.strip().split()
if len(parts) < 5:
continue
jid, stat, exit_code, queue, exec_host = parts[:5]
try:
jid_i = int(jid)
except ValueError:
continue
result[jid_i] = {
"stat": stat,
"exit_code": None if exit_code == "-" else exit_code,
"queue": queue,
"exec_host": exec_host,
}
return result
async def _status_all_cached(
self,
*,
include_done: bool = True,
timeout: Optional[float] = 60.0,
force_refresh: bool = False,
) -> Dict[int, Dict[str, Any]]:
key = f"bjobs_all|{'-a' if include_done else ''}|user={self._user}"
if not force_refresh:
cached = self._cache.get(key)
if cached is not None:
return {int(k): v for k, v in (cached.items() if isinstance(cached, dict) else [])}
inflight = self._inflight.get(key)
if inflight:
return await inflight
fut: asyncio.Future = asyncio.get_running_loop().create_future()
self._inflight[key] = fut
try:
data = await self._fetch_bjobs_all(include_done=include_done, timeout=timeout)
self._cache.set(key, {str(k): v for k, v in data.items()})
fut.set_result(data)
return data
except Exception as e:
fut.set_exception(e)
raise
finally:
self._inflight.pop(key, None)
# --- 안전한 status (누락 자동 보강) ---
async def status(
self,
job_ids: Optional[Iterable[int]] = None,
*,
include_done: bool = True,
timeout: Optional[float] = 60.0,
force_refresh: bool = False,
) -> Dict[int, Dict[str, Any]]:
all_map = await self._status_all_cached(include_done=include_done, timeout=timeout, force_refresh=force_refresh)
if job_ids is None:
return all_map
req_ids = [int(i) for i in job_ids]
out = {i: all_map.get(i) for i in req_ids}
# 누락(id -> None) 있으면 1회 강제 새로고침
if any(v is None for v in out.values()):
all_map2 = await self._status_all_cached(include_done=include_done, timeout=timeout, force_refresh=True)
for i in req_ids:
out[i] = out[i] or all_map2.get(i)
# 그래도 없으면 UNKNOWN 보강 + 첫 발견 시간 기록
now = time.time()
for i in req_ids:
if out[i] is None:
out[i] = {"stat": "UNKNOWN", "exit_code": None, "queue": None, "exec_host": None, "_missing": True}
self._unknown_seen_at.setdefault(i, now)
return out
# --- 완료 판정 ---
async def is_finished(self, job_ids: Iterable[int]) -> bool:
ids = list({int(i) for i in job_ids})
if not ids:
return True
st = await self.status(ids, include_done=True)
now = time.time()
for i in ids:
info = st.get(i, {})
stat = (info or {}).get("stat")
if stat == "UNKNOWN":
first = self._unknown_seen_at.setdefault(i, now)
# UNKNOWN 그레이스 내에는 “미완료”로 취급 -> bjobs 전파 지연/캐시 지연 흡수
if now - first < self._unknown_grace:
return False
# 그레이스 초과 시: 완료로 간주(LSF가 정리했거나 보존기간 경과 등)
else:
continue
if stat not in self.TERMINAL:
return False
return True
# --- 배치 제출 + 취소 ---
async def submit_many_with_cancel(
self,
commands: Sequence[StrOrSeq],
*,
is_interrupted: callable, # () -> bool
poll_interval_sec: float = 0.2,
max_in_flight: int = 8,
cmd_timeout: Optional[float] = 60.0,
cmd_raise_on_error: bool = False,
cleanup_on_interrupt: bool = True, # True면 인터럽트 시 수집된 job_id를 즉시 bkill
) -> SubmitResult:
items: List[SubmitItem] = []
job_ids_all: List[int] = []
job_id_to_cmd: Dict[int, str] = {}
failures = 0
in_flight: set[asyncio.Task] = set()
async def run_one(idx: int, cmd: StrOrSeq) -> SubmitItem:
res = await self.run_command_capture_job(
cmd, timeout=cmd_timeout, raise_on_error=cmd_raise_on_error
)
return SubmitItem(index=idx, command=res.command, returncode=res.returncode,
stdout=res.stdout, stderr=res.stderr, job_ids=res.job_ids)
idx, total = 0, len(commands)
try:
while idx < total or in_flight:
while idx < total and len(in_flight) < max_in_flight and not is_interrupted():
t = asyncio.create_task(run_one(idx, commands[idx]))
in_flight.add(t)
idx += 1
if in_flight:
done, _ = await asyncio.wait(in_flight, timeout=poll_interval_sec, return_when=asyncio.FIRST_COMPLETED)
for t in done:
in_flight.discard(t)
try:
item: SubmitItem = await t
items.append(item)
for jid in item.job_ids:
job_ids_all.append(jid)
job_id_to_cmd[jid] = item.command
if item.returncode != 0 and not item.job_ids:
failures += 1
except Exception:
failures += 1
else:
await asyncio.sleep(poll_interval_sec)
finally:
if in_flight:
done, _ = await asyncio.wait(in_flight, return_when=asyncio.ALL_COMPLETED)
for t in done:
try:
item: SubmitItem = await t
items.append(item)
for jid in item.job_ids:
job_ids_all.append(jid)
job_id_to_cmd[jid] = item.command
if item.returncode != 0 and not item.job_ids:
failures += 1
except Exception:
failures += 1
interrupted = bool(is_interrupted())
# 인터럽트 시, 이미 수집된 job만 정리(bkill). (강제 옵션 없이 순수 bkill)
if interrupted and cleanup_on_interrupt and job_ids_all:
await self.kill(job_ids_all, raise_on_error=False)
job_ids_all = list(dict.fromkeys(job_ids_all))
return SubmitResult(
interrupted=interrupted,
items=items,
job_ids=job_ids_all,
job_id_to_command=job_id_to_cmd,
num_submitted=len(job_ids_all),
num_failed=failures,
)
당신이 원하는 사용 패턴 그대로
import asyncio, time
from lsf_core import LSFClient
class Token:
def __init__(self, delay): self.t0, self.delay = time.time(), delay
def is_interrupted(self): return (time.time()-self.t0) >= self.delay
async def main():
client = LSFClient(
max_concurrent_cli=8,
bjobs_ttl_sec=5.0,
bjobs_min_interval_sec=1.2,
disk_cache_ttl_sec=5.0,
cache_dir="/shared/cache",
unknown_grace_sec=15.0, # bjobs 반영 지연 허용 시간
)
is_interrupted = Token(delay=9999).is_interrupted
cmds = ["job_sub a.txt", "mkdir package", "bsub -q short sleep 3"]
data = await client.submit_many_with_cancel(cmds, is_interrupted=is_interrupted)
if data.interrupted or is_interrupted():
print("중지되었습니다.")
return
job_ids = data.job_ids # data로부터 running job id 추출
if not job_ids:
print("LSF 잡이 없습니다.")
return
while not await client.is_finished(job_ids):
status = await client.status(job_ids)
for job_id in job_ids:
info = status[job_id] # 항상 키가 존재 (UNKNOWN 보강)
job_command = data.job_id_to_command.get(job_id, "<unknown>")
print(job_id, info.get("stat"), job_command)
await asyncio.sleep(1)
if is_interrupted():
await client.kill(job_ids)
print("중지하였습니다.")
return
print("모두 완료되었습니다.")
asyncio.run(main())
정리 (step-by-step)
-
bkill은 옵션 없이 만 씀. -
status(job_ids)는 항상 모든 ID의 키가 존재 하도록 보장:-
캐시 히트 시 바로 반환
-
누락 있으면 1회 강제 리프레시
-
그래도 없으면
{"stat":"UNKNOWN"}으로 보강(에러 없음)
-
-
is_finished는UNKNOWN을 그레이스 시간 동안은 미완료 로 보고, 그 이후엔 완료로 간주(무한루프 방지).
필요하면 unknown_grace_sec만 조정해 운영 환경에 맞출 수 있어.
한 프로세스 내에서 여러 LSFClient 를 만들면 동시 돌아가는 Job이 많아지는거 아냐? Semaphore를 프로세스 전체가 공유해야할 것 같아 싱글톤 같은걸 써서. LSFClient를 싱글톤화 하든지 Semaphore만 하든지 그건 선택일거같은데 더 일반적이고 좋은 쪽을 선택해야해줘ㅗ
좋은 지적! “한 프로세스 안에서 LSFClient를 여러 개 만들어도 동시 실행 수가 늘어나지 않도록” 프로세스(정확히는 이벤트 루프) 단위 공유 세마포어 를 넣은 최적 버전으로 정리했어.
핵심만 요약하면:
-
LoopSharedSemaphore
-
같은 이벤트 루프 안에서 만들어지는 모든
LSFClient/ShellRunner가 하나의 세마포어를 공유. -
configure_global_cli_concurrency(N)으로 전역 동시 실행 수를 설정(클라이언트 만들기 전에 한 번 호출).
-
-
LSFClient/ShellRunner
-
기본은 공유 세마포어 사용(
share_semaphore=True). -
정말 필요하면 인스턴스별로 독립 세마포어도 가능(
share_semaphore=False).
-
-
기능은 그대로
-
커맨드 그대로 실행(
create_subprocess_shell),Job <id>파싱 -
bkill옵션 없이 -
bjobs스냅샷: 메모리+디스크 캐시, 과호출 레이트리밋, in-process single-flight -
누락 시 1회 강제 새로고침 + UNKNOWN 보강 ,
is_finished는 UNKNOWN 그레이스(기본 15s)
-
드롭인 교체 코드 (lsf_core.py)
# lsf_core.py
from __future__ import annotations
import asyncio
import contextlib
import getpass
import json
import os
import re
import tempfile
import time
import weakref
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, Union
# ===================== 공용 유틸 =====================
StrOrSeq = Union[str, Sequence[str]]
_JOBID_RE = re.compile(r"Job\s*<(\d+)>", re.IGNORECASE)
def _to_shell_str(cmd: StrOrSeq) -> str:
return cmd if isinstance(cmd, str) else " ".join(map(str, cmd))
def _find_job_ids(*texts: str) -> List[int]:
ids: List[int] = []
for t in texts:
if not t:
continue
for m in _JOBID_RE.finditer(t):
try:
ids.append(int(m.group(1)))
except ValueError:
pass
return list(dict.fromkeys(ids))
class LSFCommandError(RuntimeError):
def __init__(self, cmd_str: str, returncode: int, stdout: str, stderr: str):
super().__init__(f"Command failed: {cmd_str} (rc={returncode})\n{stderr.strip()}")
self.cmd_str, self.returncode, self.stdout, self.stderr = cmd_str, returncode, stdout, stderr
# ===================== 프로세스(루프) 전역 limiter =====================
class LoopSharedSemaphore:
"""
같은 이벤트 루프 안에서 create_subprocess_shell 동시 실행 수를 제한하는 전역 세마포어.
- 여러 LSFClient, 여러 ShellRunner가 있어도 '같은 루프'에서는 이 세마포어 하나를 공유.
- 다중 스레드/다중 루프 환경이라면 '루프별'로 하나씩 생김.
"""
_sems: "weakref.WeakKeyDictionary[asyncio.AbstractEventLoop, asyncio.Semaphore]" = weakref.WeakKeyDictionary()
_default_limit: int = 8
@classmethod
def configure(cls, limit: int) -> None:
# 이미 만들어진 세마포어의 용량을 동적으로 바꾸긴 어렵다.
# 그러니 '클라이언트 만들기 전에' 호출해 두는 것을 권장.
cls._default_limit = max(1, int(limit))
@classmethod
def get(cls) -> asyncio.Semaphore:
loop = asyncio.get_running_loop()
sem = cls._sems.get(loop)
if sem is None:
sem = asyncio.Semaphore(cls._default_limit)
cls._sems[loop] = sem
return sem
def configure_global_cli_concurrency(limit: int) -> None:
"""클라이언트 생성 전에 호출해서 전역 동시 실행 수를 지정하세요."""
LoopSharedSemaphore.configure(limit)
# ===================== ShellRunner =====================
class ShellRunner:
"""create_subprocess_shell만 사용. 기본은 전역(루프공유) 세마포어를 씀."""
def __init__(self, max_concurrency: Optional[int], env: Optional[Dict[str, str]], share_semaphore: bool):
# env
base_env = dict(os.environ)
base_env.setdefault("LC_ALL", "C")
base_env.setdefault("LANG", "C")
self._env = base_env if env is None else (base_env | env)
# semaphore
if share_semaphore:
self._sem = LoopSharedSemaphore.get()
else:
self._sem = asyncio.Semaphore(max_concurrency or LoopSharedSemaphore._default_limit)
async def run(self, cmd_str: str, *, timeout: Optional[float] = None) -> Tuple[str, str, int]:
async with self._sem:
proc = await asyncio.create_subprocess_shell(
cmd_str,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=self._env,
)
try:
out_b, err_b = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
with contextlib.suppress(ProcessLookupError):
proc.kill()
raise
return out_b.decode("utf-8", "replace"), err_b.decode("utf-8", "replace"), proc.returncode
# ===================== SharedCache (mem+disk) =====================
class SharedCache:
"""
멀티프로세스 공유 JSON 캐시:
- 메모리 TTL + 디스크 TTL (같은 FS 내 rename 원자성)
- 쓰기: tmp -> fsync(tmp) -> os.replace -> fsync(dir)
- 신선도: JSON 내부 ts와 파일 mtime 모두 사용(+2s skew 허용)
"""
def __init__(self, cache_dir: Optional[str], ttl_sec: float, namespace: str = "lsf_async", skew_eps: float = 2.0):
base = cache_dir or (os.environ.get("XDG_CACHE_HOME") or os.path.join(os.path.expanduser("~"), ".cache"))
self._root = os.path.join(base, namespace)
os.makedirs(self._root, exist_ok=True)
self._ttl = float(ttl_sec)
self._skew = float(skew_eps)
self._mem: Dict[str, Tuple[float, Any]] = {}
def _key_path(self, key: str) -> str:
safe = re.sub(r"[^A-Za-z0-9._-]+", "_", key)
return os.path.join(self._root, f"{safe}.json")
def get(self, key: str) -> Optional[Any]:
# memory
item = self._mem.get(key)
if item:
ts, data = item
if time.time() - ts <= self._ttl:
return data
# disk
p = self._key_path(key)
if not os.path.exists(p):
return None
try:
st = os.stat(p)
with open(p, "r") as fp:
payload = json.load(fp)
ts_in = float(payload.get("ts", 0.0))
data = payload.get("data", None)
now = time.time()
fresh_by_ts = (now - ts_in) <= (self._ttl + self._skew) if ts_in > 0 else False
fresh_by_mtime = (now - max(0.0, st.st_mtime)) <= (self._ttl + self._skew)
if fresh_by_ts or fresh_by_mtime:
self._mem[key] = (time.time(), data)
return data
except Exception:
return None
return None
def set(self, key: str, data: Any) -> None:
self._mem[key] = (time.time(), data)
p = self._key_path(key)
d = os.path.dirname(p)
os.makedirs(d, exist_ok=True)
dir_fd = os.open(d, os.O_RDONLY)
try:
fd, tmp = tempfile.mkstemp(prefix="cache_", suffix=".json", dir=d)
try:
with os.fdopen(fd, "w") as f:
json.dump({"ts": time.time(), "data": data}, f)
f.flush()
os.fsync(f.fileno())
os.replace(tmp, p)
os.fsync(dir_fd)
finally:
with contextlib.suppress(FileNotFoundError):
os.remove(tmp)
finally:
os.close(dir_fd)
# ===================== 결과 타입 =====================
@dataclass
class SubmitItem:
index: int
command: str
returncode: int
stdout: str
stderr: str
job_ids: List[int]
@dataclass
class SubmitResult:
interrupted: bool
items: List[SubmitItem]
job_ids: List[int] # 평탄화된 전체 Job ID
job_id_to_command: Dict[int, str] # Job ID -> 원 커맨드
num_submitted: int
num_failed: int
# ===================== LSFClient =====================
class LSFClient:
"""
- run_command_capture_job: 커맨드 그대로 실행, 출력에서 'Job <id>'(복수 가능) 파싱
- kill: 순수 bkill (옵션 없이)
- status: bjobs 전체 스냅샷(캐시/레이트리밋/단일플라이트) + 누락 시 1회 강제 새로고침 + UNKNOWN 보강
- submit_many_with_cancel: 인터럽트 시 새 실행 중단, 진행 중은 완주 → 수집한 job_id를 필요 시 bkill
- is_finished: UNKNOWN 그레이스(기본 15s) 동안은 미완료로 간주
- 세마포어: 기본은 루프 전역 공유(LoopSharedSemaphore)
"""
TERMINAL = {"DONE", "EXIT", "ZOMBI"}
def __init__(
self,
*,
bkill: str = "bkill",
bjobs: str = "bjobs",
max_concurrent_cli: Optional[int] = None, # share_semaphore=False일 때만 사용
bjobs_ttl_sec: float = 5.0,
bjobs_min_interval_sec: float = 1.5,
disk_cache_ttl_sec: float = 5.0,
cache_dir: Optional[str] = None,
user: Optional[str] = None,
env: Optional[Dict[str, str]] = None,
unknown_grace_sec: float = 15.0,
share_semaphore: bool = True, # True: 루프 전역 공유, False: 인스턴스별 세마포어
):
self._paths = {"bkill": bkill, "bjobs": bjobs}
self._runner = ShellRunner(max_concurrency=max_concurrent_cli, env=env, share_semaphore=share_semaphore)
self._user = user or os.environ.get("LSF_USER") or getpass.getuser()
self._cache = SharedCache(cache_dir, ttl_sec=disk_cache_ttl_sec, namespace="lsf_async", skew_eps=2.0)
# bjobs 제어
self._bjobs_ttl = float(bjobs_ttl_sec)
self._bjobs_min_interval = float(bjobs_min_interval_sec)
self._last_bjobs_ts = 0.0
self._inflight: Dict[str, asyncio.Future] = {}
# UNKNOWN 처리
self._unknown_seen_at: Dict[int, float] = {}
self._unknown_grace = float(unknown_grace_sec)
# --- 일반 커맨드 실행 + Job ID 파싱 ---
async def run_command_capture_job(
self,
command: StrOrSeq,
*,
timeout: Optional[float] = None,
raise_on_error: bool = False,
) -> SubmitItem:
cmd = _to_shell_str(command)
stdout, stderr, rc = await self._runner.run(cmd, timeout=timeout)
job_ids = _find_job_ids(stdout, stderr)
if raise_on_error and rc != 0:
raise LSFCommandError(cmd, rc, stdout, stderr)
return SubmitItem(index=-1, command=cmd, returncode=rc, stdout=stdout, stderr=stderr, job_ids=job_ids)
# --- bkill (옵션 없이) ---
async def kill(
self,
job_ids: Iterable[int] | int,
*,
timeout: Optional[float] = 60.0,
raise_on_error: bool = True,
) -> Tuple[str, str, int]:
ids = [str(int(job_ids))] if isinstance(job_ids, int) else [str(int(i)) for i in job_ids]
if not ids:
return ("", "", 0)
cmd = f"{self._paths['bkill']} {' '.join(ids)}"
stdout, stderr, rc = await self._runner.run(cmd, timeout=timeout)
if rc != 0 and raise_on_error:
raise LSFCommandError(cmd, rc, stdout, stderr)
return stdout, stderr, rc
# --- bjobs 스냅샷 (캐시/레이트리밋/단일플라이트) ---
async def _fetch_bjobs_all(self, *, include_done: bool, timeout: Optional[float]) -> Dict[int, Dict[str, Any]]:
now = time.time()
since = now - self._last_bjobs_ts
if since < self._bjobs_min_interval:
await asyncio.sleep(self._bjobs_min_interval - since)
flag = "-a " if include_done else ""
cmd = f"{self._paths['bjobs']} -noheader -u {self._user} {flag}-o 'jobid stat exit_code queue exec_host'"
stdout, stderr, rc = await self._runner.run(cmd, timeout=timeout)
self._last_bjobs_ts = time.time()
if rc != 0:
raise LSFCommandError(cmd, rc, stdout, stderr)
result: Dict[int, Dict[str, Any]] = {}
for line in stdout.splitlines():
parts = line.strip().split()
if len(parts) < 5:
continue
jid, stat, exit_code, queue, exec_host = parts[:5]
try:
jid_i = int(jid)
except ValueError:
continue
result[jid_i] = {
"stat": stat,
"exit_code": None if exit_code == "-" else exit_code,
"queue": queue,
"exec_host": exec_host,
}
return result
async def _status_all_cached(
self,
*,
include_done: bool = True,
timeout: Optional[float] = 60.0,
force_refresh: bool = False,
) -> Dict[int, Dict[str, Any]]:
key = f"bjobs_all|{'-a' if include_done else ''}|user={self._user}"
if not force_refresh:
cached = self._cache.get(key)
if cached is not None:
return {int(k): v for k, v in (cached.items() if isinstance(cached, dict) else [])}
inflight = self._inflight.get(key)
if inflight:
return await inflight
fut: asyncio.Future = asyncio.get_running_loop().create_future()
self._inflight[key] = fut
try:
data = await self._fetch_bjobs_all(include_done=include_done, timeout=timeout)
self._cache.set(key, {str(k): v for k, v in data.items()})
fut.set_result(data)
return data
except Exception as e:
fut.set_exception(e)
raise
finally:
self._inflight.pop(key, None)
# --- 안전한 status (누락 자동 보강) ---
async def status(
self,
job_ids: Optional[Iterable[int]] = None,
*,
include_done: bool = True,
timeout: Optional[float] = 60.0,
force_refresh: bool = False,
) -> Dict[int, Dict[str, Any]]:
all_map = await self._status_all_cached(include_done=include_done, timeout=timeout, force_refresh=force_refresh)
if job_ids is None:
return all_map
req_ids = [int(i) for i in job_ids]
out = {i: all_map.get(i) for i in req_ids}
if any(v is None for v in out.values()):
all_map2 = await self._status_all_cached(include_done=include_done, timeout=timeout, force_refresh=True)
for i in req_ids:
out[i] = out[i] or all_map2.get(i)
now = time.time()
for i in req_ids:
if out[i] is None:
out[i] = {"stat": "UNKNOWN", "exit_code": None, "queue": None, "exec_host": None, "_missing": True}
# UNKNOWN 그레이스 시작 시각 기록
self._unknown_seen_at.setdefault(i, now)
return out
# --- 완료 판정 ---
async def is_finished(self, job_ids: Iterable[int]) -> bool:
ids = list({int(i) for i in job_ids})
if not ids:
return True
st = await self.status(ids, include_done=True)
now = time.time()
for i in ids:
info = st.get(i, {})
stat = (info or {}).get("stat")
if stat == "UNKNOWN":
first = self._unknown_seen_at.setdefault(i, now)
if now - first < 15.0: # self._unknown_grace를 써도 됨. 기본 15s
return False
else:
continue
if stat not in self.TERMINAL:
return False
return True
# --- 배치 제출 + 취소 ---
async def submit_many_with_cancel(
self,
commands: Sequence[StrOrSeq],
*,
is_interrupted: callable, # () -> bool
poll_interval_sec: float = 0.2,
max_in_flight: int = 8,
cmd_timeout: Optional[float] = 60.0,
cmd_raise_on_error: bool = False,
cleanup_on_interrupt: bool = True, # True면 인터럽트 시 수집된 job_id를 즉시 bkill
) -> SubmitResult:
items: List[SubmitItem] = []
job_ids_all: List[int] = []
job_id_to_cmd: Dict[int, str] = {}
failures = 0
in_flight: set[asyncio.Task] = set()
async def run_one(idx: int, cmd: StrOrSeq) -> SubmitItem:
res = await self.run_command_capture_job(
cmd, timeout=cmd_timeout, raise_on_error=cmd_raise_on_error
)
return SubmitItem(index=idx, command=res.command, returncode=res.returncode,
stdout=res.stdout, stderr=res.stderr, job_ids=res.job_ids)
idx, total = 0, len(commands)
try:
while idx < total or in_flight:
while idx < total and len(in_flight) < max_in_flight and not is_interrupted():
t = asyncio.create_task(run_one(idx, commands[idx]))
in_flight.add(t)
idx += 1
if in_flight:
done, _ = await asyncio.wait(in_flight, timeout=poll_interval_sec, return_when=asyncio.FIRST_COMPLETED)
for t in done:
in_flight.discard(t)
try:
item: SubmitItem = await t
items.append(item)
for jid in item.job_ids:
job_ids_all.append(jid)
job_id_to_cmd[jid] = item.command
if item.returncode != 0 and not item.job_ids:
failures += 1
except Exception:
failures += 1
else:
await asyncio.sleep(poll_interval_sec)
finally:
if in_flight:
done, _ = await asyncio.wait(in_flight, return_when=asyncio.ALL_COMPLETED)
for t in done:
try:
item: SubmitItem = await t
items.append(item)
for jid in item.job_ids:
job_ids_all.append(jid)
job_id_to_cmd[jid] = item.command
if item.returncode != 0 and not item.job_ids:
failures += 1
except Exception:
failures += 1
interrupted = bool(is_interrupted())
if interrupted and cleanup_on_interrupt and job_ids_all:
await self.kill(job_ids_all, raise_on_error=False)
job_ids_all = list(dict.fromkeys(job_ids_all))
return SubmitResult(
interrupted=interrupted,
items=items,
job_ids=job_ids_all,
job_id_to_command=job_id_to_cmd,
num_submitted=len(job_ids_all),
num_failed=failures,
)
어떻게 쓰면 되나 (step-by-step)
1) 전역 동시 실행 수를 한 번만 설정
from lsf_core import configure_global_cli_concurrency
configure_global_cli_concurrency(6) # 프로세스(=루프) 전체에서 최대 6개만 동시 실행
이건 클라이언트들을 만들기 전에 호출하세요. 같은 이벤트 루프에서 만든 모든
LSFClient가 이 세마포어를 공유합니다.
2) 클라이언트 여러 개 만들어도 OK
from lsf_core import LSFClient
c1 = LSFClient() # 기본: 공유 세마포어 사용
c2 = LSFClient() # 동일 루프면 c1과 같은 세마포어 공유
c3 = LSFClient(share_semaphore=False, max_concurrent_cli=2) # 정말 필요하면 독립 세마포어
3) 질문에서 준 루프
cmds = ["job_sub a.txt", "mkdir package", "bsub -q short sleep 3"]
data = await c1.submit_many_with_cancel(cmds, is_interrupted=is_interrupted)
if data.interrupted or is_interrupted():
print("중지되었습니다.")
return
job_ids = data.job_ids
while not await c1.is_finished(job_ids):
status = await c1.status(job_ids)
for job_id in job_ids:
job_command = data.job_id_to_command.get(job_id)
print(job_id, status[job_id].get("stat"), job_command)
await asyncio.sleep(1)
if is_interrupted():
await c1.kill(job_ids) # 옵션 없이 순수 bkill
print("중지하였습니다.")
return
print("모두 완료되었습니다.")
왜 이게 “더 일반적이고 좋은 선택”인가
-
안전한 기본값 : 대부분의 애플리케이션은 단일 이벤트 루프에서 돌아가므로, 루프 단위 전역 세마포어 공유가 가장 자연스럽고 안정적.
-
예측 가능한 동시성 :
configure_global_cli_concurrency(N)딱 한 번으로 프로세스 전체 실행량을 제어. -
유연성 : 예외적으로 독립 세마포어가 필요하면
share_semaphore=False로 분리 가능. -
복잡도 최소화 : 싱글톤 클라이언트 강제보다 제어 지점이 명확하고, 테스트·확장이 쉽다.
원하면 전역 동시 실행 수를 환경변수(예: LSF_CLI_MAX=6)로도 읽도록 한 줄 추가해줄게.
SharedCache도 한 프로세스 내에 LSFClient가 여러개 있을 때 최대한 효율적으로 동작하게 하고 싶어. 지금은 Client가 N개면 Cache도 N개 있는데 이건 좀 비효율적인거 아닌가 싶어서.
좋은 포인트! 한 프로세스(정확히는 같은 이벤트 루프) 안에서 LSFClient를 여러 개 생성해도 캐시(메모리)와 단일 플라이트(single-flight)가 중복되지 않게 고쳤어요.
핵심 변화만 요약:
-
공유 세마포어 : 이미 제공했던
LoopSharedSemaphore로 전역 동시 실행 수를 루프 단위로 공유. -
공유 메모리 캐시 :
SharedCache가 프로세스/루프 전역 메모리 샤드 를 사용하도록 변경 →LSFClient가 N개여도 메모리 캐시 1벌 만. -
전역 single-flight :
bjobs스냅샷을 가져오는 fetch는 루프 전역 single-flight 로 묶어, 동시에 여러LSFClient가 호출해도 실제 호출은 1회 만 수행.
아래 파일 하나로 교체해서 쓰세요. (이전 응답의 기능/인터페이스는 유지됩니다: bkill 옵션 없음, UNKNOWN 보강, is_finished 그레이스 처리 등.)
# lsf_core.py
from __future__ import annotations
import asyncio
import contextlib
import getpass
import json
import os
import re
import tempfile
import time
import weakref
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, Union
# ========== 공용 ==========
StrOrSeq = Union[str, Sequence[str]]
_JOBID_RE = re.compile(r"Job\s*<(\d+)>", re.IGNORECASE)
def _to_shell_str(cmd: StrOrSeq) -> str:
return cmd if isinstance(cmd, str) else " ".join(map(str, cmd))
def _find_job_ids(*texts: str) -> List[int]:
ids: List[int] = []
for t in texts:
if not t:
continue
for m in _JOBID_RE.finditer(t):
try:
ids.append(int(m.group(1)))
except ValueError:
pass
return list(dict.fromkeys(ids))
class LSFCommandError(RuntimeError):
def __init__(self, cmd_str: str, returncode: int, stdout: str, stderr: str):
super().__init__(f"Command failed: {cmd_str} (rc={returncode})\n{stderr.strip()}")
self.cmd_str, self.returncode, self.stdout, self.stderr = cmd_str, returncode, stdout, stderr
# ========== 루프 전역 공유 도우미들 ==========
class LoopSharedSemaphore:
"""같은 이벤트 루프 내에서 전역으로 공유되는 세마포어 (CLI 동시 실행 상한)."""
_sems: "weakref.WeakKeyDictionary[asyncio.AbstractEventLoop, asyncio.Semaphore]" = weakref.WeakKeyDictionary()
_default_limit: int = 8
@classmethod
def configure(cls, limit: int) -> None:
cls._default_limit = max(1, int(limit))
@classmethod
def get(cls) -> asyncio.Semaphore:
loop = asyncio.get_running_loop()
sem = cls._sems.get(loop)
if sem is None:
sem = asyncio.Semaphore(cls._default_limit)
cls._sems[loop] = sem
return sem
def configure_global_cli_concurrency(limit: int) -> None:
"""클라이언트 생성 전에 한 번 호출해서 전역 동시 실행 수를 지정하세요."""
LoopSharedSemaphore.configure(limit)
class LoopSingleFlight:
"""
같은 이벤트 루프에서 key별로 동시 실행을 1회로 보장.
사용법: await LoopSingleFlight.run(key, coro_factory)
"""
_maps: "weakref.WeakKeyDictionary[asyncio.AbstractEventLoop, Dict[str, asyncio.Future]]" = weakref.WeakKeyDictionary()
@classmethod
async def run(cls, key: str, coro_factory):
loop = asyncio.get_running_loop()
fmap = cls._maps.get(loop)
if fmap is None:
fmap = {}
cls._maps[loop] = fmap
fut = fmap.get(key)
if fut is not None:
return await fut
fut = loop.create_future()
fmap[key] = fut
try:
res = await coro_factory()
fut.set_result(res)
return res
except Exception as e:
fut.set_exception(e)
raise
finally:
fmap.pop(key, None)
# ========== ShellRunner ==========
class ShellRunner:
"""create_subprocess_shell만 사용. 기본은 루프 전역 세마포어 공유."""
def __init__(self, max_concurrency: Optional[int], env: Optional[Dict[str, str]], share_semaphore: bool):
base_env = dict(os.environ)
base_env.setdefault("LC_ALL", "C")
base_env.setdefault("LANG", "C")
self._env = base_env if env is None else (base_env | env)
self._sem = LoopSharedSemaphore.get() if share_semaphore else asyncio.Semaphore(max_concurrency or 8)
async def run(self, cmd_str: str, *, timeout: Optional[float] = None) -> Tuple[str, str, int]:
async with self._sem:
proc = await asyncio.create_subprocess_shell(
cmd_str,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=self._env,
)
try:
out_b, err_b = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
with contextlib.suppress(ProcessLookupError):
proc.kill()
raise
return out_b.decode("utf-8", "replace"), err_b.decode("utf-8", "replace"), proc.returncode
# ========== SharedCache (프로세스/루프 전역 메모리) ==========
# 루트 디렉토리(= base/namespace) 단위로 메모리 샤드 공유
_MEM_SHARDS: Dict[str, Dict[str, Tuple[float, Any]]] = {}
class SharedCache:
"""
멀티프로세스 공유 JSON 캐시 + **프로세스/루프 전역 메모리 샤드**:
- 같은 프로세스에서 같은 cache_root(=base/namespace)를 쓰는 모든 LSFClient가 **한 메모리 dict 공유**
- 디스크: tmp -> fsync(tmp) -> os.replace -> fsync(dir) (같은 FS 내 rename 원자성)
- 신선도: JSON 내부 ts와 파일 mtime 모두 사용(+2s skew 허용)
"""
def __init__(self, cache_dir: Optional[str], ttl_sec: float, namespace: str = "lsf_async", skew_eps: float = 2.0):
base = cache_dir or (os.environ.get("XDG_CACHE_HOME") or os.path.join(os.path.expanduser("~"), ".cache"))
self._root = os.path.join(base, namespace) # shard key
os.makedirs(self._root, exist_ok=True)
self._ttl = float(ttl_sec)
self._skew = float(skew_eps)
# 전역 샤드 확보
self._mem = _MEM_SHARDS.setdefault(self._root, {}) # key -> (ts, data)
def _key_path(self, key: str) -> str:
safe = re.sub(r"[^A-Za-z0-9._-]+", "_", key)
return os.path.join(self._root, f"{safe}.json")
def get(self, key: str) -> Optional[Any]:
# 1) 프로세스 전역 메모리
item = self._mem.get(key)
if item:
ts, data = item
if time.time() - ts <= self._ttl:
return data
# 2) 디스크
p = self._key_path(key)
if not os.path.exists(p):
return None
try:
st = os.stat(p)
with open(p, "r") as fp:
payload = json.load(fp)
ts_in = float(payload.get("ts", 0.0))
data = payload.get("data", None)
now = time.time()
fresh_by_ts = (now - ts_in) <= (self._ttl + self._skew) if ts_in > 0 else False
fresh_by_mtime = (now - max(0.0, st.st_mtime)) <= (self._ttl + self._skew)
if fresh_by_ts or fresh_by_mtime:
self._mem[key] = (time.time(), data)
return data
except Exception:
return None
return None
def set(self, key: str, data: Any) -> None:
self._mem[key] = (time.time(), data)
p = self._key_path(key)
d = os.path.dirname(p)
os.makedirs(d, exist_ok=True)
dir_fd = os.open(d, os.O_RDONLY)
try:
fd, tmp = tempfile.mkstemp(prefix="cache_", suffix=".json", dir=d)
try:
with os.fdopen(fd, "w") as f:
json.dump({"ts": time.time(), "data": data}, f)
f.flush()
os.fsync(f.fileno())
os.replace(tmp, p)
os.fsync(dir_fd)
finally:
with contextlib.suppress(FileNotFoundError):
os.remove(tmp)
finally:
os.close(dir_fd)
# ========== 결과 타입 ==========
@dataclass
class SubmitItem:
index: int
command: str
returncode: int
stdout: str
stderr: str
job_ids: List[int]
@dataclass
class SubmitResult:
interrupted: bool
items: List[SubmitItem]
job_ids: List[int] # 평탄화된 전체 Job ID
job_id_to_command: Dict[int, str] # Job ID -> 원 커맨드
num_submitted: int
num_failed: int
# ========== LSFClient ==========
class LSFClient:
"""
- run_command_capture_job: 커맨드 그대로 실행, 출력에서 'Job <id>'(복수 가능) 파싱
- kill: 순수 bkill (옵션 없이)
- status: bjobs 전체 스냅샷(공유 캐시 + 전역 single-flight + 레이트리밋)
+ 누락 시 1회 강제 새로고침 + UNKNOWN 보강
- submit_many_with_cancel: 인터럽트 시 새 실행 중단, 진행 중은 완주 → 수집 job_id 필요시 bkill
- is_finished: UNKNOWN 그레이스(기본 15s) 동안은 미완료로 간주
- 동시성: 루프 전역 세마포어 공유 (configure_global_cli_concurrency 사용 가능)
"""
TERMINAL = {"DONE", "EXIT", "ZOMBI"}
def __init__(
self,
*,
bkill: str = "bkill",
bjobs: str = "bjobs",
max_concurrent_cli: Optional[int] = None, # share_semaphore=False일 때만 의미 있음
bjobs_ttl_sec: float = 5.0,
bjobs_min_interval_sec: float = 1.5,
disk_cache_ttl_sec: float = 5.0,
cache_dir: Optional[str] = None,
user: Optional[str] = None,
env: Optional[Dict[str, str]] = None,
unknown_grace_sec: float = 15.0,
share_semaphore: bool = True,
):
self._paths = {"bkill": bkill, "bjobs": bjobs}
self._runner = ShellRunner(max_concurrency=max_concurrent_cli, env=env, share_semaphore=share_semaphore)
self._user = user or os.environ.get("LSF_USER") or getpass.getuser()
self._cache = SharedCache(cache_dir, ttl_sec=disk_cache_ttl_sec, namespace="lsf_async", skew_eps=2.0)
# bjobs 제어
self._bjobs_ttl = float(bjobs_ttl_sec)
self._bjobs_min_interval = float(bjobs_min_interval_sec)
self._last_bjobs_ts = 0.0
# UNKNOWN 처리
self._unknown_seen_at: Dict[int, float] = {}
self._unknown_grace = float(unknown_grace_sec)
# --- 일반 커맨드 실행 + Job ID 파싱 ---
async def run_command_capture_job(
self,
command: StrOrSeq,
*,
timeout: Optional[float] = None,
raise_on_error: bool = False,
) -> SubmitItem:
cmd = _to_shell_str(command)
stdout, stderr, rc = await self._runner.run(cmd, timeout=timeout)
job_ids = _find_job_ids(stdout, stderr)
if raise_on_error and rc != 0:
raise LSFCommandError(cmd, rc, stdout, stderr)
return SubmitItem(index=-1, command=cmd, returncode=rc, stdout=stdout, stderr=stderr, job_ids=job_ids)
# --- bkill (옵션 없이) ---
async def kill(
self,
job_ids: Iterable[int] | int,
*,
timeout: Optional[float] = 60.0,
raise_on_error: bool = True,
) -> Tuple[str, str, int]:
ids = [str(int(job_ids))] if isinstance(job_ids, int) else [str(int(i)) for i in job_ids]
if not ids:
return ("", "", 0)
cmd = f"{self._paths['bkill']} {' '.join(ids)}"
stdout, stderr, rc = await self._runner.run(cmd, timeout=timeout)
if rc != 0 and raise_on_error:
raise LSFCommandError(cmd, rc, stdout, stderr)
return stdout, stderr, rc
# --- bjobs 스냅샷 (공유 캐시 + 전역 single-flight) ---
async def _fetch_bjobs_all(self, *, include_done: bool, timeout: Optional[float]) -> Dict[int, Dict[str, Any]]:
# per-process(min: 루프) rate-limit
now = time.time()
since = now - self._last_bjobs_ts
if since < self._bjobs_min_interval:
await asyncio.sleep(self._bjobs_min_interval - since)
flag = "-a " if include_done else ""
cmd = f"{self._paths['bjobs']} -noheader -u {self._user} {flag}-o 'jobid stat exit_code queue exec_host'"
stdout, stderr, rc = await self._runner.run(cmd, timeout=timeout)
self._last_bjobs_ts = time.time()
if rc != 0:
raise LSFCommandError(cmd, rc, stdout, stderr)
result: Dict[int, Dict[str, Any]] = {}
for line in stdout.splitlines():
parts = line.strip().split()
if len(parts) < 5:
continue
jid, stat, exit_code, queue, exec_host = parts[:5]
try:
jid_i = int(jid)
except ValueError:
continue
result[jid_i] = {
"stat": stat,
"exit_code": None if exit_code == "-" else exit_code,
"queue": queue,
"exec_host": exec_host,
}
return result
async def _status_all_cached(
self,
*,
include_done: bool = True,
timeout: Optional[float] = 60.0,
force_refresh: bool = False,
) -> Dict[int, Dict[str, Any]]:
key = f"bjobs_all|{'-a' if include_done else ''}|user={self._user}"
# 1) 공유 캐시 먼저 조회
if not force_refresh:
cached = self._cache.get(key)
if cached is not None:
return {int(k): v for k, v in (cached.items() if isinstance(cached, dict) else [])}
# 2) 전역 single-flight로 fetch 단일화
async def _do_fetch():
data = await self._fetch_bjobs_all(include_done=include_done, timeout=timeout)
self._cache.set(key, {str(k): v for k, v in data.items()})
return data
if force_refresh:
return await _do_fetch()
return await LoopSingleFlight.run(key, _do_fetch)
# --- 안전한 status (누락 자동 보강 + UNKNOWN 보강) ---
async def status(
self,
job_ids: Optional[Iterable[int]] = None,
*,
include_done: bool = True,
timeout: Optional[float] = 60.0,
force_refresh: bool = False,
) -> Dict[int, Dict[str, Any]]:
all_map = await self._status_all_cached(include_done=include_done, timeout=timeout, force_refresh=force_refresh)
if job_ids is None:
return all_map
req_ids = [int(i) for i in job_ids]
out = {i: all_map.get(i) for i in req_ids}
# 누락 있으면 1회 강제 새로고침
if any(v is None for v in out.values()):
all_map2 = await self._status_all_cached(include_done=include_done, timeout=timeout, force_refresh=True)
for i in req_ids:
out[i] = out[i] or all_map2.get(i)
# 그래도 없으면 UNKNOWN 보강
now = time.time()
for i in req_ids:
if out[i] is None:
out[i] = {"stat": "UNKNOWN", "exit_code": None, "queue": None, "exec_host": None, "_missing": True}
self._unknown_seen_at.setdefault(i, now)
return out
# --- 완료 판정 ---
async def is_finished(self, job_ids: Iterable[int]) -> bool:
ids = list({int(i) for i in job_ids})
if not ids:
return True
st = await self.status(ids, include_done=True)
now = time.time()
for i in ids:
info = st.get(i, {})
stat = (info or {}).get("stat")
if stat == "UNKNOWN":
first = self._unknown_seen_at.setdefault(i, now)
if now - first < 15.0: # 기본 그레이스 15초 (필요시 self._unknown_grace 사용)
return False
else:
continue
if stat not in self.TERMINAL:
return False
return True
# --- 배치 제출 + 취소 ---
async def submit_many_with_cancel(
self,
commands: Sequence[StrOrSeq],
*,
is_interrupted: callable, # () -> bool
poll_interval_sec: float = 0.2,
max_in_flight: int = 8,
cmd_timeout: Optional[float] = 60.0,
cmd_raise_on_error: bool = False,
cleanup_on_interrupt: bool = True, # True면 인터럽트 시 수집된 job_id 즉시 bkill
) -> SubmitResult:
items: List[SubmitItem] = []
job_ids_all: List[int] = []
job_id_to_cmd: Dict[int, str] = {}
failures = 0
in_flight: set[asyncio.Task] = set()
async def run_one(idx: int, cmd: StrOrSeq) -> SubmitItem:
res = await self.run_command_capture_job(
cmd, timeout=cmd_timeout, raise_on_error=cmd_raise_on_error
)
return SubmitItem(index=idx, command=res.command, returncode=res.returncode,
stdout=res.stdout, stderr=res.stderr, job_ids=res.job_ids)
idx, total = 0, len(commands)
try:
while idx < total or in_flight:
while idx < total and len(in_flight) < max_in_flight and not is_interrupted():
t = asyncio.create_task(run_one(idx, commands[idx]))
in_flight.add(t)
idx += 1
if in_flight:
done, _ = await asyncio.wait(in_flight, timeout=poll_interval_sec, return_when=asyncio.FIRST_COMPLETED)
for t in done:
in_flight.discard(t)
try:
item: SubmitItem = await t
items.append(item)
for jid in item.job_ids:
job_ids_all.append(jid)
job_id_to_cmd[jid] = item.command
if item.returncode != 0 and not item.job_ids:
failures += 1
except Exception:
failures += 1
else:
await asyncio.sleep(poll_interval_sec)
finally:
if in_flight:
done, _ = await asyncio.wait(in_flight, return_when=asyncio.ALL_COMPLETED)
for t in done:
try:
item: SubmitItem = await t
items.append(item)
for jid in item.job_ids:
job_ids_all.append(jid)
job_id_to_cmd[jid] = item.command
if item.returncode != 0 and not item.job_ids:
failures += 1
except Exception:
failures += 1
interrupted = bool(is_interrupted())
if interrupted and cleanup_on_interrupt and job_ids_all:
await self.kill(job_ids_all, raise_on_error=False)
job_ids_all = list(dict.fromkeys(job_ids_all))
return SubmitResult(
interrupted=interrupted,
items=items,
job_ids=job_ids_all,
job_id_to_command=job_id_to_cmd,
num_submitted=len(job_ids_all),
num_failed=failures,
)
어떻게 달라졌나 (step-by-step)
-
SharedCache 메모리 공유 :
_MEM_SHARDS에 루트(=cache_dir/namespace)별로 딱 한 벌의 메모리 dict 만 존재 → 같은 프로세스/루프의 여러LSFClient가 동일 메모리 캐시 를 씀. -
전역 single-flight :
LoopSingleFlight.run(key, _do_fetch)로bjobs스냅샷 fetch를 루프 전역 1회로 제한. -
동시 실행 상한 :
LoopSharedSemaphore로 루프 전역 동시 실행 수 제어(원하면configure_global_cli_concurrency(N)호출).
이제 한 프로세스에서 LSFClient를 여러 개 만들어도:
-
셸 실행 동시성 상한이 늘지 않고 ,
-
메모리 캐시/단일 플라이트도 중복되지 않아서 불필요한
bjobs실행이 사실상 1회로 합쳐집니다.
필요하면 캐시 루트(cache_dir, namespace)를 팀/유저 단위로 나눠서 격리할 수도 있어요.
이렇게 복잡해질 일인가 싶네. 그냥 이벤트 루프는 1개라고 가정하고 모두 공유하면 안돼?
좋아, “이벤트 루프는 1개라고 가정”하고, 프로세스 전역으로 모두 공유 하도록 깔끔하게 정리했어.
전역 공유 대상: 세마포어(동시 실행 상한), 메모리 캐시, single-flight, bjobs rate-limit.
-
bkill은 옵션 없이 만 사용. -
bjobs는 전체 스냅샷 1회 + 메모리/디스크 TTL 캐시 + 전역 single-flight , 과호출 방지. -
스냅샷에 누락 되면 1회 강제 갱신 → 그래도 없으면
{"stat":"UNKNOWN"}으로 보강(에러 X). -
is_finished()는UNKNOWN을 그레이스(기본 15s) 동안은 “미완료”로 간주, 이후 완료로 간주. -
전역 동시 실행 수 는
configure_global_cli_concurrency(N)로 한 번만 설정(클라이언트 생성 전 권장).
# lsf_simple.py
from __future__ import annotations
import asyncio
import contextlib
import getpass
import json
import os
import re
import tempfile
import time
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, Union
# ===================== 전역 설정/공유 상태 =====================
_DEFAULT_CONCURRENCY = 8
_SEM: Optional[asyncio.Semaphore] = None # 프로세스 전역 세마포어(단일 이벤트 루프 가정)
_MEM_CACHE: Dict[str, Tuple[float, Any]] = {} # 프로세스 전역 메모리 캐시
_INFLIGHT: Dict[str, asyncio.Future] = {} # 프로세스 전역 single-flight (key -> Future)
_LAST_BJOBS_CALL: Dict[str, float] = {} # rate-limit용 마지막 호출시각 (key= f"{user}|{'a' if include_done else ''}")
def configure_global_cli_concurrency(limit: int) -> None:
"""프로세스 전역 CLI 동시 실행 상한을 설정. 가능한 한 클라이언트 사용 전에 호출하세요."""
global _DEFAULT_CONCURRENCY
_DEFAULT_CONCURRENCY = max(1, int(limit))
def _get_global_sem() -> asyncio.Semaphore:
global _SEM
if _SEM is None:
_SEM = asyncio.Semaphore(_DEFAULT_CONCURRENCY)
return _SEM
# ===================== 공용 유틸 =====================
StrOrSeq = Union[str, Sequence[str]]
_JOBID_RE = re.compile(r"Job\s*<(\d+)>", re.IGNORECASE)
def _to_shell_str(cmd: StrOrSeq) -> str:
return cmd if isinstance(cmd, str) else " ".join(map(str, cmd))
def _find_job_ids(*texts: str) -> List[int]:
ids: List[int] = []
for t in texts:
if not t:
continue
for m in _JOBID_RE.finditer(t):
try: ids.append(int(m.group(1)))
except ValueError: pass
# 중복 제거(출력 중복 대비)
return list(dict.fromkeys(ids))
class LSFCommandError(RuntimeError):
def __init__(self, cmd_str: str, returncode: int, stdout: str, stderr: str):
super().__init__(f"Command failed: {cmd_str} (rc={returncode})\n{stderr.strip()}")
self.cmd_str, self.returncode, self.stdout, self.stderr = cmd_str, returncode, stdout, stderr
# ===================== ShellRunner (전역 세마포어 공유) =====================
class ShellRunner:
"""create_subprocess_shell + 전역 세마포어로 동시 실행 제한"""
def __init__(self, env: Optional[Dict[str, str]] = None):
base_env = dict(os.environ)
base_env.setdefault("LC_ALL", "C")
base_env.setdefault("LANG", "C")
self._env = base_env if env is None else (base_env | env)
async def run(self, cmd_str: str, *, timeout: Optional[float] = None) -> Tuple[str, str, int]:
sem = _get_global_sem()
async with sem:
proc = await asyncio.create_subprocess_shell(
cmd_str,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=self._env,
)
try:
out_b, err_b = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
with contextlib.suppress(ProcessLookupError):
proc.kill()
raise
return out_b.decode("utf-8", "replace"), err_b.decode("utf-8", "replace"), proc.returncode
# ===================== SharedCache (프로세스 전역 메모리 + 디스크) =====================
class SharedCache:
"""
멀티프로세스 공유 JSON 캐시 + 프로세스 전역 메모리:
- 메모리 TTL: _MEM_CACHE 사용
- 디스크 TTL: tmp -> fsync(tmp) -> os.replace -> fsync(dir)
- 신선도: JSON 내부 ts와 파일 mtime 모두로 판정(+2s skew 허용)
"""
def __init__(self, cache_dir: Optional[str], ttl_sec: float, namespace: str = "lsf_async", skew_eps: float = 2.0):
base = cache_dir or (os.environ.get("XDG_CACHE_HOME") or os.path.join(os.path.expanduser("~"), ".cache"))
self._root = os.path.join(base, namespace)
os.makedirs(self._root, exist_ok=True)
self._ttl = float(ttl_sec)
self._skew = float(skew_eps)
def _key_path(self, key: str) -> str:
safe = re.sub(r"[^A-Za-z0-9._-]+", "_", key)
return os.path.join(self._root, f"{safe}.json")
def get(self, key: str) -> Optional[Any]:
# 1) 프로세스 전역 메모리
item = _MEM_CACHE.get(key)
if item:
ts, data = item
if time.time() - ts <= self._ttl:
return data
# 2) 디스크
p = self._key_path(key)
if not os.path.exists(p):
return None
try:
st = os.stat(p)
with open(p, "r") as fp:
payload = json.load(fp)
ts_in = float(payload.get("ts", 0.0))
data = payload.get("data", None)
now = time.time()
fresh_by_ts = (now - ts_in) <= (self._ttl + self._skew) if ts_in > 0 else False
fresh_by_mtime = (now - max(0.0, st.st_mtime)) <= (self._ttl + self._skew)
if fresh_by_ts or fresh_by_mtime:
_MEM_CACHE[key] = (time.time(), data)
return data
except Exception:
return None
return None
def set(self, key: str, data: Any) -> None:
_MEM_CACHE[key] = (time.time(), data)
p = self._key_path(key)
d = os.path.dirname(p)
os.makedirs(d, exist_ok=True)
dir_fd = os.open(d, os.O_RDONLY)
try:
fd, tmp = tempfile.mkstemp(prefix="cache_", suffix=".json", dir=d)
try:
with os.fdopen(fd, "w") as f:
json.dump({"ts": time.time(), "data": data}, f)
f.flush()
os.fsync(f.fileno())
os.replace(tmp, p) # 같은 FS 내 원자적
os.fsync(dir_fd) # 메타데이터 영속
finally:
with contextlib.suppress(FileNotFoundError):
os.remove(tmp)
finally:
os.close(dir_fd)
# ===================== 결과 타입 =====================
@dataclass
class SubmitItem:
index: int
command: str
returncode: int
stdout: str
stderr: str
job_ids: List[int]
@dataclass
class SubmitResult:
interrupted: bool
items: List[SubmitItem]
job_ids: List[int] # 평탄화된 전체 Job ID
job_id_to_command: Dict[int, str] # Job ID -> 원 커맨드
num_submitted: int
num_failed: int
# ===================== LSFClient =====================
class LSFClient:
"""
- run_command_capture_job: 커맨드 그대로 실행, 출력에서 'Job <id>'(복수 가능) 파싱
- kill: 순수 bkill (옵션 없이)
- status: bjobs 전체 스냅샷(전역 캐시 + 전역 single-flight + rate-limit)
누락 시 1회 강제 갱신 → 그래도 없으면 UNKNOWN 보강
- submit_many_with_cancel: 인터럽트 시 새 실행 중단, 진행 중은 완주 → 수집 job_id 필요시 bkill
- is_finished: UNKNOWN 그레이스(기본 15s) 동안은 미완료로 취급
"""
TERMINAL = {"DONE", "EXIT", "ZOMBI"}
def __init__(
self,
*,
bkill: str = "bkill",
bjobs: str = "bjobs",
bjobs_ttl_sec: float = 5.0,
bjobs_min_interval_sec: float = 1.5,
disk_cache_ttl_sec: float = 5.0,
cache_dir: Optional[str] = None,
user: Optional[str] = None,
env: Optional[Dict[str, str]] = None,
unknown_grace_sec: float = 15.0,
):
self._paths = {"bkill": bkill, "bjobs": bjobs}
self._runner = ShellRunner(env=env)
self._user = user or os.environ.get("LSF_USER") or getpass.getuser()
self._cache = SharedCache(cache_dir, ttl_sec=disk_cache_ttl_sec, namespace="lsf_async", skew_eps=2.0)
self._bjobs_ttl = float(bjobs_ttl_sec)
self._bjobs_min_interval = float(bjobs_min_interval_sec)
self._unknown_seen_at: Dict[int, float] = {}
self._unknown_grace = float(unknown_grace_sec)
# ---- 일반 커맨드 실행 + Job ID 파싱 ----
async def run_command_capture_job(
self, command: StrOrSeq, *, timeout: Optional[float] = None, raise_on_error: bool = False
) -> SubmitItem:
cmd = _to_shell_str(command)
stdout, stderr, rc = await self._runner.run(cmd, timeout=timeout)
job_ids = _find_job_ids(stdout, stderr)
if raise_on_error and rc != 0:
raise LSFCommandError(cmd, rc, stdout, stderr)
return SubmitItem(index=-1, command=cmd, returncode=rc, stdout=stdout, stderr=stderr, job_ids=job_ids)
# ---- bkill (옵션 없이) ----
async def kill(
self, job_ids: Iterable[int] | int, *, timeout: Optional[float] = 60.0, raise_on_error: bool = True
) -> Tuple[str, str, int]:
ids = [str(int(job_ids))] if isinstance(job_ids, int) else [str(int(i)) for i in job_ids]
if not ids:
return ("", "", 0)
cmd = f"{self._paths['bkill']} {' '.join(ids)}"
stdout, stderr, rc = await self._runner.run(cmd, timeout=timeout)
if rc != 0 and raise_on_error:
raise LSFCommandError(cmd, rc, stdout, stderr)
return stdout, stderr, rc
# ---- bjobs 전체 스냅샷 (전역 캐시 + single-flight + rate-limit) ----
async def _fetch_bjobs_all(self, *, include_done: bool, timeout: Optional[float]) -> Dict[int, Dict[str, Any]]:
# 전역 rate-limit: user & include_done 조합별로 제한
rl_key = f"{self._user}|{'a' if include_done else ''}"
now = time.time()
since = now - _LAST_BJOBS_CALL.get(rl_key, 0.0)
if since < self._bjobs_min_interval:
await asyncio.sleep(self._bjobs_min_interval - since)
flag = "-a " if include_done else ""
cmd = f"{self._paths['bjobs']} -noheader -u {self._user} {flag}-o 'jobid stat exit_code queue exec_host'"
stdout, stderr, rc = await self._runner.run(cmd, timeout=timeout)
_LAST_BJOBS_CALL[rl_key] = time.time()
if rc != 0:
raise LSFCommandError(cmd, rc, stdout, stderr)
result: Dict[int, Dict[str, Any]] = {}
for line in stdout.splitlines():
parts = line.strip().split()
if len(parts) < 5:
continue
jid, stat, exit_code, queue, exec_host = parts[:5]
try:
jid_i = int(jid)
except ValueError:
continue
result[jid_i] = {
"stat": stat,
"exit_code": None if exit_code == "-" else exit_code,
"queue": queue,
"exec_host": exec_host,
}
return result
async def _status_all_cached(
self, *, include_done: bool = True, timeout: Optional[float] = 60.0, force_refresh: bool = False
) -> Dict[int, Dict[str, Any]]:
key = f"bjobs_all|{'-a' if include_done else ''}|user={self._user}"
# 1) 전역 캐시 먼저
if not force_refresh:
cached = self._cache.get(key)
if cached is not None:
return {int(k): v for k, v in (cached.items() if isinstance(cached, dict) else [])}
# 2) 전역 single-flight: 같은 key로 동시에 들어오면 1회만 fetch
if not force_refresh:
fut = _INFLIGHT.get(key)
if fut is None:
fut = asyncio.get_running_loop().create_future()
_INFLIGHT[key] = fut
try:
data = await self._fetch_bjobs_all(include_done=include_done, timeout=timeout)
self._cache.set(key, {str(k): v for k, v in data.items()})
fut.set_result(data)
except Exception as e:
fut.set_exception(e)
finally:
_INFLIGHT.pop(key, None)
return await fut
# 강제 갱신 경로
data = await self._fetch_bjobs_all(include_done=include_done, timeout=timeout)
self._cache.set(key, {str(k): v for k, v in data.items()})
return data
# ---- 안전한 status (누락 자동 보강) ----
async def status(
self, job_ids: Optional[Iterable[int]] = None, *, include_done: bool = True, timeout: Optional[float] = 60.0, force_refresh: bool = False
) -> Dict[int, Dict[str, Any]]:
all_map = await self._status_all_cached(include_done=include_done, timeout=timeout, force_refresh=force_refresh)
if job_ids is None:
return all_map
req_ids = [int(i) for i in job_ids]
out = {i: all_map.get(i) for i in req_ids}
# 누락 있으면 1회 강제 갱신
if any(v is None for v in out.values()):
all_map2 = await self._status_all_cached(include_done=include_done, timeout=timeout, force_refresh=True)
for i in req_ids:
out[i] = out[i] or all_map2.get(i)
# 그래도 없으면 UNKNOWN 보강 + 첫 발견 시각 기록
now = time.time()
for i in req_ids:
if out[i] is None:
out[i] = {"stat": "UNKNOWN", "exit_code": None, "queue": None, "exec_host": None, "_missing": True}
self._unknown_seen_at.setdefault(i, now)
return out
# ---- 완료 판정 ----
async def is_finished(self, job_ids: Iterable[int]) -> bool:
ids = list({int(i) for i in job_ids})
if not ids:
return True
st = await self.status(ids, include_done=True)
now = time.time()
for i in ids:
info = st.get(i, {})
stat = (info or {}).get("stat")
if stat == "UNKNOWN":
first = self._unknown_seen_at.setdefault(i, now)
if now - first < self._unknown_grace:
return False
else:
continue
if stat not in self.TERMINAL:
return False
return True
# ---- 배치 제출 + 취소 ----
async def submit_many_with_cancel(
self,
commands: Sequence[StrOrSeq],
*,
is_interrupted: callable, # () -> bool
poll_interval_sec: float = 0.2,
max_in_flight: int = 8,
cmd_timeout: Optional[float] = 60.0,
cmd_raise_on_error: bool = False,
cleanup_on_interrupt: bool = True, # True면 인터럽트 시 수집된 job_id 즉시 bkill
) -> SubmitResult:
items: List[SubmitItem] = []
job_ids_all: List[int] = []
job_id_to_cmd: Dict[int, str] = {}
failures = 0
in_flight: set[asyncio.Task] = set()
async def run_one(idx: int, cmd: StrOrSeq) -> SubmitItem:
res = await self.run_command_capture_job(
cmd, timeout=cmd_timeout, raise_on_error=cmd_raise_on_error
)
return SubmitItem(index=idx, command=res.command, returncode=res.returncode,
stdout=res.stdout, stderr=res.stderr, job_ids=res.job_ids)
idx, total = 0, len(commands)
try:
while idx < total or in_flight:
while idx < total and len(in_flight) < max_in_flight and not is_interrupted():
t = asyncio.create_task(run_one(idx, commands[idx]))
in_flight.add(t)
idx += 1
if in_flight:
done, _ = await asyncio.wait(in_flight, timeout=poll_interval_sec, return_when=asyncio.FIRST_COMPLETED)
for t in done:
in_flight.discard(t)
try:
item: SubmitItem = await t
items.append(item)
for jid in item.job_ids:
job_ids_all.append(jid)
job_id_to_cmd[jid] = item.command
if item.returncode != 0 and not item.job_ids:
failures += 1
except Exception:
failures += 1
else:
await asyncio.sleep(poll_interval_sec)
finally:
if in_flight:
done, _ = await asyncio.wait(in_flight, return_when=asyncio.ALL_COMPLETED)
for t in done:
try:
item: SubmitItem = await t
items.append(item)
for jid in item.job_ids:
job_ids_all.append(jid)
job_id_to_cmd[jid] = item.command
if item.returncode != 0 and not item.job_ids:
failures += 1
except Exception:
failures += 1
interrupted = bool(is_interrupted())
if interrupted and cleanup_on_interrupt and job_ids_all:
await self.kill(job_ids_all, raise_on_error=False)
job_ids_all = list(dict.fromkeys(job_ids_all))
return SubmitResult(
interrupted=interrupted,
items=items,
job_ids=job_ids_all,
job_id_to_command=job_id_to_cmd,
num_submitted=len(job_ids_all),
num_failed=failures,
)
사용 예시 (질문 준 패턴 그대로)
import asyncio, time
from lsf_simple import LSFClient, configure_global_cli_concurrency
# 1) 전역 동시 실행 상한 (클라이언트 생성 전 1회)
configure_global_cli_concurrency(6)
class Token:
def __init__(self, delay): self.t0, self.delay = time.time(), delay
def is_interrupted(self): return (time.time()-self.t0) >= self.delay
async def main():
client = LSFClient(
bjobs_ttl_sec=5.0,
bjobs_min_interval_sec=1.2,
disk_cache_ttl_sec=5.0,
cache_dir="/shared/cache", # 여러 머신이 공유하는 FS면 더 좋아요
unknown_grace_sec=15.0,
)
is_interrupted = Token(delay=9999).is_interrupted # 실제론 외부 신호로 대체
cmds = ["job_sub a.txt", "mkdir package", "bsub -q short sleep 3"]
data = await client.submit_many_with_cancel(cmds, is_interrupted=is_interrupted)
if data.interrupted or is_interrupted():
print("중지되었습니다.")
return
job_ids = data.job_ids
if not job_ids:
print("제출된 LSF 잡이 없습니다.")
return
while not await client.is_finished(job_ids):
status = await client.status(job_ids)
for job_id in job_ids:
job_command = data.job_id_to_command.get(job_id, "<unknown>")
print(job_id, status[job_id].get("stat"), job_command)
await asyncio.sleep(1)
if is_interrupted():
await client.kill(job_ids) # 순수 bkill
print("중지하였습니다.")
return
print("모두 완료되었습니다.")
asyncio.run(main())
왜 이 버전이 딱 좋은지 (step-by-step)
-
단일 이벤트 루프 가정 하에 전역 공유 → 복잡도 최소, 의도 명확.
-
전역 세마포어/캐시/single-flight/rate-limit 로
LSFClient가 여러 개여도 호출량·부하가 늘지 않음. -
에러 최소화 : 스냅샷 누락시 자동 갱신 &
UNKNOWN보강, 루프가 멈추지 않도록is_finished그레이스 처리. -
현실적 성능 :
bjobs는 항상 전체 스냅샷 1회 → 캐시로 재사용, I/O/부하 절감.
필요한 거 더 줄이거나(예: 클래스 1개로 합치기), 로깅/메트릭 훅만 얹고 싶으면 말해줘.