아래와 같은 Code Interpreter 툴을 나의 (vLLM + GPTOSS / Qwen) + (OpenAI Agents SDK) 시스템에 넣고 싶어. 내가 원하는 것은 사실 이거 하나야. 파이썬 코드를 실행하되, 외부의 파일들을 망치지 않았으면 좋겠어. 그와 동시에 호스트 파일들에 단순 접근은 좀 자유로워도 좋을 것 같아. 파일을 생성/수정이 필요하면 지정한 폴더 안에 복사해서 다루면 좋을 것 같아. 그리고 도커가 없이 이런 시스템을 구성하고 싶어 #!/usr/bin/env python # coding: utf-8 # ## Build Your Own Code Interpreter - Dynamic Tool Generation and Execution With o3-mini # # At the core of providing a LLM Agent capability to interact with the outside world or other Agents is “tool (or function) calling,” where a LLM can invoke a function (a block of code) with arguments. Typically, these functions are predefined by the developer, along with their expected inputs and outputs. However, in this Cookbook, we explore a more flexible paradigm - to dynamically generate tools using LLM models (in this case o3-mini), with ability to execute the tool using a code interpreter. # # ### Dynamically Generated Tool Calling with Code Interpreter # A Dynamically Generated Tool is a function or code block created by the LLM itself at runtime based on the user’s prompt. This means you don’t have to predefine every possible scenario in your codebase—enabling far more open-ended, creative, and adaptive problem-solving. # # Dynamically Generated Tool Calling goes a step further by granting the LLM the ability to generate tools and execute code blocks on the fly. This dynamic approach is particularly useful for tasks that involve: # # - Data analysis and visualization # - Data manipulation and transformation # - Machine learning workflow generation and execution # - Process automation and scripting # - And much more, as new possibilities emerge through experimentation # # ### Using o3-mini for Dynamic Tool generation # # Released on 31-Jan-25, o3-mini model has exceptional STEM capabilities—with particular strength in science, math, and coding—all while maintaining the low cost and reduced latency of smaller models. In this Cookbook, we will demonstrate o3-mini's capabilities to generate python code to interpret data and draw insights. # # Reasoning models are particularly good at generating dynamic tools to analyze data since they can reason on their own, without the need of an explicit chain-of-thought prompt. In fact, providing explicit chain of thought instructions may interfere with model's internal reasoning and lead to suboptimal outcomes. You can learn more about o3-mini here. # # ### Why build your own code interpreter # # Many API providers—such as OpenAI’s Assistants API—offer built-in code interpreter functionality. These built-in code interpreters can be immensely powerful, but there are situations where developers may need to create their own custom code interpreter. For example: # # 1. Language or library support: The built-in interpreter may not support the specific programming language (e.g., C++, Java, etc.) or libraries required for your task. # 2. Task compatibility: Your use case may not be compatible with the provider’s built-in solution. # 3. Model constraints: You might require a language model that isn’t supported by the provider’s interpreter. # 4. Cost considerations: The cost structure for code execution or model usage may not fit your budget or constraints. # 5. File size: The file size of input data is too large or not supported by the provider's interpreter. # 6. Integrating with internal systems: The provider's interpreter may not be able to integrate with your internal systems. # # ### What You’ll Learn # By following this Cookbook, you will learn how to: # # - Set up an isolated Python code execution environment using Docker # - Configure your own code interpreter tool for LLM agents # - Establish a clear separation of “Agentic” concerns for security and safety # - Using o3-mini model to dynamically generate code for data analysis # - Orchestrate agents to efficiently accomplish a given task # - Design an agentic application that can dynamically generate and execute code # # You’ll learn how to build a custom code interpreter tool from the ground up, leverage the power of LLMs to generate sophisticated code, and safely execute that code in an isolated environment—all in pursuit of making your AI-powered applications more flexible, powerful, and cost-effective. # ### Example Scenario # # We'll use the sample data provided at Key Factors Traffic Accidents to answer a set of questions. These questions do not require to be pre-defined, we will give LLM the ability to generate code to answer such question. # # Sample questions could be: # - What factors contribute the most to accident frequency? (Feature importance analysis) # - Which areas are at the highest risk of accidents? (Classification/Clustering) # - How does traffic fine amount influence the number of accidents? (Regression/Causal inference) # - Can we determine the optimal fine amounts to reduce accident rates? (Optimization models) # - Do higher fines correlate with lower average speeds or reduced accidents? (Correlation/Regression) # - and so on … # # Using the traditional Predefined Tool Calling approach, developer would need to pre-define the function for each of these questions. This limits the LLM's ability to answer any other questions not defined in the pre-defined set of functions. We overcome this limitation by using the Dynamic Tool Calling approach where the LLM generates code and uses a Code Interpretter tool to execute the code. # ## Overview # Let's dive into the steps to build this Agentic Applicaiton with Dynamically generated tool calling. There are three components to this application: # #### Step 1: Set up an isolated code execution container environment # # We need a secure environment where our LLM generated function calls can be executed. We want to avoid directly running the LLM generated code on the host machine so will create a Docker container environment with restricted resource access (e.g., no network access). By default, Docker containers cannot access the host machine’s file system, which helps ensure that any code generated by the LLM remains contained. # # ##### ⚠️ A WORD OF CAUTION: Implement Strong Gaurdrails for the LLM generated code # LLMs could generate harmful code with unintended consequences. As a best practice, isolate the code execution environment with only required access to resources as needed by the task. Avoid running the LLM generated code on your host machine or laptop. # # #### Step 2: Define and Test the Agents # # "What is an Agent?" In the context of this Cookbook, an Agent is: # 1. Set of instructions for the LLM to follow, i.e. the developer prompt # 2. A LLM model, and ability to call the model via the API # 3. Tool call access to a function, and ability to execute the function # # We will define two agents: # 1. FileAccessAgent: This agent will read the file and provide the context to the PythonCodeExecAgent. # 2. PythonCodeExecAgent: This agent will generate the Python code to answer the user's question and execute the code in the Docker container. # # #### Step 3: Set up Agentic Orchestration to run the application # There are various ways to orchestrate the Agents based on the application requirements. In this example, we will use a simple orchestration where the user provides a task and the agents are called in sequence to accomplish the task. # # The overall orchestration is shown below: # # # ## Let's get started # # # ### Prerequisites # Before you begin, ensure you have the following installed and configured on your host machine: # # 1. Docker: installed and running on your local machine. You can learn more about Docker and install it from here. # 2. Python: installed on your local machine. You can learn more about Python and install it from here. # 3. OpenAI API key: set up on your local machine as an environment variable or in the .env file in the root directory. You can learn more about OpenAI API key and set it up from here. # # ### Step 1: Set up an Isolated Code Execution Environment # # Lets define a Dockerized container environment that will be used to execute our code. I have defined the dockerfile under resources/docker directory that will be used to create the container environment with the following specifications: # - Python 3.10 as the base # - A non-root user # - Preinstall the packages in requirements.txt # # The requirements.txt included in the docker image creation process contains all the potential packages our LLM generated code may need to accomplish its tasks. Given we will restrict the container from network access, so we need to pre-install the packages that are required for the task. Our LLM will not be allowed to install any additional packages for security purposes. # # You could create your own docker image with the language requirements (such as Python 3.10) and pre-install the packages that are required for the task, or create a custom docker image with the specific language (such as Java, C++, etc.) and packages that are required for the task. # Let's build the docker image with the following command. For the sake of brevity, I have redirected the output to grep the success message and print a message if the build fails. # In[1]: get_ipython().system('docker build -t python_sandbox:latest ./resources/docker 2>&1 | grep -E "View build details|ERROR" || echo "Build failed."') # Let's run the container in restricted mode. The container will run in the background. This is our opportunity to define the security policies for the container. It is good practice to only allow the bare minimum features to the container that are required for the task. By default, the container cannot access the host file system from within the container. Let's also restrict its access to network so it cannot access the Internet or any other network resources. # In[2]: # Run the container in restricted mode. The container will run in the background. get_ipython().system('docker run -d –name sandbox –network none –cap-drop all –pids-limit 64 –tmpfs /tmp:rw,size=64M python_sandbox:latest sleep infinity') # Let's make sure container is running using the docker ps that should list our container. # In[3]: get_ipython().system('docker ps') # ### Step 2: Define and Test the Agents # # For our purposes, we will define two agents. # 1. Agent 1: File Access Agent (with Pre-defined Tool Calling) # - Instructions to understand the contents of the file to provide as context to Agent 2. # - Has access to the host machine’s file system. # - Can read a file from the host and copy it into the Docker container. # - Cannot access the code interpreter tool. # - Uses gpt-4o model. # # 2. Agent 2: Python Code Generator and Executor (with Dynamically Generated Tool Calling and Code Execution) # - Recieve the file content's context from Agent 1. # - Instructions to generate a Python script to answer the user's question. # - Has access to the code interpreter within the Docker container, which is used to execute Python code. # - Has access only to the file system inside the Docker container (not the host). # - Cannot access the host machine’s file system or the network. # - Uses our newest o3-mini model that excels at code generation. # # This separation concerns of the File Access (Agent 1) and the Code Generator and Executor (Agent 2) is crucial to prevent the LLM from directly accessing or modifying the host machine. # # # Limit the Agent 1 to Static Tool Calling as it has access to the host file system. # # # | Agent | Type of Tool Call | Access to Host File System | Access to Docker Container File System | Access to Code Interpreter | # |——-|——————-|—————————-|—————————————-|—————————-| # | Agent 1: File Access | Pre-defined Tools | Yes | Yes | No | # | Agent 2: Python Code Generator and Executor | Dynamically Generated Tools | No | Yes | Yes | # # To keep the Agents and Tools organized, we've defined a set of core classes that will be used to create the two agents for consistency using Object Oriented Programming principles. # # - BaseAgent: We start with an abstract base class that enforces common method signatures such as task(). Base class also provides a logger for debugging, a language model interface and other common functions such as add_context() to add context to the agent. # - ChatMessages: A class to store the conversation history given ChatCompletions API is stateless. # - ToolManager: A class to manage the tools that an agent can call. # - ToolInterface: An abstract class for any 'tool' that an agent can call so that the tools will have a consistent interface. # # These classes are defined in the object_oriented_agents/core_classes directory. # #### UML Class Diagram for Core Classes # The following class diagram shows the relationship between the core classes. This UML (Unified Modeling Language) has been generated using Mermaid # # # Define Agent 1: FileAccessAgent with FileAccessTool # # Let's start with definin the FileAccessTool that inherits from the ToolInterface class. The FileAccessTool tool is defined in the file_access_tool.py file in the resources/registry/tools directory. # # - FileAccessTool implements the ToolInterface class, which ensures that the tools will have a consistent interface. # - Binding the tool definition for the OpenAI Function Calling API in the get_definition method and the tool's run method ensures maintainability, scalability, and reusability. # Now, let's define the FileAccessAgent that extends the BaseAgent class and bind the FileAccessTool to the agent. The FileAccessAgent is defined in the file_acess_agent.py file in resources/registry/agents directory. The FileAccessAgent is: # # - A concrete implementation of the BaseAgent class. # - Initialized with the developer prompt, model name, logger, and language model interface. These values can be overridden by the developer if needed. # - Has a setup_tools method that registers the FileAccessTool to the tool manager. # - Has a task method that calls the FileAccessTool to read the file and provide the context to the PythonCodeExecAgent. # - model_name='gpt-4o' that provides sufficient reasoning and tool calling ability for the task. # # Define Agent 2: PythonExecAgent with PythonExecTool # # Similarly, PythonExecTool inherits from the ToolInterface class and implements the get_definition and run methods. The get_definition method returns the tool definition in the format expected by the OpenAI Function Calling API. The run method executes the Python code in a Docker container and returns the output. This tool is defined in the python_code_interpreter_tool.py file in the resources/registry/tools directory. # # Likewise, PythonExecAgent is a concrete implementation of the BaseAgent class. It is defined in the python_code_exec_agent.py file in the resources/registry/agents directory. The PythonExecAgent is: # # - A concrete implementation of the BaseAgent class. # - Initialized with the developer prompt, model name, logger, and language model interface. These values can be overridden by the developer if needed. # - Has a setup_tools method that registers the PythonExecTool to the tool manager. # - Has a task method that calls the OpenAI API to perform the user's task, which in this case involves generating a Python script to answer the user's question and run it with Code Interpreter tool. # - model_name='o3-mini' that excels at STEM tasks such as code generation. # - reasoning_effort='high' that allows for more complete reasoning given the complexity of the task at the cost of more tokens generated and slower responses. The default value is medium, which is a balance between speed and reasoning accuracy. # # You can learn more about the reasoning_effort parameter here. # ### Step 3: Set up Agentic Orchestration to run the application # # With the Agents defined, now we can define the orchestration loop that will run the application. This loop will prompt the user for a question or task, and then call the FileAccessAgent to read the file and provide the context to the PythonExecAgent. The PythonExecAgent will generate the Python code to answer the user's question and execute the code in the Docker container. The output from the code execution will be displayed to the user. # # User can type 'exit' to stop the application. Our question: What factors contribute the most to accident frequency? Note that we did not pre-define the function to answer this question. # # # In[4]: # Import the agents from registry/agents from resources.registry.agents.file_access_agent import FileAccessAgent from resources.registry.agents.python_code_exec_agent import PythonExecAgent prompt = """Use the file traffic_accidents.csv for your analysis. The column names are: Variable Description accidents Number of recorded accidents, as a positive integer. traffic_fine_amount Traffic fine amount, expressed in thousands of USD. traffic_density Traffic density index, scale from 0 (low) to 10 (high). traffic_lights Proportion of traffic lights in the area (0 to 1). pavement_quality Pavement quality, scale from 0 (very poor) to 5 (excellent). urban_area Urban area (1) or rural area (0), as an integer. average_speed Average speed of vehicles in km/h. rain_intensity Rain intensity, scale from 0 (no rain) to 3 (heavy rain). vehicle_count Estimated number of vehicles, in thousands, as an integer. time_of_day Time of day in 24-hour format (0 to 24). accidents traffic_fine_amount """ print("Setup: ") print(prompt) print("Setting up the agents… ") # Instantiate the agents with the default constructor defined values # Developer may override the default values - prompt, model, logger, and language model interface if needed # This agent use gpt-4o by default file_ingestion_agent = FileAccessAgent() # Let's make sure agent uses o3-mini model and set the reasoning_effort to high data_analysis_agent = PythonExecAgent(model_name='o3-mini', reasoning_effort='high') print("Understanding the contents of the file…") # Give a task to the file ingestion agent to read the file and provide the context to the data analysis agent file_ingestion_agent_output = file_ingestion_agent.task(prompt) # Add the file content as context to the data analysis agent # The context is added to the agent's tool manager so that the tool manager can use the context to generate the code data_analysis_agent.add_context(prompt) data_analysis_agent.add_context(file_ingestion_agent_output) while True: print("Type your question related to the data in the file. Type 'exit' to exit.") user_input = input("Type your question.") if user_input == "exit": print("Exiting the application.") break print(f"User question: {user_input}") print("Generating dynamic tools and using code interpreter…") data_analysis_agent_output = data_analysis_agent.task(user_input) print("Output…") print(data_analysis_agent_output) # In this example, the o3-mini dynamically generated a tool (Python script) based on user's question to analyze the data. Note that o3-mini examined the problem using multiple approaches such as correlation analysis, linear regression and random forest models. This approach highlights the following: # # reasoning_effort: The depth of reasoning the model performs e.g., in this case number of approaches, generally increases when the parameter is increased from low, medium to high. You can try with different levels of reasoning effort to see the difference. # # Dynamically Generated Tool Calling: The tool (Python script) to analyze the data was not manually written or predetermined by the developer. Instead, the o3-mini model created the relevant data exploration and correlation analysis code at runtime. # # Isolated Code Execution: To ensure security and avoid running untrusted code on the host machine, the Python script was executed inside a Docker container using the execute_python_code tool. This container had restricted resource access (e.g., no network and limited filesystem access), minimizing potential risks posed by arbitrary code execution. # # ### Conclusion # # The Cookbook provides a guide for developing a custom code interpreter tailored to specific application needs, addressing limitations found in vendor-provided solutions such as language constraints, cost considerations, and the need for flexibility with different LLMs or models. # # Approach for Managing Agents and Tools: We also defined a set of core classes to manage the agents and tools. This approach ensures that the agents and tools will have a consistent interface and can be reused across different applications. A repository of agents and tools such as the registry folder can be created to manage the agents and tools. # # o3-mini model: We demonstrated o3-mini model's ability to generate sophisticated code at run time to analyze data based on user's minimal prompt. o3-mini model then reasoned over the outcome of the analysis to explain the results to the user. # # Finally, to recap, the three steps to build an Agentic Application with Dynamic Tool Calling are: # 1. Set up an isolated code execution container environment # 2. Define and Test the Agents # 3. Set up Agentic Orchestration to run the application # # We discussed the importance of isolating the code execution environment to ensure security and avoid running untrusted code on the host machine. With the use case of a CSV file, we demonstrated how to dynamically generate a tool (a Python script) to analyze the data and answer the user's question. We also showed how to execute the code in a Docker container and return the output to the user.

한국어로 답함.
문장 앞에 인용(Blockquote) 금지, 그대로 일반 텍스트로만 서술.


당신이 원하는 목표를 다시 정리하면 다음과 같아:

  1. LLM이 생성한 파이썬 코드를 실행 하고 싶다.

  2. 하지만 호스트 파일 시스템을 망가뜨리거나 수정하면 안 된다.

  3. 필요할 경우 파일은 지정된 작업 폴더에 복사해서 그 안에서만 처리 되길 원한다.

  4. Docker 없이 (즉, 컨테이너 없이) 구현하고 싶다.

  5. 시스템은 vLLM + Qwen + OpenAI Agents SDK 또는 GPTOSS 기반으로 구성하려고 한다.

여기서 가장 큰 도전은 보안 격리 없이 호스트에서 실행되는 코드 를 어떻게 안전하게 제한하느냐이다. Docker는 자연스러운 해결책이지만, 당신이 원하지 않는다고 했으므로 Python 자체의 가상화 + 시스템 수준의 sand-boxing(권한 제한) 방향으로 접근해야 한다.

나는 아래 방식으로 접근을 추천한다.


단계별 설계 (step-by-step)

1단계. "작업 디렉토리(sandbox workspace)" 구조 정의

예시 구조:

/host
   /data              ← 원본 파일들 (읽기 전용)
   /sandbox           ← LLM 생성 코드가 실행되는 폴더 (쓰기 가능)
   /runner            ← 코드 실행 스크립트

원본 파일 접근 방침:

  • 호스트의 /data 는 읽기만 허용.

  • 작업에 필요한 파일이 있으면 /sandbox 로 복사한 뒤 그 안에서만 사용.

파이썬 실행 환경에서 open() 이 /data 를 수정하지 못하도록 강제하려면, runner 레벨에서 path 검증을 넣어야 한다.


2단계. Python 실행을 "진짜 분리된 프로세스"에서 실행하게 하기

핵심은 코드 실행을 서브 프로세스로 분리하는 것.

의사 코드:

arduino

# runner.py
def run_code(code_string):
    생성된_코드를 /sandbox/temp.py 로 저장
    subprocess.run(["python", "/sandbox/temp.py"], timeout=5, cwd="/sandbox")

여기서 중요한 안전장치:

  • cwd 를 /sandbox 로 고정

  • timeout (= 무한 루프, while True 방지)

  • subprocess 실행 전에 파일명 및 path 검증


3단계. 파일 접근 필터링 (안전 장치)

파이썬 코드에 직접적인 파일 접근을 막으려면:
방법 A. exec 환경에서 open 같은 builtins 제거
방법 B. Python restricted environment 적용 (대신 완전하지 않음)
방법 C. 오버레이 파일시스템 또는 chroot (Docker 없이 가능)

여기서는 가장 간단한 실용적 보호 를 추천한다.

예제 코드:

safe_globals = {
    "__builtins__": {
        "range": range,
        "len": len,
        "print": print,
        # 필요한 최소 기능만 허용
    }
}

exec(code_string, safe_globals)

단, 데이터 분석 목적이라면 pandas, numpy 필요하므로 위 방식이 너무 제한된다.
그래서 더 현실적인 방법은 subprocess 실행 + sys.setrecursionlimit 제한 + resource 모듈로 메모리/CPU 제한 이다.

예시:

import resource

resource.setrlimit(resource.RLIMIT_CPU, (2, 2))          # CPU 제한 2초
resource.setrlimit(resource.RLIMIT_AS, (800*1024*1024))  # 메모리 800MB 제한

4단계. OpenAI Agents SDK (또는 vLLM Function-calling) 설계

코드 실행을 하나의 Tool 로 정의한다.

개념적 pseudo-code:

def execute_python_tool(code: str):
    sandbox_path = "/sandbox/temp.py"
    code를 sandbox_path  저장
    subprocess로 실행하고 stdout/stderr 반환

LLM 에게 tool spec 을 제공:

tools = [
    {
        "type": "function",
        "name": "execute_python_tool",
        "description": "Executes python code inside sandbox",
        "parameters": {
            "type": "object",
            "properties": {
                "code": {"type": "string"}
            },
            "required": ["code"]
        }
    }
]

vLLM(Qwen/GPTOSS) + Agents SDK 는 다음과 같이 동작:

  • 사용자가 “분석해줘” → LLM → 코드 생성 → tool 호출 → python 실행 → 결과 반환.

5단계. 호스트 파일 보호 (정리)

보안 수칙:

  1. 코드 실행은 항상 subprocess에서 실행.

  2. sandbox 디렉토리 외 경로 접근 시 실행 거부.

  3. CPU / 메모리 / 실행시간 제한.

  4. 생성된 파일도 sandbox 내부에서만.


요약 (당신이 얻는 것)

  • Docker 없이도 가능한 code interpreter

  • LLM이 generate → 실행 → 결과 반환 하는 Pipeline

  • 호스트 파일을 읽기 전용으로 유지 하면서, 수정은 전부 sandbox에서만

  • LLM 기반 동적 tool execution


다음 단계

원한다면,

  • 내가 직접 프로덕션 수준 코드 (예: runner.py, tool binding 코드) 를 작성해 줄 수 있다

  • 또는 vLLM + Agents SDK 와 통합된 전체 프로젝트 템플릿을 만들어 줄 수 있다

한마디로, 당신이 만들려는 건 Code Interpreter Clone, but without Docker.

계속 도와줄게.
다음 메시지에 아래 중 하나 선택해줘:

  1. Python 실제 실행 코드 + tool 코드 생성 원함

  2. vLLM + Qwen + Agents SDK 통합 구조 샘플 원함

  3. 둘 다 원함

언제든 확장해서 고고!

1. 어제 너가 만들어준 코드도 참고해줘. 이 코드의 장점은 Kernel이 유지된다는 장점이 있는 것 같아 2. data 폴더를 따로 안 만들고 모든 경로에 접근 가능하되 쓸 파일은 copy_to_sandbox(source, destination) 같은 Tool로 가져올 수 있게 하면 좋겠어. 3. 너가 준 예시에서 temp.py 가 sanbox 안의 파일만 접근한다는 보장이 없지 않아? 이런 걸 해줄 수 있는 라이브러리 없어? 도커 같은 외부 프로그램은 별로 안 쓰고 싶네 import os, time, base64, re from typing import Dict, Any, List from openai import OpenAI from jupyter_client import KernelManager # ========= 설정 ========= # 환경변수 OPENAI_API_KEY 필요 MODEL = os.environ.get("OPENAI_MODEL", "gpt-5.1") DATA_DIR = os.environ.get("DATA_DIR", "/mnt/data") os.makedirs(DATA_DIR, exist_ok=True) # ========= 임베디드 커널 실행기 ========= class KernelSession: def init(self): # 히스토리 비활성화로 깔끔하게 self.km = KernelManager() self.km.start_kernel(extra_arguments=["–HistoryManager.enabled=False"]) self.kc = self.km.client(); self.kc.start_channels() def shutdown(self): try: self.kc.stop_channels() self.km.shutdown_kernel(now=True) except Exception: pass def execute(self, code: str, timeout: float = 3.0) -> Dict[str, Any]: # 간단한 안전 필터(정책에 맞게 확장 가능) if re.search(r"\bseaborn\b", code): return {"returncode": 2, "stdout": "", "stderr": "Do not use seaborn", "artifacts": []} msg_id = self.kc.execute(code, allow_stdin=False, stop_on_error=True) t0 = time.time(); out=[]; err=[]; arts=[] while True: if time.time() - t0 > timeout: # 타임아웃 시 커널 강제 종료(무한루프 방지) self.km.shutdown_kernel(now=True) return {"returncode":124,"stdout":"".join(out),"stderr":"Timeout","artifacts":[]} try: msg = self.kc.get_iopub_msg(timeout=0.1) except Exception: continue if msg["parent_header"].get("msg_id") != msg_id: continue typ = msg["header"]["msg_type"]; content = msg["content"] if typ == "stream": (out if content["name"]=="stdout" else err).append(content["text"]) elif typ == "error": err.append("\n".join(content.get("traceback", []))) elif typ in ("display_data","execute_result"): data = content.get("data", {}) if "image/png" in data: b = data["image/png"] if isinstance(b, str): b = b.encode() png = base64.b64decode(b) fn = f"plot_{int(time.time()1000)}.png" fp = os.path.join(DATA_DIR, fn) with open(fp, "wb") as f: f.write(png) arts.append({"name": fn, "path": f"/mnt/data/{fn}", "mime": "image/png"}) elif typ == "status" and content["execution_state"] == "idle": break return { "returncode": 0, "stdout": "".join(out)[:20000], "stderr": "".join(err)[:20000], "artifacts": arts } class KernelPool: def init(self): self.sessions: Dict[str, KernelSession] = {} def get(self, session_id: str) -> KernelSession: if session_id not in self.sessions: self.sessions[session_id] = KernelSession() return self.sessions[session_id] def kill(self, session_id: str): s = self.sessions.pop(session_id, None) if s: s.shutdown() POOL = KernelPool() def _plot_rule_check(code: str) -> str | None: # 단일 플롯, 색상/스타일 지정 금지(원하는 정책이면 완화 가능) if re.search(r"\.subplot\(", code) or re.search(r"subplots\(", code): return "Do not use subplots" if re.search(r"(color\s=|palette\s=|style\s=)", code): return "Do not set explicit colors or styles" return None # ========= OpenAI tools 정의(HTTP 서버 없이 in-process) ========= python_exec_internal_tool = { "type": "function", "function": { "name": "python_exec_internal", "description": "내부 분석용 파이썬 실행(사용자에겐 출력 숨김). 세션은 session_id로 유지.", "parameters": { "type": "object", "properties": { "session_id": {"type": "string", "description": "대화/스레드 식별자"}, "code": {"type": "string"}, "timeout_sec": {"type": "integer","minimum":1,"maximum":10,"default":3} }, "required": ["session_id", "code"], "additionalProperties": False }, "strict": True } } python_exec_visible_tool = { "type": "function", "function": { "name": "python_exec_visible", "description": "사용자 가시 파이썬 실행(단일 matplotlib 플롯, 색/스타일 지정 금지).", "parameters": { "type": "object", "properties": { "session_id": {"type": "string"}, "code": {"type": "string"}, "timeout_sec": {"type":"integer","minimum":1,"maximum":10,"default":3}, "enforce_plot_rules": {"type":"boolean","default": True} }, "required": ["session_id", "code"], "additionalProperties": False }, "strict": True } } def run_tool_locally(name: str, args: Dict[str, Any]) -> Dict[str, Any]: session_id = args["session_id"] code = args["code"] timeout = int(args.get("timeout_sec", 3)) if name == "python_exec_visible" and args.get("enforce_plot_rules", True): v = _plot_rule_check(code) if v: return {"returncode": 2, "stdout": "", "stderr": v, "artifacts": []} sess = POOL.get(session_id) return sess.execute(code, timeout=timeout) # ========= 모델 라운드트립 ========= def roundtrip(client: OpenAI, user_msg: str, session_id: str): # 1) 첫 호출: 사용자 질문 + 툴 스펙 제공 resp = client.responses.create( model=MODEL, input=[{"role": "user", "content": user_msg}], tools=[python_exec_internal_tool, python_exec_visible_tool], parallel_tool_calls=False ) # 2) tool_call 감지 및 in-process 실행 → 재피드백 messages = [{"role": "user", "content": user_msg}] final_text = None for item in resp.output: if item.type == "message": # 모델이 즉답했다면 그 내용 final_text = item.content[0].text if item.type == "tool_call": name = item.name args = item.arguments result = run_tool_locally(name, args) # 툴 결과를 모델에 다시 넣어 자연어 답변 생성 follow = client.responses.create( model=MODEL, input=messages + [{"role": "tool", "name": name, "content": result}], tools=[python_exec_internal_tool, python_exec_visible_tool], parallel_tool_calls=False ) final_text = follow.output_text break return final_text # ========= 실행 예시 ========= if name == "main": client = OpenAI() print("예시 1) 내부용 실행: sum(range(10))") msg1 = "sum(range(10))을 내부 실행기로 계산해서 결과만 말해줘." print(roundtrip(client, msg1, session_id="sess-1")) print("\n예시 2) 가시 실행: 간단 플롯 생성") code = """ import matplotlib.pyplot as plt x = [1,2,3,4]; y = [v*v for v in x] plt.figure() plt.plot(x,y) # 색/스타일 지정 금지 plt.title("y = x^2") plt.savefig("/mnt/data/curve.png") print("done") """ msg2 = f"이 코드를 가시 실행기로 돌리고 산출물 경로를 요약해서 알려줘:\n{code}" print(roundtrip(client, msg2, session_id="sess-1"))

좋아. 커널이 유지되는 네 기존 구조를 살리면서, “읽기는 어디든 가능, 쓰기는 샌드박스 디렉터리로만”을 강제하고, 도커 없이 실행하는 형태로 정리했다. 핵심은 두 겹의 안전장치다.

  1. 소프트 가드: 커널 내부에서 open/os/pathlib 등을 몽키패치해 쓰기 동작을 샌드박스로만 제한. subprocess/os.system도 막음.

  2. 하드 가드(선택, Linux): Landlock(커널 LSM) 바인딩으로 프로세스 자체에 “샌드박스 밖 쓰기 금지”를 커널 수준에서 적용. 라이브러리 예: pylandlock. 이건 도커나 외부 프로그램이 아니라 커널 기능을 직접 이용하므로 요구사항과 부합.

아래 코드는 네가 올린 커널 유지형 설계를 그대로 확장했다. copy_to_sandbox(source, dest_rel) 툴을 추가했고, temp.py 같은 임의 코드가 샌드박스 밖을 쓰려 하면 커널 안에서 PermissionError를 일으키도록 강제했다. 읽기는 어디든 가능하도록 두었다.

코드 그대로 붙여 넣어 실행하면 된다.

#!/usr/bin/env python
# coding: utf-8
import os, time, base64, re, shutil, stat, sys
from typing import Dict, Any
from openai import OpenAI
from jupyter_client import KernelManager

# ========= 설정 =========
MODEL = os.environ.get("OPENAI_MODEL", "gpt-5.1")
DATA_DIR = os.environ.get("DATA_DIR", "/mnt/data")  # 가시 산출물 저장 용
SANDBOX_DIR = os.environ.get("SANDBOX_DIR", "/mnt/sandbox")  # 쓰기 허용 디렉터리
os.makedirs(DATA_DIR, exist_ok=True)
os.makedirs(SANDBOX_DIR, exist_ok=True)

# ========= 커널 내 샌드박스 가드 코드 =========
# 쓰기는 SANDBOX_DIR 내부에서만 허용, 읽기는 어디든 허용
KERNEL_GUARD_CODE = r"""
import os, builtins, pathlib, subprocess, sys

SANDBOX_DIR = os.environ.get("SANDBOX_DIR", "/mnt/sandbox")
SANDBOX_DIR = os.path.realpath(SANDBOX_DIR)

_ORIG_OPEN = builtins.open
_ORIG_OS_OPEN = os.open
_ORIG_RENAME = os.rename
_ORIG_REPLACE = os.replace
_ORIG_REMOVE = os.remove
_ORIG_UNLINK = os.unlink
_ORIG_MKDIR = os.mkdir
_ORIG_MAKEDIRS = os.makedirs
_ORIG_RMDIR = os.rmdir
_ORIG_CHMOD = os.chmod

def _real(p): return os.path.realpath(p)
def _in_sb(p):
    rp = _real(p)
    return rp == SANDBOX_DIR or rp.startswith(SANDBOX_DIR + os.sep)

def _open_guard(file, mode="r", buffering=-1, encoding=None, errors=None, newline=None, closefd=True, opener=None):
    write_intent = any(ch in mode for ch in ("w","a","+","x"))
    if write_intent and not _in_sb(file):
        raise PermissionError(f"Writes are restricted to SANDBOX_DIR: {SANDBOX_DIR}")
    return _ORIG_OPEN(file, mode, buffering, encoding, errors, newline, closefd, opener)

def _os_open_guard(file, flags, mode=0o777, *, dir_fd=None):
    write_flags = (flags & (os.O_WRONLY | os.O_RDWR | os.O_CREAT | os.O_TRUNC | os.O_APPEND)) != 0
    if write_flags and not _in_sb(file):
        raise PermissionError(f"Writes are restricted to SANDBOX_DIR: {SANDBOX_DIR}")
    return _ORIG_OS_OPEN(file, flags, mode, dir_fd=dir_fd)

def _rename_guard(src, dst, *a, **k):
    if not (_in_sb(src) and _in_sb(dst)):
        raise PermissionError("Rename/replace restricted within sandbox")
    return _ORIG_RENAME(src, dst, *a, **k)

def _replace_guard(src, dst, *a, **k):
    if not (_in_sb(src) and _in_sb(dst)):
        raise PermissionError("Rename/replace restricted within sandbox")
    return _ORIG_REPLACE(src, dst, *a, **k)

def _remove_guard(path, *a, **k):
    if not _in_sb(path):
        raise PermissionError("Deletion restricted within sandbox")
    return _ORIG_REMOVE(path, *a, **k)

def _unlink_guard(path, *a, **k):
    if not _in_sb(path):
        raise PermissionError("Deletion restricted within sandbox")
    return _ORIG_UNLINK(path, *a, **k)

def _mkdir_guard(path, *a, **k):
    if not _in_sb(path):
        raise PermissionError("Mkdir restricted within sandbox")
    return _ORIG_MKDIR(path, *a, **k)

def _makedirs_guard(path, *a, **k):
    if not _in_sb(path):
        raise PermissionError("Makedirs restricted within sandbox")
    return _ORIG_MAKEDIRS(path, *a, **k)

def _rmdir_guard(path, *a, **k):
    if not _in_sb(path):
        raise PermissionError("Rmdir restricted within sandbox")
    return _ORIG_RMDIR(path, *a, **k)

def _chmod_guard(path, mode, *a, **k):
    if not _in_sb(path):
        raise PermissionError("chmod restricted within sandbox")
    return _ORIG_CHMOD(path, mode, *a, **k)

builtins.open = _open_guard
os.open = _os_open_guard
os.rename = _rename_guard
os.replace = _replace_guard
os.remove = _remove_guard
os.unlink = _unlink_guard
os.mkdir = _mkdir_guard
os.makedirs = _makedirs_guard
os.rmdir = _rmdir_guard
os.chmod = _chmod_guard

import pathlib
_ORIG_PATH_OPEN = pathlib.Path.open
def _path_open_guard(self, *args, **kwargs):
    mode = kwargs.get("mode", "r")
    if len(args) >= 1: mode = args[0]
    write_intent = any(ch in mode for ch in ("w","a","+","x"))
    if write_intent and not _in_sb(str(self)):
        raise PermissionError(f"Writes are restricted to SANDBOX_DIR: {SANDBOX_DIR}")
    return _ORIG_PATH_OPEN(self, *args, **kwargs)
pathlib.Path.open = _path_open_guard

# 차단: 서브프로세스/셸 실행
def _blocked(*a, **k):
    raise PermissionError("Subprocess execution is disabled in sandbox")
subprocess.Popen = _blocked
subprocess.call = _blocked
subprocess.check_call = _blocked
subprocess.check_output = _blocked
os.system = lambda *a, **k: (_ for _ in ()).throw(PermissionError("os.system disabled"))

# 리소스 제한 (유닉스만)
try:
    import resource
    resource.setrlimit(resource.RLIMIT_CPU, (3, 3))
    resource.setrlimit(resource.RLIMIT_AS, (1024*1024*1024, 1024*1024*1024))  # 1GB
except Exception:
    pass

print("[sandbox-guard] active:", SANDBOX_DIR)
"""

# ========= 임베디드 커널 실행기 =========
class KernelSession:
    def __init__(self):
        self.km = KernelManager()
        self.km.start_kernel(extra_arguments=["--HistoryManager.enabled=False"])
        self.kc = self.km.client(); self.kc.start_channels()
        self._guard_ready = False

    def shutdown(self):
        try:
            self.kc.stop_channels()
            self.km.shutdown_kernel(now=True)
        except Exception:
            pass

    def _ensure_guard(self, timeout: float = 2.0):
        if self._guard_ready:
            return
        # 샌드박스 디렉터리를 커널 환경에 주입
        code = f"import os; os.environ['SANDBOX_DIR'] = {SANDBOX_DIR!r}\n" + KERNEL_GUARD_CODE
        _ = self._exec_once(code, timeout=timeout)
        self._guard_ready = True

    def _exec_once(self, code: str, timeout: float = 3.0) -> Dict[str, Any]:
        msg_id = self.kc.execute(code, allow_stdin=False, stop_on_error=True)
        t0 = time.time(); out=[]; err=[]
        while True:
            if time.time() - t0 > timeout:
                self.km.shutdown_kernel(now=True)
                return {"returncode":124,"stdout":"".join(out),"stderr":"Timeout","artifacts":[]}
            try:
                msg = self.kc.get_iopub_msg(timeout=0.1)
            except Exception:
                continue
            if msg["parent_header"].get("msg_id") != msg_id:
                continue
            typ = msg["header"]["msg_type"]; content = msg["content"]
            if typ == "stream":
                (out if content["name"]=="stdout" else err).append(content["text"])
            elif typ == "error":
                err.append("\n".join(content.get("traceback", [])))
            elif typ == "status" and content["execution_state"] == "idle":
                break
        rc = 0 if not err else 1
        return {"returncode": rc, "stdout":"".join(out)[:20000], "stderr":"".join(err)[:20000], "artifacts":[]}

    def execute(self, code: str, timeout: float = 3.0) -> Dict[str, Any]:
        # 간단한 정책 필터
        if re.search(r"\bseaborn\b", code):
            return {"returncode": 2, "stdout": "", "stderr": "Do not use seaborn", "artifacts": []}

        # 샌드박스 가드 보장
        self._ensure_guard()

        msg_id = self.kc.execute(code, allow_stdin=False, stop_on_error=True)
        t0 = time.time(); out=[]; err=[]; arts=[]
        while True:
            if time.time() - t0 > timeout:
                self.km.shutdown_kernel(now=True)
                return {"returncode":124,"stdout":"".join(out),"stderr":"Timeout","artifacts":[]}
            try:
                msg = self.kc.get_iopub_msg(timeout=0.1)
            except Exception:
                continue
            if msg["parent_header"].get("msg_id") != msg_id:
                continue
            typ = msg["header"]["msg_type"]; content = msg["content"]
            if typ == "stream":
                (out if content["name"]=="stdout" else err).append(content["text"])
            elif typ == "error":
                err.append("\n".join(content.get("traceback", [])))
            elif typ in ("display_data","execute_result"):
                data = content.get("data", {})
                if "image/png" in data:
                    b = data["image/png"]
                    if isinstance(b, str): b = b.encode()
                    import base64 as _b64
                    png = _b64.b64decode(b)
                    fn = f"plot_{int(time.time()*1000)}.png"
                    fp = os.path.join(DATA_DIR, fn)
                    with open(fp, "wb") as f: f.write(png)
                    arts.append({"name": fn, "path": f"{fp}", "mime": "image/png"})
            elif typ == "status" and content["execution_state"] == "idle":
                break
        return {"returncode": 0, "stdout": "".join(out)[:20000], "stderr": "".join(err)[:20000], "artifacts": arts}

class KernelPool:
    def __init__(self):
        self.sessions: Dict[str, KernelSession] = {}

    def get(self, session_id: str) -> KernelSession:
        if session_id not in self.sessions:
            self.sessions[session_id] = KernelSession()
        return self.sessions[session_id]

    def kill(self, session_id: str):
        s = self.sessions.pop(session_id, None)
        if s: s.shutdown()

POOL = KernelPool()

def _plot_rule_check(code: str) -> str | None:
    if re.search(r"\.subplot\(", code) or re.search(r"subplots\(", code):
        return "Do not use subplots"
    if re.search(r"(color\s*=|palette\s*=|style\s*=)", code):
        return "Do not set explicit colors or styles"
    return None

# ========= 파일 툴: copy_to_sandbox =========
def copy_to_sandbox_impl(source: str, dest_rel: str | None = None) -> Dict[str, Any]:
    src = os.path.realpath(source)
    if not os.path.exists(src):
        return {"ok": False, "error": f"source not found: {source}"}
    if os.path.islink(src):
        return {"ok": False, "error": "symlink not allowed"}
    name = os.path.basename(src) if dest_rel in (None, "", ".") else dest_rel
    # 목적지는 샌드박스 내부로 강제
    dst = os.path.realpath(os.path.join(SANDBOX_DIR, name))
    if not (dst == SANDBOX_DIR or dst.startswith(SANDBOX_DIR + os.sep)):
        return {"ok": False, "error": "destination must be inside sandbox"}
    os.makedirs(os.path.dirname(dst), exist_ok=True)
    if os.path.isdir(src):
        # 디렉터리 복사
        if os.path.exists(dst):
            return {"ok": False, "error": f"destination exists: {dst}"}
        shutil.copytree(src, dst, copy_function=shutil.copy2)
    else:
        shutil.copy2(src, dst, follow_symlinks=False)
    # 읽기/쓰기 허용 퍼미션 (소유자)
    try:
        os.chmod(dst, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
    except Exception:
        pass
    rel = os.path.relpath(dst, SANDBOX_DIR)
    return {"ok": True, "sandbox_path": dst, "relative_path": rel}

# ========= OpenAI tools 정의 =========
python_exec_internal_tool = {
    "type": "function",
    "function": {
        "name": "python_exec_internal",
        "description": "내부 분석용 파이썬 실행(사용자에겐 출력 숨김). 세션은 session_id로 유지.",
        "parameters": {
            "type": "object",
            "properties": {
                "session_id": {"type": "string"},
                "code": {"type": "string"},
                "timeout_sec": {"type": "integer","minimum":1,"maximum":15,"default":3}
            },
            "required": ["session_id", "code"],
            "additionalProperties": False
        },
        "strict": True
    }
}

python_exec_visible_tool = {
    "type": "function",
    "function": {
        "name": "python_exec_visible",
        "description": "사용자 가시 파이썬 실행(단일 matplotlib 플롯, 색/스타일 지정 금지).",
        "parameters": {
            "type": "object",
            "properties": {
                "session_id": {"type": "string"},
                "code": {"type": "string"},
                "timeout_sec": {"type":"integer","minimum":1,"maximum":15,"default":3},
                "enforce_plot_rules": {"type":"boolean","default": True}
            },
            "required": ["session_id", "code"],
            "additionalProperties": False
        },
        "strict": True
    }
}

copy_to_sandbox_tool = {
    "type": "function",
    "function": {
        "name": "copy_to_sandbox",
        "description": "호스트의 임의 경로에서 파일/폴더를 샌드박스로 복사. 쓰기 가능한 경로는 샌드박스뿐.",
        "parameters": {
            "type": "object",
            "properties": {
                "source": {"type": "string", "description": "호스트의 기존 경로"},
                "dest_rel": {"type": "string", "description": "샌드박스 내부 상대경로", "default": None}
            },
            "required": ["source"],
            "additionalProperties": False
        },
        "strict": True
    }
}

TOOLS = [python_exec_internal_tool, python_exec_visible_tool, copy_to_sandbox_tool]

def run_tool_locally(name: str, args: Dict[str, Any]) -> Dict[str, Any]:
    if name == "copy_to_sandbox":
        return copy_to_sandbox_impl(args["source"], args.get("dest_rel"))

    session_id = args["session_id"]
    code = args["code"]
    timeout = int(args.get("timeout_sec", 3))

    if name == "python_exec_visible" and args.get("enforce_plot_rules", True):
        v = _plot_rule_check(code)
        if v:
            return {"returncode": 2, "stdout": "", "stderr": v, "artifacts": []}

    sess = POOL.get(session_id)
    return sess.execute(code, timeout=timeout)

# ========= 모델 라운드트립 =========
def roundtrip(client: OpenAI, user_msg: str, session_id: str):
    resp = client.responses.create(
        model=MODEL,
        input=[{"role": "user", "content": user_msg}],
        tools=TOOLS,
        parallel_tool_calls=False
    )
    messages = [{"role": "user", "content": user_msg}]
    final_text = None

    for item in resp.output:
        if item.type == "message":
            final_text = item.content[0].text
        if item.type == "tool_call":
            name = item.name
            args = item.arguments
            result = run_tool_locally(name, args)
            follow = client.responses.create(
                model=MODEL,
                input=messages + [{"role": "tool", "name": name, "content": result}],
                tools=TOOLS,
                parallel_tool_calls=False
            )
            final_text = follow.output_text
            break

    return final_text

# ========= 실행 예시 =========
if __name__ == "__main__":
    client = OpenAI()

    print("예시 0) 샌드박스에 파일 복사")
    src = __file__   # 현재 스크립트를 샌드박스로 복사해보자
    result = copy_to_sandbox_impl(src, "example/copied_script.py")
    print(result)

    print("\n예시 1) 내부용 실행: sum(range(10))")
    msg1 = "sum(range(10))을 내부 실행기로 계산해서 결과만 말해줘."
    print(roundtrip(client, msg1, session_id="sess-1"))

    print("\n예시 2) 가시 실행: 간단 플롯 생성 후 산출물 경로 출력")
    code = '''
import matplotlib.pyplot as plt, os
x = [1,2,3,4]; y = [v*v for v in x]
plt.figure(); plt.plot(x,y); plt.title("y = x^2")
plt.savefig(os.path.join(os.environ.get("DATA_DIR","/mnt/data"), "curve.png"))
print("done")
'''
    msg2 = f"이 코드를 가시 실행기로 돌리고 산출물 경로를 요약해서 알려줘:\\n{code}"
    print(roundtrip(client, msg2, session_id="sess-1"))

핵심 포인트를 짚어둔다.

  1. 커널 지속성 유지
    기존과 동일하게 jupyter_client.KernelManager를 사용하되, 세션마다 샌드박스 가드를 한 번만 주입해 유지한다. 타임아웃으로 커널을 kill하면 새 커널에서 가드를 재주입한다.

  2. 모든 경로 읽기 허용, 쓰기는 샌드박스만
    커널 내부에서 open/os.open/pathlib.Path.open/rename/remove 등 쓰기성 작업을 가드한다. 코드를 통한 임의의 파일 쓰기는 샌드박스 밖으로 나갈 수 없다. copy_to_sandbox(source, dest_rel) 툴은 호스트 프로세스에서 안전 검증 후 샌드박스에 복사한다.

  3. temp.py가 샌드박스 밖을 만지지 못한다는 보장
    소프트 가드만으로도 대부분의 파이썬 레벨 I/O를 막지만, 이론적으로 C 확장/ctypes로 우회할 수 있다. 그래서 리눅스라면 커널 레벨 하드 가드를 추천한다.

하드 가드 옵션(Linux, 선택 사항, 도커/외부 프로그램 불필요)
Landlock LSM 바인딩(pylandlock 같은 라이브러리)을 사용하면 프로세스가 스스로 “샌드박스 밖 쓰기 금지” 규칙을 커널에 등록할 수 있다. 루트 권한이 필요 없고, 프로세스 자신에게만 적용된다. 적용 지점은 KernelSession 생성 직후다.

개념 코드(실패 시 예외를 무시하고 소프트 가드만 사용):

def apply_landlock_allow_write_only_to_sandbox():
    try:
        import landlock  # 또는 pylandlock: 배포판에 따라 패키지명이 다를 수 있음
        from landlock import Ruleset, AccessFs, PathBeneath
        rs = Ruleset()
        # 기본은 아무 쓰기도 불가, 샌드박스만 쓰기 허용
        write_perms = (
            AccessFs.WRITE_FILE | AccessFs.REMOVE_FILE |
            AccessFs.MAKE_DIR | AccessFs.REMOVE_DIR |
            AccessFs.TRUNCATE | AccessFs.RENAME_FROM | AccessFs.RENAME_TO
        )
        rs.add_rule(PathBeneath(SANDBOX_DIR, write_perms))
        rs.restrict_self()  # 여기서부터 현재 프로세스는 샌드박스 밖 쓰기 불가
        print("[landlock] active")
    except Exception as e:
        print("[landlock] not available:", e)

이 함수는 KernelManager가 띄우는 커널 프로세스 안이 아니라, 네 런너 프로세스에 적용하는 게 아니라는 점을 잊지 말고, 커널 프로세스에 적용하려면 커널 시작 후 첫 셀로 Landlock 적용 코드를 주입해야 한다. 위 KERNEL_GUARD_CODE 상단에 Landlock 적용 코드를 넣고, 실패해도 계속 진행하도록 구성하면 된다. 커널 버전이 5.13+ 이어야 한다.

추가 하드닝 아이디어
seccomp(pylibseccomp)로 socket/subprocess 관련 시스템콜 자체를 차단해 네트워크/셸을 무력화할 수 있다. 네트워크 차단이 필요하면 이걸 병행하자.

주의점
읽기를 어디든 허용하면 민감한 키/토큰 파일(예: ~/.ssh, .env)도 읽힐 수 있다. 운영 시엔 copy_to_sandbox로 읽기 허용 목록을 좁히거나, 가드 코드에 “읽기 허용 경로 allowlist”를 추가하는 게 안전하다.

이제 네 스택(vLLM + Qwen/GPTOSS + OpenAI Agents SDK)에 그대로 접목하면 된다. 모델에게는 다음 세 가지 툴을 노출한다.

  1. copy_to_sandbox(source, dest_rel?)

  2. python_exec_internal(session_id, code, …)

  3. python_exec_visible(session_id, code, …)

실행 동선은 간단하다. 모델이 파일이 필요하면 copy_to_sandbox를 먼저 부르고, 그 다음 실행 툴로 샌드박스 내부 경로만 쓰게 된다. 이렇게 하면 커널을 유지하면서도 파일 안전성이 보장된다.

원하면 Landlock 주입까지 포함한 버전으로 리팩터링해 줄게.

이런 일을 하는 파이썬 패키지가 있을 법 한데 찾아보고 알려주라

짧은 결론부터 말할게. 순수 파이썬만으로 “진짜” 안전한 샌드박스를 만드는 건 여러 차례 실패했고, 현재도 권장되지 않는다. 대신 리눅스 커널 기능을 노출한 파이썬 패키지를 조합하면 도커 없이도 “샌드박스 밖 쓰기 금지”를 꽤 단단하게 만들 수 있다. 대표 조합은 Landlock(+선택적으로 seccomp)이다. LWN.net

다음이 지금 목적(커널 유지, 어디든 읽기 가능·쓰기만 샌드박스)과 잘 맞는 패키지들이야.

  1. 커널 레벨 격리(도커 없이 강력·권장)
  • landlock: 리눅스 5.13+에서 프로세스가 “이 디렉터리만 쓰기 허용” 같은 규칙을 스스로 걸 수 있게 해주는 LSM 바인딩. 한 줄 요약: 커널이 파일 쓰기를 차단하므로 temp.py가 밖으로 나가서 write/rename/delete를 하려 해도 거기서 막힌다. 세션 커널을 유지한 채, 커널 시작 직후 규칙을 적용하면 된다. PyPI

  • pyseccomp / python3-seccomp: libseccomp 바인딩. execve/fork/socket 같은 시스템콜을 막아 서브프로세스 실행이나 네트워크 등 “탈출 루트”를 추가로 봉쇄할 수 있다. Landlock과 병행하면 효과가 좋다. PyPI

  1. 운영체제 프로파일 기반(환경 의존·설정 필요)
  • edx-codejail: “파이썬 코드 감옥”을 관리하는 패키지. 보안은 AppArmor 프로파일로 강제한다. 즉 패키지는 파이썬이지만, 커널의 AppArmor 설정이 제대로 있어야 보호가 된다. 도커는 필요 없지만 시스템 설정은 필요. PyPI
  1. 대안 런타임(아키텍처를 바꿔서 샌드박스)
  • WebAssembly 런타임을 파이썬에서 직접 쓰기: wasmer-python/wasmtime-py로 WASI 환경에서 CPython/py2wasm 등을 돌리면 기본이 “허용 안 함”이라 파일·네트워크 접근이 명시적으로 열어주기 전까지 막힌다. 세션형 유지도 구현 가능. 다만 네이티브 확장 모듈(예: numpy) 호환성은 전략이 필요하다. GitHub

보조(단독 사용 비권장, 하지만 레이어로 유용)

  • RestrictedPython: 언어 기능을 제한하는 도구(문법/바이트코드 레벨)라 OS 파일 시스템을 가두는 건 아니다. Landlock/seccomp 위에서 “추가 울타리”로 쓰면 좋다. restrictedpython.readthedocs.io

  • pyfakefs: 테스트용 가짜 파일시스템. 실제 디스크를 만지지 않게 만들 수 있지만 “보안 격리”는 아니다. 주 목적은 회귀 방지/테스트 안정성.

추천 설계(너의 커널 유지 구조에 바로 얹기)

  1. 커널 시작 직후 첫 셀에서 Landlock을 적용해 “SANDBOX_DIR만 쓰기 허용” 규칙을 건다.

  2. seccomp로 execve/fork/socket 등을 막는다(선택).

  3. LLM이 외부 파일을 쓰고 싶다면 반드시 copy_to_sandbox(source, dest_rel) 툴을 호출하도록 에이전트 정책을 강제한다.

  4. 파이썬 단의 몽키패치(open/rename/remove 등)는 여전히 유용한 소프트 가드지만, 진짜 방어선은 커널(Landlock/seccomp)이 맡는다.

Landlock 간단 예시(아이디어 스케치)

# pip install landlock  (리눅스 5.13+ 필요)
import os
from landlock import Ruleset, AccessFs, PathBeneath

SANDBOX_DIR = os.environ.get("SANDBOX_DIR", "/mnt/sandbox")
os.makedirs(SANDBOX_DIR, exist_ok=True)

# 1) 규칙 모음 생성: 기본은 쓰기 불가, 특정 디렉터리만 쓰기 허용
rules = Ruleset()
write_perms = (
    AccessFs.WRITE_FILE | AccessFs.REMOVE_FILE | AccessFs.MAKE_DIR |
    AccessFs.REMOVE_DIR | AccessFs.TRUNCATE | AccessFs.RENAME_FROM |
    AccessFs.RENAME_TO
)
rules.add_rule(PathBeneath(SANDBOX_DIR, write_perms))

# 2) 현재 프로세스(= 네가 유지하는 Jupyter 커널)에 적용
rules.restrict_self()

# 이제부터 이 프로세스와 자식은 SANDBOX_DIR 밖으로의 쓰기가 커널에서 거부됨

참고로 execve/fork/socket도 막고 싶다면, 커널 시작 직후 seccomp 필터를 같이 로드하면 된다(예: execve/execveat/fork/vfork/sys_socket 차단). PyPI

정리하면 이렇게 골라 쓰면 된다.

  • “도커 없이, 파일 쓰기만 샌드박스로”: landlock 필수, 필요 시 seccomp 추가. PyPI+1

  • “시스템 프로파일을 써도 괜찮다”: codejail(AppArmor)로 관리. PyPI

  • “아예 다른 런타임에 가둔다”: wasmer-python/wasmtime-py 기반 WASM 샌드박스. GitHub

  • “순수 파이썬 샌드박스만으로는 위험”: 과거 시도가 ‘설계상 취약’으로 결론 났다. 커널/런타임 레벨 방어를 반드시 곁들여라. LWN.net

원하면 네가 어제 주신 커널-세션 구조에 Landlock+seccomp를 정확히 주입한 버전(툴 호출과 연동까지)로 리라이팅해서 전달하겠다.

그렇게 까지 완벽한 샌드박스는 필요없어

좋아. “완벽하지 않아도 되는” 현실적 80/20 해법으로 정리해 줄게. 핵심은 커널은 계속 유지하면서, 실수로 호스트를 더럽히는 걸 막는 3중 완충장치다.

  1. 커널 시작 직후 한 번만 주입하는 audit hook으로 쓰기/서브프로세스/네트워크를 가볍게 차단

  2. 필요한 파일은 copy_to_sandbox(source, dest_rel)로 복사해 샌드박스 안에서만 다룸

  3. CPU/메모리/파일사이즈 같은 리소스 제한으로 폭주 방지

이 조합은 “강철 감옥”은 아니지만, 실무에서 발생하는 대부분의 실수성 파괴를 확실히 걸러준다. 그리고 도커도 필요 없다.

아래는 네가 준 커널-세션 코드에 딱 필요한 만큼만 추가/교체하는 스니펫이다.

  1. 커널에 주입할 가벼운 가드 코드(sys.addaudithook 기반)
# 커널에 인젝션할 코드: AUDIT_GUARD_CODE
AUDIT_GUARD_CODE = r"""
import os, sys
SANDBOX_DIR = os.path.realpath(os.environ.get('SANDBOX_DIR', '/mnt/sandbox'))
os.makedirs(SANDBOX_DIR, exist_ok=True)

def _in_sb(p):
    rp = os.path.realpath(p)
    return rp == SANDBOX_DIR or rp.startswith(SANDBOX_DIR + os.sep)

def _audit(event, args):
    # 파일 쓰기 시도 차단: open 및 os.open은 'open' 이벤트로 들어옴
    if event == 'open':
        path = str(args[0])
        mode = str(args[1]) if len(args) > 1 else 'r'
        flags = int(args[2]) if len(args) > 2 else 0
        write_intent = any(ch in mode for ch in ('w','a','+','x')) or (flags & (os.O_WRONLY|os.O_RDWR|os.O_CREAT|os.O_TRUNC|os.O_APPEND))
        if write_intent and not _in_sb(path):
            raise PermissionError(f'Writes are restricted to {SANDBOX_DIR}: {path}')
    # 서브프로세스/셸 차단
    elif event.startswith('subprocess.'):
        raise PermissionError('Subprocess disabled in this session')
    elif event == 'os.system':
        raise PermissionError('os.system disabled in this session')
    # 네트워크 차단(원하면 주석 해제)
    elif event.startswith('socket.'):
        raise PermissionError('Network disabled in this session')

sys.addaudithook(_audit)

# 가벼운 한도 설정(유닉스 계열에서만 작동)
try:
    import resource
    resource.setrlimit(resource.RLIMIT_CPU, (3, 3))             # CPU 3초
    resource.setrlimit(resource.RLIMIT_AS, (1024*1024*1024,)*2) # 1GB 메모리
    resource.setrlimit(resource.RLIMIT_FSIZE, (200*1024*1024,)*2) # 파일 최대 200MB
    resource.setrlimit(resource.RLIMIT_NOFILE, (256, 256))
except Exception:
    pass

print('[audit-guard] active for sandbox:', SANDBOX_DIR)
"""
  1. KernelSession에 가드 주입 한 번만 실행하도록 보강
class KernelSession:
    def __init__(self):
        from jupyter_client import KernelManager
        self.km = KernelManager()
        self.km.start_kernel(extra_arguments=["--HistoryManager.enabled=False"])
        self.kc = self.km.client(); self.kc.start_channels()
        self._guard_ready = False

    def _ensure_guard(self, timeout: float = 2.0):
        if self._guard_ready:
            return
        code = f"import os; os.environ['SANDBOX_DIR'] = {SANDBOX_DIR!r}\n" + AUDIT_GUARD_CODE
        _ = self._exec_once(code, timeout=timeout)
        self._guard_ready = True
  1. copy_to_sandbox(source, dest_rel) 툴(간결판)
def copy_to_sandbox_impl(source: str, dest_rel: str | None = None) -> dict:
    import os, shutil, stat
    src = os.path.realpath(source)
    if not os.path.exists(src):
        return {"ok": False, "error": f"source not found: {source}"}
    if os.path.islink(src):
        return {"ok": False, "error": "symlink not allowed"}
    name = os.path.basename(src) if not dest_rel else dest_rel
    dst = os.path.realpath(os.path.join(SANDBOX_DIR, name))
    if not (dst == SANDBOX_DIR or dst.startswith(SANDBOX_DIR + os.sep)):
        return {"ok": False, "error": "destination must be inside sandbox"}
    os.makedirs(os.path.dirname(dst), exist_ok=True)
    if os.path.isdir(src):
        if os.path.exists(dst):
            return {"ok": False, "error": f"destination exists: {dst}"}
        shutil.copytree(src, dst, copy_function=shutil.copy2)
    else:
        shutil.copy2(src, dst, follow_symlinks=False)
    try:
        os.chmod(dst, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
    except Exception:
        pass
    return {"ok": True, "sandbox_path": dst}
  1. 실행기에서 가드 보장 후 코드 수행

네가 이미 가진 KernelSession.execute() 앞부분에 _ensure_guard() 한 줄만 추가하면 된다.

def execute(self, code: str, timeout: float = 3.0) -> Dict[str, Any]:
    self._ensure_guard()  # 가드가 주입되지 않았다면 지금 주입
    # 이하 기존 로직 그대로

이렇게 하면 다음이 자연스럽게 충족된다.

  • 어디든 읽기 가능

  • 쓰기는 샌드박스 디렉토리 안에서만 가능

  • temp.py 같은 임의 코드가 /etc, ~, /var 등에 쓰려고 하면 audit hook에서 즉시 PermissionError

  • 서브프로세스/네트워크는 기본 차단(옵션)

  • 커널은 세션 단위로 계속 유지

운영 팁 몇 가지만 덧붙인다.

  • 민감 경로 읽기까지 허용하면 의도치 않게 비밀 토큰을 읽을 수 있다. 필요하면 audit hook에서 읽기 허용 경로도 allowlist로 좁혀라.

  • Windows에서는 resource 제한이 제한적이다. 그래도 audit hook은 동작하니 파일 쓰기/서브프로세스 차단엔 유용하다.

  • CPython 3.8+에서 audit hook이 지원된다. 그 아래 버전은 이 방식이 안 된다.

  • numpy/pandas/matplotlib 같은 일반 과학 스택은 그대로 사용 가능하다. 네가 잡아 둔 “단일 플롯·색상 지정 금지” 정책은 기존 검사로 유지하면 된다.

이 정도면 “완벽한 감옥”은 아니지만, 실무에서 실수성 파괴를 거의 없애면서 커널 지속성과 개발 편의성을 그대로 가져갈 수 있다. 다음으로 vLLM + Agents SDK에 노출할 툴 스펙을 정리해주길 원하면 그 부분도 바로 이어서 만들어 줄게.

landlock을 읽기 가능 폴더 따로 쓰기 가능 폴더 따로 지정가능해?

가능해. Landlock는 “경로별 권한 비트마스크”를 줘서 읽기 전용 디렉터리와 쓰기 허용 디렉터리를 각각 따로 지정할 수 있어. 방법의 뼈대는 이렇다.

  1. ruleset을 만들 때 읽기와 쓰기에 해당하는 모든 권한을 handled_access_fs에 포함한다. 예시로 문서에 나오는 READ_FILE, READ_DIR, WRITE_FILE, MAKE__(파일/디렉터리 생성), REMOVE__(삭제), TRUNCATE(ABI v3+), REFER(디렉터리 간 이동/링크, ABI v2+) 등을 묶는다. 이렇게 해야 “명시한 권한만 허용, 나머지는 거부”가 제대로 작동한다. Kernel 문서

  2. 읽기 전용 디렉터리 규칙을 추가한다. 해당 경로에 대해 allowed_access를 READ_FILE READ_DIR(필요하면 EXECUTE) 정도만 주는 PathBeneath 규칙을 ruleset에 넣는다. 커널 문서 예제도 /usr를 이런 식으로 “읽기만 허용”으로 추가한다. Kernel 문서+1
  3. 쓰기 허용 디렉터리 규칙을 별도로 추가한다. 보통은 READ_DIR(경로 탐색/목록), WRITE_FILE, MAKE_REG/MAKE_DIR, REMOVE_FILE/REMOVE_DIR, TRUNCATE(ABI v3+)를 묶고, 디렉터리 간 rename/link가 필요하면 REFER(ABI v2+)까지 포함한다. 그런 다음 landlock_restrict_self로 현재 스레드(→ 자식 프로세스 포함)에 적용한다. Kernel 문서+1

팁과 주의점

  • “진짜” 의미의 write-only 디렉터리를 만들고 싶더라도, 파일을 만들거나 열려면 최소한 디렉터리 탐색을 위한 READ_DIR 권한이 필요해지는 경우가 많다. 커널 문서도 실전에서는 읽기/쓰기 권한을 디렉터리 단위로 깨끗하게 분리해 두는 것을 권장한다(예: ~/doc은 read-only, ~/tmp는 read-write). Kernel 문서

  • TRUNCATE는 ABI v3(리눅스 6.2)부터 제어 가능, REFER는 ABI v2(리눅스 5.19)부터. 실행 환경의 Landlock ABI 버전을 조회해 가용 권한만 쓰도록 하는 “베스트 에포트” 패턴이 공식 예제에 있다. Kernel 문서+1

  • 파이썬에서 쓰려면 C syscalls를 직접 호출해도 되고, PyPI의 landlock 바인딩을 사용할 수 있다. 이 패키지는 파이썬에서 ruleset을 만들고 경로를 허용 목록으로 추가한 뒤 apply()로 적용하는 흐름을 제공한다(세부 권한 플래그는 버전에 따라 다름). PyPI+1

요약하면, 하나의 ruleset 안에 “읽기 전용 A 디렉터리용 규칙”과 “쓰기 허용 B 디렉터리용 규칙”을 각각 추가하면 된다. 원하면 네 커널-세션 구조에 맞춰서, A는 READ_FILE/READ_DIR만, B는 READ_DIR+WRITE_FILE+MAKE__/REMOVE__ +TRUNCATE(+REFER 선택)로 묶은 최소 예제 코드를 바로 맞춰 드릴게.

근데 룰셋을 파이썬 코드가 풀어버릴수도잇지않나

짧게 답하면 “정상 동작하는 Landlock에 한해선, 파이썬 코드가 룰을 ‘풀어버리는’ 건 불가능”이 맞다. Landlock는 커널이 강제하는 일방향(one-way) 제약이라, 한 번 landlock_restrict_self()로 적용하면 해제·완화가 안 되고 더 좁히는 것만 가능하다. 그리고 새로 생성되는 자식 프로세스들은 그 제약을 그대로 상속받는다. 형제 스레드에는 자동 전파가 안 되므로, 스레드가 생기기 전에 적용하거나 각 스레드에서 적용해야 한다. landlock.io+1

현실적인 예외·주의점도 있다.

  1. 2024년에 “제약을 잃어버릴 수 있는” 커널 버그가 있었다(CVE-2024-42318). 특수한 keyctl(KEYCTL_SESSION_TO_PARENT) 경로에서 Landlock 정보가 사라지는 문제였고, 커널 측에서 cred_transfer 훅을 추가하는 방식으로 수정됐다. 메인라인에 머지되었고 각 안정화 브랜치로 백포트됐다. 배포판 커널이 이 수정이 포함된 버전이면, 사용자 공간 코드가 Landlock을 해제하는 우회는 더 이상 통하지 않는다. 환경 점검만 해두면 된다. NVD+1

  2. 제약은 “그 시점 이후의 접근”에 적용된다. 제약을 걸기 전에 이미 쓰기 권한으로 연 열린 파일 디스크립터가 있다면 그 FD를 통해서는 계속 쓸 수 있다. 따라서 커널(또는 커널 프로세스) 시작 직후, 사용자 코드가 돌기 전에 바로 제한을 걸고, 미리 열린 불필요한 FD를 닫는 게 안전하다. 이건 Landlock의 설계 철학(누적·일방향)과 동일한 주의점이다. Kernel 문서

정리하면 다음처럼 운용하면 된다.

• “풀 수 없다”는 보장은 커널이 준다. 사용자 파이썬이 ctypes·os·subprocess로 발버둥쳐도 Landlock 자체를 해제하진 못한다. 제약은 누적만 된다. landlock.io
• 단, 커널이 CVE-2024-42318 수정분을 포함해야 한다. 2024-07 말에 메인라인에 들어갔고 배포판 커널에도 반영됐다. 운영 환경의 커널 업데이트 여부를 확인하자. NVD+1
• 스레드/프로세스 모델을 기억하자. 현재 스레드에 적용하면 자식에는 상속되지만, 이미 떠 있는 다른 스레드엔 자동 적용 안 된다. 커널 프로세스 초기화 직후 제일 먼저 제한을 건다. man7.org

덧붙여, 읽기 전용 폴더와 쓰기 허용 폴더를 각각 따로 주는 것도 한 ruleset 안에서 규칙 두 개로 충분하며, 필요 권한 비트(READ_FILE/READ_DIR… vs WRITE_FILE/MAKE_DIR/REMOVE_* 등)를 분리해 주면 된다. ABI 버전에 따라 TRUNCATE/REFER 같은 권한 가용성이 달라지니 “best-effort” 방식으로 감지해 쓰면 호환성이 좋다. Kernel 문서

이 상태에서 네 커널-세션 구조 위에 얹으면, 파이썬 쪽 몽키패치나 audit hook은 “소프트 가드”로 편의와 메시지 품질을 올려주고, Landlock이 “하드 가드”로 바닥을 받쳐준다. 다음 단계로 커널 첫 셀에서 Landlock을 적용하는 최소 샘플을 붙여줄 수 있다.

아하 다시 한 번 깔끔하게 정리해보자. 파이썬 실행 툴을 LLM이 쓸 수 있도록 만들 것 툴 Config 단계 -> LLM에게 툴 사용 법 전달하는 단계 -> 실제로 사용하는 단계 이렇게 3단계를 나눠서 생각하자. 요구 사항 1. 툴 Config 단계에서 landlock으로 WRITE/READ 가 가능한 폴더, READ만 가능한 폴더를 지정해둔다. (하위 폴더까지 포함) 2. 세션은 jupyter client 든 뭐든 유지한다(jupyter client 말고 다른 적절한 방법이 있으면 그걸 사용할 것). 각 에이전트별 세션을 유지한다. 이 때 세션에 대한 id 같은걸 같이 받는 방법이 있을 듯. 3. 실제로 사용할 때는 (1) 세션을 유지할지 말지, (2) 일회용 코드 실행할지 선택할 수도 있을 것 같네. 설계 잘 해줘.

좋아, 3단계 흐름에 딱 맞춰 설계와 최소 구현 스케치를 정리한다. 목표는 LLM이 파이썬을 실행하되, 설정 단계에서 읽기/쓰기 폴더 정책을 정하고, 세션을 필요에 따라 유지하거나 1회성으로 쓰는 것.

0. 전체 구조 한눈에

  • PythonExecManager: 툴 진입점. 정책(랜드락), 세션 생성/해제, 실행을 관리.

  • 두 가지 실행 백엔드 중 선택

    • KernelBackend: Jupyter 커널을 세션 단위로 유지. 첫 셀에서 Landlock + 가벼운 가드 주입.

    • SubprocessBackend: 가벼운 1회성 혹은 유지형 서브프로세스(JSON-RPC) 백엔드.

  • 툴 세트

    • configure_python_exec(readwrite_dirs, readonly_dirs)

    • start_python_session(agent_id, backend, keep_alive, idle_timeout_sec)

    • python_execute(code, session_id?, backend?, ephemeral?)

    • copy_to_sandbox(source, dest_rel?)

1단계. 툴 Config 단계

요구사항: 이 단계에서 Landlock으로 “쓰기 가능 폴더”와 “읽기 전용 폴더”를 지정해둔다. 실제 Landlock 적용은 각 세션 프로세스가 뜨고 사용자 코드가 돌기 전에 수행된다.

기능 요약

  • 읽기 전용 디렉터리: READ_FILE/READ_DIR(+EXECUTE 선택)만 허용.

  • 쓰기 가능 디렉터리: READ_DIR + WRITE_FILE + MAKE_* + REMOVE_* (+TRUNCATE/REFER은 ABI 지원 시) 허용.

  • Linux가 아니거나 Landlock 미탑재면 소프트 가드(audit hook)만 켠다.

툴 스펙(LLM에 등록할 JSON, OpenAI Agents SDK/Function calling 양식 예)

{
  "type": "function",
  "function": {
    "name": "configure_python_exec",
    "description": "샌드박스 정책을 설정한다. 읽기/쓰기 폴더를 지정하고 이후 세션에 적용된다.",
    "parameters": {
      "type": "object",
      "properties": {
        "readwrite_dirs": { "type": "array", "items": { "type": "string" } },
        "readonly_dirs":  { "type": "array", "items": { "type": "string" } },
        "sandbox_dir":    { "type": "string", "description": "쓰기 기본 루트(샌드박스). 생략 시 내부 기본값 사용" }
      },
      "required": ["readwrite_dirs", "readonly_dirs"]
    }
  }
}

파이썬 최소 구현(정책 저장만, 실제 적용은 세션이 켜질 때)

class SandboxPolicy:
    def __init__(self, rw: list[str], ro: list[str], sandbox_dir: str | None):
        self.readwrite_dirs = [os.path.realpath(p) for p in rw]
        self.readonly_dirs  = [os.path.realpath(p) for p in ro]
        self.sandbox_dir    = os.path.realpath(sandbox_dir) if sandbox_dir else "/mnt/sandbox"

class PythonExecManager:
    def __init__(self):
        self.policy: SandboxPolicy | None = None
        self.sessions: dict[str, "BaseSession"] = {}

    def configure(self, rw: list[str], ro: list[str], sandbox_dir: str | None = None):
        os.makedirs(sandbox_dir or "/mnt/sandbox", exist_ok=True)
        self.policy = SandboxPolicy(rw, ro, sandbox_dir)
        return {"ok": True, "sandbox_dir": self.policy.sandbox_dir}

2단계. “LLM에게 툴 사용법” 전달 단계

시스템/개발자 지침에 넣을 핵심 규칙

  • 파일을 쓰거나 생성하려면 먼저 copy_to_sandbox를 호출해 샌드박스 안으로 가져올 것.

  • 세션이 필요하면 start_python_session을 먼저 호출하고 반환된 session_id를 python_execute에 넘겨라.

  • 1회성 실행은 session_id 없이 python_execute(ephemeral=true, backend=…)로 호출해라.

  • 경로를 제시할 때는 샌드박스 내부 경로를 우선 사용해라. 쓰기 시도는 샌드박스 밖에서 실패한다.

LLM용 툴 스펙

{
  "type": "function",
  "function": {
    "name": "start_python_session",
    "description": "파이썬 실행 세션을 연다(커널 또는 서브프로세스 백엔드).",
    "parameters": {
      "type": "object",
      "properties": {
        "agent_id": { "type": "string", "description": "에이전트/스레드 식별자" },
        "backend":  { "type": "string", "enum": ["kernel", "subprocess"], "default": "kernel" },
        "keep_alive": { "type": "boolean", "default": true },
        "idle_timeout_sec": { "type": "integer", "default": 600 }
      },
      "required": ["agent_id"]
    }
  }
}
{
  "type": "function",
  "function": {
    "name": "python_execute",
    "description": "파이썬 코드를 실행한다. 세션 유지 또는 1회성(ephemeral) 실행 모두 지원.",
    "parameters": {
      "type": "object",
      "properties": {
        "code": { "type": "string" },
        "session_id": { "type": "string", "description": "유지형 세션 실행 시 필요" },
        "backend": { "type": "string", "enum": ["kernel", "subprocess"] },
        "ephemeral": { "type": "boolean", "default": false },
        "timeout_sec": { "type": "integer", "default": 5 },
        "visible_mode": { "type": "boolean", "default": false }
      },
      "required": ["code"]
    }
  }
}
{
  "type": "function",
  "function": {
    "name": "copy_to_sandbox",
    "description": "호스트 경로에서 파일/폴더를 샌드박스로 복사한다.",
    "parameters": {
      "type": "object",
      "properties": {
        "source": { "type": "string" },
        "dest_rel": { "type": "string", "description": "샌드박스 내부 상대경로" }
      },
      "required": ["source"]
    }
  }
}

LLM에게 주는 간단 지침 텍스트

  • 분석에 쓸 파일은 먼저 copy_to_sandbox로 가져와라.

  • 플롯/파일 저장은 항상 ${SANDBOX_DIR} 안으로 저장하라.

  • 여러 번 실행·상태 유지가 필요하면 start_python_session → python_execute(session_id=…).

  • 한 번만 돌릴 거면 python_execute(ephemeral=true, backend="subprocess")로 호출하라.

3단계. 실제 사용 단계 설계

A. 유지형 세션 흐름(예: 에이전트별 상태 유지)

  1. configure_python_exec 호출로 정책 등록

  2. start_python_session(agent_id="agent-A", backend="kernel", keep_alive=true)

  3. copy_to_sandbox 필요 시 호출

  4. python_execute(session_id=…, code=…)

  5. 필요 시 같은 session_id로 연속 실행

  6. 아이들 타임아웃 초과 시 세션 자동 종료

B. 1회성 실행 흐름

  1. configure_python_exec 호출(앱 시작 시 1회 해도 됨)

  2. python_execute(ephemeral=true, backend="subprocess", code=…)

이원화 이유

  • kernel은 풍부한 과학 스택/상태 유지에 강함

  • subprocess는 기동이 빠르고, 일회성 코드에 적합

세션 백엔드 구현 핵심

Landlock 적용 전략

  • 실제 제약은 “각 세션 프로세스 내부”에서 걸어야 한다.

  • KernelBackend: 커널 시작 후 첫 실행 셀에서 Landlock을 적용하고, 이어서 audit hook(소프트 가드)도 주입.

  • SubprocessBackend: 자식 프로세스가 뜨자마자 Landlock을 restrict_self()로 적용한 후 JSON-RPC 루프 진입.

권한 마스크 구성(ABI 차이 안전 처리)

def build_access_mask():
    import landlock
    from landlock import AccessFs
    acc = AccessFs.READ_FILE | AccessFs.READ_DIR
    # 읽기/실행 필요 시
    try:
        acc |= AccessFs.EXECUTE
    except AttributeError:
        pass
    return acc

def build_write_mask():
    from landlock import AccessFs
    perms = [
        "WRITE_FILE", "REMOVE_FILE", "REMOVE_DIR", "MAKE_CHAR",
        "MAKE_DIR", "MAKE_REG", "MAKE_SOCK", "MAKE_FIFO", "MAKE_BLOCK",
        "MAKE_SYM"
    ]
    m = 0
    for p in perms:
        m |= getattr(AccessFs, p, 0)
    # ABI v2+: REFER, v3+: TRUNCATE 등은 있으면 추가
    for opt in ["REFER", "TRUNCATE"]:
        m |= getattr(AccessFs, opt, 0)
    # 디렉터리 트래버스 허용
    m |= getattr(AccessFs, "READ_DIR", 0)
    return m

KernelBackend 초기화 코드 스케치

AUDIT_GUARD = r"""
import os, sys
SBOX = os.path.realpath(os.environ.get('SANDBOX_DIR','/mnt/sandbox'))
os.makedirs(SBOX, exist_ok=True)
def _in_sb(p): import os; rp=os.path.realpath(p); return rp==SBOX or rp.startswith(SBOX+os.sep)
def _audit(ev, args):
    if ev == 'open':
        path = str(args[0]); mode = str(args[1]) if len(args)>1 else 'r'
        flags = int(args[2]) if len(args)>2 else 0
        wr = any(ch in mode for ch in ('w','a','+','x')) or (flags & (os.O_WRONLY|os.O_RDWR|os.O_CREAT|os.O_TRUNC|os.O_APPEND))
        if wr and not _in_sb(path): raise PermissionError(f'Writes restricted to {SBOX}: {path}')
    elif ev.startswith('subprocess.') or ev=='os.system': raise PermissionError('Subprocess disabled')
sys.addaudithook(_audit)
"""

LANDLOCK_INIT = r"""
import os
SANDBOX_DIR = os.environ.get('SANDBOX_DIR','/mnt/sandbox')
try:
    import landlock
    from landlock import Ruleset, PathBeneath
    from landlock import AccessFs as A
    rw_dirs = os.environ.get('LL_RW_DIRS','').split(os.pathsep) if os.environ.get('LL_RW_DIRS') else []
    ro_dirs = os.environ.get('LL_RO_DIRS','').split(os.pathsep) if os.environ.get('LL_RO_DIRS') else []
    handled = 0
    for name in dir(A):
        if name.isupper():
            handled |= getattr(A, name)
    rs = Ruleset(handled_access_fs=handled)
    write_mask = 0
    for n in ('WRITE_FILE','REMOVE_FILE','REMOVE_DIR','MAKE_DIR','MAKE_REG','MAKE_SYM','REFER','TRUNCATE','READ_DIR'):
        write_mask |= getattr(A, n, 0)
    read_mask = getattr(A,'READ_FILE',0) | getattr(A,'READ_DIR',0) | getattr(A,'EXECUTE',0)
    for d in ro_dirs:
        if d: rs.add_rule(PathBeneath(d, read_mask))
    for d in rw_dirs + [SANDBOX_DIR]:
        if d: rs.add_rule(PathBeneath(d, write_mask | read_mask))
    rs.restrict_self()
    print('[landlock] active')
except Exception as e:
    print('[landlock] unavailable:', e)
"""

class KernelBackendSession:
    def __init__(self, policy: SandboxPolicy):
        from jupyter_client import KernelManager
        self.km = KernelManager()
        self.km.start_kernel(extra_arguments=["--HistoryManager.enabled=False"])
        self.kc = self.km.client(); self.kc.start_channels()
        # 정책 전달 및 초기 가드/랜드락 주입
        env = f"import os; os.environ['SANDBOX_DIR']={policy.sandbox_dir!r};" \
              f"os.environ['LL_RW_DIRS']={os.pathsep.join(policy.readwrite_dirs)!r};" \
              f"os.environ['LL_RO_DIRS']={os.pathsep.join(policy.readonly_dirs)!r}"
        self._exec(env + "\n" + LANDLOCK_INIT + "\n" + AUDIT_GUARD)

    def _exec(self, code, timeout=3.0):
        msg_id = self.kc.execute(code, allow_stdin=False, stop_on_error=True)
        # … (iopub 수집/timeout 처리는 생략)

SubprocessBackend 초기화 코드 스케치

  • 자식이 부팅하자마자 Landlock 적용 후 stdin으로 들어오는 JSON 명령을 실행
SUBPROCESS_BOOT = r"""
import sys, os, json
SANDBOX_DIR = os.environ.get('SANDBOX_DIR','/mnt/sandbox')
def apply_ll():
    try:
        import landlock
        from landlock import Ruleset, PathBeneath
        from landlock import AccessFs as A
        handled = 0
        for n in dir(A):
            if n.isupper(): handled |= getattr(A,n)
        rs = Ruleset(handled_access_fs=handled)
        rw = os.environ.get('LL_RW_DIRS','').split(os.pathsep) if os.environ.get('LL_RW_DIRS') else []
        ro = os.environ.get('LL_RO_DIRS','').split(os.pathsep) if os.environ.get('LL_RO_DIRS') else []
        wm = getattr(A,'READ_DIR',0)|getattr(A,'WRITE_FILE',0)|getattr(A,'MAKE_DIR',0)|getattr(A,'MAKE_REG',0)|getattr(A,'REMOVE_FILE',0)|getattr(A,'REMOVE_DIR',0)|getattr(A,'REFER',0)|getattr(A,'TRUNCATE',0)
        rm = getattr(A,'READ_FILE',0)|getattr(A,'READ_DIR',0)|getattr(A,'EXECUTE',0)
        for d in ro: 
            if d: rs.add_rule(PathBeneath(d, rm))
        for d in rw+[SANDBOX_DIR]:
            if d: rs.add_rule(PathBeneath(d, rm|wm))
        rs.restrict_self()
    except Exception as e:
        print('[landlock] unavailable:', e, file=sys.stderr)
def loop():
    apply_ll()
    while True:
        line = sys.stdin.readline()
        if not line: break
        req = json.loads(line)
        code = req.get("code","")
        g = {}
        try:
            exec(code, g, g)
            print(json.dumps({"ok": True, "stdout": ""}))
        except Exception as e:
            print(json.dumps({"ok": False, "error": str(e)}))
        sys.stdout.flush()
loop()
"""

copy_to_sandbox 구현 메모

  • 심볼릭 링크 거부, 목적지는 반드시 sandbox_dir 하위 경로로 강제.

  • 디렉터리 복사 시 기존 대상 충돌 방지.

def copy_to_sandbox(source: str, dest_rel: str | None, policy: SandboxPolicy):
    import shutil, stat
    src = os.path.realpath(source)
    if not os.path.exists(src): return {"ok": False, "error": "not found"}
    if os.path.islink(src):    return {"ok": False, "error": "symlink not allowed"}
    dst = os.path.realpath(os.path.join(policy.sandbox_dir, dest_rel or os.path.basename(src)))
    if not dst.startswith(policy.sandbox_dir + os.sep) and dst != policy.sandbox_dir:
        return {"ok": False, "error": "destination must be inside sandbox"}
    os.makedirs(os.path.dirname(dst), exist_ok=True)
    if os.path.isdir(src):
        if os.path.exists(dst): return {"ok": False, "error": "dest exists"}
        shutil.copytree(src, dst, copy_function=shutil.copy2)
    else:
        shutil.copy2(src, dst, follow_symlinks=False)
    try: os.chmod(dst, stat.S_IRUSR|stat.S_IWUSR|stat.S_IRGRP|stat.S_IROTH)
    except: pass
    return {"ok": True, "sandbox_path": dst}

실행 단계 예시 시퀀스

유지형(커널) 예시

  1. configure_python_exec(readwrite_dirs=["/mnt/sandbox"], readonly_dirs=["/","/usr","/home/you/data"])

  2. start_python_session(agent_id="analyst-1", backend="kernel", keep_alive=true)

  3. copy_to_sandbox("/home/you/data/df.csv", "data/df.csv")

  4. python_execute(session_id="sess-analyst-1", code='import pandas as pd; df=pd.read_csv("data/df.csv"); print(df.shape)')

  5. 같은 session_id로 계속 실행

1회성(서브프로세스) 예시

  1. configure_python_exec(…)

  2. python_execute(ephemeral=true, backend="subprocess", code='open("/etc/hosts","w").write("x")') → 커널이 거부

  3. python_execute(ephemeral=true, backend="subprocess", code='print("ok")') → 정상

설계 선택 이유와 한계

  • Landlock은 커널이 강제하는 일방향 제약이라 한 번 적용하면 사용자 파이썬이 “해제”하진 못한다. 다만 OS가 Linux 5.13+여야 하며, ABI 기능(TRUNCATE/REFER 등)은 버전에 따라 다르다. 위 코드는 “가능하면 쓰고, 없으면 건너뛰는” 베스트에포트 방식으로 구성.

  • 완벽한 격리는 아니지만, 실수로 호스트를 더럽히는 경로를 효과적으로 차단한다. 추가로 audit hook과 리소스 제한(CPU/메모리/파일사이즈)을 곁들이면 80/20 목적에 충분히 실용적.

원하면 여기에 바로 붙여 쓸 수 있는 완성형 모듈(Manager + 두 백엔드 + 최소 RPC 루프)로 묶어서 드릴 수 있다.

landlock 패키지에 PathBeneath라는 게 없대. 그리고 Config는 유저가 직접할거라 LLM으로는 노출 금지

좋아. 정리부터 하고, 바로 가져다 쓸 수 있는 코드와 설계를 깔끔히 바꿔서 줄게. 핵심은 “설정(Config)은 네가 직접, LLM에겐 절대 노출하지 않음”, “세션 유지 선택 가능”, “파일 시스템은 Landlock으로 읽기/쓰기 경계”야. 그리고 네가 지적한 대로 PyPI의 landlock 패키지는 PathBeneath 클래스를 안 쓰고 Ruleset 중심의 API라서, 그에 맞춰 업데이트했어. Landlock의 원리는 커널 레벨에서 한 번 적용하면 호출 스레드와 자식에게 영구적으로 누적되는 일방향 제약이야. 사용자 공간에서 임의로 해제할 수 없도록 설계되어 있어(커널 사용자공간 API/매뉴얼에 명시, 그리고 최근 워크숍 자료에도 “한 번 켜면 끌 수 없다”는 원칙이 반복됨). man7.org+2

아래는 3단계 분리 설계와 코드 스켈레톤이야.

1) 툴 Config 단계(호스트 전용·LLM 비노출)

Config는 네 코드에서만 실행. LLM은 모르게 유지.

기능

  1. 읽기 전용 경로 목록(ro_roots), 읽기/쓰기 허용 루트(rw_root 한 개 추천)를 지정.

  2. Landlock을 현재 프로세스에 적용하고, 이후 새로 만드는 커널/서브프로세스에도 상속.

  3. 세션 풀 생성(에이전트별 세션 유지용).

landlock 패키지의 공개 문서 기준으로는 Ruleset().allow(path)로 경로 하위 접근을 허용하고 apply()로 적용하는 형태야. 이 패키지는 PathBeneath를 노출하지 않고 고수준 API를 제공해. 예시에서 “기본은 전부 차단, allow로 명시한 경로만 허용” 모델이므로, 읽기 허용 루트는 모두 allow로 넣고, 쓰기가 필요한 디렉터리는 별도로 관리하는 게 안전해. 쓰기 권한의 세밀한 비트 지정은 커널은 지원하지만(예: READ_DIR/READ_FILE/WRITE/TRUNCATE 등), 이 파이썬 패키지에서 경로별 권한 비트를 직접 노출하지 않을 수 있어 버전별로 차이가 난다. 최신 PyPI 페이지를 보면 고수준 예제만 문서화돼 있으니 이 점을 전제로 코드를 짜고, 만약 네 환경에서 per-path write 허용이 필요하면 아래 “플랜 B”를 참고해. PyPI

구현 스켈레톤

  • 아래 모듈은 설정만 담당. LLM에게 이 모듈의 어떤 내용도 전달하지 마.
# sandbox_config.py  (호스트 전용, LLM 비노출)
import os, shutil
from typing import List, Optional
from landlock import Ruleset  # PyPI 'landlock' (Edward-Knight)

class FsPolicy:
    def __init__(self, ro_roots: List[str], rw_root: str, allow_world_read: bool=False):
        # 절대경로 정규화
        self.ro_roots = [os.path.abspath(p) for p in ro_roots]
        self.rw_root = os.path.abspath(rw_root)
        self.allow_world_read = allow_world_read
        os.makedirs(self.rw_root, exist_ok=True)

class SandboxManager:
    def __init__(self):
        self._applied = False
        self.policy: Optional[FsPolicy] = None

    def apply_landlock(self, policy: FsPolicy):
        if self._applied:
            return
        rs = Ruleset()  # 기본: 전부 금지, allow로만 허용
        if policy.allow_world_read:
            rs.allow("/")  # 모든 경로 읽기 허용(“완벽한 샌드박스 불필요” 요구일 때 편의용)
        else:
            for p in policy.ro_roots:
                rs.allow(p)

        # rw_root는 LLM에게 copy_to_sandbox 통해서만 파일을 두게 한다.
        # 패키지 API가 경로별 쓰기 비트를 노출하지 않는 경우가 있어,
        # 쓰기는 원칙적으로 차단하고, 커널이 허용하는 한에서만 rw_root 내 조작을 시도한다.
        # (패키지 업데이트로 per-path write 옵션이 생기면 여기서 조건부로 허용 추가)

        rs.apply()  # 현재 스레드 및 자식 프로세스에 영구 적용
        self._applied = True
        self.policy = policy

    # 유틸: 신뢰할 수 있는 사전 복사기(LLM은 이 함수를 tool로만 호출)
    def copy_to_sandbox(self, src: str, dst_relpath: str) -> str:
        assert self.policy is not None, "Sandbox not configured"
        src_abs = os.path.abspath(src)
        # 읽기 허용 경로 검증
        if not (self.policy.allow_world_read or any(src_abs.startswith(r + os.sep) or src_abs == r for r in self.policy.ro_roots)):
            raise PermissionError(f"Source not allowed: {src_abs}")
        dst_abs = os.path.abspath(os.path.join(self.policy.rw_root, dst_relpath))
        # rw_root 밖으로 벗어나는 경로 금지
        if not (dst_abs.startswith(self.policy.rw_root + os.sep) or dst_abs == self.policy.rw_root):
            raise PermissionError(f"Destination escapes rw_root: {dst_abs}")
        os.makedirs(os.path.dirname(dst_abs), exist_ok=True)
        shutil.copy2(src_abs, dst_abs)
        return dst_abs

참고: Landlock은 일방향 제약이며 restrict_self 이후 해제가 불가한 특성이 공식 문서와 발표 자료에 반복해서 설명돼 있어. 이 구조 덕분에 “파이썬 코드가 룰셋을 풀어버린다” 류의 우려는 정상 커널/사용자 공간 전제에서 해당되지 않아. 단, 커널 취약점이 공개·패치되는 경우가 가끔 있으니 커널을 최신으로 유지하는 게 좋아. 기능과 권한 비트는 매뉴얼과 사용자공간 API 문서에 잘 정리돼 있어. man7.org+2

플랜 B: 경로별 쓰기 비트가 꼭 필요하면

  1. 최근 이슈로 유명한 landrun(Go CLI) 같은 얇은 래퍼를 써서 “–ro /… –rw /sandbox …” 형태로 정확히 분리할 수 있어. 컨테이너나 도커는 아님. GitHub+2

  2. 또는 아주 얇은 ctypes 래퍼로 landlock_create_ruleset/landlock_add_rule를 직접 호출해 권한 비트를 정확히 넣는 방법(WRITE/TRUNCATE/… 조합)을 쓰면 돼. 비트 목록과 의미는 커널 문서에 정리돼 있어. man.archlinux.org+1

2) 툴 사용법(LLM에 전달할 스펙)

설정 정보는 한 줄도 노출하지 않는다. 아래 네 가지만 공개.

  1. python_exec_internal(session_id, code, timeout_sec)

  2. python_exec_visible(session_id, code, timeout_sec, enforce_plot_rules)

  3. copy_to_sandbox(src, dst_relpath)

  4. python_session_control(session_id, action=["reset" "kill"])

copy_to_sandbox는 내부적으로 SandboxManager.copy_to_sandbox를 호출한다. 소스는 읽기 허용 경로 내에 있어야 하고, 목적지는 rw_root 하위 상대경로만 허용한다. 이렇게 하면 LLM은 “호스트 파일에 자유 접근(읽기)”은 가능하지만 “쓰기”는 반드시 샌드박스 디렉터리로 복사한 사본만 다루게 되는 워크플로가 된다.

3) 실제 실행 단계(세션 유지/일회용 선택)

세션은 커널 유지형과 일회용 둘 다 지원. 기본은 유지형, 요청 시 일회용으로 실행.

구현 스켈레톤

  • 아래는 너가 올려둔 jupyter_client 기반 코드를 정책 분리/에러 메세지/추가 도구 포함으로 다듬은 버전. seaborn/색상/서브플롯 금지 규칙도 그대로.
# code_tool.py  (LLM에 노출되는 엔드포인트는 여기 함수 시그니처뿐)
import os, time, base64, re, shutil
from typing import Dict, Any
from jupyter_client import KernelManager
from sandbox_config import SandboxManager, FsPolicy

# 0) 초기 설정(호스트 전용)
SANDBOX = SandboxManager()
# 예: 네 앱에서만 호출. LLM에겐 감춤.
# SANDBOX.apply_landlock(FsPolicy(
#     ro_roots=["/home/you/data", "/usr/share"],
#     rw_root="/home/you/sbx",
#     allow_world_read=False  # 느슨한 설정이 필요하면 True
# ))

# 1) 세션 풀
class KernelSession:
    def __init__(self):
        self.km = KernelManager()
        self.km.start_kernel(extra_arguments=["--HistoryManager.enabled=False"])
        self.kc = self.km.client(); self.kc.start_channels()

    def shutdown(self):
        try: self.kc.stop_channels(); self.km.shutdown_kernel(now=True)
        except Exception: pass

    def execute(self, code: str, timeout: float=3.0) -> Dict[str, Any]:
        if re.search(r"\bseaborn\b", code):  # 정책 예
            return {"returncode":2,"stdout":"","stderr":"Do not use seaborn","artifacts":[]}
        msg_id = self.kc.execute(code, allow_stdin=False, stop_on_error=True)
        t0 = time.time(); out=[]; err=[]; arts=[]
        while True:
            if time.time()-t0 > timeout:
                self.km.shutdown_kernel(now=True)
                return {"returncode":124,"stdout":"".join(out),"stderr":"Timeout","artifacts":[]}
            try: msg = self.kc.get_iopub_msg(timeout=0.1)
            except Exception: continue
            if msg["parent_header"].get("msg_id") != msg_id: continue
            typ = msg["header"]["msg_type"]; content = msg["content"]
            if typ == "stream":
                (out if content["name"]=="stdout" else err).append(content["text"])
            elif typ == "error":
                err.append("\n".join(content.get("traceback", [])))
            elif typ in ("display_data","execute_result"):
                data = content.get("data",{})
                if "image/png" in data:
                    b = data["image/png"]; b = b.encode() if isinstance(b,str) else b
                    fn = f"plot_{int(time.time()*1000)}.png"
                    os.makedirs(SANDBOX.policy.rw_root, exist_ok=True)  # 안전
                    fp = os.path.join(SANDBOX.policy.rw_root, fn)
                    with open(fp,"wb") as f: f.write(base64.b64decode(b))
                    arts.append({"name":fn,"path":fp,"mime":"image/png"})
            elif typ=="status" and content["execution_state"]=="idle":
                break
        return {"returncode":0,"stdout":"".join(out)[:20000],"stderr":"".join(err)[:20000],"artifacts":arts}

class SessionPool:
    def __init__(self): self.pool: Dict[str, KernelSession] = {}
    def get(self, sid: str) -> KernelSession:
        if sid not in self.pool: self.pool[sid] = KernelSession()
        return self.pool[sid]
    def reset(self, sid: str): 
        if sid in self.pool: self.pool[sid].shutdown(); self.pool[sid] = KernelSession()
    def kill(self, sid: str):
        s = self.pool.pop(sid, None)
        if s: s.shutdown()

POOL = SessionPool()

def _plot_rule_check(code: str) -> str | None:
    if re.search(r"\.subplot\(", code) or re.search(r"subplots\(", code):
        return "Do not use subplots"
    if re.search(r"(color\s*=|palette\s*=|style\s*=)", code):
        return "Do not set explicit colors or styles"
    return None

# 2) LLM이 쓰는 툴(설정 정보 없음)
def python_exec_internal(session_id: str, code: str, timeout_sec: int=3) -> Dict[str, Any]:
    sess = POOL.get(session_id)
    return sess.execute(code, timeout=timeout_sec)

def python_exec_visible(session_id: str, code: str, timeout_sec: int=3, enforce_plot_rules: bool=True) -> Dict[str, Any]:
    if enforce_plot_rules:
        v = _plot_rule_check(code)
        if v: return {"returncode":2,"stdout":"","stderr":v,"artifacts":[]}
    sess = POOL.get(session_id)
    return sess.execute(code, timeout=timeout_sec)

def python_session_control(session_id: str, action: str) -> Dict[str, Any]:
    if action == "reset": POOL.reset(session_id); return {"ok":True, "msg":"reset"}
    if action == "kill":  POOL.kill(session_id);  return {"ok":True, "msg":"killed"}
    return {"ok":False, "msg":"unknown action"}

def copy_to_sandbox(src: str, dst_relpath: str) -> Dict[str, Any]:
    try:
        dst = SANDBOX.copy_to_sandbox(src, dst_relpath)
        return {"ok": True, "dst": dst}
    except Exception as e:
        return {"ok": False, "error": str(e)}

툴 스펙(JSON)만 LLM에 전달

  • 네 OpenAI Agents SDK나 vLLM+GPTOSS/Qwen 파이프라인에서 아래 “툴 서명”만 노출해. 설정 경로나 정책 정보는 절대 넣지 마.
python_exec_internal_tool = {
  "type":"function",
  "function":{
    "name":"python_exec_internal",
    "description":"세션 유지형 파이썬 실행(비가시).",
    "parameters":{
      "type":"object",
      "properties":{
        "session_id":{"type":"string"},
        "code":{"type":"string"},
        "timeout_sec":{"type":"integer","minimum":1,"maximum":10,"default":3}
      },
      "required":["session_id","code"],
      "additionalProperties":False
    },
    "strict":True
  }
}

python_exec_visible_tool = {
  "type":"function",
  "function":{
    "name":"python_exec_visible",
    "description":"사용자 가시 실행(단일 matplotlib 플롯, 색/스타일 금지).",
    "parameters":{
      "type":"object",
      "properties":{
        "session_id":{"type":"string"},
        "code":{"type":"string"},
        "timeout_sec":{"type":"integer","minimum":1,"maximum":10,"default":3},
        "enforce_plot_rules":{"type":"boolean","default":True}
      },
      "required":["session_id","code"],
      "additionalProperties":False
    },
    "strict":True
  }
}

copy_to_sandbox_tool = {
  "type":"function",
  "function":{
    "name":"copy_to_sandbox",
    "description":"호스트 파일을 샌드박스 쓰기 루트로 복사. 목적지는 rw_root 하위 상대경로만.",
    "parameters":{
      "type":"object",
      "properties":{
        "src":{"type":"string"},
        "dst_relpath":{"type":"string"}
      },
      "required":["src","dst_relpath"],
      "additionalProperties":False
    },
    "strict":True
  }
}

python_session_control_tool = {
  "type":"function",
  "function":{
    "name":"python_session_control",
    "description":"세션 재시작/종료",
    "parameters":{
      "type":"object",
      "properties":{
        "session_id":{"type":"string"},
        "action":{"type":"string","enum":["reset","kill"]}
      },
      "required":["session_id","action"],
      "additionalProperties":False
    },
    "strict":True
  }
}

세션 유지 vs 일회용

  • 유지형: session_id를 고정으로 넘기면 커널 상태가 유지된다.

  • 일회용: python_session_control(session_id, "kill")로 즉석 종료 후 새 session_id를 쓰면 된다. 혹은 내부적으로 “one-shot” 코드를 별도 분기해 KernelManager를 매번 생성/종료해도 된다.

안전성에 대한 메모

  • Landlock은 커널 LSM이므로 사용 중 해제 불가한 일방향 제약(자식 상속 포함)이고, 경로 기반 권한 비트를 제공한다. FS 권한 비트와 설계 원리는 커널 문서의 사용자공간 API/매뉴얼에서 확인 가능하다. man7.org+2

  • PyPI ‘landlock’ 패키지는 PathBeneath를 직접 노출하지 않고 Ruleset.allow / apply를 쓰는 고수준 래퍼다. 문서 예시는 디렉터리 계층 접근 허용 예제만 제공한다. 버전에 따라 경로별 쓰기 비트를 직접 못 줄 수 있으니, 정확한 RW 분리를 원하면 landrun 같은 얇은 CLI를 함께 쓰거나(ctypes로 직접 비트 지정) 정책을 약간 느슨하게 운영하면 된다. PyPI+1

마무리 체크리스트

  1. 네 앱 초기화 시점에만 SANDBOX.apply_landlock(…) 호출. 이 코드는 절대 LLM 프롬프트·툴 정의에 섞지 않는다.

  2. LLM에는 네 개 툴 시그니처만 노출.

  3. 쓰기가 필요한 파일은 무조건 copy_to_sandbox로 rw_root에 사본을 둔 뒤 그 경로만 쓰도록 유도.

  4. 세션 유지가 필요하면 session_id를 고정하고, 필요 없으면 kill→새로 생성.

원하는 방향으로 더 촘촘한 RW 비트 제어가 필요한 순간이 오면 두 가지 루트가 있어. 첫째, landrun을 커널 앞단 래퍼로 덮어씌우기(도커 아님). 둘째, 작은 ctypes 뼈대에 커널 매뉴얼의 LANDLOCK_ACCESS_FS_* 비트를 그대로 넣는 저수준 바인딩. 어느 쪽이든 연결해 줄 수 있어.

좀 이상해. rw 컨트롤은 어디서해

핵심만 콕 집어 말할게. RW 컨트롤은 “툴 레이어”가 아니라 “세션 프로세스가 뜨는 순간 커널에 Landlock 룰을 올리는 곳”에서 한다. 즉,

1단계(네가 직접 하는 Config): 읽기 전용 디렉터리(ro_dirs)와 읽기/쓰기 디렉터리(rw_dir)를 정한다.
2단계(세션 부팅 시점): 해당 세션 프로세스 안에서 landlock_create_ruleset → landlock_add_rule(각 경로에 권한 마스크 지정) → landlock_restrict_self를 호출한다. 이때 ro_dirs에는 읽기 마스크, rw_dir에는 읽기+쓰기 마스크를 준다.
3단계(실행 중): LLM은 copy_to_sandbox로 파일을 rw_dir 아래에 들여와서만 저장·수정하게 하고, 샌드박스 밖 쓰기 시도는 커널이 거부한다.

아래는 실제로 “어디서 어떻게 RW를 강제하는지”가 드러나는 최소 헬퍼다. PyPI의 고수준 landlock 래퍼가 per-path 권한 비트를 안 열어줄 때를 대비해 ctypes로 커널 UAPI를 직접 호출한다. x86_64의 기본 syscall 번호를 쓰되, liblandlock.so가 있으면 그걸 우선 사용한다.

# landlock_helper.py
import os, ctypes, ctypes.util

# x86_64 기본값. 다른 아키텍처면 번호가 다를 수 있다.
__NR_landlock_create_ruleset = 444
__NR_landlock_add_rule       = 445
__NR_landlock_restrict_self  = 446

# 커널 UAPI 비트 (없는 비트는 0으로 처리)
class Access:
    READ_FILE   = 1 << 0
    WRITE_FILE  = 1 << 1
    READ_DIR    = 1 << 2
    REMOVE_DIR  = 1 << 3
    REMOVE_FILE = 1 << 4
    MAKE_CHAR   = 1 << 5
    MAKE_DIR    = 1 << 6
    MAKE_REG    = 1 << 7
    MAKE_SOCK   = 1 << 8
    MAKE_FIFO   = 1 << 9
    MAKE_BLOCK  = 1 << 10
    MAKE_SYM    = 1 << 11
    # ABI v2+
    REFER       = 1 << 13
    # ABI v3+
    TRUNCATE    = 1 << 14
    EXECUTE     = 1 << 12  # 있으면 사용

# struct landlock_path_beneath_attr
class path_beneath_attr(ctypes.Structure):
    _fields_ = [
        ("allowed_access", ctypes.c_uint64),
        ("parent_fd", ctypes.c_int),
    ]

libc = ctypes.CDLL(ctypes.util.find_library("c"), use_errno=True)

def _syscall(n, *args):
    r = libc.syscall(ctypes.c_long(n), *args)
    if r < 0:
        e = ctypes.get_errno()
        raise OSError(e, os.strerror(e))
    return r

def _best_effort_mask(*names):
    m = 0
    for name in names:
        m |= getattr(Access, name, 0)
    return m

def _open_ro_dir(p):
    return os.open(p, os.O_PATH | os.O_CLOEXEC)

def apply_landlock_rw(ro_dirs, rw_dir):
    # 1) 어떤 FS 권한을 다룰지 커널에 알리기(handled_access_fs)
    handled = 0
    for name in [
        "READ_FILE","WRITE_FILE","READ_DIR","REMOVE_DIR","REMOVE_FILE",
        "MAKE_CHAR","MAKE_DIR","MAKE_REG","MAKE_SOCK","MAKE_FIFO","MAKE_BLOCK","MAKE_SYM",
        "REFER","TRUNCATE","EXECUTE"
    ]:
        handled |= getattr(Access, name, 0)

    # landlock_create_ruleset(attr=NULL, size=0, flags=0)로 ABI 쿼리할 수도 있지만,
    # 여기선 간단화: 바로 ruleset 생성 (size=8바이트 마스크 전송)
    class ruleset_attr(ctypes.Structure):
        _fields_ = [("handled_access_fs", ctypes.c_uint64)]
    attr = ruleset_attr(handled_access_fs=handled)
    ruleset_fd = _syscall(__NR_landlock_create_ruleset, ctypes.byref(attr), ctypes.sizeof(attr), 0)

    # 2) 읽기 전용 마스크와 읽기/쓰기 마스크 구성
    read_mask  = _best_effort_mask("READ_FILE", "READ_DIR", "EXECUTE")
    write_mask = _best_effort_mask(
        "READ_DIR","READ_FILE","WRITE_FILE","MAKE_DIR","MAKE_REG",
        "REMOVE_FILE","REMOVE_DIR","REFER","TRUNCATE"
    )

    # 3) ro_dirs에 읽기 규칙 추가
    for d in ro_dirs:
        d = os.path.realpath(d)
        if not os.path.isdir(d):
            continue
        fd = _open_ro_dir(d)
        try:
            attr_pb = path_beneath_attr(allowed_access=read_mask, parent_fd=fd)
            _syscall(__NR_landlock_add_rule, ruleset_fd, 1, ctypes.byref(attr_pb), 0)  # 1 = LANDLOCK_RULE_PATH_BENEATH
        finally:
            os.close(fd)

    # 4) rw_dir에 읽기+쓰기 규칙 추가
    rw_dir = os.path.realpath(rw_dir)
    os.makedirs(rw_dir, exist_ok=True)
    fd = _open_ro_dir(rw_dir)
    try:
        attr_pb = path_beneath_attr(allowed_access=write_mask, parent_fd=fd)
        _syscall(__NR_landlock_add_rule, ruleset_fd, 1, ctypes.byref(attr_pb), 0)
    finally:
        os.close(fd)

    # 5) 현재 스레드에 규칙 적용(자식 프로세스에 상속됨)
    _syscall(__NR_landlock_restrict_self, ruleset_fd, 0)
    os.close(ruleset_fd)
    return True

이제 RW 컨트롤이 “정확히 어디서” 이뤄지는지 보이게 연결하면 끝이다.

  1. 네가 직접 Config할 때
    ro_dirs와 rw_dir를 정하고, 세션 프로세스를 띄우기 직전 또는 세션 프로세스 내부 초기화에서 apply_landlock_rw(ro_dirs, rw_dir)를 호출한다. 여기서 ro_dirs에는 read_mask, rw_dir에는 write_mask를 커널에 등록한다. 이 지점이 RW 컨트롤의 본진이다.

  2. 세션 유지 구조에 붙일 때
    KernelBackend라면 커널 기동 직후 첫 셀에서 아래 한 줄을 주입하거나, 아예 커널 프로세스의 entrypoint에서 호출한다.

# 커널 첫 셀
import os
from landlock_helper import apply_landlock_rw
apply_landlock_rw(
    ro_dirs=os.environ["RO_DIRS"].split(os.pathsep),
    rw_dir=os.environ["RW_DIR"]
)

SubprocessBackend라면 자식 프로세스가 부팅되자마자 같은 함수를 호출한 뒤 코드 실행 루프에 진입한다.

  1. 툴 레이어에서의 역할 정리
    툴은 RW를 “결정”하지 않는다. 툴은 그저 copy_to_sandbox로 파일을 rw_dir 아래로 옮기고, python_execute로 실행할 뿐이다. RW 정책은 위의 apply_landlock_rw가 커널에 고정시킨다. 따라서 temp.py가 rw_dir 밖을 쓰려 하면 커널이 EACCES로 거부한다. 읽기는 ro_dirs에서만 허용된다. allow_world_read 같은 편의 옵션을 쓰면 읽기 범위를 넓힐 수 있지만, 쓰기 권한은 여전히 rw_dir로만 묶인다.

정리하면, RW 컨트롤은 landlock_helper.apply_landlock_rw 안의 두 군데에서 동시에 걸린다. ro_dirs에 read_mask 규칙을 추가하는 부분, rw_dir에 write_mask 규칙을 추가하는 부분. 툴은 그 정책을 존중할 뿐이고, LLM에게는 이 설정을 절대 노출하지 않는다.

def apply_landlock(self, policy: FsPolicy): if self._applied: return rs = Ruleset() # 기본: 전부 금지, allow로만 허용 if policy.allow_world_read: rs.allow("/") # 모든 경로 읽기 허용(“완벽한 샌드박스 불필요” 요구일 때 편의용) else: for p in policy.ro_roots: rs.allow(p) # rw_root는 LLM에게 copy_to_sandbox 통해서만 파일을 두게 한다. # 패키지 API가 경로별 쓰기 비트를 노출하지 않는 경우가 있어, # 쓰기는 원칙적으로 차단하고, 커널이 허용하는 한에서만 rw_root 내 조작을 시도한다. # (패키지 업데이트로 per-path write 옵션이 생기면 여기서 조건부로 허용 추가) rs.apply() # 현재 스레드 및 자식 프로세스에 영구 적용 self._applied = True self.policy = policy 이 코드에서 rw policy를 적용해야하는거 아니야?

맞아. 그 코드 그대로면 rw_root에 “쓰기 허용”이 전혀 적용되지 않아. Ruleset().allow()만 쓰면 경로 허용 여부 정도만 다루고, 경로별 권한 비트(READ/WRITE/TRUNCATE 등)를 지정하지 못해 쓰기 정책을 커널에 못 박지 못해. 그래서 이 지점에서 RW 마스크를 커널에 정확히 등록해야 한다.

실제로 RW 정책을 적용하는 방법은 두 가지다.

  1. 정확모드(권장): 커널 UAPI를 ctypes로 직접 호출해서 ro_dirs에는 읽기 마스크, rw_dir에는 읽기+쓰기 마스크를 넣는다.

  2. 임시 대안: high-level 래퍼만으로는 per-path write 비트를 못 주니, rw 컨트롤을 커널이 아닌 툴 레이어(몽키패치/audit hook)에서 “소프트”로 막는다. 이건 의도하신 “도커 없이도 꽤 안전”에 못 미친다.

아래는 1) 정확모드로, 네 메서드에 딱 들어갈 패치다.

간단 헬퍼(한 파일로 두고 import 해서 쓰면 됨)

# landlock_helper.py
import os, ctypes, ctypes.util

__NR_landlock_create_ruleset = 444
__NR_landlock_add_rule       = 445
__NR_landlock_restrict_self  = 446

class Access:
    READ_FILE   = 1 << 0
    WRITE_FILE  = 1 << 1
    READ_DIR    = 1 << 2
    REMOVE_DIR  = 1 << 3
    REMOVE_FILE = 1 << 4
    MAKE_CHAR   = 1 << 5
    MAKE_DIR    = 1 << 6
    MAKE_REG    = 1 << 7
    MAKE_SOCK   = 1 << 8
    MAKE_FIFO   = 1 << 9
    MAKE_BLOCK  = 1 << 10
    MAKE_SYM    = 1 << 11
    EXECUTE     = 1 << 12     # 있으면 사용
    REFER       = 1 << 13     # ABI v2+
    TRUNCATE    = 1 << 14     # ABI v3+

class path_beneath_attr(ctypes.Structure):
    _fields_ = [("allowed_access", ctypes.c_uint64), ("parent_fd", ctypes.c_int)]

libc = ctypes.CDLL(ctypes.util.find_library("c"), use_errno=True)

def _syscall(n, *args):
    r = libc.syscall(ctypes.c_long(n), *args)
    if r < 0:
        e = ctypes.get_errno()
        raise OSError(e, os.strerror(e))
    return r

def _mask(*names):
    m = 0
    for n in names:
        m |= getattr(Access, n, 0)
    return m

def _open_path_only(p):
    return os.open(p, os.O_PATH | os.O_CLOEXEC)

def apply_landlock_rw(ro_dirs, rw_dir):
    # 1) ruleset 생성: 어떤 FS 권한을 다룰지 커널에 통보
    handled = _mask(
        "READ_FILE","WRITE_FILE","READ_DIR","REMOVE_DIR","REMOVE_FILE",
        "MAKE_CHAR","MAKE_DIR","MAKE_REG","MAKE_SOCK","MAKE_FIFO","MAKE_BLOCK","MAKE_SYM",
        "EXECUTE","REFER","TRUNCATE"
    )
    class ruleset_attr(ctypes.Structure):
        _fields_ = [("handled_access_fs", ctypes.c_uint64)]
    rs_attr = ruleset_attr(handled_access_fs=handled)
    rs_fd = _syscall(__NR_landlock_create_ruleset, ctypes.byref(rs_attr), ctypes.sizeof(rs_attr), 0)

    # 2) 읽기와 쓰기 마스크
    read_mask  = _mask("READ_FILE","READ_DIR","EXECUTE")
    write_mask = _mask("READ_FILE","READ_DIR","WRITE_FILE","MAKE_DIR","MAKE_REG",
                       "REMOVE_FILE","REMOVE_DIR","REFER","TRUNCATE")

    # 3) 읽기 전용 경로들
    for d in (os.path.realpath(p) for p in ro_dirs):
        if not os.path.isdir(d): 
            continue
        fd = _open_path_only(d)
        try:
            _syscall(__NR_landlock_add_rule, rs_fd, 1, path_beneath_attr(read_mask, fd), 0)  # 1 = PATH_BENEATH
        finally:
            os.close(fd)

    # 4) 쓰기 허용 루트
    rw_dir = os.path.realpath(rw_dir)
    os.makedirs(rw_dir, exist_ok=True)
    fd = _open_path_only(rw_dir)
    try:
        _syscall(__NR_landlock_add_rule, rs_fd, 1, path_beneath_attr(write_mask, fd), 0)
    finally:
        os.close(fd)

    # 5) 현재 스레드에 적용(자식 상속)
    _syscall(__NR_landlock_restrict_self, rs_fd, 0)
    os.close(rs_fd)

네 기존 메서드 교체본

# sandbox_config.py
from landlock_helper import apply_landlock_rw

def apply_landlock(self, policy: FsPolicy):
    if self._applied:
        return
    # ro_roots, rw_root를 커널에 정확히 등록
    apply_landlock_rw(policy.ro_roots, policy.rw_root)
    self._applied = True
    self.policy = policy
    return {"ok": True, "rw_root": policy.rw_root, "ro_roots": policy.ro_roots}

이 변경으로 “RW 컨트롤을 정확히 어디서 하느냐”가 분명해진다.

  • ro_roots에는 읽기 전용 비트만, rw_root에는 읽기+쓰기 비트를 커널에 고정한다.

  • 이후 temp.py가 rw_root 밖에 쓰려 하면 커널이 EACCES로 거부하고, ro_roots 밖의 경로는 아예 읽기도 거부된다(너가 ro_roots를 얼마나 넓게 잡느냐에 따라 다름).

  • ro_roots와 rw_root가 겹칠 수 있는데, Landlock은 허용 비트를 합집합으로 보므로 rw_root 하위에선 쓰기가 열리고, 그 밖에선 닫힌다. 운영상 문제 없다.

간단 동작 확인

# 설정 단계(LLM 비노출)
cfg = FsPolicy(ro_roots=["/usr","/home/you/data"], rw_root="/home/you/sbx")
SANDBOX.apply_landlock(cfg)

# 테스트
open("/etc/hosts","w").write("x")           # → PermissionError (쓰기 거부)
open("/home/you/data/df.csv","r").read(10)  # → OK (읽기 허용)
open("/home/you/sbx/out.txt","w").write("ok")  # → OK (rw_root 쓰기 허용)

만약 환경 제약으로 위 syscall 방식이 불가하거나 커널 ABI가 낮으면, apply_landlock에서 예외를 받아 “소프트 가드로 폴백”하도록 분기하면 된다. 그래도 질문의 요지였던 “이 코드에서 rw policy를 적용해야 하는가?”에 답하자면, 그렇다. 바로 이 apply_landlock에서 커널에 rw_root용 쓰기 권한 마스크를 명시적으로 추가해야 한다.

아 점점 쓸데 없이 복잡해지는 것 같은데; 정신 차리고 제대로 해보자. 일단 /user/abc 는 읽기 가능 /user/abc/test 안에는 쓰기 가능한 landlock을 하는 가장 간단한 코드 줘봐. 테스트 라인 포함해서

원하는 딱 그거. 리눅스 5.13+ (x86_64 가정)에서 /user/abc는 읽기만, /user/abc/test는 쓰기까지 허용하는 가장 단순한 파이썬 스크립트다. 테스트까지 포함했다. 외부 라이브러리 불필요, ctypes로 커널 UAPI 직접 호출한다.

#!/usr/bin/env python3
import os, sys, ctypes, ctypes.util

# x86_64용 landlock syscall 번호
__NR_landlock_create_ruleset = 444
__NR_landlock_add_rule       = 445
__NR_landlock_restrict_self  = 446

# 권한 비트 (커널 uapi/linux/landlock.h)
class FS:
    EXECUTE     = 1 << 0
    WRITE_FILE  = 1 << 1
    READ_FILE   = 1 << 2
    READ_DIR    = 1 << 3
    REMOVE_DIR  = 1 << 4
    REMOVE_FILE = 1 << 5
    MAKE_CHAR   = 1 << 6
    MAKE_DIR    = 1 << 7
    MAKE_REG    = 1 << 8
    MAKE_SOCK   = 1 << 9
    MAKE_FIFO   = 1 << 10
    MAKE_BLOCK  = 1 << 11
    MAKE_SYM    = 1 << 12
    REFER       = 1 << 13   # ABI v2+
    TRUNCATE    = 1 << 14   # ABI v3+

LANDLOCK_RULE_PATH_BENEATH = 1

libc = ctypes.CDLL(ctypes.util.find_library("c"), use_errno=True)

def _syscall(n, *args):
    r = libc.syscall(ctypes.c_long(n), *args)
    if r < 0:
        e = ctypes.get_errno()
        raise OSError(e, os.strerror(e))
    return r

class ruleset_attr(ctypes.Structure):
    _fields_ = [("handled_access_fs", ctypes.c_uint64)]

class path_beneath_attr(ctypes.Structure):
    _fields_ = [
        ("allowed_access", ctypes.c_uint64),
        ("parent_fd", ctypes.c_int),
    ]

def _open_dir_path_only(p: str) -> int:
    return os.open(p, os.O_PATH | os.O_CLOEXEC)

def _try_create_ruleset(handled_mask: int) -> int:
    attr = ruleset_attr(handled_access_fs=handled_mask)
    return _syscall(__NR_landlock_create_ruleset, ctypes.byref(attr), ctypes.sizeof(attr), 0)

def apply_landlock_read_ro_write_rw(ro_dir: str, rw_dir: str):
    ro_dir = os.path.realpath(ro_dir)
    rw_dir = os.path.realpath(rw_dir)
    os.makedirs(rw_dir, exist_ok=True)

    # 가능한 최대 비트로 시도 → 안 되면 v3(=TRUNCATE) 제외 → 그래도 안 되면 REFER도 제외
    handled_all = (FS.EXECUTE | FS.WRITE_FILE | FS.READ_FILE | FS.READ_DIR |
                   FS.REMOVE_DIR | FS.REMOVE_FILE |
                   FS.MAKE_CHAR | FS.MAKE_DIR | FS.MAKE_REG | FS.MAKE_SOCK |
                   FS.MAKE_FIFO | FS.MAKE_BLOCK | FS.MAKE_SYM |
                   FS.REFER | FS.TRUNCATE)
    masks = [handled_all,
             handled_all & ~FS.TRUNCATE,
             handled_all & ~FS.TRUNCATE & ~FS.REFER]
    rs_fd = None
    last_err = None
    for m in masks:
        try:
            rs_fd = _try_create_ruleset(m)
            break
        except OSError as e:
            last_err = e
    if rs_fd is None:
        raise last_err or RuntimeError("landlock_create_ruleset failed")

    read_mask  = (FS.READ_FILE | FS.READ_DIR | FS.EXECUTE)
    write_mask = (FS.READ_FILE | FS.READ_DIR | FS.WRITE_FILE |
                  FS.MAKE_DIR | FS.MAKE_REG | FS.REMOVE_FILE | FS.REMOVE_DIR |
                  (FS.REFER if (handled_all & FS.REFER) else 0) |
                  (FS.TRUNCATE if (handled_all & FS.TRUNCATE) else 0))

    # 읽기 전용: /user/abc
    fd_ro = _open_dir_path_only(ro_dir)
    try:
        ro_rule = path_beneath_attr(allowed_access=read_mask, parent_fd=fd_ro)
        _syscall(__NR_landlock_add_rule, rs_fd, LANDLOCK_RULE_PATH_BENEATH, ctypes.byref(ro_rule), 0)
    finally:
        os.close(fd_ro)

    # 읽기/쓰기 허용: /user/abc/test
    fd_rw = _open_dir_path_only(rw_dir)
    try:
        rw_rule = path_beneath_attr(allowed_access=write_mask, parent_fd=fd_rw)
        _syscall(__NR_landlock_add_rule, rs_fd, LANDLOCK_RULE_PATH_BENEATH, ctypes.byref(rw_rule), 0)
    finally:
        os.close(fd_rw)

    # 현재 스레드(프로세스)에 적용. 이후 자식 프로세스에도 상속됨.
    _syscall(__NR_landlock_restrict_self, rs_fd, 0)
    os.close(rs_fd)

def main():
    ro = "/user/abc"
    rw = "/user/abc/test"

    # 테스트 준비: 디렉터리와 읽기용 파일 생성(제한 걸기 전)
    os.makedirs(rw, exist_ok=True)
    with open(os.path.join(ro, "readonly.txt"), "w") as f:
        f.write("hello\n")

    print("Applying Landlock: RO =", ro, "RW =", rw)
    apply_landlock_read_ro_write_rw(ro, rw)
    print("Landlock applied.")

    # 테스트 1: ro 디렉터리 쓰기 → 실패해야 함
    try:
        with open(os.path.join(ro, "should_fail.txt"), "w") as f:
            f.write("nope\n")
        print("[FAIL] wrote to RO dir (unexpected)")
    except OSError as e:
        print("[OK] RO write blocked:", e)

    # 테스트 2: rw 디렉터리 쓰기 → 성공해야 함
    try:
        with open(os.path.join(rw, "ok.txt"), "w") as f:
            f.write("works\n")
        print("[OK] RW write succeeded")
    except OSError as e:
        print("[FAIL] RW write blocked:", e)

    # 테스트 3: ro 파일 읽기 → 성공해야 함
    try:
        with open(os.path.join(ro, "readonly.txt"), "r") as f:
            print("[OK] RO read:", f.readline().strip())
    except OSError as e:
        print("[FAIL] RO read blocked:", e)

if __name__ == "__main__":
    if sys.platform != "linux":
        print("Linux 전용입니다.")
        sys.exit(1)
    main()

설명 요약

  • 이 스크립트는 현재 프로세스에만 제약을 건다. 한 번 적용되면 해제 불가, 자식 프로세스에 상속된다.

  • /user/abc는 READ_FILE/READ_DIR(+EXECUTE)만, /user/abc/test는 READ/WRITE/CREATE/REMOVE(+가능하면 REFER/TRUNCATE)까지 허용한다.

  • x86_64 기준 syscall 번호를 썼다. 다른 아키텍처면 번호가 다를 수 있다.

  • 커널이 낮아 TRUNCATE/REFER를 지원하지 않으면 자동으로 빼고 재시도한다.

python landlock 패키지 쓰는거보다 ctype로 하는게 더 낫나?

짧게 정리하면 “무엇을 통제하느냐”에 따라 선택이 갈린다.

  1. ctypes 직접 호출이 나은 경우
  • 경로별 권한 비트(READ_FILE/READ_DIR/WRITE_FILE/MAKE__/REMOVE__ /REFER/TRUNCATE 등)를 정확히 지정해야 할 때

  • 패키지 래퍼가 원하는 기능(예: rw 전용 디렉터리)까지 노출하지 않거나 버전마다 API가 달라 흔들릴 때

  • 지금처럼 “/user/abc는 읽기만, /user/abc/test는 쓰기까지” 같은 세밀한 정책이 필수일 때

장점: 커널 UAPI를 그대로 쓰므로 원하는 동작을 1:1로 구현 가능, 예측 가능성 높음
단점: 아키텍처별 syscall 번호, ABI 호환성(REFER/TRUNCATE 지원 여부) 처리 등 주변 코드가 필요

  1. PyPI landlock 패키지가 나은 경우
  • “읽기만 허용할 루트 몇 개 + 기본은 차단” 같은 단순 모델이면 충분할 때

  • 유지보수 용이성, 가독성, 배포 편의성을 우선할 때

  • 커널 기능 추가(ABI v2/v3)까지 세밀 제어가 굳이 필요 없을 때

장점: 코드 간단, 포터블, 업데이트 시 따라가면 됨
단점: 래퍼가 노출하는 추상화 한계 때문에 per-path 쓰기 권한을 정확히 다루지 못할 수 있음

현실적인 권장

  • 네 요구(RO 디렉터리와 RW 디렉터리를 분리)라면 ctypes로 얇은 헬퍼를 두는 쪽이 깔끔하다. 이미 제공한 50~80줄짜리 helper를 모듈화하면 끝이다. 그 위에 “설정은 네가 직접, LLM에는 노출 금지” 원칙만 지키면 된다.

  • 유지보수 부담을 줄이고 싶다면 하이브리드로 가자. 우선 PyPI 패키지로 “읽기 허용 루트”만 잡고, “쓰기 허용 루트”만 ctypes로 추가 규칙을 넣는 방식도 가능하다. 즉, 간단한 건 래퍼에 맡기고, 꼭 필요한 부분만 커널 UAPI로 보강.

결론

  • 세밀한 RW 컨트롤이 필요하면 ctypes가 더 낫다.

  • 단순 접근제어와 운영 편의가 우선이면 패키지가 더 낫다.

  • 너의 현재 설계(세션 유지 + 명시적 RW 분리)는 ctypes 미니 헬퍼가 가장 직관적이고, LLM 툴 레이어와도 잘 맞는다.

아 redhat 8.10은 그런게 안 되네. 그냥 pure 파이썬으로 간단하게 되는 패키지 없나? 해커가 작정하고 하면 뚫릴수도 있지만 LLM이 그런 동작을 안한다고 가정하고

가능한 한 단순하게 갈게. RHEL 8.10에서 Landlock 안 된다면 “완벽 보안” 대신 실수 방지용 가벼운 가드가 현실적이다. 순수 파이썬만으로도 다음 정도는 막을 수 있다.

  1. sys.addaudithook로 파일 열기(open) 이벤트에서 “쓰기 의도”를 가로채기

  2. os/builtins/pathlib의 쓰기 계열 호출을 몽키패치해 우회 시도 최소화

  3. 옵션으로 subprocess/네트워크 차단

audit hook은 파이썬 3.8+에서 표준이며 open 이벤트는 경로, 모드, 플래그를 넘겨준다. 이걸로 “쓰기 의도”를 정밀하게 잡아낼 수 있다. 다만 감사용 훅이지 하드한 보안 경계는 아니라는 점은 공식적으로도 명시돼 있다(그래도 이번 목표에는 충분히 유용). Python documentation+2

아래는 바로 복붙해 쓸 수 있는 “순수 파이썬 미니 패키지” 코드와 테스트다.

# lite_fs_guard.py
# 순수 파이썬 가드: 쓰기는 write_root 아래만 허용, 읽기는 기본 전체 허용(옵션으로 제한 가능)

import os, sys, builtins, pathlib

class LiteFSGuard:
    def __init__(self, write_root: str, read_allow: list[str] | None = None,
                 allow_read_anywhere: bool = True,
                 block_subprocess: bool = True, block_network: bool = False):
        self.write_root = os.path.realpath(write_root)
        self.read_allow = [os.path.realpath(p) for p in (read_allow or [])]
        self.allow_read_anywhere = allow_read_anywhere
        self.block_subprocess = block_subprocess
        self.block_network = block_network
        os.makedirs(self.write_root, exist_ok=True)
        self._installed = False
        # 원본 함수 백업
        self._orig = {
            "open": builtins.open,
            "os_open": os.open,
            "rename": os.rename,
            "replace": os.replace,
            "remove": os.remove,
            "unlink": os.unlink,
            "mkdir": os.mkdir,
            "makedirs": os.makedirs,
            "rmdir": os.rmdir,
            "chmod": os.chmod,
            "path_open": pathlib.Path.open,
            "os_system": os.system
        }

    def _in_write_root(self, p: str) -> bool:
        rp = os.path.realpath(p)
        return rp == self.write_root or rp.startswith(self.write_root + os.sep)

    def _is_read_allowed(self, p: str) -> bool:
        if self.allow_read_anywhere:
            return True
        rp = os.path.realpath(p)
        if self._in_write_root(rp):
            return True
        return any(rp == r or rp.startswith(r + os.sep) for r in self.read_allow)

    # --------- audit hook ----------
    def _audit(self, event, args):
        if event == "open":
            # args: path, mode, flags (문서화됨)
            # https://docs.python.org/3/library/audit_events.html
            path = str(args[0])
            mode = str(args[1]) if len(args) > 1 else "r"
            flags = int(args[2]) if len(args) > 2 else 0
            write_intent = any(ch in mode for ch in ("w","a","+","x")) or \
                (flags & (os.O_WRONLY | os.O_RDWR | os.O_CREAT | os.O_TRUNC | os.O_APPEND))
            if write_intent and not self._in_write_root(path):
                raise PermissionError(f"Write restricted to {self.write_root}: {path}")
            if not write_intent and not self._is_read_allowed(path):
                raise PermissionError(f"Read not allowed here: {path}")
        elif self.block_subprocess and (event.startswith("subprocess.") or event == "os.system"):
            raise PermissionError("Subprocess disabled by LiteFSGuard")
        elif self.block_network and event.startswith("socket."):
            raise PermissionError("Network disabled by LiteFSGuard")

    # --------- monkey patches (소프트 가드) ----------
    def _patches(self):
        def open_guard(file, mode="r", *a, **k):
            write_intent = any(ch in mode for ch in ("w","a","+","x"))
            if write_intent and not self._in_write_root(file):
                raise PermissionError(f"Write restricted to {self.write_root}: {file}")
            if not write_intent and not self._is_read_allowed(file):
                raise PermissionError(f"Read not allowed here: {file}")
            return self._orig["open"](file, mode, *a, **k)

        def os_open_guard(file, flags, *a, **k):
            write_intent = (flags & (os.O_WRONLY | os.O_RDWR | os.O_CREAT | os.O_TRUNC | os.O_APPEND)) != 0
            if write_intent and not self._in_write_root(file):
                raise PermissionError(f"Write restricted to {self.write_root}: {file}")
            if not write_intent and not self._is_read_allowed(file):
                raise PermissionError(f"Read not allowed here: {file}")
            return self._orig["os_open"](file, flags, *a, **k)

        def rename_guard(src, dst, *a, **k):
            if not (self._in_write_root(src) and self._in_write_root(dst)):
                raise PermissionError("Rename restricted within write_root")
            return self._orig["rename"](src, dst, *a, **k)

        def replace_guard(src, dst, *a, **k):
            if not (self._in_write_root(src) and self._in_write_root(dst)):
                raise PermissionError("Replace restricted within write_root")
            return self._orig["replace"](src, dst, *a, **k)

        def remove_guard(path, *a, **k):
            if not self._in_write_root(path):
                raise PermissionError("Delete restricted within write_root")
            return self._orig["remove"](path, *a, **k)

        def unlink_guard(path, *a, **k):
            if not self._in_write_root(path):
                raise PermissionError("Unlink restricted within write_root")
            return self._orig["unlink"](path, *a, **k)

        def mkdir_guard(path, *a, **k):
            if not self._in_write_root(path):
                raise PermissionError("Mkdir restricted within write_root")
            return self._orig["mkdir"](path, *a, **k)

        def makedirs_guard(path, *a, **k):
            if not self._in_write_root(path):
                raise PermissionError("Makedirs restricted within write_root")
            return self._orig["makedirs"](path, *a, **k)

        def rmdir_guard(path, *a, **k):
            if not self._in_write_root(path):
                raise PermissionError("Rmdir restricted within write_root")
            return self._orig["rmdir"](path, *a, **k)

        def chmod_guard(path, *a, **k):
            if not self._in_write_root(path):
                raise PermissionError("Chmod restricted within write_root")
            return self._orig["chmod"](path, *a, **k)

        def path_open_guard(self_path, *args, **kwargs):
            mode = kwargs.get("mode", "r")
            if len(args) >= 1:
                mode = args[0]
            write_intent = any(ch in mode for ch in ("w","a","+","x"))
            p = str(self_path)
            if write_intent and not self._in_write_root(p):
                raise PermissionError(f"Write restricted to {self.write_root}: {p}")
            if not write_intent and not self._is_read_allowed(p):
                raise PermissionError(f"Read not allowed here: {p}")
            return self._orig["path_open"](self_path, *args, **kwargs)

        builtins.open = open_guard
        os.open = os_open_guard
        os.rename = rename_guard
        os.replace = replace_guard
        os.remove = remove_guard
        os.unlink = unlink_guard
        os.mkdir = mkdir_guard
        os.makedirs = makedirs_guard
        os.rmdir = rmdir_guard
        os.chmod = chmod_guard
        pathlib.Path.open = path_open_guard
        if self.block_subprocess:
            os.system = lambda *a, **k: (_ for _ in ()).throw(PermissionError("os.system disabled"))

    def install(self):
        if self._installed:
            return
        sys.addaudithook(self._audit)  # 3.8+ 표준 훅: 이벤트 목록은 공식 표 참조
        self._patches()
        self._installed = True
        print(f"[lite-fs-guard] active. write_root={self.write_root}, read_anywhere={self.allow_read_anywhere}")

간단 테스트 스크립트

# test_lite_fs_guard.py
import os
from lite_fs_guard import LiteFSGuard

WRITE_ROOT = "/tmp/sbx"
guard = LiteFSGuard(write_root=WRITE_ROOT,
                    allow_read_anywhere=True,   # 어디든 읽기 허용
                    block_subprocess=True,      # 원하면 False
                    block_network=False)        # 원하면 True
guard.install()

# 준비
os.makedirs(WRITE_ROOT, exist_ok=True)
print("try write to /etc/hosts (should fail)")
try:
    open("/etc/hosts", "a").write("# nope\n")
    print("FAIL: wrote to /etc/hosts")
except Exception as e:
    print("OK:", e)

print("try write to write_root (should pass)")
open(os.path.join(WRITE_ROOT, "ok.txt"), "w").write("hello\n")
print("OK: wrote to", os.path.join(WRITE_ROOT, "ok.txt"))

print("try read from /etc/hosts (should pass)")
print(open("/etc/hosts").read().splitlines()[0])

이 접근은 어디까지나 “실수 방지용”이다. 핵심 포인트만 다시 적는다.

  • sys.addaudithook는 표준이며 open 이벤트(path, mode, flags)를 준다. 여기서 정확히 쓰기 의도를 판별할 수 있다. Python documentation

  • audit hook은 감사용이므로 완전한 보안 경계는 아니다. 하지만 이번 전제(LLM이 악의적 동작을 하지 않는다고 가정)에서는 충분히 실용적이다. GitHub

  • 더 강한 격리가 필요하면 도커·랜드락·AppArmor 같은 OS 레벨이 답이지만, Red Hat 8.10 제약이라면 위 가드가 가장 단순한 “순수 파이썬 80/20 해법”이다.

  • 참고로 언어 제한만 필요한 경우 RestrictedPython 같은 도구도 있지만 파일시스템 격리를 제공하는 건 아니다. pyfakefs는 테스트용 가짜 FS이지 보안 격리가 아니다. codejail은 AppArmor가 필요해서 이번 조건에 맞지 않는다. restrictedpython.readthedocs.io+3

다음 스텝으로, 이 가드를 너의 커널 세션(또는 서브프로세스) 시작 직후에 한 번 설치하도록 넣으면 된다. 세션 유지/일회성 모두 동일하게 동작한다.

#!/usr/bin/python3.8 #-- coding: utf-8 -- ## QUEST FOR A PYTHON SANDBOX: # 1. You design an script where an exec() command is ran. # 2. The code ran by exec() is supplied by your opponent, trying e.g. to # write something to the './forbidden.txt' file or access # forbidden_object. Your opponent has # full knowledge of the outer script (no security through obscurity). # There should be no trick to break the sandbox whatsoever. Crashing # the subprocess Cpython does not count. # 3. The exec()uted code should however allow as large subset of Python # capabilities as possible, including interaction with variables/ # functions/objects in the outer script, if and only if they are # explicitly listed as accessible. # 4. Elegant, fast, dependency-frugal solution is also appreciated. # 5. The outer script writes something to the './allowed.txt' file # in the end. user_command = """import sys ## Useful code on the figure import numpy x = numpy.linspace(0,10) y = numpy.sin(x) fig.get_axes()[0].plot(x, y) ## Malicious wannabe exploits follow for l in ().class.base.subclasses(): if l.name == 'code': print(l) ## SANDBOXED: has access to the sandboxed python subprocess only, in line with our intention ## to provide Python environment as-rich-as-possible exec("[ i for i in ().class.base.subclasses() if i.name == 'code']0") ## TEST #fig = numpy.array([666]) ## TODO: unexpected class type of returned objects should be detected after being returned from the sandbox #class SpoofFigure(matplotlib.pyplot.Figure): # def savefig(self): print("Malicious action") ## SANDBOXED: modified class won't be serialized if not defined in a module - can not get out of sandbox #fig = SpoofFig() #import XXX ## UNCLEAR: could some compiled module circumwent the file access hook? Which OS calls should be forbidden, too? ## See https://docs.python.org/3/library/audit_events.html to find them. #open('forbidden','w').write('pwned by l33t h4xx0rz') ## SANDBOXED: fails to write on disk thanks to the audit hook #fig.savefig('forbidden.png') ## SANDBOXED: audit hook applies to all python code #forbidden_object.access() ## SANDBOXED: no access to the forbidden object from the outer scope conn.send([fig]) ## all results can be safely passed (unless it is of custom class etc.) """ ## Prepare the testing example import matplotlib.pyplot as plt fig, ax = plt.subplots(nrows=1, ncols=1) ## the user's code will plot here allowed_objects = [fig] ## no other data exposed to the exec()uted script ## —————- The sandbox implementation follows, not to be changed ——————– def sandbox2020(code, allowed_objects): def ran_in_subprocess(conn): from sys import addaudithook def block_mischief(event,arg): if type(event) != str: raise # Security note: Well-designed objects can be passed to this function that could expose the top-level namespace # (through catching error and reading sys.exc_info()[2].tb_frame.f_back.f_globals). This should not enable modifying # variables outside the multiprocessing sandbox, but could give access to some internal sandbox variables. This function # thus should not refer to any 'lock variables'. It is safer to check the results in the main thread. if event=='open' and type(arg[1])==str and arg[1]!='r': print('\taudit:', event, arg) raise IOError('file write forbidden') if event.split('.')[0] in ['subprocess', 'shutil', 'winreg']: print('\taudit:', event, arg) raise IOError('potentially dangerous, filesystem-accessing functions forbidden') addaudithook(block_mischief) del(block_mischief) ## No way to remove or circumwent audit hooks from python. No access to this function. sandbox_locals['conn'] = conn # automatically add the communication pipe to objects accessible from the sandbox exec(user_command, sandbox_locals) print('len sandbox locals after', len(sandbox_locals)) conn.close() from multiprocessing import Process, Pipe parent_conn, child_conn = Pipe() p = Process(target=ran_in_subprocess, args=(child_conn,)) p.start() ## NOTE: The sandbox is probably safe against file writing, as well as against access into the main process. ## Yet the objects returned from it as results could have been manipulated. Asserting the output objects to be ## of expected data types is an extra safety measure. But be careful whenever your main program flow is ## controlled by the returned objects' attributes, e.g. file paths could change. return parent_conn.recv() p.join() ## Run the sandbox! sandbox_locals = dict((key, value) for key,value in locals().items() if value in allowed_objects) # copy.deepcopy() ## are all obj safely serialized? fig_result = sandbox2020(user_command, sandbox_locals) ## Demonstrate that the main process is not affected open('./allowed.txt','w').write('legally saved file from the main process') print(type(fig_result) == type(fig)) fig_result.savefig('allowed.png') 이런 코드는 어때

좋은 출발이야. 다만 지금 코드는 “쓰기 의도” 판별이 부정확하고, os 계열 조작·서브프로세스·네트워크가 그대로 열려 있어 실수 방지라는 목표에도 살짝 모자라. 구조는 유지하되, 최소한만 손봐서 안전성과 예측가능성을 올린 패치안을 줄게.

핵심 문제 요약

  1. audit 훅의 open 검사: mode가 'r'이 아니면 전부 쓰기라고 가정하고 있는데, 'rb', 'r+', 'a', 'x' 등과 flags까지 모두 봐야 한다. os.open()도 잡아야 한다.

  2. 파일 조작류: rename/replace/remove/unlink/mkdir/makedirs/rmdir 같은 조작 이벤트를 막아야 샌드박스 밖 훼손을 피할 수 있다.

  3. subprocess와 os.system, socket 계열은 명시 차단이 필요하다.

  4. 타임아웃과 join 누락: recv() 대기 중 교착, 무한루프 방지 필요. p.join(timeout) 후 미응답이면 종료.

  5. 인자 전달 혼선: sandbox2020(…, allowed_objects) 인자를 쓰지 않고 전역 sandbox_locals를 닫아 사용하고 있다. 함수 인자를 사용하도록 정리.

아래는 네 코드에 최소 변경만 가한 드롭인 패치다. 동작은 그대로이고, 샌드박스 안에서의 파일 쓰기는 현재 작업 디렉터리 기준으로 “./sandbox/” 하위에서만 허용하도록 했다. 필요하면 WRITE_ROOT를 바꿔.

수정본 전체

#!/usr/bin/python3.8
#-*- coding: utf-8 -*-

import os, sys, time
import matplotlib.pyplot as plt

WRITE_ROOT = os.path.abspath("./sandbox")   # 샌드박스 내에서만 쓰기 허용
os.makedirs(WRITE_ROOT, exist_ok=True)

user_command = """import sys, os, numpy
x = numpy.linspace(0,10)
y = numpy.sin(x)
fig.get_axes()[0].plot(x, y)

# 시도: 금지 영역 쓰기
try:
    open('./forbidden.txt','w').write('nope')
except Exception as e:
    print('forbidden write blocked:', type(e).__name__)

# 시도: 허용 영역 쓰기
open(os.path.join(os.environ['WRITE_ROOT'], 'ok.txt'),'w').write('ok')

# 결과 전달
conn.send([fig])
"""

fig, ax = plt.subplots(nrows=1, ncols=1)
allowed_objects = {"fig": fig}   # exec에 노출할 것만 명시

def sandbox2020(code: str, allowed_globals: dict, timeout_sec=3):
    from multiprocessing import Process, Pipe

    def ran_in_subprocess(conn, allowed):
        import os, sys, pathlib
        os.environ['WRITE_ROOT'] = os.environ.get('WRITE_ROOT', os.path.abspath("./sandbox"))

        def _in_write_root(p: str) -> bool:
            rp = os.path.realpath(p)
            wr = os.path.realpath(os.environ['WRITE_ROOT'])
            return rp == wr or rp.startswith(wr + os.sep)

        def _audit(event, args):
            # open / os.open: (path, mode, flags)
            if event in ("open", "os.open"):
                path = str(args[0])
                mode = str(args[1]) if len(args) > 1 else "r"
                flags = int(args[2]) if len(args) > 2 else 0
                write = any(ch in mode for ch in ("w","a","+","x")) or \
                        (flags & (os.O_WRONLY | os.O_RDWR | os.O_CREAT | os.O_TRUNC | os.O_APPEND))
                if write and not _in_write_root(path):
                    raise PermissionError(f"Write restricted to {os.environ['WRITE_ROOT']}: {path}")

            # 파일 조작류
            elif event in ("os.rename","os.replace","os.remove","os.unlink","os.mkdir","os.makedirs","os.rmdir","os.chmod"):
                path = str(args[0])
                if not _in_write_root(path):
                    raise PermissionError(f"FS op restricted to {os.environ['WRITE_ROOT']}: {path}")

            # 서브프로세스/시스템 호출
            elif event.startswith("subprocess.") or event == "os.system":
                raise PermissionError("Subprocess disabled")

            # 네트워크(원하면 주석 해제)
            # elif event.startswith("socket."):
            #     raise PermissionError("Network disabled")

        sys.addaudithook(_audit)

        # exec용 전역 딕셔너리 구성
        g = {"__builtins__": __builtins__}
        g.update(allowed)
        g["conn"] = conn

        # 실행
        exec(code, g)

        conn.close()

    parent_conn, child_conn = Pipe(duplex=False)
    p = Process(target=ran_in_subprocess, args=(child_conn, allowed_globals))
    p.start()

    # 결과 수신 대기 + 타임아웃
    t0 = time.time()
    while True:
        if parent_conn.poll(0.05):
            result = parent_conn.recv()
            break
        if time.time() - t0 > timeout_sec:
            p.terminate()
            p.join(1)
            raise TimeoutError("sandbox execution timed out")

    p.join(1)
    return result

# 샌드박스 실행
result_list = sandbox2020(user_command, allowed_objects)

# 메인 프로세스는 정상 동작
open('./allowed.txt','w').write('legally saved file from the main process')

# 결과 확인
fig_result = result_list[0]
print("same type:", type(fig_result) == type(fig_result))
fig_result.savefig(os.path.join(WRITE_ROOT, 'allowed.png'))
print("done")

설명과 운용 팁

  1. WRITE_ROOT 안에서만 쓰기 허용: open/os.open에서 mode/flags를 모두 검사해 쓰기 의도를 잡고, 조작성 작업(삭제/이동/생성 등)은 경로가 WRITE_ROOT 내부일 때만 허용했다.

  2. 읽기는 어디서나 가능하게 남겨두었고, 필요하면 _audit의 open 분기에서 읽기도 allowlist로 제한할 수 있다.

  3. subprocess, os.system 차단. 네 목적에 맞게 socket.*도 막을 수 있다.

  4. 타임아웃과 안전 종료 추가. 자식이 결과를 안 보내면 terminate 후 TimeoutError.

  5. allowed_objects는 dict로 명시 주입. exec의 globals로만 전달돼, 그 외 바깥 이름은 보이지 않는다.

  6. 이건 “실수 방지용”이다. 악의적 확장(C 확장에서 직접 open 호출, ctypes로 커널 호출 등)은 막지 못한다는 점은 설계상 감수. 그 대신 코드가 짧고 의존성 없고, LLM이 악의적으로 행동하지 않는다는 전제에서는 실용적이다.

여기서 더 단단히 하고 싶다면, 네가 이미 쓰던 커널 유지 구조에 이 가드를 첫 셀로 주입하고, 파일은 copy_to_sandbox로 WRITE_ROOT 밑으로만 들여와 다루게 만드는 정도면 충분히 80/20을 채운다.

근데 기존 코드에서 return을 먼저하고 join을 하는데 말이 안되는거지?

맞아. 그 코드는 return 다음에 p.join()이 있어 실행되지 않아. 결과를 받은 뒤 프로세스를 정리(join/terminate)하고 리소스를 닫은 다음에 return 해야 한다. 그렇지 않으면 좀비 프로세스가 생길 수 있다(특히 POSIX).

안전한 기본 패턴을 아주 간단히 적으면 이렇게 된다.

  1. 자식 시작 → 부모에서 child_conn 닫기

  2. poll로 타임아웃 감시하며 recv

  3. 결과 받으면 p.join()으로 수거

  4. 타임아웃이면 p.terminate() 후 p.join()

  5. 항상 파이프 닫기

최소 수정 예시

def sandbox2020(code, allowed_globals, timeout_sec=3):
    from multiprocessing import Process, Pipe

    def ran_in_subprocess(conn, allowed):
        # ... 내부 실행 로직 ...
        conn.send(result)
        conn.close()           # 자식 쪽 파이프 닫기 후 종료

    parent_conn, child_conn = Pipe(duplex=False)
    p = Process(target=ran_in_subprocess, args=(child_conn, allowed_globals))
    p.start()
    child_conn.close()          # 부모 쪽에서 child end는 바로 닫기

    try:
        # 타임아웃 감시하면서 결과 수신
        if parent_conn.poll(timeout_sec):
            result = parent_conn.recv()
        else:
            p.terminate()
            p.join(1)
            raise TimeoutError("sandbox execution timed out")

        # 여기서 반드시 조인해서 수거
        p.join(1)
        if p.is_alive():
            # 혹시 아직 살아 있으면 강제 종료 후 재조인
            p.terminate()
            p.join(1)
        return result

    finally:
        parent_conn.close()

한 줄 요약
return을 먼저 하면 그 아래 코드는 죽은 코드가 된다. 결과를 변수에 담고, 프로세스를 수거(p.join)하고, 핸들들을 닫은 뒤에 return 하도록 순서를 바꿔야 한다.

수정할 것: conn 불필요한 것 같은데 그냥 subprocess(with 파일/subprocess sendbox)로 실행하고 stdout 메시지랑 stderr 메시지만 있으면 될 것 같아. 단 에러 난 내용에 대해선 좀 더 자세히 나오면 좋겠어. 어떤 파일을 접근하려다 났는지/어떤 커맨드를 하려다 났는지 <p class="chat--user" markdown="1"> 수정할 것: conn 불필요한 것 같은데 그냥 subprocess(with 파일/subprocess sendbox)로 실행하고 stdout 메시지랑 stderr 메시지만 있으면 될 것 같아. 단 에러 난 내용에 대해선 좀 더 자세히 나오면 좋겠어. 어떤 파일을 접근하려다 났는지/어떤 커맨드를 하려다 났는지 또, async를 활용해서 끝날때까지는 await로 하면 좋겠어

좋아. 파이프(conn) 없이, 별도 파이썬 서브프로세스를 async/await로 실행하고 stdout/stderr만 수집하는 최소 구현을 정리했다. 서브프로세스 안에서는 audit hook으로 “파일 시스템 쓰기”를 WRITE_ROOT 내부로만 제한하고, subprocess/os.system 시도를 차단한다. 에러 메시지는 어떤 파일/커맨드 때문에 막혔는지 자세히 출력된다.

아래 스크립트 하나로 바로 실행해도 되고, run_user_code_async 함수만 가져다 써도 된다.

# sandbox_async.py
# Python 3.8+ 필요. 외부 의존성 없음.
import asyncio, os, sys, tempfile, textwrap, traceback

CHILD_TEMPLATE = r"""
import os, sys

WRITE_ROOT = os.environ.get("WRITE_ROOT", os.path.abspath("./sandbox"))
ALLOW_READ_ANYWHERE = os.environ.get("ALLOW_READ_ANYWHERE", "1") == "1"
READ_ALLOW = os.environ.get("READ_ALLOW", "")
READ_ALLOW = [os.path.realpath(p) for p in READ_ALLOW.split(os.pathsep) if p]

def _real(p): 
    try: return os.path.realpath(p)
    except Exception: return str(p)

def _in_write_root(p):
    rp = _real(p); wr = _real(WRITE_ROOT)
    return rp == wr or rp.startswith(wr + os.sep)

def _is_read_allowed(p):
    if ALLOW_READ_ANYWHERE: 
        return True
    rp = _real(p)
    if _in_write_root(rp): 
        return True
    return any(rp == r or rp.startswith(r + os.sep) for r in READ_ALLOW)

def _audit(event, args):
    # 파일 열기/생성
    if event in ("open", "os.open"):
        path = str(args[0])
        mode = str(args[1]) if len(args) > 1 else "r"
        flags = int(args[2]) if len(args) > 2 else 0
        write_intent = any(ch in mode for ch in ("w","a","+","x")) or \
            (flags & (os.O_WRONLY | os.O_RDWR | os.O_CREAT | os.O_TRUNC | os.O_APPEND))
        if write_intent and not _in_write_root(path):
            msg = f"[AUDIT] blocked: event={event} path={_real(path)} mode={mode} flags={flags} reason=write-outside-WRITE_ROOT({WRITE_ROOT})"
            print(msg, file=sys.stderr, flush=True)
            raise PermissionError(msg)
        # 선택: 읽기 제한을 두고 싶으면 여기서 검사
        if (not write_intent) and (not _is_read_allowed(path)):
            msg = f"[AUDIT] blocked: event={event} path={_real(path)} reason=read-not-allowed"
            print(msg, file=sys.stderr, flush=True)
            raise PermissionError(msg)

    # 파일 조작류 (경로가 첫 인자에 온다고 가정; rename/replace는 src,dst 둘 다 체크)
    elif event in ("os.rename","os.replace"):
        src = str(args[0]); dst = str(args[1])
        if not (_in_write_root(src) and _in_write_root(dst)):
            msg = f"[AUDIT] blocked: event={event} src={_real(src)} dst={_real(dst)} reason=op-outside-WRITE_ROOT({WRITE_ROOT})"
            print(msg, file=sys.stderr, flush=True)
            raise PermissionError(msg)
    elif event in ("os.remove","os.unlink","os.mkdir","os.makedirs","os.rmdir","os.chmod"):
        path = str(args[0])
        if not _in_write_root(path):
            msg = f"[AUDIT] blocked: event={event} path={_real(path)} reason=op-outside-WRITE_ROOT({WRITE_ROOT})"
            print(msg, file=sys.stderr, flush=True)
            raise PermissionError(msg)

    # 서브프로세스/시스템 호출 차단
    elif event.startswith("subprocess.") or event == "os.system":
        cmd = None
        try:
            # subprocess.Popen은 첫 인자로 args(커맨드)가 들어오는 경우가 많다
            cmd = args[0]
        except Exception:
            cmd = "<unknown>"
        msg = f"[AUDIT] blocked: event={event} cmd={cmd} reason=subprocess-disabled"
        print(msg, file=sys.stderr, flush=True)
        raise PermissionError(msg)

    # 네트워크를 막고 싶으면 주석 해제
    # elif event.startswith("socket."):
    #     msg = f"[AUDIT] blocked: event={event} reason=network-disabled"
    #     print(msg, file=sys.stderr, flush=True)
    #     raise PermissionError(msg)

# 설치
sys.addaudithook(_audit)

# 사용자 코드 실행
USER_CODE = r\"\"\"{user_code}\"\"\"
g = {"__name__": "__main__"}
try:
    exec(USER_CODE, g, g)
except Exception as e:
    # 에러를 자세히 stderr로
    print("[ERROR] exception while executing user code:", file=sys.stderr)
    traceback.print_exc()
    raise
"""

async def run_user_code_async(
    code: str,
    write_root: str = "./sandbox",
    allow_read_anywhere: bool = True,
    read_allow: list[str] | None = None,
    timeout_sec: float = 10.0,
    python_executable: str | None = None,
):
    """
    user code를 별도 파이썬 서브프로세스에서 실행하고 stdout/stderr만 수집.
    쓰기는 write_root 내부에서만 허용. subprocess/os.system은 차단.
    """
    write_root = os.path.abspath(write_root)
    os.makedirs(write_root, exist_ok=True)
    py = python_executable or sys.executable

    child_code = CHILD_TEMPLATE.format(user_code=code)
    # 임시 파일에 자식 스크립트 저장
    with tempfile.NamedTemporaryFile("w", delete=False, suffix=".py") as f:
        child_path = f.name
        f.write(child_code)

    # 자식 환경
    env = os.environ.copy()
    env["WRITE_ROOT"] = write_root
    env["ALLOW_READ_ANYWHERE"] = "1" if allow_read_anywhere else "0"
    if read_allow:
        env["READ_ALLOW"] = os.pathsep.join(os.path.realpath(p) for p in read_allow)

    try:
        proc = await asyncio.create_subprocess_exec(
            py, child_path,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
            env=env,
        )
        try:
            stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout_sec)
        except asyncio.TimeoutError:
            proc.kill()
            try:
                await asyncio.wait_for(proc.wait(), timeout=2.0)
            except asyncio.TimeoutError:
                pass
            return {
                "returncode": 124,
                "stdout": "",
                "stderr": f"[TIMEOUT] exceeded {timeout_sec}s",
            }

        rc = proc.returncode
        return {
            "returncode": rc,
            "stdout": stdout.decode(errors="replace"),
            "stderr": stderr.decode(errors="replace"),
        }
    finally:
        try:
            os.unlink(child_path)
        except Exception:
            pass

# 데모
if __name__ == "__main__":
    async def main():
        user_code = """
import os, sys, numpy
print("hello from user code")
# 허용된 쓰기
open(os.path.join(os.environ['WRITE_ROOT'], 'ok.txt'), 'w').write('ok')
# 금지된 쓰기
try:
    open('/etc/forbidden.txt','w').write('no')
except Exception as e:
    print("caught:", type(e).__name__)
"""
        result = await run_user_code_async(user_code, write_root="./sandbox", timeout_sec=5)
        print("RETURN =", result["returncode"])
        print("STDOUT:")
        print(result["stdout"])
        print("STDERR:")
        print(result["stderr"])
    asyncio.run(main())

핵심 포인트 요약

  1. 별도 파이썬 프로세스에서 실행하므로 세션 유지가 필요 없을 때 가볍고 안전하다. 세션 유지형이 필요하면 프로세스를 길게 붙잡는 래퍼를 만들면 된다.

  2. 파일 쓰기는 WRITE_ROOT 내부만 허용. 금지 시도는 stderr에 [AUDIT] 라인으로 이벤트, 경로, 이유가 찍히고 PermissionError를 일으킨다.

  3. subprocess/os.system은 차단되고, 차단 사유와 시도한 커맨드를 stderr로 남긴다.

  4. await로 결과를 받을 때까지 기다린다. 타임아웃이 나면 returncode 124와 함께 종료한다.

  5. 읽기는 기본적으로 어디서든 허용하지만, allow_read_anywhere=False로 두고 read_allow 리스트를 주면 읽기도 경로 기반 허용으로 좁힐 수 있다.

이제 네 에이전트 툴 쪽에서는 이 함수 하나만 감싸서 호출하면 된다. 툴 인자는 code, write_root, timeout 정도만 노출하고, 정책(읽기 허용 범위)은 서버 측에서만 관리해도 깔끔하다.

추가 구현 요청: read_path(리스트)와 write_path(리스트)를 받도록 하고 싶어. 그리고 시작하는 cwd(Working directory)도 코드 시작할 때 코드랑 같이 입력 하면 좋겠어. 없으면 그냥 None으로 하기. 그런데 cwd를 subprocess 에서 바꿔도 부모 프로세스에는 상관이 없지?

정리 먼저.

  • 요청 반영: read_path(여러 개), write_path(여러 개) 받기, 시작 작업 디렉터리(cwd) 옵션, 비동기(async/await) 실행, 결과는 stdout/stderr만.

  • 서브프로세스 안에서 audit hook으로 파일/서브프로세스 시도 감시. 어떤 파일·커맨드 때문에 막혔는지 stderr에 자세히 씀.

  • 서브프로세스에서 cwd를 바꿔도 부모 프로세스에는 영향 없다(프로세스마다 현재 디렉터리는 독립).

아래는 바로 쓸 수 있는 최소 구현이다. 외부 의존성 없음, Python 3.8+.

# sandbox_async_paths.py
import asyncio, os, sys, tempfile

CHILD_TEMPLATE = r"""
import os, sys, traceback

WRITE_DIRS = [p for p in os.environ.get("WRITE_DIRS","").split(os.pathsep) if p]
READ_DIRS  = [p for p in os.environ.get("READ_DIRS","").split(os.pathsep) if p]
ALLOW_READ_ANYWHERE = os.environ.get("ALLOW_READ_ANYWHERE","1") == "1"

def _real(p):
    try: return os.path.realpath(p)
    except Exception: return str(p)

def _is_under_any(path, roots):
    rp = _real(path)
    for r in roots:
        rr = _real(r)
        if rp == rr or rp.startswith(rr + os.sep):
            return True
    return False

def _audit(event, args):
    try:
        # 파일 열기
        if event in ("open", "os.open"):
            path = str(args[0])
            mode = str(args[1]) if len(args) > 1 else "r"
            flags = int(args[2]) if len(args) > 2 else 0
            write = any(ch in mode for ch in ("w","a","+","x")) or \
                    (flags & (os.O_WRONLY|os.O_RDWR|os.O_CREAT|os.O_TRUNC|os.O_APPEND))
            if write and not _is_under_any(path, WRITE_DIRS):
                msg = f"[AUDIT] blocked: event={event} path={_real(path)} mode={mode} flags={flags} reason=write-outside-allowed"
                print(msg, file=sys.stderr, flush=True)
                raise PermissionError(msg)
            if not write and not ALLOW_READ_ANYWHERE and not _is_under_any(path, READ_DIRS+WRITE_DIRS):
                msg = f"[AUDIT] blocked: event={event} path={_real(path)} reason=read-not-allowed"
                print(msg, file=sys.stderr, flush=True)
                raise PermissionError(msg)

        # 파일 조작류
        elif event in ("os.rename","os.replace"):
            src = str(args[0]); dst = str(args[1])
            if not (_is_under_any(src, WRITE_DIRS) and _is_under_any(dst, WRITE_DIRS)):
                msg = f"[AUDIT] blocked: event={event} src={_real(src)} dst={_real(dst)} reason=op-outside-allowed"
                print(msg, file=sys.stderr, flush=True)
                raise PermissionError(msg)
        elif event in ("os.remove","os.unlink","os.mkdir","os.makedirs","os.rmdir","os.chmod"):
            path = str(args[0])
            if not _is_under_any(path, WRITE_DIRS):
                msg = f"[AUDIT] blocked: event={event} path={_real(path)} reason=op-outside-allowed"
                print(msg, file=sys.stderr, flush=True)
                raise PermissionError(msg)

        # 서브프로세스/시스템 호출 차단
        elif event.startswith("subprocess.") or event == "os.system":
            cmd = None
            try: cmd = args[0]
            except Exception: cmd = "<unknown>"
            msg = f"[AUDIT] blocked: event={event} cmd={cmd} reason=subprocess-disabled"
            print(msg, file=sys.stderr, flush=True)
            raise PermissionError(msg)

        # 네트워크를 막고 싶으면 주석 해제
        # elif event.startswith("socket."):
        #     msg = f"[AUDIT] blocked: event={event} reason=network-disabled"
        #     print(msg, file=sys.stderr, flush=True)
        #     raise PermissionError(msg)

    except Exception:
        raise

sys.addaudithook(_audit)

USER_CODE = r\"\"\"{user_code}\"\"\"
g = {"__name__":"__main__"}
try:
    exec(USER_CODE, g, g)
except Exception:
    print("[ERROR] exception while executing user code:", file=sys.stderr)
    traceback.print_exc()
    raise
"""

async def run_user_code_async(
    code: str,
    *,
    read_paths: list[str] | None = None,
    write_paths: list[str] | None = None,
    cwd: str | None = None,
    allow_read_anywhere: bool | None = None,
    timeout_sec: float = 10.0,
    python_executable: str | None = None,
):
    """
    별도 파이썬 서브프로세스에서 사용자 코드를 실행한다.
    - read_paths: 읽기 허용 경로 목록(하위 포함). None 또는 []면 읽기 어디든 허용(allow_read_anywhere=True).
    - write_paths: 쓰기 허용 경로 목록(하위 포함). None/[]면 쓰기 전부 차단.
    - cwd: 서브프로세스 시작 작업 디렉터리. None이면 부모의 cwd를 상속. 자식이 바꿔도 부모에는 영향 없음.
    - stdout/stderr를 수집해 반환.
    """
    py = python_executable or sys.executable
    rp = [os.path.realpath(p) for p in (read_paths or [])]
    wp = [os.path.realpath(p) for p in (write_paths or [])]
    for d in wp:
        os.makedirs(d, exist_ok=True)
    if allow_read_anywhere is None:
        allow_read_anywhere = (not rp)  # 읽기 경로 미지정이면 어디든 읽기 허용

    child_code = CHILD_TEMPLATE.format(user_code=code)
    with tempfile.NamedTemporaryFile("w", delete=False, suffix=".py") as f:
        child_path = f.name
        f.write(child_code)

    env = os.environ.copy()
    env["READ_DIRS"] = os.pathsep.join(rp)
    env["WRITE_DIRS"] = os.pathsep.join(wp)
    env["ALLOW_READ_ANYWHERE"] = "1" if allow_read_anywhere else "0"

    try:
        proc = await asyncio.create_subprocess_exec(
            py, child_path,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
            env=env,
            cwd=cwd,  # 자식의 시작 작업 디렉터리. 부모엔 영향 없음.
        )
        try:
            stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout_sec)
        except asyncio.TimeoutError:
            proc.kill()
            try:
                await asyncio.wait_for(proc.wait(), timeout=2.0)
            except asyncio.TimeoutError:
                pass
            return {"returncode":124, "stdout":"", "stderr":f"[TIMEOUT] exceeded {timeout_sec}s"}
        return {
            "returncode": proc.returncode,
            "stdout": stdout.decode(errors="replace"),
            "stderr": stderr.decode(errors="replace"),
        }
    finally:
        try: os.unlink(child_path)
        except Exception: pass

# 데모 실행
if __name__ == "__main__":
    async def main():
        code = r"""
import os, sys, subprocess
print("CWD:", os.getcwd())
# 허용된 쓰기
open(os.path.join("out_dir","ok.txt"), "w").write("ok")  # 상대경로는 cwd 기준
# 금지된 쓰기
try:
    open("/tmp/blocked.txt","w").write("x")
except Exception as e:
    print("blocked write:", type(e).__name__)
# 서브프로세스 시도
try:
    subprocess.check_output(["echo","hi"])
except Exception as e:
    print("blocked subproc:", type(e).__name__)
# 허용된 읽기(예: read_paths가 지정되었거나 allow_read_anywhere=True인 경우)
print("done")
"""
        # 예시: cwd를 ./work로, 쓰기는 ./work/out_dir 아래만, 읽기는 어디든 허용
        os.makedirs("./work/out_dir", exist_ok=True)
        res = await run_user_code_async(
            code,
            read_paths=None,                 # None → 어디든 읽기 허용
            write_paths=["./work/out_dir"],  # 여기만 쓰기 허용
            cwd="./work",                    # 자식의 시작 CWD
            timeout_sec=5,
        )
        print("RC:", res["returncode"])
        print("STDOUT:\n", res["stdout"])
        print("STDERR:\n", res["stderr"])
    asyncio.run(main())

포인트

  • read_paths와 write_paths는 하위 폴더까지 포함해 허용된다. 쓰기는 write_paths 중 하나에 포함될 때만 통과.

  • read_paths를 지정하지 않으면 allow_read_anywhere=True로 동작해 어디든 읽기 허용(요구가 느슨할 때 편함). 읽기도 제한하려면 read_paths를 지정하고 allow_read_anywhere를 생략하면 자동으로 False로 바뀐다.

  • cwd 인자는 서브프로세스에만 적용된다. 자식이 os.chdir로 바꿔도 부모 프로세스에는 아무 영향 없다.

  • 차단 시 stderr에 [AUDIT] 라인으로 이벤트, 경로, 커맨드, 이유를 상세히 남긴다.

{event}가 없대 format 부분에서

맞아. 그 에러는 CHILD_TEMPLATE 전체에 .format(...)을 걸면서 템플릿 안에 있는 f-string의 {event}, {args} 같은 중괄호를 바깥 포맷터가 “키 event/args를 달라”로 오해했기 때문에 생긴 거야.

해결은 셋 중 하나로 하면 깔끔해.

  1. string.Template로 치환하기(가장 간단)

  2. 템플릿 안 모든 중괄호를 이스케이프해서 ``로 바꾸고 .format(user_code=...) 사용

  3. 아예 사용자 코드 위치만 특수 토큰으로 두고 .replace()로 치환

바로 돌아가는 패치(Template 사용)만 보여줄게. 너의 기존 run_user_code_async에서 템플릿 치환 부분만 바꾸면 된다.

수정 전
child_code = CHILD_TEMPLATE.format(user_code=code)

수정 후
from string import Template
child_code = Template(CHILD_TEMPLATE).substitute(USER_CODE=code)

그리고 CHILD_TEMPLATE 안의 자리표시자도 ${USER_CODE}로 바꿔줘야 한다.

예시(핵심 부분만)
CHILD_TEMPLATE = r"""
import os, sys, traceback

… 중략 …

sys.addaudithook(_audit)

USER_CODE = r"""${USER_CODE}"""
g = {"name ":"main "}
try:
exec(USER_CODE, g, g)
except Exception:
print("[ERROR] exception while executing user code:", file=sys.stderr)
traceback.print_exc()
raise
"""

이렇게 바꾸면 템플릿 안의 f-string들(예: f"[AUDIT] blocked: event={event} …")은 손대지 않아도 되고, 바깥 치환기(Template)는 $만 보고 치환하므로 {event}를 건드리지 않는다.

참고로 .format()을 꼭 쓰고 싶다면 템플릿 내부의 모든 중괄호를 ``로 이스케이프해야 한다. 예: f"[AUDIT] blocked: event="처럼. 그런데 그러면 내부 f-string이 더 이상 f-string이 아니게 되니, 그 경우엔 f-string을 포기하고 나중에 .format(event=event, …)를 자식 프로세스에서 호출하는 식으로 바꿔야 해서 코드가 번거로워진다. Template가 제일 깔끔해.

async multi process 를 이용할 순 없나 string으로 코드를 줄줄이 쓰니깐 좀 …

맞아. 문자열 템플릿로 “자식 스크립트”를 만들지 말고, 멀티프로세싱으로 독립 프로세스에서 실행하되 async/await로 감싸면 깔끔하다. 아래 코드는 순수 파이썬(의존성 없음)으로 동작하고, 자식 프로세스는 audit hook으로 파일/서브프로세스를 감시한다. 읽기·쓰기 허용 경로는 리스트로 받고, 시작 CWD도 옵션으로 준다. 자식에서 CWD를 바꿔도 부모에는 영향 없다.

# sandbox_async_multiprocess.py
# Python 3.8+ 전제. 외부 의존성 없음.

import asyncio
import multiprocessing as mp
import os, sys, io, traceback, time
from typing import List, Optional, Dict, Any

def _install_guard(read_paths: List[str], write_paths: List[str],
                   allow_read_anywhere: bool,
                   block_subprocess: bool = True,
                   block_network: bool = False):
    rp = [os.path.realpath(p) for p in (read_paths or [])]
    wp = [os.path.realpath(p) for p in (write_paths or [])]

    def _real(p):
        try: return os.path.realpath(p)
        except Exception: return str(p)

    def _under_any(path, roots):
        path = _real(path)
        for r in roots:
            r = _real(r)
            if path == r or path.startswith(r + os.sep):
                return True
        return False

    def _audit(event, args):
        # open / os.open: (path, mode, flags)
        if event in ("open", "os.open"):
            path = str(args[0])
            mode = str(args[1]) if len(args) > 1 else "r"
            flags = int(args[2]) if len(args) > 2 else 0
            write = any(ch in mode for ch in ("w","a","+","x")) or \
                    (flags & (os.O_WRONLY|os.O_RDWR|os.O_CREAT|os.O_TRUNC|os.O_APPEND))
            if write and not _under_any(path, wp):
                msg = f"[AUDIT] blocked: event={event} path={_real(path)} mode={mode} flags={flags} reason=write-outside-allowed"
                print(msg, file=sys.stderr, flush=True)
                raise PermissionError(msg)
            if not write and not allow_read_anywhere and not _under_any(path, rp+wp):
                msg = f"[AUDIT] blocked: event={event} path={_real(path)} reason=read-not-allowed"
                print(msg, file=sys.stderr, flush=True)
                raise PermissionError(msg)

        # 파일 조작(경로 첫 인자; rename/replace는 src,dst 둘 다 확인)
        elif event in ("os.rename","os.replace"):
            src, dst = str(args[0]), str(args[1])
            if not (_under_any(src, wp) and _under_any(dst, wp)):
                msg = f"[AUDIT] blocked: event={event} src={_real(src)} dst={_real(dst)} reason=op-outside-allowed"
                print(msg, file=sys.stderr, flush=True)
                raise PermissionError(msg)
        elif event in ("os.remove","os.unlink","os.mkdir","os.makedirs","os.rmdir","os.chmod"):
            path = str(args[0])
            if not _under_any(path, wp):
                msg = f"[AUDIT] blocked: event={event} path={_real(path)} reason=op-outside-allowed"
                print(msg, file=sys.stderr, flush=True)
                raise PermissionError(msg)

        # 서브프로세스/시스템 호출
        elif block_subprocess and (event.startswith("subprocess.") or event == "os.system"):
            cmd = None
            try: cmd = args[0]
            except Exception: cmd = "<unknown>"
            msg = f"[AUDIT] blocked: event={event} cmd={cmd} reason=subprocess-disabled"
            print(msg, file=sys.stderr, flush=True)
            raise PermissionError(msg)

        # 네트워크 차단 옵션
        elif block_network and event.startswith("socket."):
            msg = f"[AUDIT] blocked: event={event} reason=network-disabled"
            print(msg, file=sys.stderr, flush=True)
            raise PermissionError(msg)

    sys.addaudithook(_audit)

def _child_worker(code: str,
                  read_paths: List[str],
                  write_paths: List[str],
                  cwd: Optional[str],
                  allow_read_anywhere: bool,
                  block_subprocess: bool,
                  block_network: bool,
                  result_q: mp.Queue):
    # stdout/stderr 캡처
    stdout_buf, stderr_buf = io.StringIO(), io.StringIO()
    rc = 0
    try:
        # CWD 설정(부모에는 영향 없음)
        if cwd:
            os.chdir(cwd)

        # 가드 설치
        _install_guard(read_paths, write_paths, allow_read_anywhere,
                       block_subprocess=block_subprocess, block_network=block_network)

        # 코드 실행
        g = {"__name__": "__main__"}
        import contextlib
        with contextlib.redirect_stdout(stdout_buf), contextlib.redirect_stderr(stderr_buf):
            try:
                exec(code, g, g)
            except Exception:
                rc = 1
                print("[ERROR] exception while executing user code:", file=sys.stderr)
                traceback.print_exc()

    except Exception as e:
        rc = 1
        # 가드 초기화 중 오류 등
        print(f"[ERROR] sandbox init/exec failed: {type(e).__name__}: {e}", file=stderr_buf)
        traceback.print_exc(file=stderr_buf)
    finally:
        result_q.put({
            "returncode": rc,
            "stdout": stdout_buf.getvalue(),
            "stderr": stderr_buf.getvalue(),
        })

def _run_blocking(code: str,
                  read_paths: List[str],
                  write_paths: List[str],
                  cwd: Optional[str],
                  allow_read_anywhere: bool,
                  timeout_sec: float,
                  block_subprocess: bool,
                  block_network: bool) -> Dict[str, Any]:
    q: mp.Queue = mp.Queue(maxsize=1)
    p = mp.Process(target=_child_worker, args=(
        code, read_paths, write_paths, cwd, allow_read_anywhere,
        block_subprocess, block_network, q
    ))
    p.start()

    # 결과 대기(타임아웃 지원)
    t0 = time.time()
    result = None
    while True:
        if not q.empty():
            result = q.get()
            break
        if not p.is_alive():
            # 비정상 종료 케이스
            result = {"returncode": p.exitcode if p.exitcode is not None else 1,
                      "stdout": "", "stderr": "[ERROR] child exited unexpectedly"}
            break
        if time.time() - t0 > timeout_sec:
            p.terminate()
            p.join(1.0)
            return {"returncode": 124, "stdout": "", "stderr": f"[TIMEOUT] exceeded {timeout_sec}s"}
        time.sleep(0.01)

    p.join(1.0)
    if p.is_alive():
        p.terminate()
        p.join(1.0)
    return result

async def run_user_code_mp_async(
    code: str,
    *,
    read_paths: Optional[List[str]] = None,
    write_paths: Optional[List[str]] = None,
    cwd: Optional[str] = None,
    allow_read_anywhere: Optional[bool] = None,
    timeout_sec: float = 10.0,
    block_subprocess: bool = True,
    block_network: bool = False,
) -> Dict[str, Any]:
    """
    별도 프로세스에서 사용자 코드를 실행하고 stdout/stderr만 수집한다(비동기).
    - read_paths: 읽기 허용 경로 리스트(하위 포함). None이면 어디든 읽기 허용.
    - write_paths: 쓰기 허용 경로 리스트(하위 포함). None/[]면 쓰기 전부 차단.
    - cwd: 자식 프로세스 시작 CWD. None이면 부모 CWD 상속. 자식 변경은 부모에 영향 없음.
    - block_subprocess: 서브프로세스 호출 차단 여부.
    - block_network: 네트워크 차단 여부(기본 False).
    - timeout_sec: 시간 초과시 returncode=124 반환.
    """
    if allow_read_anywhere is None:
        allow_read_anywhere = (not read_paths)  # 읽기 경로 미지정 → 어디든 읽기 허용

    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        None,
        _run_blocking,
        code,
        read_paths or [],
        write_paths or [],
        cwd,
        allow_read_anywhere,
        timeout_sec,
        block_subprocess,
        block_network,
    )

# 데모
if __name__ == "__main__":
    async def main():
        demo_code = r"""
import os, sys, subprocess
print("hello, cwd:", os.getcwd())
# 허용된 쓰기
open("out/ok.txt","w").write("ok")
# 금지된 쓰기
try:
    open("/etc/blocked.txt","w").write("x")
except Exception as e:
    print("blocked write:", type(e).__name__)
# 서브프로세스 시도(차단)
try:
    subprocess.check_output(["echo","hi"])
except Exception as e:
    print("blocked subproc:", type(e).__name__)
print("done")
"""
        os.makedirs("./work/out", exist_ok=True)
        res = await run_user_code_mp_async(
            demo_code,
            read_paths=None,               # 읽기 어디든
            write_paths=["./work/out"],    # 여기에만 쓰기 허용
            cwd="./work",
            timeout_sec=5,
            block_subprocess=True,
            block_network=False,
        )
        print("RC:", res["returncode"])
        print("STDOUT:\n", res["stdout"])
        print("STDERR:\n", res["stderr"])

    # Windows spawn 대비
    if sys.platform == "win32":
        mp.freeze_support()
    asyncio.run(main())

핵심 포인트

  1. 문자열 템플릿로 “자식 스크립트”를 만들지 않는다. 모듈 내의 _child_worker가 별도 프로세스에서 실행되어 exec(code)를 돌린다.

  2. read_paths, write_paths를 리스트로 받아 하위까지 허용한다. 쓰기 시도는 write_paths 바깥이면 [AUDIT] 라인이 stderr에 찍히고 PermissionError가 난다. 어떤 파일을 쓰려 했는지, 어떤 조작을 하려 했는지 모두 기록된다.

  3. async/await는 run_in_executor로 멀티프로세스 작업을 논블로킹으로 감싼다. 타임아웃이 나면 자식을 terminate하고 returncode=124로 돌려준다.

  4. 자식에서 cwd를 바꿔도 부모에는 영향 없다. 시작 CWD는 인자로 지정 가능하다.

  5. 서브프로세스, os.system을 차단했고, 필요하면 block_network=True로 소켓도 막을 수 있다.

여기서 세션 유지형이 필요해지면, 프로세스를 오래 살려두고 큐로 여러 코드를 보내는 “세션 워커” 패턴으로 확장하면 된다. 지금은 요구하신 “간단하고 의존성 없는, async 멀티프로세스 + stdout/stderr만 수집”에 맞춘 최소 구현이다.

근데 이런 식으로 수행하면 parent 코드에 있는 글로벌 변수 같은걸 user code가 접근할수도 있게 되나?

핵심만 딱 정리.

  1. 지금 멀티프로세스 설계에서, 기본이 Linux의 fork면 자식이 부모의 메모리 스냅샷을 “읽기”는 할 수 있다. 예컨대 user code가 import main 해서 모듈 전역을 보면, 포크 시점의 값들이 보인다. 부모 상태를 바꾸진 못하지만, 전역 값이 노출되는 건 맞다.

  2. 이걸 원천 차단하려면 시작 방법을 spawn으로 강제하면 된다. spawn은 새 인터프리터를 깨끗하게 띄우고 모듈을 다시 임포트하므로, 부모의 런타임 전역 객체가 그대로 보이지 않는다. 네가 의도적으로 넘겨준 값(인자, 큐 메시지, exec에 주는 dict)만 접근 가능하다. Windows/macOS는 기본이 spawn, Linux는 기본이 fork라서 강제하는 것이 중요하다.

  3. 추가로 환경변수는 기본적으로 상속된다. 민감 정보가 있다면 자식에서 os.environ을 초기화하거나, 부모에서 run_user_code_mp_async 호출 전에 “허용 리스트만” 넘기는 방식을 써라.

아래는 내 이전 코드에 “spawn 강제 + 환경변수 정리 옵션”을 넣은 패치다. 이러면 user code는 부모 전역에 직접 접근할 수 없다.

# patch: spawn 사용 + 선택적 환경변수 정리
import multiprocessing as mp

def _child_worker(..., sanitize_env: bool, allowed_env_keys: list[str] | None, ...):
    # CWD 설정
    if cwd:
        os.chdir(cwd)

    # 환경변수 정리(선택)
    if sanitize_env:
        keep = set(allowed_env_keys or [])
        for k in list(os.environ.keys()):
            if k not in keep:
                os.environ.pop(k, None)

    # 가드 설치
    _install_guard(...)

    # 완전히 깨끗한 글로벌 dict로 실행
    g = {"__name__": "__main__"}
    with contextlib.redirect_stdout(stdout_buf), contextlib.redirect_stderr(stderr_buf):
        try:
            exec(code, g, g)
        except Exception:
            rc = 1
            print("[ERROR] exception while executing user code:", file=sys.stderr)
            traceback.print_exc()
    ...

def _run_blocking(..., sanitize_env: bool, allowed_env_keys: list[str] | None, ...) -> dict:
    ctx = mp.get_context("spawn")  # 여기!
    q: mp.Queue = ctx.Queue(maxsize=1)
    p = ctx.Process(target=_child_worker, args=(..., sanitize_env, allowed_env_keys, ...))
    p.start()
    ...
    return result

async def run_user_code_mp_async(..., sanitize_env: bool = False, allowed_env_keys: list[str] | None = None, ...):
    ...
    return await loop.run_in_executor(
        None,
        _run_blocking,
        code,
        read_paths or [],
        write_paths or [],
        cwd,
        allow_read_anywhere,
        timeout_sec,
        block_subprocess,
        block_network,
        sanitize_env,
        allowed_env_keys,
    )

요점 체크

  • spawn을 쓰면 user code는 부모 전역에 접근하지 못한다. 네가 exec에 넣은 dict(g)와 네가 넘긴 인자만 본다.

  • fork를 쓰면 import main 등으로 부모 모듈 전역 “값”을 읽을 수 있다. 그래서 Linux에서는 반드시 spawn으로 강제하자.

  • 자식에서 os.chdir로 CWD를 바꿔도 부모에는 영향 없다. 프로세스별로 독립이다.

  • 환경변수는 기본 상속이라 민감 정보가 있다면 sanitize_env=True와 allowed_env_keys로 최소화하자.

이 설정이면, LLM이 악의적이진 않다는 전제에서 “부모 전역 접근” 걱정은 깔끔히 정리된다.

음 OK. 전체 코드 다 써줘.

좋아. 아래 스크립트 하나로 끝난다. 멀티프로세스로 분리해 exec(code)를 돌리고, spawn 컨텍스트로 부모 전역 노출을 막았다. read_paths와 write_paths 리스트로 경로 허용을 설정하고, 자식 프로세스 시작 cwd도 선택적으로 지정한다. 파일·서브프로세스(옵션으로 네트워크) 시도는 audit hook에서 차단하며, 어떤 경로/커맨드였는지 stderr에 자세히 남긴다. 비동기 API는 run_user_code_mp_async 한 함수만 쓰면 된다.

# sandbox_async_multiprocess_full.py
# Python 3.8+ / 외부 의존성 없음

import asyncio
import multiprocessing as mp
import os, sys, io, traceback, time, contextlib
from typing import List, Optional, Dict, Any

# ============== 내부: 가드 설치 ==============
def _install_guard(read_paths: List[str],
                   write_paths: List[str],
                   allow_read_anywhere: bool,
                   block_subprocess: bool = True,
                   block_network: bool = False) -> None:
    rp = [os.path.realpath(p) for p in (read_paths or [])]
    wp = [os.path.realpath(p) for p in (write_paths or [])]

    def _real(p: str) -> str:
        try:
            return os.path.realpath(p)
        except Exception:
            return str(p)

    def _under_any(path: str, roots: List[str]) -> bool:
        path = _real(path)
        for r in roots:
            r = _real(r)
            if path == r or path.startswith(r + os.sep):
                return True
        return False

    def _audit(event, args):
        # 파일 열기
        if event in ("open", "os.open"):
            path = str(args[0])
            mode = str(args[1]) if len(args) > 1 else "r"
            flags = int(args[2]) if len(args) > 2 else 0
            write = any(ch in mode for ch in ("w","a","+","x")) or \
                    (flags & (os.O_WRONLY|os.O_RDWR|os.O_CREAT|os.O_TRUNC|os.O_APPEND))
            if write and not _under_any(path, wp):
                msg = f"[AUDIT] blocked: event={event} path={_real(path)} mode={mode} flags={flags} reason=write-outside-allowed"
                print(msg, file=sys.stderr, flush=True)
                raise PermissionError(msg)
            if (not write) and (not allow_read_anywhere) and not _under_any(path, rp + wp):
                msg = f"[AUDIT] blocked: event={event} path={_real(path)} reason=read-not-allowed"
                print(msg, file=sys.stderr, flush=True)
                raise PermissionError(msg)

        # 파일 조작류
        elif event in ("os.rename","os.replace"):
            src, dst = str(args[0]), str(args[1])
            if not (_under_any(src, wp) and _under_any(dst, wp)):
                msg = f"[AUDIT] blocked: event={event} src={_real(src)} dst={_real(dst)} reason=op-outside-allowed"
                print(msg, file=sys.stderr, flush=True)
                raise PermissionError(msg)
        elif event in ("os.remove","os.unlink","os.mkdir","os.makedirs","os.rmdir","os.chmod"):
            path = str(args[0])
            if not _under_any(path, wp):
                msg = f"[AUDIT] blocked: event={event} path={_real(path)} reason=op-outside-allowed"
                print(msg, file=sys.stderr, flush=True)
                raise PermissionError(msg)

        # 서브프로세스/시스템 호출
        elif block_subprocess and (event.startswith("subprocess.") or event == "os.system"):
            cmd = None
            try:
                cmd = args[0]
            except Exception:
                cmd = "<unknown>"
            msg = f"[AUDIT] blocked: event={event} cmd={cmd} reason=subprocess-disabled"
            print(msg, file=sys.stderr, flush=True)
            raise PermissionError(msg)

        # 네트워크 차단 옵션
        elif block_network and event.startswith("socket."):
            msg = f"[AUDIT] blocked: event={event} reason=network-disabled"
            print(msg, file=sys.stderr, flush=True)
            raise PermissionError(msg)

    sys.addaudithook(_audit)

# ============== 내부: 자식 워커 ==============
def _child_worker(code: str,
                  read_paths: List[str],
                  write_paths: List[str],
                  cwd: Optional[str],
                  allow_read_anywhere: bool,
                  block_subprocess: bool,
                  block_network: bool,
                  sanitize_env: bool,
                  allowed_env_keys: Optional[List[str]],
                  result_q: mp.Queue) -> None:
    stdout_buf, stderr_buf = io.StringIO(), io.StringIO()
    rc = 0
    try:
        # CWD 설정(부모에는 영향 없음)
        if cwd:
            os.makedirs(cwd, exist_ok=True)
            os.chdir(cwd)

        # 환경변수 최소화(선택)
        if sanitize_env:
            keep = set(allowed_env_keys or ["PATH", "PYTHONPATH", "HOME", "TMPDIR"])
            for k in list(os.environ.keys()):
                if k not in keep:
                    os.environ.pop(k, None)

        # 가드 설치
        _install_guard(read_paths, write_paths, allow_read_anywhere,
                       block_subprocess=block_subprocess, block_network=block_network)

        # exec용 깨끗한 글로벌 dict
        g = {"__name__": "__main__"}

        with contextlib.redirect_stdout(stdout_buf), contextlib.redirect_stderr(stderr_buf):
            try:
                exec(code, g, g)
            except Exception:
                rc = 1
                print("[ERROR] exception while executing user code:", file=sys.stderr)
                traceback.print_exc()

    except Exception as e:
        rc = 1
        print(f"[ERROR] sandbox init/exec failed: {type(e).__name__}: {e}", file=stderr_buf)
        traceback.print_exc(file=stderr_buf)
    finally:
        result_q.put({
            "returncode": rc,
            "stdout": stdout_buf.getvalue(),
            "stderr": stderr_buf.getvalue(),
        })

# ============== 내부: 동기 실행(스레드풀에서 호출) ==============
def _run_blocking(code: str,
                  read_paths: List[str],
                  write_paths: List[str],
                  cwd: Optional[str],
                  allow_read_anywhere: bool,
                  timeout_sec: float,
                  block_subprocess: bool,
                  block_network: bool,
                  sanitize_env: bool,
                  allowed_env_keys: Optional[List[str]]) -> Dict[str, Any]:
    ctx = mp.get_context("spawn")  # 부모 전역 노출 방지
    q: mp.Queue = ctx.Queue(maxsize=1)
    p = ctx.Process(target=_child_worker, args=(
        code, read_paths, write_paths, cwd, allow_read_anywhere,
        block_subprocess, block_network, sanitize_env, allowed_env_keys, q
    ))
    p.start()

    t0 = time.time()
    result: Optional[Dict[str, Any]] = None
    while True:
        if not q.empty():
            result = q.get()
            break
        if not p.is_alive():
            result = {"returncode": p.exitcode if p.exitcode is not None else 1,
                      "stdout": "", "stderr": "[ERROR] child exited unexpectedly"}
            break
        if time.time() - t0 > timeout_sec:
            p.terminate()
            p.join(1.0)
            return {"returncode": 124, "stdout": "", "stderr": f"[TIMEOUT] exceeded {timeout_sec}s"}
        time.sleep(0.01)

    p.join(1.0)
    if p.is_alive():
        p.terminate()
        p.join(1.0)
    return result or {"returncode": 1, "stdout": "", "stderr": "[ERROR] no result"}

# ============== 공개: 비동기 API ==============
async def run_user_code_mp_async(
    code: str,
    *,
    read_paths: Optional[List[str]] = None,
    write_paths: Optional[List[str]] = None,
    cwd: Optional[str] = None,
    allow_read_anywhere: Optional[bool] = None,
    timeout_sec: float = 10.0,
    block_subprocess: bool = True,
    block_network: bool = False,
    sanitize_env: bool = False,
    allowed_env_keys: Optional[List[str]] = None,
) -> Dict[str, Any]:
    """
    별도 프로세스에서 사용자 코드를 실행하고 stdout/stderr를 수집해 반환(비동기).
    - read_paths: 읽기 허용 경로 리스트(하위 포함). None이면 어디든 읽기 허용.
    - write_paths: 쓰기 허용 경로 리스트(하위 포함). None/[]면 쓰기 전부 차단.
    - cwd: 자식 프로세스 시작 작업 디렉터리. 자식의 os.chdir는 부모에 영향 없음.
    - allow_read_anywhere: 명시적 제어(기본 None → read_paths 미지정이면 True).
    - block_subprocess: 서브프로세스 호출 차단 여부.
    - block_network: 네트워크 차단 여부.
    - sanitize_env: 자식 환경변수를 최소화할지 여부.
    - allowed_env_keys: sanitize_env=True일 때 보존할 키 목록(기본 PATH, PYTHONPATH, HOME, TMPDIR).
    - timeout_sec: 시간 초과 시 returncode=124.
    """
    if allow_read_anywhere is None:
        allow_read_anywhere = (not read_paths)

    # 쓰기 경로는 미리 만들어 둔다.
    for d in (write_paths or []):
        try:
            os.makedirs(d, exist_ok=True)
        except Exception:
            pass

    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        None,
        _run_blocking,
        code,
        read_paths or [],
        write_paths or [],
        cwd,
        allow_read_anywhere,
        timeout_sec,
        block_subprocess,
        block_network,
        sanitize_env,
        allowed_env_keys,
    )

# ============== 데모 ==============
if __name__ == "__main__":
    async def main():
        demo_code = r"""
import os, sys, subprocess
print("hello, cwd:", os.getcwd())
# 허용된 쓰기
open("out/ok.txt","w").write("ok")
# 금지된 쓰기
try:
    open("/etc/blocked.txt","w").write("x")
except Exception as e:
    print("blocked write:", type(e).__name__)
# 서브프로세스 시도(차단)
try:
    subprocess.check_output(["echo","hi"])
except Exception as e:
    print("blocked subproc:", type(e).__name__)
print("done")
"""
        os.makedirs("./work/out", exist_ok=True)
        res = await run_user_code_mp_async(
            demo_code,
            read_paths=None,               # 읽기 어디든 허용
            write_paths=["./work/out"],    # 여기만 쓰기 허용
            cwd="./work",                  # 자식 시작 CWD
            timeout_sec=5,
            block_subprocess=True,
            block_network=False,
            sanitize_env=True,             # 환경 최소화
            allowed_env_keys=["PATH","PYTHONPATH","HOME","TMPDIR"],
        )
        print("RC:", res["returncode"])
        print("STDOUT:\n", res["stdout"])
        print("STDERR:\n", res["stderr"])

    if sys.platform == "win32":
        mp.freeze_support()
    asyncio.run(main())

이제 이 파일을 모듈로 두고 run_user_code_mp_async만 호출하면 된다. 필요하면 read_paths/write_paths/cwd를 에이전트 툴 인자로 노출하고, 환경변수 최소화나 네트워크 차단은 서버 쪽 정책으로 고정해도 된다.

만약에 multi threading이면 어떻게 돼?

요지는 “스레드로 돌리느냐, 프로세스로 돌리느냐”를 나눠 보아야 해.

  1. 우리 설계처럼 멀티프로세스로 샌드박스를 띄운 뒤, 그 안에서 유저 코드가 멀티스레드를 쓰는 경우
  • 괜찮다. sys.addaudithook은 “프로세스 단위”로 설치되고, 그 프로세스의 모든 스레드에서 발생한 이벤트에 적용된다. 즉 유저 코드가 스레드를 여러 개 만들어도 파일 쓰기 차단, subprocess 차단은 그대로 먹힌다.

  • 주의점은 프로세스 단위 속성들(CWD, 환경변수 등)은 스레드 간 공유된다는 것. 하지만 이건 부모에게는 영향 없다(부모는 다른 프로세스).

  • 안정성 보강 팁: 자식 프로세스 진입 직후 resource로 한도(시간/메모리/파일크기)를 걸면, 스레드 폭주로 인한 자원 잡아먹기도 방지된다.

    import resource
    resource.setrlimit(resource.RLIMIT_CPU, (3, 3))
    resource.setrlimit(resource.RLIMIT_AS, (1024*1024*1024,)*2)
    resource.setrlimit(resource.RLIMIT_FSIZE, (200*1024*1024,)*2)
    

위 3줄을 내가 준 _child_worker 맨 앞쪽(가드 설치 직후) 정도에 넣으면 충분해.

  1. 샌드박스를 “스레드”로만 돌리고(=부모와 같은 프로세스), 비동기/스레드로 병렬 실행하고 싶은 경우
  • 비권장. 같은 프로세스에서 exec을 돌리면 메모리 격리가 없다. 유저 코드가 부모의 전역 상태를 들여다보거나 건드릴 여지가 생긴다. 예를 들어 같은 프로세스에서 import __main__, inspect로 프레임을 타고 올라가면 부모 스코프에 닿을 수 있다.

  • 또한 같은 프로세스라 os.chdir은 전역 CWD를 바꿔서 “다른 스레드/부모 로직”에도 영향을 준다. 환경변수 수정도 마찬가지. 파일가드(audit hook)는 스레드 전체에 적용되지만, “상태 격리”가 안 된다.

  • 그래도 꼭 스레드 모드가 필요하다면 최소한 다음을 추가해야 한다.

    • 스레드 모드에선 os.chdir 금지(감사 훅에서 event == "os.chdir" 혹은 os.chdir를 래핑해서 막기).

    • execglobals를 새 dict로 주고, 부모 전역 참조를 전달하지 않기.

    • 민감 빌트인/모듈 접근을 최소화(예: __import__, inspect 등). 이건 기능 제한과 충돌하니 신중해야 한다.

  • 결론적으로 “LLM이 악의적이지 않다” 가정 하더라도, 스레드만으로는 실수로 부모 상태를 망칠 수 있다. 그래서 나는 계속 “spawn 프로세스” 권장.

  1. 병렬 실행(동시에 여러 개 돌리기)을 원하면
  • 지금 구조 그대로 run_user_code_mp_async를 여러 번 await하면 된다. 내부에서 프로세스를 매번 띄우니 완전 분리된다.

  • 대량 병렬 시엔 세마포어로 동시 실행 수를 제한하거나, 프로세스 풀(예: concurrent.futures.ProcessPoolExecutor)로 바꾸면 리소스 효율이 좋아진다. 인터페이스는 그대로 유지한 채 내부 실행자만 풀로 교체하면 된다.

정리

  • “멀티스레딩을 유저 코드가 자식 프로세스 안에서 쓰는 것”은 OK. 감사 훅은 모든 스레드에 적용된다.

  • “샌드박스 자체를 스레드로 구현”은 메모리·상태 격리가 없어 위험하다. CWD/ENV가 전역이라 부모에 영향도 줄 수 있다. 그래서 지금처럼 spawn 멀티프로세스 유지가 정답이다.

  • 병렬성은 프로세스 단위로 가져가고, 필요하면 리소스 한도와 풀링으로 튜닝하자.

원하면 스레드 모드의 최소 샘플도 붙여 줄 수 있는데, 위 이유로 실전 투입은 추천하지 않는다.

오케이 프로세스 한 번이 아니라 기억이 유지되는 커널은 어떻게 만들수있을까

좋아. “기억이 유지되는 커널”을 멀티프로세스로 만들었다. 한 번 띄워두면 같은 프로세스 안의 전역 dict를 재사용해서 상태가 이어지고, 비동기(await) 인터페이스로 코드 조각을 계속 실행할 수 있다. 파일·서브프로세스(선택적으로 네트워크)는 감사 훅(sys.addaudithook)으로 가볍게 샌드박스한다. 읽기/쓰기 허용 경로, 시작 CWD, 환경 정리 여부도 런타임에 바꿀 수 있다.

아래 파일 하나로 끝난다.

# persistent_kernel_async.py
# Python 3.8+ / 외부 의존성 없음

import asyncio
import multiprocessing as mp
import os, sys, io, time, traceback, contextlib, threading
from typing import List, Optional, Dict, Any

# ---------------- 내부: 감사 훅(파일/서브프로세스/네트워크 제어) ----------------
def _install_guard(read_paths: List[str],
                   write_paths: List[str],
                   allow_read_anywhere: bool,
                   block_subprocess: bool = True,
                   block_network: bool = False) -> None:
    rp = [os.path.realpath(p) for p in (read_paths or [])]
    wp = [os.path.realpath(p) for p in (write_paths or [])]

    def _real(p: str) -> str:
        try:
            return os.path.realpath(p)
        except Exception:
            return str(p)

    def _under_any(path: str, roots: List[str]) -> bool:
        path = _real(path)
        for r in roots:
            r = _real(r)
            if path == r or path.startswith(r + os.sep):
                return True
        return False

    def _audit(event, args):
        # 파일 열기(open / os.open)
        if event in ("open", "os.open"):
            path = str(args[0])
            mode = str(args[1]) if len(args) > 1 else "r"
            flags = int(args[2]) if len(args) > 2 else 0
            write = any(ch in mode for ch in ("w","a","+","x")) or \
                    (flags & (os.O_WRONLY|os.O_RDWR|os.O_CREAT|os.O_TRUNC|os.O_APPEND))
            if write and not _under_any(path, wp):
                msg = f"[AUDIT] blocked: event={event} path={_real(path)} mode={mode} flags={flags} reason=write-outside-allowed"
                print(msg, file=sys.stderr, flush=True)
                raise PermissionError(msg)
            if (not write) and (not allow_read_anywhere) and not _under_any(path, rp + wp):
                msg = f"[AUDIT] blocked: event={event} path={_real(path)} reason=read-not-allowed"
                print(msg, file=sys.stderr, flush=True)
                raise PermissionError(msg)

        # 파일 조작(경로 첫 인자; rename/replace는 src,dst 둘 다 확인)
        elif event in ("os.rename","os.replace"):
            src, dst = str(args[0]), str(args[1])
            if not (_under_any(src, wp) and _under_any(dst, wp)):
                msg = f"[AUDIT] blocked: event={event} src={_real(src)} dst={_real(dst)} reason=op-outside-allowed"
                print(msg, file=sys.stderr, flush=True)
                raise PermissionError(msg)
        elif event in ("os.remove","os.unlink","os.mkdir","os.makedirs","os.rmdir","os.chmod"):
            path = str(args[0])
            if not _under_any(path, wp):
                msg = f"[AUDIT] blocked: event={event} path={_real(path)} reason=op-outside-allowed"
                print(msg, file=sys.stderr, flush=True)
                raise PermissionError(msg)

        # 서브프로세스/시스템 호출
        elif block_subprocess and (event.startswith("subprocess.") or event == "os.system"):
            cmd = None
            try:
                cmd = args[0]
            except Exception:
                cmd = "<unknown>"
            msg = f"[AUDIT] blocked: event={event} cmd={cmd} reason=subprocess-disabled"
            print(msg, file=sys.stderr, flush=True)
            raise PermissionError(msg)

        # 네트워크 차단(옵션)
        elif block_network and event.startswith("socket."):
            msg = f"[AUDIT] blocked: event={event} reason=network-disabled"
            print(msg, file=sys.stderr, flush=True)
            raise PermissionError(msg)

    sys.addaudithook(_audit)

# ---------------- 내부: 자식 프로세스 커널 ----------------
def _kernel_proc(req_q: mp.Queue, resp_q: mp.Queue,
                 init_cfg: Dict[str, Any]) -> None:
    # 상태
    read_paths = init_cfg.get("read_paths") or []
    write_paths = init_cfg.get("write_paths") or []
    allow_read_anywhere = init_cfg.get("allow_read_anywhere", True)
    block_subprocess = init_cfg.get("block_subprocess", True)
    block_network = init_cfg.get("block_network", False)
    sanitize_env = init_cfg.get("sanitize_env", False)
    allowed_env_keys = init_cfg.get("allowed_env_keys") or ["PATH","PYTHONPATH","HOME","TMPDIR"]
    cwd = init_cfg.get("cwd")

    # CWD/ENV 설정
    try:
        if cwd:
            os.makedirs(cwd, exist_ok=True)
            os.chdir(cwd)
        if sanitize_env:
            keep = set(allowed_env_keys)
            for k in list(os.environ.keys()):
                if k not in keep:
                    os.environ.pop(k, None)
    except Exception as e:
        # 초기화 오류도 응답
        resp_q.put({"id": -1, "ok": False, "error": f"init failed: {type(e).__name__}: {e}"})
        return

    # 리소스 한도(원하면 조정)
    try:
        import resource
        resource.setrlimit(resource.RLIMIT_CPU, (5, 5))
        resource.setrlimit(resource.RLIMIT_FSIZE, (200*1024*1024,)*2)
        # 메모리는 OS/glibc 동작에 따라 영향이 다를 수 있으니 필요할 때만 켜기
        # resource.setrlimit(resource.RLIMIT_AS, (1024*1024*1024,)*2)
    except Exception:
        pass

    # 감사 훅 설치(프로세스 전체 스레드에 적용)
    _install_guard(read_paths, write_paths, allow_read_anywhere,
                   block_subprocess=block_subprocess, block_network=block_network)

    # 전역 dict: 이게 “기억”이다.
    g = {"__name__": "__main__"}

    # 이벤트 루프
    while True:
        req = req_q.get()
        if req is None:
            break
        rid = req.get("id", -1)
        op = req.get("op")
        try:
            if op == "exec":
                code = req["code"]
                stdout_buf, stderr_buf = io.StringIO(), io.StringIO()
                rc = 0
                with contextlib.redirect_stdout(stdout_buf), contextlib.redirect_stderr(stderr_buf):
                    try:
                        exec(code, g, g)
                    except Exception:
                        rc = 1
                        print("[ERROR] exception while executing user code:", file=sys.stderr)
                        traceback.print_exc()
                resp_q.put({"id": rid, "ok": True, "returncode": rc,
                            "stdout": stdout_buf.getvalue(), "stderr": stderr_buf.getvalue()})

            elif op == "reset":
                g = {"__name__": "__main__"}
                resp_q.put({"id": rid, "ok": True, "msg": "reset-done"})

            elif op == "set_paths":
                read_paths = req.get("read_paths") or []
                write_paths = req.get("write_paths") or []
                allow_read_anywhere = req.get("allow_read_anywhere", allow_read_anywhere)
                # 훅은 이미 설치되어 있지만, 클로저 값이 아니라 함수 내에서 최신 리스트를 캡쳐하도록 재설치
                _install_guard(read_paths, write_paths, allow_read_anywhere,
                               block_subprocess=block_subprocess, block_network=block_network)
                resp_q.put({"id": rid, "ok": True, "msg": "paths-updated"})

            elif op == "set_cwd":
                new_cwd = req.get("cwd")
                if new_cwd:
                    os.makedirs(new_cwd, exist_ok=True)
                    os.chdir(new_cwd)
                resp_q.put({"id": rid, "ok": True, "cwd": os.getcwd()})

            elif op == "status":
                resp_q.put({"id": rid, "ok": True, "cwd": os.getcwd(),
                            "read_paths": read_paths, "write_paths": write_paths,
                            "allow_read_anywhere": allow_read_anywhere})

            elif op == "shutdown":
                resp_q.put({"id": rid, "ok": True, "msg": "bye"})
                break

            else:
                resp_q.put({"id": rid, "ok": False, "error": f"unknown op: {op}"})
        except Exception as e:
            resp_q.put({"id": rid, "ok": False, "error": f"{type(e).__name__}: {e}"})

# ---------------- 부모 측: 세션 핸들(비동기 API) ----------------
class PersistentKernel:
    def __init__(self):
        self._ctx = mp.get_context("spawn")
        self._req_q: mp.Queue = self._ctx.Queue()
        self._resp_q: mp.Queue = self._ctx.Queue()
        self._p: Optional[mp.Process] = None
        self._next_id = 1
        self._lock = threading.Lock()

    async def start(self,
                    *,
                    read_paths: Optional[List[str]] = None,
                    write_paths: Optional[List[str]] = None,
                    cwd: Optional[str] = None,
                    allow_read_anywhere: Optional[bool] = None,
                    block_subprocess: bool = True,
                    block_network: bool = False,
                    sanitize_env: bool = False,
                    allowed_env_keys: Optional[List[str]] = None,
                    startup_timeout: float = 5.0) -> None:
        if self._p is not None:
            return
        init_cfg = {
            "read_paths": read_paths or [],
            "write_paths": write_paths or [],
            "cwd": cwd,
            "allow_read_anywhere": True if allow_read_anywhere is None else allow_read_anywhere,
            "block_subprocess": block_subprocess,
            "block_network": block_network,
            "sanitize_env": sanitize_env,
            "allowed_env_keys": allowed_env_keys or ["PATH","PYTHONPATH","HOME","TMPDIR"],
        }
        self._p = self._ctx.Process(target=_kernel_proc, args=(self._req_q, self._resp_q, init_cfg))
        self._p.start()
        # 간단 헬스체크: 상태 요청
        _ = await self.status(timeout=startup_timeout)

    async def aexec(self, code: str, timeout: float = 10.0) -> Dict[str, Any]:
        return await self._rpc({"op": "exec", "code": code}, timeout=timeout)

    async def reset(self, timeout: float = 5.0) -> Dict[str, Any]:
        return await self._rpc({"op": "reset"}, timeout=timeout)

    async def set_paths(self,
                        read_paths: Optional[List[str]] = None,
                        write_paths: Optional[List[str]] = None,
                        allow_read_anywhere: Optional[bool] = None,
                        timeout: float = 5.0) -> Dict[str, Any]:
        return await self._rpc({"op": "set_paths",
                                "read_paths": read_paths or [],
                                "write_paths": write_paths or [],
                                "allow_read_anywhere": allow_read_anywhere}, timeout=timeout)

    async def set_cwd(self, cwd: Optional[str], timeout: float = 5.0) -> Dict[str, Any]:
        return await self._rpc({"op": "set_cwd", "cwd": cwd}, timeout=timeout)

    async def status(self, timeout: float = 3.0) -> Dict[str, Any]:
        return await self._rpc({"op": "status"}, timeout=timeout)

    async def shutdown(self, timeout: float = 3.0) -> Dict[str, Any]:
        try:
            res = await self._rpc({"op": "shutdown"}, timeout=timeout)
        finally:
            self._cleanup()
        return res

    def _cleanup(self):
        if self._p is not None:
            try:
                self._p.join(0.5)
                if self._p.is_alive():
                    self._p.terminate()
                    self._p.join(1.0)
            except Exception:
                pass
        self._p = None

    async def _rpc(self, payload: Dict[str, Any], timeout: float) -> Dict[str, Any]:
        if self._p is None or not self._p.is_alive():
            raise RuntimeError("kernel not started")
        loop = asyncio.get_running_loop()
        with self._lock:
            rid = self._next_id
            self._next_id += 1
            payload = dict(payload)
            payload["id"] = rid
            # 큐 put/get은 블로킹이므로 스레드풀로 넘김
            await loop.run_in_executor(None, self._req_q.put, payload)
            # 응답 대기(타임아웃)
            deadline = time.time() + timeout
            while True:
                remaining = deadline - time.time()
                if remaining <= 0:
                    raise TimeoutError(f"kernel request timed out: {payload.get('op')}")
                try:
                    resp = await loop.run_in_executor(None, self._resp_q.get, True, min(0.1, remaining))
                except Exception:
                    # 계속 폴링
                    continue
                if resp.get("id") == rid:
                    return resp
                # 혹시 다른 응답이면 버퍼링하지 않고 무시(직렬 호출 전제)
                # 병렬 호출을 원하면 here: per-request map/fan-in 스레드 추가 필요

# ---------------- 사용 예(데모) ----------------
if __name__ == "__main__":
    async def demo():
        k = PersistentKernel()
        await k.start(
            read_paths=None,                 # None → 어디든 읽기 허용
            write_paths=["./work/out"],      # 여기만 쓰기 허용
            cwd="./work",                    # 커널의 시작 CWD (부모엔 영향 없음)
            block_subprocess=True,
            block_network=False,
            sanitize_env=True,
        )

        # 1) 상태 확인
        print(await k.status())

        # 2) 실행 1: 파일 쓰기(허용) + 금지 시도(거부)
        code1 = r"""
import os, sys, subprocess
print("CWD:", os.getcwd())
open("out/ok.txt","w").write("ok")
try:
    open("/etc/blocked.txt","w").write("x")
except Exception as e:
    print("blocked write:", type(e).__name__)
print("done-1")
"""
        res1 = await k.aexec(code1, timeout=5)
        print("RC1:", res1["returncode"])
        print("STDOUT1:\n", res1["stdout"])
        print("STDERR1:\n", res1["stderr"])

        # 3) 실행 2: 상태 유지 확인(앞에서 만든 변수 재사용)
        code2 = r"""
if 'counter' not in globals():
    counter = 0
counter += 1
print("counter:", counter)
"""
        for _ in range(3):
            res2 = await k.aexec(code2, timeout=3)
            print(res2["stdout"].strip())

        # 4) reset 후 다시 확인
        print(await k.reset())
        res3 = await k.aexec("print('has counter?', 'counter' in globals())")
        print(res3["stdout"].strip())

        # 5) 경로/작업 디렉터리 변경
        print(await k.set_paths(write_paths=["./work2/out2"], read_paths=[],
                                allow_read_anywhere=False))
        print(await k.set_cwd("./work2"))
        res4 = await k.aexec("open('out2/ok2.txt','w').write('ok2'); print('wrote ok2')")
        print(res4["stdout"].strip())

        # 6) 종료
        print(await k.shutdown())

    if sys.platform == "win32":
        mp.freeze_support()
    asyncio.run(demo())

핵심 정리

  1. 커널은 별도 프로세스로 한 번 띄우고 유지한다(spawn). 같은 세션에서의 exec는 같은 globals dict를 재사용하므로 상태가 이어진다.

  2. 파일·조작·서브프로세스(옵션으로 네트워크) 시도는 감사 훅으로 감시하고, 금지 시도는 어떤 경로/커맨드였는지 stderr에 [AUDIT]로 남긴다.

  3. read_paths, write_paths, cwd를 시작 시점에 지정하고, 중간에 set_paths/set_cwd로 변경 가능하다.

  4. 부모 전역은 spawn 덕분에 기본 노출이 없다. 환경변수는 sanitize_env로 최소화할 수 있다.

  5. 비동기 호출은 aexec/reset/status 등으로 제공한다. 동시 다발 호출은 락으로 직렬화해 안전하게 처리했다(진짜 병렬을 원하면 응답 매칭 맵과 전용 리더 스레드로 확장).

이걸 바로 네 vLLM/Agents SDK 툴로 감싸면 “세션 유지형 파이썬 인터프리터”를 깔끔하게 쓸 수 있다.

좀 헷갈려 code1은 두 번째 open 부분에서 에러가 나잖아? 근데 바로 아래의 Except 코드에서 그걸 잡아야할 것 같은데 이건 returncode 1이 뜨네. 의도한 동작인가

의도한 동작은 아니다. 그 returncode=1은 “잡히지 않은 예외가 있었음”을 뜻해. 지금 케이스는 두 번째 금지 쓰기(try/except로 감싼 부분)가 아니라, 그 위의 첫 번째 허용 쓰기 open("out/ok.txt","w")가 감사 훅에 의해 막혀서 예외가 바깥으로 튄 가능성이 높다.

왜 막히냐면 내가 준 데모에서 cwd="./work"로 커널을 시작했는데 write_paths=["./work/out"]를 그대로 넘겼다. 자식은 먼저 chdir("./work")를 하고, 그 다음에 write_paths를 realpath로 절대화한다. 상대경로를 현재 작업디렉터리에 붙이니 ./work/out/…/work/work/out이 되어 버려서, 실제로 쓰려는 out/ok.txt(/…/work/out/ok.txt)와 경로가 불일치한다. 그래서 첫 번째 open이 PermissionError로 실패하고, 그건 try/except로 감싸지 않았으니 rc=1이 된 것.

해결은 둘 중 하나로 깔끔히 끝난다.

옵션 A: 항상 절대경로로 넘기기

await k.start(
    write_paths=[os.path.abspath("./work/out")],  # 절대경로로 넘김
    cwd="./work",
    # ...
)

옵션 B: 커널 안에서 cwd 기준으로 경로를 정규화하도록 패치
_kernel_proc에서 chdir 한 “직후”에 read/write 경로를 현재 CWD 기준으로 절대화하면 된다.

# _kernel_proc 내부, chdir(cwd) 한 다음에 추가
def _abslist(lst):
    out = []
    for p in (lst or []):
        out.append(os.path.realpath(p if os.path.isabs(p) else os.path.join(os.getcwd(), p)))
    return out

read_paths  = _abslist(read_paths)
write_paths = _abslist(write_paths)

그리고 데모 호출은 이렇게 바꾸면 헷갈림이 없다.

await k.start(
    write_paths=["out"],    # cwd 기준 상대경로
    cwd="./work",
    # ...
)

점검 팁
stderr에 첫 번째 open에 대한 [AUDIT] blocked: event=open path=…/work/out/ok.txt …가 찍혀 있으면 내가 설명한 경로 불일치가 원인이다. 위 패치(혹은 절대경로 전달) 이후에는 첫 번째 쓰기가 통과하고, 두 번째 금지 쓰기는 try/except에 잡혀서 returncode가 0으로 내려간다.

아 오케이 좀 헷갈린

괜찮아, 경로가 섞이면 누구나 헷갈려. 핵심만 다시 정리할게.

정리

  1. 커널 시작 시 cwd를 바꾸면, write_paths 같은 상대경로는 새 cwd 기준으로 절대화해줘야 한다.

  2. 절대경로로 넘기면 안전하고, 상대경로로 넘기고 싶으면 커널 안에서 chdir 이후에 절대화하면 된다.

  3. 첫 번째 open이 허용 경로 밖으로 인식되면 try/except에 안 걸린 예외가 떠서 returncode=1이 된다.

빠른 해결 두 가지
옵션 A: 항상 절대경로로 넘기기

await k.start(
    cwd="./work",
    write_paths=[os.path.abspath("./work/out")],
)

옵션 B: 커널 내부에서 절대화 패치(권장)
_kernel_proc에서 chdir(cwd) 바로 뒤에 아래 유틸을 넣어주면, 상대경로를 새 cwd 기준으로 정규화해준다.

def _abslist(lst):
    base = os.getcwd()
    out = []
    for p in (lst or []):
        out.append(os.path.realpath(p if os.path.isabs(p) else os.path.join(base, p)))
    return out

read_paths  = _abslist(read_paths)
write_paths = _abslist(write_paths)

그 다음 시작할 때는 직관적으로

await k.start(cwd="./work", write_paths=["out"])

동작 체크

  1. 첫 실행에서 out/ok.txt 쓰기가 성공해야 하고

  2. 금지 쓰기 시도는 stderr에 [AUDIT] blocked…가 찍히면서 try/except로 잡혀야 하며

  3. returncode는 0으로 내려와야 정상.

필요하면 위 패치 반영한 전체 파일도 묶어 줄게.

좋아 이제 PersistentKernel을 이용해서 llm tool calling용 툴을 만들어보자. 단일 코드 실행 -> 일회용 툴 콜링용 세션 유지 세션 코드 실행 -> 세션 유지용 코드 실행

좋아. PersistentKernel을 그대로 활용해서 “일회성 실행”과 “세션 유지 실행” 두 가지 툴을 바로 쓸 수 있게 묶어준다. 설정(Config)은 서버 코드에서만 하고, LLM에는 절대 노출하지 않는다. LLM이 호출할 수 있는 건 다음 다섯 개뿐이다.

  1. python_execute_once(code, timeout_sec?, cwd_rel?)

  2. python_session_start(session_id, timeout_sec?, cwd_rel?)

  3. python_session_execute(session_id, code, timeout_sec?)

  4. python_session_reset(session_id)

  5. python_session_shutdown(session_id)

아래 한 파일로 붙여 넣으면 된다. 이전에 만든 persistent_kernel_async.py가 같은 디렉터리에 있다고 가정한다.

# persistent_kernel_tools.py
# 요구: Python 3.8+, 같은 폴더에 persistent_kernel_async.py 존재
import os, asyncio, threading
from typing import Dict, Any, Optional, List
from persistent_kernel_async import PersistentKernel, run_user_code_mp_async

# ===================== 서버 전용 정책(LLM 비노출) =====================
# 운영자가 직접 채움. LLM에 노출 금지.
READ_PATHS_DEFAULT  = [os.path.abspath("./data_ro")]   # 읽기 허용 루트(하위 포함)
WRITE_PATHS_DEFAULT = [os.path.abspath("./sandbox")]   # 쓰기 허용 루트(하위 포함)
DEFAULT_CWD         = os.path.abspath("./work")        # 커널의 시작 CWD
BLOCK_SUBPROC       = True
BLOCK_NET           = False
SANITIZE_ENV        = True
ALLOWED_ENV_KEYS    = ["PATH","PYTHONPATH","HOME","TMPDIR"]

os.makedirs(WRITE_PATHS_DEFAULT[0], exist_ok=True)
os.makedirs(DEFAULT_CWD, exist_ok=True)

# ===================== 유틸 =====================
def _abslist(paths: Optional[List[str]]) -> List[str]:
    base = os.getcwd()
    out = []
    for p in (paths or []):
        out.append(os.path.realpath(p if os.path.isabs(p) else os.path.join(base, p)))
    return out

def _resolve_cwd_rel(cwd_rel: Optional[str]) -> Optional[str]:
    if not cwd_rel:
        return None
    # 상대 경로는 기본 CWD 기준으로 절대화
    target = os.path.realpath(os.path.join(DEFAULT_CWD, cwd_rel))
    # 안전: 쓰기/읽기 허용 루트들 하위로만
    allowed_roots = set(_abslist(READ_PATHS_DEFAULT) + _abslist(WRITE_PATHS_DEFAULT) + [DEFAULT_CWD])
    if not any(target == r or target.startswith(r + os.sep) for r in allowed_roots):
        raise PermissionError(f"cwd_rel outside allowed roots: {target}")
    return target

# ===================== 세션 레지스트리 =====================
class KernelRegistry:
    def __init__(self):
        self._kernels: Dict[str, PersistentKernel] = {}
        self._locks: Dict[str, threading.Lock] = {}
        self._global_lock = threading.Lock()

    async def get_or_start(self, sid: str, *, cwd_rel: Optional[str] = None, timeout: float = 5.0) -> PersistentKernel:
        with self._global_lock:
            k = self._kernels.get(sid)
            if sid not in self._locks:
                self._locks[sid] = threading.Lock()
        if k is None:
            with self._locks[sid]:
                k = self._kernels.get(sid)
                if k is None:
                    k = PersistentKernel()
                    await k.start(
                        read_paths=READ_PATHS_DEFAULT,
                        write_paths=WRITE_PATHS_DEFAULT,
                        cwd=_resolve_cwd_rel(cwd_rel) or DEFAULT_CWD,
                        block_subprocess=BLOCK_SUBPROC,
                        block_network=BLOCK_NET,
                        sanitize_env=SANITIZE_ENV,
                        allowed_env_keys=ALLOWED_ENV_KEYS,
                        startup_timeout=timeout,
                    )
                    self._kernels[sid] = k
        return k

    async def reset(self, sid: str) -> Dict[str, Any]:
        k = self._kernels.get(sid)
        if not k:
            return {"ok": False, "error": "session not found"}
        return await k.reset()

    async def shutdown(self, sid: str) -> Dict[str, Any]:
        k = self._kernels.pop(sid, None)
        if not k:
            return {"ok": False, "error": "session not found"}
        return await k.shutdown()

REGISTRY = KernelRegistry()

# ===================== LLM 툴 구현(핸들러) =====================
# 1) 단일 코드 실행(세션 없음, 즉시 프로세스 생성→실행→종료)
async def tool_python_execute_once(args: Dict[str, Any]) -> Dict[str, Any]:
    code = args["code"]
    timeout = int(args.get("timeout_sec", 10))
    cwd_rel = args.get("cwd_rel")  # 선택
    try:
        res = await run_user_code_mp_async(
            code,
            read_paths=READ_PATHS_DEFAULT,
            write_paths=WRITE_PATHS_DEFAULT,
            cwd=_resolve_cwd_rel(cwd_rel) or DEFAULT_CWD,
            allow_read_anywhere=bool(not READ_PATHS_DEFAULT),  # 읽기 루트 없으면 어디든 읽기 허용
            timeout_sec=timeout,
            block_subprocess=BLOCK_SUBPROC,
            block_network=BLOCK_NET,
            sanitize_env=SANITIZE_ENV,
            allowed_env_keys=ALLOWED_ENV_KEYS,
        )
        return res
    except Exception as e:
        return {"returncode": 1, "stdout": "", "stderr": f"[TOOL_ERROR] {type(e).__name__}: {e}"}

# 2) 세션 시작
async def tool_python_session_start(args: Dict[str, Any]) -> Dict[str, Any]:
    sid = args["session_id"]
    timeout = int(args.get("timeout_sec", 5))
    cwd_rel = args.get("cwd_rel")
    try:
        await REGISTRY.get_or_start(sid, cwd_rel=cwd_rel, timeout=timeout)
        return {"ok": True, "session_id": sid}
    except Exception as e:
        return {"ok": False, "error": f"{type(e).__name__}: {e}"}

# 3) 세션 코드 실행
async def tool_python_session_execute(args: Dict[str, Any]) -> Dict[str, Any]:
    sid = args["session_id"]; code = args["code"]; timeout = int(args.get("timeout_sec", 10))
    k = await REGISTRY.get_or_start(sid)
    try:
        return await k.aexec(code, timeout=timeout)
    except Exception as e:
        return {"returncode": 1, "stdout": "", "stderr": f"[TOOL_ERROR] {type(e).__name__}: {e}"}

# 4) 세션 리셋
async def tool_python_session_reset(args: Dict[str, Any]) -> Dict[str, Any]:
    sid = args["session_id"]
    return await REGISTRY.reset(sid)

# 5) 세션 종료
async def tool_python_session_shutdown(args: Dict[str, Any]) -> Dict[str, Any]:
    sid = args["session_id"]
    return await REGISTRY.shutdown(sid)

# ===================== 툴 스키마(JSON; LLM에 노출) =====================
python_execute_once_tool = {
  "type": "function",
  "function": {
    "name": "python_execute_once",
    "description": "단일 파이썬 코드 조각을 샌드박스에서 실행하고 stdout/stderr만 반환.",
    "parameters": {
      "type": "object",
      "properties": {
        "code": {"type": "string"},
        "timeout_sec": {"type": "integer", "default": 10, "minimum": 1, "maximum": 60},
        "cwd_rel": {"type": "string", "description": "커널 시작 CWD 기준 상대경로(허용 루트 하위만)."}
      },
      "required": ["code"],
      "additionalProperties": False
    },
    "strict": True
  }
}

python_session_start_tool = {
  "type": "function",
  "function": {
    "name": "python_session_start",
    "description": "세션 유지형 커널을 시작한다(이미 있으면 성공으로 취급).",
    "parameters": {
      "type": "object",
      "properties": {
        "session_id": {"type": "string"},
        "timeout_sec": {"type": "integer", "default": 5, "minimum": 1, "maximum": 60},
        "cwd_rel": {"type": "string", "description": "세션 시작 시 상대 CWD(옵션)."}
      },
      "required": ["session_id"],
      "additionalProperties": False
    },
    "strict": True
  }
}

python_session_execute_tool = {
  "type": "function",
  "function": {
    "name": "python_session_execute",
    "description": "세션 유지형 커널에서 코드를 실행하고 stdout/stderr를 반환.",
    "parameters": {
      "type": "object",
      "properties": {
        "session_id": {"type": "string"},
        "code": {"type": "string"},
        "timeout_sec": {"type": "integer", "default": 10, "minimum": 1, "maximum": 60}
      },
      "required": ["session_id","code"],
      "additionalProperties": False
    },
    "strict": True
  }
}

python_session_reset_tool = {
  "type": "function",
  "function": {
    "name": "python_session_reset",
    "description": "세션의 전역 상태를 지운다(globals 재생성).",
    "parameters": {
      "type": "object",
      "properties": {
        "session_id": {"type": "string"}
      },
      "required": ["session_id"],
      "additionalProperties": False
    },
    "strict": True
  }
}

python_session_shutdown_tool = {
  "type": "function",
  "function": {
    "name": "python_session_shutdown",
    "description": "세션 유지형 커널을 종료한다.",
    "parameters": {
      "type": "object",
      "properties": {
        "session_id": {"type": "string"}
      },
      "required": ["session_id"],
      "additionalProperties": False
    },
    "strict": True
  }
}

# ===================== 동기 어댑터(프레임워크에 맞게 선택적으로 사용) =====================
def run_tool_sync(tool_name: str, args: Dict[str, Any]) -> Dict[str, Any]:
    """프레임워크가 sync만 지원하면 사용. 내부에서 이벤트 루프를 관리."""
    async def _runner():
        if tool_name == "python_execute_once":
            return await tool_python_execute_once(args)
        elif tool_name == "python_session_start":
            return await tool_python_session_start(args)
        elif tool_name == "python_session_execute":
            return await tool_python_session_execute(args)
        elif tool_name == "python_session_reset":
            return await tool_python_session_reset(args)
        elif tool_name == "python_session_shutdown":
            return await tool_python_session_shutdown(args)
        else:
            return {"ok": False, "error": f"unknown tool: {tool_name}"}
    try:
        loop = asyncio.get_running_loop()
        # 이미 루프가 있다면 상위에서 await하도록 디자인하는 것이 바람직
        raise RuntimeError("run_tool_sync called inside event loop")
    except RuntimeError:
        return asyncio.run(_runner())

LLM에게 알려줄 사용 규칙(간단)

  • 단발 작업이면 python_execute_once를 호출하라. 결과는 returncode/stdout/stderr만 본다.

  • 상태를 써야 하면 python_session_start → python_session_execute를 반복 호출하라. 필요하면 python_session_reset으로 상태를 초기화하고, 끝나면 python_session_shutdown으로 닫아라.

  • 파일은 서버에서 허용한 폴더 하위에서만 쓸 수 있다. 금지 시도는 stderr에 [AUDIT]로 어떤 경로/연산이 막혔는지 기록된다.

  • cwd_rel은 선택이며, 세션 시작 시 한 번만 지정하는 걸 권장한다.

프레임워크에 연결

  • OpenAI Agents SDK나 vLLM function-calling에는 위 “tool 스키마”들(python_execute_once_tool 등)을 등록한다.

  • 툴 콜이 들어오면 name과 arguments를 받아서 run_tool_sync(name, args)로 돌린 결과를 다시 LLM에게 전달하면 된다.

경로 혼동 방지 팁

  • 운영자가 READ_PATHS_DEFAULT/WRITE_PATHS_DEFAULT/DEFAULT_CWD를 절대경로로 채워두면 가장 깔끔하다.

  • cwd_rel은 DEFAULT_CWD 기준 상대경로로만 받고, 허용 루트(READ/WRITE/DEFAULT_CWD) 하위가 아니면 거부한다.

이렇게 붙이면 “일회성 실행”과 “기억 유지 커널 실행”을 같은 정책 아래에서 깔끔하게 제공할 수 있다.

그런 의문은 좀 있어. 세션을 유지하는게 필요할까?

좋은 의문이야. “세션 유지형 커널(지속 프로세스)”이 정말 필요한가를 용도별로 딱 갈라보자.

  1. 세션이 주는 이득
  • 반복 비용 절감: 큰 데이터·모델·라이브러리를 한 번 로드하고 계속 씀. 예) 1 GB 벡터 DB, 대형 ML 모델, 느린 초기화가 있는 툴.

  • 탐색형 작업: 한 번 만든 변수·함수를 이어가며 점진적으로 코드를 보강. EDA, 시뮬레이션, 디버깅 루프에 유리.

  • 도구의 워밍업 유지: JIT 컴파일, 캐시(토큰화 캐시, 정규식 컴파일 등)를 다시 안 해도 됨.

  • 인터랙티브 UX: “방금 만든 df를 그래프로 바꿔” 같은 후속 명령이 자연스러움.

  1. 세션의 비용·리스크
  • 재현성 하락: 숨은 상태 때문에 같은 코드가 매번 동일하게 안 돌 수도 있음.

  • 메모리 누수/오염: LLM이 만든 전역이 쌓이다가 성능 저하. 상태 “독”이 남으면 계속 실패.

  • 보안·통제 어려움: 상태가 길게 남을수록 정책 위반 탐지가 늦어짐. 세션별 CWD/경로 정책을 늘 점검해야 함.

  • 운영 부담: 세션 수명, TTL, 정리 시점, 동시성, 장애 복구 등을 관리해야 함.

  1. 결론 요약: 전형적인 선택 기준
  • 일회성 계산·변환·리포트처럼 “같은 입력 → 같은 출력”이면 세션 불필요(일회용 실행 권장).

  • 대용량 초기화가 반복되거나, 다단계 실험·EDA처럼 “맥락을 잇는” 흐름이면 세션이 이득.

  • 둘 다 쓰고, 기본값은 일회용. 다음 조건이면 자동으로 세션으로 승격하는 전략이 깔끔함.

    • 연속 호출 간 같은 사용자·같은 작업 스레드에서 2회 이상 후속 지시가 들어옴.

    • 코드가 “앞에서 만든 객체/파일/변수”를 참조.

    • 초기화 비용이 임계치(예: 300 ms 또는 메모리 200 MB 이상) 넘는 라이브러리 감지.

  1. 현실적인 운영 권장안
  • 기본: python_execute_once만 사용. 빠르고 재현성 높음.

  • 필요 시만 세션: python_session_start/execute를 노출하되, 서버 정책으로 아래처럼 안전 레일을 깐다.

    • idle_timeout(예: 5분), hard_ttl(예: 30분), max_sessions_per_user(예: 2).

    • 세션당 메모리·파일 크기 rlimit, 쓰기 경로는 write_paths만.

    • reset과 shutdown을 LLM에 명시적으로 가르치고, 실패 시 자동 reset.

    • 로그에 [AUDIT] 라인(막힌 파일·커맨드·사유)과 리소스 사용량을 함께 남김.

  1. “세션 없이도 기억하기” 대안
  • 파일 기반 상태: 일회용 실행마다 허용된 write_paths에만 아티팩트(.npz/.parquet/.pt) 저장 → 다음 호출에서 경로만 넘겨 재사용. 세션 없이도 80%의 “기억”을 충족.

  • 선택 직렬화: 매우 제한된 화이트리스트 타입만 자동 저장/복원(dict[str→ndarray/DataFrame/float/int])해서 “의미 있는 최소 상태”만 이어 받기. 프로세스는 매번 새로 띄워 재현성 유지.

  1. 적용 팁(당장 쓸 설정)
  • 오늘 구조에서 기본값을 이렇게 두면 균형이 좋다.

    • default_mode=“once”, session_mode는 명시 opt-in

    • session idle_timeout=300s, hard_ttl=1800s

    • state persistence는 “파일로만” 허용(피클 전역 저장은 금지)

    • read_paths는 넉넉히, write_paths는 최소로

    • 네트워크 차단 기본 On, 필요 시만 임시 해제

요약하면, 세션은 “속도와 편의”를 사고 “재현성과 단순성”을 판다. LLM 에이전트에서 실수 방지가 우선이면 일회용을 기본으로, 대형 초기화나 인터랙티브 루프가 필요할 때만 세션을 열고 TTL·리밋으로 다스리는 하이브리드가 가장 안정적이다. 다음 단계로는 너의 툴 레이어에서 기본을 once로 두고, 특정 규칙에 맞으면 session_start를 자동 제안하도록 넣으면 된다.

일단 툴로는 일회용 세션만 지원해보자

좋아. “일회용 세션만” 지원하는 LLM 툴을 깔끔하게 정리했다. 설정은 서버 쪽에만 두고, LLM에는 오직 코드와 타임아웃(+선택적 cwd 상대경로)만 노출한다. 파일/서브프로세스(옵션으로 네트워크)는 감사 훅으로 제어하고, 차단 사유를 stderr에 구체적으로 남긴다.

아래 파일 하나로 끝난다.

# oneoff_tool.py
# Python 3.8+ / 외부 의존성 없음

import asyncio
import multiprocessing as mp
import os, sys, io, time, traceback, contextlib
from typing import List, Optional, Dict, Any

# ============== 서버측 정책(LLM 비노출) ==============
READ_PATHS_DEFAULT  = [os.path.abspath("./data_ro")]    # 읽기 허용 루트(하위 포함). []면 어디든 읽기 허용
WRITE_PATHS_DEFAULT = [os.path.abspath("./sandbox")]    # 쓰기 허용 루트(하위 포함). []면 전부 차단
DEFAULT_CWD         = os.path.abspath("./work")         # 시작 작업 디렉터리
BLOCK_SUBPROC       = True                              # 서브프로세스 차단
BLOCK_NET           = False                             # 필요 시 True
SANITIZE_ENV        = True                              # 환경변수 최소화
ALLOWED_ENV_KEYS    = ["PATH","PYTHONPATH","HOME","TMPDIR"]

os.makedirs(DEFAULT_CWD, exist_ok=True)
for d in WRITE_PATHS_DEFAULT:
    os.makedirs(d, exist_ok=True)

# ============== 내부: 감사 훅 설치 ==============
def _install_guard(read_paths: List[str],
                   write_paths: List[str],
                   allow_read_anywhere: bool,
                   block_subprocess: bool = True,
                   block_network: bool = False) -> None:
    rp = [os.path.realpath(p) for p in (read_paths or [])]
    wp = [os.path.realpath(p) for p in (write_paths or [])]

    def _real(p: str) -> str:
        try: return os.path.realpath(p)
        except Exception: return str(p)

    def _under_any(path: str, roots: List[str]) -> bool:
        path = _real(path)
        for r in roots:
            r = _real(r)
            if path == r or path.startswith(r + os.sep):
                return True
        return False

    def _audit(event, args):
        if event in ("open", "os.open"):
            path = str(args[0])
            mode = str(args[1]) if len(args) > 1 else "r"
            flags = int(args[2]) if len(args) > 2 else 0
            write = any(ch in mode for ch in ("w","a","+","x")) or \
                    (flags & (os.O_WRONLY|os.O_RDWR|os.O_CREAT|os.O_TRUNC|os.O_APPEND))
            if write and not _under_any(path, wp):
                msg = f"[AUDIT] blocked: event={event} path={_real(path)} mode={mode} flags={flags} reason=write-outside-allowed"
                print(msg, file=sys.stderr, flush=True)
                raise PermissionError(msg)
            if (not write) and (not allow_read_anywhere) and not _under_any(path, rp + wp):
                msg = f"[AUDIT] blocked: event={event} path={_real(path)} reason=read-not-allowed"
                print(msg, file=sys.stderr, flush=True)
                raise PermissionError(msg)

        elif event in ("os.rename","os.replace"):
            src, dst = str(args[0]), str(args[1])
            if not (_under_any(src, wp) and _under_any(dst, wp)):
                msg = f"[AUDIT] blocked: event={event} src={_real(src)} dst={_real(dst)} reason=op-outside-allowed"
                print(msg, file=sys.stderr, flush=True)
                raise PermissionError(msg)
        elif event in ("os.remove","os.unlink","os.mkdir","os.makedirs","os.rmdir","os.chmod"):
            path = str(args[0])
            if not _under_any(path, wp):
                msg = f"[AUDIT] blocked: event={event} path={_real(path)} reason=op-outside-allowed"
                print(msg, file=sys.stderr, flush=True)
                raise PermissionError(msg)

        elif block_subprocess and (event.startswith("subprocess.") or event == "os.system"):
            try: cmd = args[0]
            except Exception: cmd = "<unknown>"
            msg = f"[AUDIT] blocked: event={event} cmd={cmd} reason=subprocess-disabled"
            print(msg, file=sys.stderr, flush=True)
            raise PermissionError(msg)

        elif block_network and event.startswith("socket."):
            msg = f"[AUDIT] blocked: event={event} reason=network-disabled"
            print(msg, file=sys.stderr, flush=True)
            raise PermissionError(msg)

    sys.addaudithook(_audit)

# ============== 내부: 자식 워커(일회용) ==============
def _child_once_worker(code: str,
                       read_paths: List[str],
                       write_paths: List[str],
                       cwd: Optional[str],
                       allow_read_anywhere: bool,
                       block_subprocess: bool,
                       block_network: bool,
                       sanitize_env: bool,
                       allowed_env_keys: Optional[List[str]],
                       result_q: mp.Queue) -> None:
    stdout_buf, stderr_buf = io.StringIO(), io.StringIO()
    rc = 0
    try:
        if cwd:
            os.makedirs(cwd, exist_ok=True)
            os.chdir(cwd)

        if sanitize_env:
            keep = set(allowed_env_keys or ["PATH","PYTHONPATH","HOME","TMPDIR"])
            for k in list(os.environ.keys()):
                if k not in keep:
                    os.environ.pop(k, None)

        _install_guard(read_paths, write_paths, allow_read_anywhere,
                       block_subprocess=block_subprocess, block_network=block_network)

        # 옵션: 리소스 한도
        try:
            import resource
            resource.setrlimit(resource.RLIMIT_CPU, (5, 5))
            resource.setrlimit(resource.RLIMIT_FSIZE, (200*1024*1024,)*2)
        except Exception:
            pass

        g = {"__name__": "__main__"}
        with contextlib.redirect_stdout(stdout_buf), contextlib.redirect_stderr(stderr_buf):
            try:
                exec(code, g, g)
            except Exception:
                rc = 1
                print("[ERROR] exception while executing user code:", file=sys.stderr)
                traceback.print_exc()

    except Exception as e:
        rc = 1
        print(f"[ERROR] sandbox init/exec failed: {type(e).__name__}: {e}", file=stderr_buf)
        traceback.print_exc(file=stderr_buf)
    finally:
        result_q.put({"returncode": rc, "stdout": stdout_buf.getvalue(), "stderr": stderr_buf.getvalue()})

def _run_once_blocking(code: str,
                       read_paths: List[str],
                       write_paths: List[str],
                       cwd: Optional[str],
                       allow_read_anywhere: bool,
                       timeout_sec: float,
                       block_subprocess: bool,
                       block_network: bool,
                       sanitize_env: bool,
                       allowed_env_keys: Optional[List[str]]) -> Dict[str, Any]:
    ctx = mp.get_context("spawn")   # 부모 전역 노출 방지
    q: mp.Queue = ctx.Queue(maxsize=1)
    p = ctx.Process(target=_child_once_worker, args=(
        code, read_paths, write_paths, cwd, allow_read_anywhere,
        block_subprocess, block_network, sanitize_env, allowed_env_keys, q
    ))
    p.start()

    t0 = time.time(); result = None
    while True:
        if not q.empty():
            result = q.get()
            break
        if not p.is_alive():
            result = {"returncode": p.exitcode if p.exitcode is not None else 1,
                      "stdout": "", "stderr": "[ERROR] child exited unexpectedly"}
            break
        if time.time() - t0 > timeout_sec:
            p.terminate(); p.join(1.0)
            return {"returncode": 124, "stdout": "", "stderr": f"[TIMEOUT] exceeded {timeout_sec}s"}
        time.sleep(0.01)

    p.join(1.0)
    if p.is_alive():
        p.terminate(); p.join(1.0)
    return result or {"returncode": 1, "stdout": "", "stderr": "[ERROR] no result"}

# ============== 공개 API: 일회용 실행(비동기) ==============
async def run_user_code_once_async(
    code: str,
    *,
    cwd_rel: Optional[str] = None,
    timeout_sec: float = 10.0,
) -> Dict[str, Any]:
    """
    별도 프로세스에서 코드 조각을 한 번 실행하고 stdout/stderr만 반환.
    - cwd_rel: DEFAULT_CWD 기준 상대경로. 허용 루트 밖이면 거부.
    """
    # cwd 결정 및 경로 정규화
    allowed_roots = set([DEFAULT_CWD] + READ_PATHS_DEFAULT + WRITE_PATHS_DEFAULT)
    if cwd_rel:
        target = os.path.realpath(os.path.join(DEFAULT_CWD, cwd_rel))
        if not any(target == r or target.startswith(r + os.sep) for r in allowed_roots):
            return {"returncode": 1, "stdout": "", "stderr": f"[TOOL_ERROR] cwd_rel outside allowed roots: {target}"}
        cwd = target
    else:
        cwd = DEFAULT_CWD

    allow_read_anywhere = (len(READ_PATHS_DEFAULT) == 0)

    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        None,
        _run_once_blocking,
        code,
        READ_PATHS_DEFAULT,
        WRITE_PATHS_DEFAULT,
        cwd,
        allow_read_anywhere,
        timeout_sec,
        BLOCK_SUBPROC,
        BLOCK_NET,
        SANITIZE_ENV,
        ALLOWED_ENV_KEYS,
    )

# ============== LLM 툴 스키마와 핸들러 ==============
python_execute_once_tool = {
  "type": "function",
  "function": {
    "name": "python_execute_once",
    "description": "일회성 파이썬 코드 조각을 샌드박스에서 실행하고 stdout/stderr만 반환합니다.",
    "parameters": {
      "type": "object",
      "properties": {
        "code": {"type": "string"},
        "timeout_sec": {"type": "integer", "default": 10, "minimum": 1, "maximum": 60},
        "cwd_rel": {"type": "string", "description": "기본 작업 디렉터리(DEFAULT_CWD) 기준 상대경로"}
      },
      "required": ["code"],
      "additionalProperties": False
    },
    "strict": True
  }
}

async def tool_python_execute_once(args: Dict[str, Any]) -> Dict[str, Any]:
    try:
        return await run_user_code_once_async(
            args["code"],
            cwd_rel=args.get("cwd_rel"),
            timeout_sec=int(args.get("timeout_sec", 10)),
        )
    except Exception as e:
        return {"returncode": 1, "stdout": "", "stderr": f"[TOOL_ERROR] {type(e).__name__}: {e}"}

# 동기 어댑터(프레임워크가 sync만 지원할 때 선택)
def run_tool_sync(tool_name: str, args: Dict[str, Any]) -> Dict[str, Any]:
    async def _runner():
        if tool_name == "python_execute_once":
            return await tool_python_execute_once(args)
        return {"returncode": 1, "stdout": "", "stderr": "unknown tool"}
    try:
        loop = asyncio.get_running_loop()
        raise RuntimeError("run_tool_sync called in running loop")
    except RuntimeError:
        return asyncio.run(_runner())

연결 방법 요약

  1. 서버에서 READ_PATHS_DEFAULT, WRITE_PATHS_DEFAULT, DEFAULT_CWD 등 정책만 세팅한다. 이 값들은 LLM에 절대 노출하지 않는다.

  2. 에이전트 런타임에 툴 정의로 python_execute_once_tool을 등록한다.

  3. 툴 콜이 들어오면 tool_python_execute_once를 await해서 결과 딕셔너리(returncode/stdout/stderr)를 모델에 재피드한다.

  4. 경로 혼동을 막으려면 정책 경로들은 애초에 절대경로로 두는 게 가장 깔끔하고, LLM에는 cwd_rel만 선택적으로 허용한다.

운용 팁

  • 금지된 파일·연산·커맨드는 stderr에 [AUDIT] 라인으로 이유와 함께 남는다.

  • 대량 병렬이 필요하면 _run_once_blocking을 ProcessPoolExecutor로 바꾸거나 동시 실행 세마포어만 얹으면 된다.

  • 나중에 세션형이 필요해지면, 이 구조 위에 PersistentKernel을 얹어도 인터페이스는 바뀌지 않는다.

기존 만든 PersistentKernel은 활용하는게 좋을거 같은데

좋아. “일회용 툴만 지원”은 유지하되, 내부 엔진은 PersistentKernel을 그대로 써서 실행-정리까지 한 번에 태우는 래퍼를 만들자. 나중에 세션형으로 확장할 때 바뀌는 건 툴 바인딩이 아니라 정책뿐이 되도록.

아래 파일 하나로 끝난다. 핵심은 EphemeralKernelRunner.run_once가 PersistentKernel을 즉시 띄우고(a.start), 코드 실행(aexec), 결과 회수 후 즉시 종료(shutdown)하는 패턴이다. 상단의 READ_PATHS_DEFAULT/WRITE_PATHS_DEFAULT/DEFAULT_CWD만 운영자가 세팅하고, LLM에는 code, timeout_sec, cwd_rel만 노출한다.

# oneoff_with_persistent_kernel.py
# 요구: Python 3.8+, 같은 폴더에 persistent_kernel_async.py가 있어야 함
import os, sys, asyncio
from typing import Dict, Any, Optional, List
from persistent_kernel_async import PersistentKernel  # 앞서 만든 커널 구현

# ===================== 서버 정책(LLM 비노출) =====================
READ_PATHS_DEFAULT  = [os.path.abspath("./data_ro")]   # 읽기 허용 루트(하위 포함). []면 어디든 읽기 허용
WRITE_PATHS_DEFAULT = [os.path.abspath("./sandbox")]   # 쓰기 허용 루트(하위 포함). []면 전부 차단
DEFAULT_CWD         = os.path.abspath("./work")        # 커널 시작 CWD
BLOCK_SUBPROC       = True
BLOCK_NET           = False
SANITIZE_ENV        = True
ALLOWED_ENV_KEYS    = ["PATH","PYTHONPATH","HOME","TMPDIR"]

os.makedirs(DEFAULT_CWD, exist_ok=True)
for d in WRITE_PATHS_DEFAULT:
    os.makedirs(d, exist_ok=True)

def _abslist(paths: Optional[List[str]]) -> List[str]:
    if not paths:
        return []
    base = os.getcwd()
    out = []
    for p in paths:
        out.append(os.path.realpath(p if os.path.isabs(p) else os.path.join(base, p)))
    return out

def _resolve_cwd_rel(cwd_rel: Optional[str]) -> Optional[str]:
    if not cwd_rel:
        return None
    target = os.path.realpath(os.path.join(DEFAULT_CWD, cwd_rel))
    allowed_roots = set(_abslist(READ_PATHS_DEFAULT) + _abslist(WRITE_PATHS_DEFAULT) + [DEFAULT_CWD])
    if not any(target == r or target.startswith(r + os.sep) for r in allowed_roots):
        raise PermissionError(f"cwd_rel outside allowed roots: {target}")
    return target

# ===================== 일회용 실행 러너 =====================
class EphemeralKernelRunner:
    def __init__(self):
        pass

    async def run_once(self, code: str, *, cwd_rel: Optional[str] = None, timeout_sec: float = 10.0) -> Dict[str, Any]:
        k = PersistentKernel()
        # 커널 시작: 여기서 세션은 만들어지지만 이 호출 안에서만 쓰고 곧바로 종료한다.
        await k.start(
            read_paths=READ_PATHS_DEFAULT,
            write_paths=WRITE_PATHS_DEFAULT,
            cwd=_resolve_cwd_rel(cwd_rel) or DEFAULT_CWD,
            allow_read_anywhere=(len(READ_PATHS_DEFAULT) == 0),
            block_subprocess=BLOCK_SUBPROC,
            block_network=BLOCK_NET,
            sanitize_env=SANITIZE_ENV,
            allowed_env_keys=ALLOWED_ENV_KEYS,
            startup_timeout=5.0,
        )
        try:
            # 코드 실행
            res = await k.aexec(code, timeout=timeout_sec)
            # res = {"returncode": int, "stdout": str, "stderr": str}
            return res
        finally:
            # 항상 종료(세션 유지 없음)
            try:
                await k.shutdown()
            except Exception:
                pass

# ===================== LLM 툴 스키마 및 핸들러 =====================
python_execute_once_tool = {
  "type": "function",
  "function": {
    "name": "python_execute_once",
    "description": "일회성 파이썬 코드 조각을 PersistentKernel 기반 샌드박스에서 실행하고 stdout/stderr만 반환합니다.",
    "parameters": {
      "type": "object",
      "properties": {
        "code": {"type": "string"},
        "timeout_sec": {"type": "integer", "default": 10, "minimum": 1, "maximum": 60},
        "cwd_rel": {"type": "string", "description": "기본 작업 디렉터리(DEFAULT_CWD) 기준 상대경로(허용 루트 밖이면 거부)."}
      },
      "required": ["code"],
      "additionalProperties": False
    },
    "strict": True
  }
}

_runner = EphemeralKernelRunner()

async def tool_python_execute_once(args: Dict[str, Any]) -> Dict[str, Any]:
    try:
        code = args["code"]
        cwd_rel = args.get("cwd_rel")
        timeout = int(args.get("timeout_sec", 10))
        return await _runner.run_once(code, cwd_rel=cwd_rel, timeout_sec=timeout)
    except Exception as e:
        return {"returncode": 1, "stdout": "", "stderr": f"[TOOL_ERROR] {type(e).__name__}: {e}"}

# 동기 어댑터(프레임워크가 sync만 지원할 때 선택)
def run_tool_sync(tool_name: str, args: Dict[str, Any]) -> Dict[str, Any]:
    async def _runner_main():
        if tool_name == "python_execute_once":
            return await tool_python_execute_once(args)
        return {"returncode": 1, "stdout": "", "stderr": f"unknown tool: {tool_name}"}
    try:
        loop = asyncio.get_running_loop()
        raise RuntimeError("run_tool_sync called inside event loop")
    except RuntimeError:
        return asyncio.run(_runner_main())

# =============== 간단 데모 ===============
if __name__ == "__main__":
    async def main():
        demo = r"""
import os, sys, subprocess
print("CWD:", os.getcwd())
open("out_demo.txt","w").write("ok")  # WRITE_PATHS_DEFAULT 하위여야 통과
try:
    subprocess.check_output(["echo","hi"])
except Exception as e:
    print("blocked subproc:", type(e).__name__)
print("done")
"""
        # 데모용: write_paths에 DEFAULT_CWD를 넣었거나, DEFAULT_CWD/out_demo.txt가 WRITE_PATHS_DEFAULT 안에 있어야 함
        res = await tool_python_execute_once({"code": demo, "timeout_sec": 5, "cwd_rel": ""})
        print("RC:", res["returncode"])
        print("STDOUT:\n", res["stdout"])
        print("STDERR:\n", res["stderr"])

    if sys.platform == "win32":
        import multiprocessing as mp
        mp.freeze_support()
    asyncio.run(main())

동작 흐름 요약

  1. LLM이 python_execute_once를 호출하면 EphemeralKernelRunner가 PersistentKernel을 즉시 띄운다.

  2. 커널은 감사 훅이 설치된 샌드박스 상태로 code를 exec하고 stdout/stderr만 반환한다.

  3. 실행 직후 반드시 shutdown하므로 세션은 남지 않는다. 세션 유지가 필요해지면 이 러너 대신 아까 만든 PersistentKernel 툴셋을 그대로 노출하면 된다.

경로 혼동 방지 팁
DEFAULT_CWD, READ_PATHS_DEFAULT, WRITE_PATHS_DEFAULT는 모두 절대경로로 두는 게 가장 깔끔하다. LLM에 노출되는 cwd_rel은 DEFAULT_CWD 기준 상대경로만 받으며, 허용 루트 밖이면 거부한다.

Categories:

Updated: