Python MCP 연결 방법
너는 LLM Agent 전문가야. MCP 두 개 이상(이건 입력으로 커맨드를 여러개 받을 수 있어)을 띄우고 모든 MCP 툴을 LLM에게 전달 및 실행하고 싶어.
좋아, 여러 개의 MCP 서버를 동시에 붙여서 “모든 MCP 툴을 LLM이 한 번에 보고 실행”하게 만드는 방법을 두 가지 경로로 정리해줄게. 첫째는 Anthropic Messages API의 MCP 커넥터를 그대로 쓰는 방법(가장 간단). 둘째는 직접 MCP 클라이언트를 만들어 여러 서버에 붙고, 툴 카탈로그를 합쳐서 LLM에게 제공하는 방법(커스텀 제어). 단계별로 바로 따라하면 된다. 인용구문은 쓰지 않는다.
1) 가장 쉬운 길: Claude Messages API의 MCP 커넥터로 여러 서버 붙이기
핵심은 messages.create 호출에 mcp_servers 배열을 넣어 여러 서버를 동시에 선언하는 것. 서버마다 이름을 부여하면 충돌 없이 툴을 구분할 수 있고, 모델은 각 서버에서 노출된 MCP 툴을 필요할 때 호출한다. 베타 헤더가 필요하다.
요청 바디 예시(둘 이상의 서버 연결):
{
"model": "claude-sonnet-4-5",
"max_tokens": 1000,
"messages": [
{ "role": "user", "content": "두 서버의 툴을 모두 활용해서 작업을 끝내줘" }
],
"mcp_servers": [
{
"type": "url",
"url": "https://mcp.example1.com/sse",
"name": "mcp-server-1",
"authorization_token": "TOKEN1",
"tool_configuration": {
"enabled": true
}
},
{
"type": "url",
"url": "https://mcp.example2.com/sse",
"name": "mcp-server-2",
"authorization_token": "TOKEN2"
}
]
}
이 방식의 포인트
-
한 요청에서 여러 MCP 서버의 툴을 모두 노출할 수 있다.
-
서버 이름(
name)이 툴 호출 블록에 함께 들어가서 충돌을 피한다. -
현재는 “MCP의 툴 호출”만 지원(리소스/프롬프트 등은 제외)되고, HTTP로 공개된 원격 서버만 직접 연결된다(로컬 stdio는 불가). docs.anthropic.com
문서 근거와 스펙
• MCP 커넥터 사용법과 멀티 서버 예시는 Claude 공식 문서의 “MCP connector”와 “Multiple MCP servers” 섹션에 그대로 있다. docs.anthropic.com
• MCP 자체 개념과 스펙은 프로토콜 사이트와 스펙 문서 참고. 모델 컨텍스트 프로토콜+1
이 경로로 가면 “LLM에게 MCP 툴 전체를 그대로 전달하고, 실행은 모델이 알아서 서버별로 라우팅”해준다. 운영에서 OAuth 토큰이 필요한 서버는 authorization_token 필드로 전달하면 된다. docs.anthropic.com
2) 더 유연한 길: 직접 MCP 클라이언트를 만들어 “멀티 서버 → 단일 툴 레지스트리”로 합치기
로컬/사내용 툴까지 다 엮고 싶거나, 툴 노출/로그/권한을 섬세하게 통제하려면 직접 클라이언트를 만드는 편이 좋다. 공식 SDK가 있다(타입스크립트, 파이썬 등). GitHub+1
아키텍처 개요
-
각 MCP 서버와 별도의 세션으로 연결(stdio 또는 HTTP).
-
모든 서버에서
list_tools를 받아 합친다. 이때 충돌을 막기 위해serverName:toolName같은 네임스페이스를 붙인다. -
합쳐진 툴 목록을 LLM의 “도구(Function/Tool Use)”로 등록한다.
-
LLM이
serverName:toolName을 호출하면, 해당 서버 세션으로 라우팅해서call_tool실행 → 결과를 모델에tool_result로 되돌린다.
예시 코드(타입스크립트, Node). 여러 서버에서 툴을 모아 네임스페이스화하고, 호출 시 라우팅한다. SDK는 공식 TS SDK를 사용한다. GitHub
ts
// package.json: @modelcontextprotocol/sdk, zod, anthropic 등 설치 가정
// npm i @modelcontextprotocol/sdk zod anthropic
import { ClientSession } from '@modelcontextprotocol/sdk/client/session.js';
import { StdioClientTransport, StdioServerParameters } from '@modelcontextprotocol/sdk/client/stdio.js';
import Anthropic from '@anthropic-ai/sdk';
// 서버 설정
type ServerCfg = { name: string; command: string; args: string[] };
const SERVERS: ServerCfg[] = [
{ name: 'files', command: 'node', args: ['./servers/files/index.js'] },
{ name: 'db', command: 'python', args: ['./servers/db/server.py'] },
];
type Connected = {
name: string;
session: ClientSession;
};
const connected: Connected[] = [];
async function connectAll() {
for (const s of SERVERS) {
const params: StdioServerParameters = { command: s.command, args: s.args };
const transport = new StdioClientTransport(params);
await transport.start();
const session = new ClientSession(transport);
await session.initialize();
connected.push({ name: s.name, session });
console.log(`[MCP] connected: ${s.name}`);
}
}
type ToolDesc = {
name: string; // namespaced: `${server}:${tool}`
description?: string;
input_schema: any; // JSON Schema
};
// 모든 서버의 툴을 수집하여 네임스페이스 부여
async function collectTools(): Promise<ToolDesc[]> {
const all: ToolDesc[] = [];
for (const { name: serverName, session } of connected) {
const { tools } = await session.listTools();
for (const t of tools) {
all.push({
name: `${serverName}:${t.name}`,
description: t.description,
input_schema: t.inputSchema,
});
}
}
return all;
}
// LLM이 namespaced 툴을 호출하면 해당 서버로 라우팅
async function callNamespacedTool(nsName: string, args: any) {
const [serverName, toolName] = nsName.split(':');
const target = connected.find(c => c.name === serverName);
if (!target) throw new Error(`Server not found: ${serverName}`);
return target.session.callTool(toolName, args);
}
async function main() {
await connectAll();
const tools = await collectTools();
const anthropic = new Anthropic({ apiKey: process.env.ANTHROPIC_API_KEY! });
// 대화 루프의 한 턴 예시
let messages: any[] = [{ role: 'user', content: 'CSV를 DB에 적재하고 요약해줘' }];
// Anthropic의 Tool Use 포맷에 맞춰 툴 전달
const toolDefs = tools.map(t => ({
name: t.name,
description: t.description,
input_schema: t.input_schema,
}));
let resp = await anthropic.messages.create({
model: 'claude-3-5-sonnet-20241022',
max_tokens: 1000,
messages,
tools: toolDefs,
});
// tool_use 처리 루프
while (resp.content.some((c: any) => c.type === 'tool_use')) {
const assistantContent: any[] = [];
const toolResults: any[] = [];
for (const c of resp.content) {
if (c.type === 'text') assistantContent.push(c);
if (c.type === 'tool_use') {
const result = await callNamespacedTool(c.name, c.input);
toolResults.push({
type: 'tool_result',
tool_use_id: c.id,
content: result.content, // MCP 표준 content 배열
});
}
}
messages.push({ role: 'assistant', content: assistantContent.concat(resp.content.filter((c: any) => c.type === 'tool_use')) });
messages.push({ role: 'user', content: toolResults });
resp = await anthropic.messages.create({
model: 'claude-3-5-sonnet-20241022',
max_tokens: 1000,
messages,
tools: toolDefs,
});
}
console.log(resp.content.map((c: any) => (c.type === 'text' ? c.text : '')).join('\n'));
}
main().catch(e => { console.error(e); process.exit(1); });
이 패턴의 장점
• 어떤 LLM이든(Anthropic/OpenAI/로컬) 표준 “툴 호출” 인터페이스만 있으면 같은 레지스트리를 재활용할 수 있다.
• 서버별 권한, 로깅, 레이트 리밋, 입력 검증을 중앙에서 통제 가능.
• 네임스페이스 전략으로 툴 이름 충돌을 확실히 방지.
• stdio 로컬 서버와 원격 HTTP 서버를 섞어서 붙일 수 있다(직접 클라이언트 방식이므로). SDK가 stdio/HTTP 모두 지원한다. GitHub
설치·참고
• 공식 TS SDK와 예제 서버/클라이언트, Streamable HTTP/stdio 전송계층은 깃허브 README와 예제에 상세. GitHub
• “Build an MCP client” 튜토리얼(파이썬/노드 포함)도 멀티 서버로 확장하기 쉽다. 위 예시 로직은 그 흐름을 일반화한 것. 모델 컨텍스트 프로토콜
운영 팁과 함정(실무 체크리스트)
네임스페이스 규칙
• ${server}:${tool}로 고정하면 라우팅이 단순해진다. 툴 설명에도 서버명을 적어 LLM의 선택을 돕는다.
• 모델이 “비슷한 두 툴”을 혼동할 수 있으니 설명에 차이를 명확히 쓰고 스키마에 제약을 넣는다.
권한과 안전장치
• 서버별 allowed_tools로 노출 툴을 제한하거나, 자체 클라이언트라면 툴 화이트리스트를 적용한다. docs.anthropic.com
• 파괴적 툴(삭제/결제 등)은 “dry_run” 플래그를 스키마에 추가해 2단계 확인을 강제.
• 프롬프트 인젝션 방지: 서버 측에서 입력 검증과 URL/경로 화이트리스트를 둔다.
성능/안정성
• 초기에 모든 서버에 list_tools를 한 번만 호출하고 캐싱하라.
• 툴 호출은 타임아웃과 재시도를 둔다. 실패 시 에러 메시지와 복구 힌트를 모델에게 짧게 제공하면 후속 계획을 모델이 스스로 조정한다.
• 결과가 큰 경우(파일/표)는 리소스 링크를 반환하도록 MCP 쪽에서 설계하면 토큰 압박을 줄일 수 있다(리소스는 커넥터 경로에 따라 제한될 수 있음).
배포 토막 가이드
• 원격 연결 위주면 Claude의 MCP 커넥터가 가장 빠른 온보딩이다(단, HTTP 공개 필요). docs.anthropic.com
• 사내망·로컬 혼합이면 직접 클라이언트 구성이 유리하다. TS SDK의 streamable HTTP + stdio를 혼용하면 된다. GitHub
다음 확장 아이디어
• “자동 라우터”: 툴 메타데이터에 domain 태그를 두고, 사용자 요청을 분류해 관련 서버 툴만 모델에 노출(토큰/혼란 감소).
• “관측성”: 모든 tool_use/tool_result를 이벤트로 로그 수집해 실패율/지연/입력 분포를 대시보드로 본다.
• “권한 승격”: 고위험 툴 호출은 사람 확인을 요구하는 인가 훅을 중앙 클라이언트에 추가.
원하면 네가 쓰려는 서버 목록과 전송 방식(stdio/HTTP), 붙일 모델(Claude/OpenAI 등)을 알려줘. 거기에 맞춰 완성 스캐폴딩을 바로 내줄게.
타입스크립트 말고 파이썬 mcp 로 부탁해 그리고 커스텀 엔드포인트 LLM을 사용할거야.
좋아. 파이썬으로 MCP 다중 서버를 붙이고, 커스텀 LLM 엔드포인트(자체 호스팅 모델)에 “툴 콜링”을 돌리는 전체 오케스트레이터 스캐폴딩을 바로 줄게. 핵심 아이디어는 두 가지다. 첫째, 여러 MCP 서버에 각각 세션을 열고 툴 목록을 합쳐서 네임스페이스(server:tool)로 노출. 둘째, 커스텀 LLM의 /chat API에 툴 스키마를 전달하고, 응답 내 tool_calls를 받아 해당 MCP 서버로 라우팅해 실행 결과를 다시 모델에 피드백하는 루프를 돈다.
아래 코드는 바로 실행 가능한 단일 파일 예시다. 커스텀 LLM의 API 포맷은 OpenAI 함수콜과 유사한 형태를 가정했지만, 클래스 한 군데만 바꾸면 어떤 엔드포인트든 맞출 수 있도록 어댑터를 분리해두었다.
설치 가정
pip install mcp requests pydantic orjson
main.py
import asyncio
import json
import os
from typing import Any, Dict, List, Optional, Tuple
import orjson
import requests
from pydantic import BaseModel, Field
# MCP Python SDK
from mcp.client.session import ClientSession
from mcp.client.stdio import StdioClientTransport, StdioServerParameters
try:
# 선택: SSE(원격 HTTP) 서버를 붙일 때
from mcp.client.sse import SSEClientTransport
HAVE_SSE = True
except Exception:
HAVE_SSE = False
#######################################################################
# 0) 데이터 모델: LLM과 툴콜 인터페이스(커스텀 엔드포인트 어댑터용)
#######################################################################
class ChatMessage(BaseModel):
role: str
content: str
class ToolDef(BaseModel):
# OpenAI style에 가깝게 노멀라이즈해 커스텀 LLM에 전달
type: str = "function"
function: Dict[str, Any] # {name, description, parameters(JSON Schema)}
class ToolCall(BaseModel):
id: str
type: str = "function"
function: Dict[str, Any] # {name, arguments(JSON str)}
class ChatResponse(BaseModel):
content: str = ""
tool_calls: List[ToolCall] = Field(default_factory=list)
# 필요 시 stop_reason 등 추가
#######################################################################
# 1) MCP 연결 유틸: 여러 서버 연결하고 툴 합치기(네임스페이스)
#######################################################################
class MCPServerCfg(BaseModel):
name: str
# stdio 또는 sse
kind: str # "stdio" | "sse"
# stdio
command: Optional[str] = None
args: Optional[List[str]] = None
# sse
url: Optional[str] = None
headers: Optional[Dict[str, str]] = None
class ConnectedServer(BaseModel):
name: str
session: ClientSession
class MCPMux:
def __init__(self, servers: List[MCPServerCfg]):
self._servers_cfg = servers
self._sessions: List[ConnectedServer] = []
async def connect_all(self):
for cfg in self._servers_cfg:
if cfg.kind == "stdio":
params = StdioServerParameters(command=cfg.command, args=cfg.args or [])
transport = StdioClientTransport(params)
await transport.start()
session = ClientSession(transport)
await session.initialize()
elif cfg.kind == "sse":
if not HAVE_SSE:
raise RuntimeError("mcp.client.sse 모듈이 설치되어야 SSE 연결을 사용할 수 있습니다.")
transport = SSEClientTransport(url=cfg.url, headers=cfg.headers or {})
await transport.start()
session = ClientSession(transport)
await session.initialize()
else:
raise ValueError(f"unknown server kind: {cfg.kind}")
self._sessions.append(ConnectedServer(name=cfg.name, session=session))
async def list_tools(self) -> List[Dict[str, Any]]:
merged = []
for srv in self._sessions:
tools_resp = await srv.session.list_tools()
for t in tools_resp.tools:
# t.name, t.description, t.input_schema
merged.append({
"server": srv.name,
"name": t.name,
"description": getattr(t, "description", None),
"parameters": getattr(t, "input_schema", {}) # JSON Schema
})
return merged
async def call(self, namespaced: str, args: Dict[str, Any]) -> Dict[str, Any]:
# namespaced = "server:tool"
try:
server_name, tool_name = namespaced.split(":", 1)
except ValueError:
raise ValueError(f"툴 이름은 'server:tool' 형식이어야 합니다: {namespaced}")
for srv in self._sessions:
if srv.name == server_name:
result = await srv.session.call_tool(tool_name, args or {})
# MCP 표준 content를 단일 문자열 또는 JSON으로 정리
# MCP content는 list of {type: "text"|"json"|"blob", ...}
# 여기서는 간단히 text/json만 지원 예시
packed: Dict[str, Any] = {"content": []}
for c in result.content:
if c.type == "text":
packed["content"].append({"type": "text", "text": c.text})
elif c.type == "json":
packed["content"].append({"type": "json", "json": c.json})
else:
# blob/resource 등은 필요 시 확장
packed["content"].append({"type": c.type})
return packed
raise KeyError(f"서버를 찾을 수 없음: {server_name}")
#######################################################################
# 2) 커스텀 LLM 어댑터: 너의 사설 /chat 엔드포인트에 맞춰 조정
#######################################################################
class CustomLLMClient:
"""
너의 커스텀 엔드포인트 규약을 한 곳에 캡슐화.
아래는 예시 규약:
POST {base_url}/chat
json: {
"model": "...",
"messages": [{"role": "...", "content": "..."}],
"tools": [{"type":"function","function":{"name":..., "description":..., "parameters":{...}}}]
}
응답:
{
"content": "assistant text...",
"tool_calls": [
{"id":"...", "type":"function", "function":{"name":"server:tool","arguments":"{\"k\":\"v\"}"}}
]
}
필요한 경우 이 클래스만 수정하면 된다.
"""
def __init__(self, base_url: str, model: str, api_key: Optional[str] = None, extra_headers: Optional[Dict[str, str]] = None):
self.base_url = base_url.rstrip("/")
self.model = model
self.api_key = api_key
self.extra_headers = extra_headers or {}
def _headers(self) -> Dict[str, str]:
h = {"content-type": "application/json"}
if self.api_key:
h["authorization"] = f"Bearer {self.api_key}"
h.update(self.extra_headers)
return h
def send(self, messages: List[ChatMessage], tools: List[ToolDef]) -> ChatResponse:
payload = {
"model": self.model,
"messages": [m.model_dump() for m in messages],
"tools": [t.model_dump() for t in tools] if tools else []
}
r = requests.post(f"{self.base_url}/chat", data=orjson.dumps(payload), headers=self._headers(), timeout=60)
r.raise_for_status()
data = r.json()
return ChatResponse(**data)
#######################################################################
# 3) 오케스트레이터: 툴 병합 → LLM 대화 루프 → MCP 라우팅
#######################################################################
def to_llm_tools(merged_tools: List[Dict[str, Any]]) -> List[ToolDef]:
defs: List[ToolDef] = []
for t in merged_tools:
namespaced = f"{t['server']}:{t['name']}"
fn = {
"name": namespaced,
"description": t.get("description") or f"MCP tool {namespaced}",
"parameters": t.get("parameters") or {"type": "object", "properties": {}}
}
defs.append(ToolDef(function=fn))
return defs
def coerce_arguments(args_raw: Any) -> Dict[str, Any]:
if args_raw is None:
return {}
if isinstance(args_raw, dict):
return args_raw
if isinstance(args_raw, str):
if args_raw.strip() == "":
return {}
try:
return json.loads(args_raw)
except Exception:
# 모델이 JSON이 아닌 걸 내는 경우 방어
return {"_raw": args_raw}
return {"_raw": args_raw}
async def run_conversation(
mux: MCPMux,
llm: CustomLLMClient,
user_prompt: str,
system_prompt: Optional[str] = None,
max_rounds: int = 8
):
tools_catalog = await mux.list_tools()
llm_tools = to_llm_tools(tools_catalog)
messages: List[ChatMessage] = []
if system_prompt:
messages.append(ChatMessage(role="system", content=system_prompt))
messages.append(ChatMessage(role="user", content=user_prompt))
for _ in range(max_rounds):
resp = llm.send(messages, llm_tools)
# 우선 어시스턴트 텍스트가 있으면 추가
assistant_text = resp.content or ""
if assistant_text:
messages.append(ChatMessage(role="assistant", content=assistant_text))
# 툴콜 없으면 종료
if not resp.tool_calls:
break
# MCP 라우팅 → 결과를 tool_result로 user에 피드백
tool_results_acc: List[str] = []
for call in resp.tool_calls:
name = call.function.get("name", "")
args = coerce_arguments(call.function.get("arguments"))
try:
result = await mux.call(name, args)
# 결과를 다시 모델에게 피드백: 단순화를 위해 JSON 텍스트로 합침
tool_results_acc.append(orjson.dumps({
"tool_call_id": call.id,
"tool_name": name,
"arguments": args,
"result": result
}).decode())
except Exception as e:
tool_results_acc.append(orjson.dumps({
"tool_call_id": call.id,
"tool_name": name,
"arguments": args,
"error": str(e)
}).decode())
# 다수 툴 호출 결과를 하나의 user 메시지로 재주입
feedback_text = "\n".join(tool_results_acc)
messages.append(ChatMessage(role="user", content=f"TOOL_RESULTS\n{feedback_text}"))
return messages
#######################################################################
# 4) 샘플 엔트리포인트
#######################################################################
async def main():
# 4-1) 붙일 MCP 서버들 정의
servers = [
# 로컬 stdio 예시
MCPServerCfg(name="files", kind="stdio", command="node", args=["./servers/files/index.js"]),
MCPServerCfg(name="db", kind="stdio", command="python", args=["./servers/db/server.py"]),
# 원격 SSE 예시(있다면)
# MCPServerCfg(name="search", kind="sse", url="https://mcp.example.com/sse", headers={"authorization":"Bearer X"}),
]
mux = MCPMux(servers)
await mux.connect_all()
# 4-2) 커스텀 LLM 엔드포인트 설정
llm = CustomLLMClient(
base_url=os.environ.get("LLM_BASE_URL", "http://localhost:8000"),
model=os.environ.get("LLM_MODEL", "my-model"),
api_key=os.environ.get("LLM_API_KEY"), # 필요 시
extra_headers={"x-tenant": "demo"} # 필요 시
)
# 4-3) 대화 한 턴 예시
system = "너는 MCP 툴을 활용해 일을 끝내는 에이전트다. 위험 작업은 dry_run을 선호하고, 명시적으로 승인받기 전에는 파괴적 변경을 하지 않는다."
user = "CSV를 files 서버의 ingest_csv 툴로 적재하고, db 서버의 sql_summary로 요약해줘. 파일 경로는 ./data/sales.csv"
messages = await run_conversation(mux, llm, user_prompt=user, system_prompt=system, max_rounds=8)
print("=== 최종 대화 로그 ===")
for m in messages:
print(f"[{m.role}] {m.content[:2000]}")
if __name__ == "__main__":
asyncio.run(main())
동작 흐름 설명
1단계. MCPMux가 각 서버에 세션을 연다. stdio 서버는 command+args로 프로세스를 실행하고, sse 서버는 HTTP SSE로 연결한다.
2단계. list_tools를 모든 서버에서 모아서 네임스페이스(server:tool)로 변환한다.
3단계. 커스텀 LLM 엔드포인트에 messages와 tools를 넘긴다. tools는 OpenAI식 function 포맷으로 정규화해서 전달한다.
4단계. LLM이 tool_calls를 반환하면, 각 call의 function.name이 server:tool이므로 해당 서버로 라우팅해서 call_tool을 실행한다.
5단계. 결과를 TOOL_RESULTS 텍스트로 user 역할 메시지에 합쳐서 모델에 다시 주입한다. 모델은 추가적인 tool_calls를 내거나, 최종 답변을 낸다.
6단계. tool_calls가 더 이상 없으면 루프 종료.
운영 팁
네임스페이스 고정. server:tool 네이밍을 스키마 description에도 반복해서 LLM의 선택 오류를 줄인다.
권한 경계. 삭제·결제·배포 같은 툴에는 parameters에 dry_run(boolean, default true) 같은 필드를 추가해 2단계 승인을 유도하라.
타임아웃·재시도. SSE나 stdio 모두 외부 프로세스/네트워크에 민감하니 call_tool에는 서버 측 타임아웃과 재시도를 둔다. 필요 시 MCPMux.call에 재시도를 추가하라.
대용량 결과. 파일·표·바이너리 결과는 MCP 리소스(파일 핸들)나 외부 저장소 URL을 반환하고, LLM에는 메타데이터만 보내 토큰을 아끼는 설계를 추천한다.
적응층. CustomLLMClient를 너의 실제 /chat 규격에 맞게 수정한다. 예컨대 문서화된 JSON 키 이름이 다르면 ChatResponse 파싱만 바꾸면 된다.
테스트 방법
가짜 서버부터. servers 디렉터리에 echo 수준의 간단한 MCP 서버 두 개(files, db)를 만들어 list_tools와 call_tool만 최소 구현한다.
단위 테스트. CustomLLMClient.send를 모킹해 tool_calls를 강제로 내보내고, MCPMux.call이 올바른 서버로 라우팅되는지 확인한다.
엔드투엔드. 실제 LLM 엔드포인트로 교체해 CSV → SQL 요약 같은 워크플로를 한 번에 돌려본다.
확장 아이디어
자동 라우팅. 도메인 태그(files/db/search 등)를 보고 관련 툴만 tools 배열에 노출해 토큰과 혼란을 줄인다.
감사 로그. 모든 tool_call 입력·출력을 JSONL로 파일에 적재해 지연, 실패율, 위험 호출을 모니터링한다.
휴먼 게이트. 위험한 툴콜이 감지되면 CustomLLMClient로 바로 보내지 말고, 별도 승인 UI 혹은 CLI에서 1회 승인 후 재주입한다.
원하는 /chat 규격(JSON 예시 스펙)이나 실제 엔드포인트 샘플을 알려주면, 위 어댑터를 거기에 딱 맞춰 구체화해서 리라이트해줄 수 있다.
질문이 있어. 1. 그냥 툴을 그대로 보내면 오류가 더 많을까? 2. 일반 툴 콜링과 섞여 있는 예시도 보여주면 좋겠어.
좋아, 두 가지 모두 딱 꽂히게 정리해줄게. 결론 먼저, 그다음 바로 실행 가능한 파이썬 예시로 마무리한다.
- 그냥 “서버에서 받은 툴을 전부” 모델에 그대로 보여주면 오류가 더 많을까?
짧은 답: 보통 그렇다. 이유는 예측 가능하다.
-
과적재(too many tools). 툴이 많아질수록 선택 오류가 늘고, 모델이 비슷한 툴을 혼동한다. 비슷한 이름, 비슷한 설명은 특히 위험하다.
-
스키마 느슨함. parameters(JSON Schema)에 타입·필수 필드·enum 제약이 약하면 arguments 형식 오류가 증가한다.
-
문맥 미스매치. 현재 턴과 관계없는 툴까지 모두 보이면 “근처에 있는 도구” 편향 때문에 엉뚱한 걸 집는다.
-
네임 충돌·의도 충돌. 서로 다른 서버에 같은 이름, 유사 기능이 있으면 라우팅·선택 모두 흔들린다.
-
부작용 위험. 삭제/결제/배포 같은 툴을 아무 맥락 없이 노출하면 모델이 성급히 실행하려 한다(모델은 과감하다).
-
레이턴시·실패 전파. 원격/SSE 툴 다수는 타임아웃과 429가 뒤섞여 에러 증폭을 만든다.
현업에서 에러를 줄이는 실전 처방
-
턴별 툴 소출(Selective exposure). 유저 입력을 기준으로 관련 서버/도메인의 툴만 노출한다. 단순 키워드 라우팅만 넣어도 실패율이 뚝 떨어진다.
-
네임스페이스 필수. server:tool 이름으로 고정하고, description에도 서버/도메인을 반복 명시한다.
-
스키마를 엄격하게. type/enum/minItems/format까지 빡빡하게 걸어라. 가능하면 예제(example) 필드까지 넣는다.
-
고위험 툴에 세이프티 플래그. dry_run 기본값 true, confirmation string 같은 이중 잠금.
-
응답 정규화. MCP tool_result를 항상 같은 JSON 포맷으로 묶어 모델에 재주입해 학습된 기대 형태를 유지한다.
-
타임아웃/재시도/백오프 표준화. 서버별 기본값이 달라도 클라이언트에서 상한을 통일한다.
-
툴 캡. 한 턴에 최대 10~20개 정도만 노출(경험상 8~12에서 가장 안정적).
- “일반 툴 콜링”과 “MCP 툴 콜링”이 섞여 있는 예시
아래는 이전 파이썬 스캐폴딩을 얇게 확장한 버전이다. 핵심은 디스패처에 로컬(일반) 툴 네임스페이스 local.* 를 추가하고, LLM이 같은 턴에서 local.* 과 server:tool(MCP)을 섞어서 호출해도 자연스럽게 라우팅되는 구조다.
필요 패키지
pip install mcp requests pydantic orjson
mixed_tools.py
import asyncio
import json
from typing import Any, Dict, List, Optional
import orjson
import requests
from pydantic import BaseModel, Field
from mcp.client.session import ClientSession
from mcp.client.stdio import StdioClientTransport, StdioServerParameters
# ======== 공용 데이터 모델 ========
class ChatMessage(BaseModel):
role: str
content: str
class ToolDef(BaseModel):
type: str = "function"
function: Dict[str, Any]
class ToolCall(BaseModel):
id: str
type: str = "function"
function: Dict[str, Any]
class ChatResponse(BaseModel):
content: str = ""
tool_calls: List[ToolCall] = Field(default_factory=list)
# ======== MCP 멀티 서버 MUX ========
class MCPServerCfg(BaseModel):
name: str
command: str
args: List[str] = Field(default_factory=list)
class ConnectedServer(BaseModel):
name: str
session: ClientSession
class MCPMux:
def __init__(self, servers: List[MCPServerCfg]):
self._cfgs = servers
self._sessions: List[ConnectedServer] = []
async def connect_all(self):
for cfg in self._cfgs:
transport = StdioClientTransport(StdioServerParameters(command=cfg.command, args=cfg.args))
await transport.start()
sess = ClientSession(transport)
await sess.initialize()
self._sessions.append(ConnectedServer(name=cfg.name, session=sess))
async def list_tools(self) -> List[Dict[str, Any]]:
merged = []
for srv in self._sessions:
tools = await srv.session.list_tools()
for t in tools.tools:
merged.append({
"server": srv.name,
"name": t.name,
"description": getattr(t, "description", f"MCP {srv.name}:{t.name}"),
"parameters": getattr(t, "input_schema", {"type": "object"})
})
return merged
async def call(self, namespaced: str, args: Dict[str, Any]) -> Dict[str, Any]:
server, tool = namespaced.split(":", 1)
for srv in self._sessions:
if srv.name == server:
res = await srv.session.call_tool(tool, args or {})
packed = {"content": []}
for c in res.content:
if c.type == "text":
packed["content"].append({"type": "text", "text": c.text})
elif c.type == "json":
packed["content"].append({"type": "json", "json": c.json})
else:
packed["content"].append({"type": c.type})
return packed
raise KeyError(f"unknown server {server}")
# ======== 로컬(일반) 툴: 예시 구현 ========
def local_tools_catalog() -> List[Dict[str, Any]]:
return [
{
"name": "local.math_eval",
"description": "안전한 사칙연산/간단식 평가기. 입력: {expr: string}",
"parameters": {
"type": "object",
"properties": {"expr": {"type": "string"}},
"required": ["expr"],
"additionalProperties": False
}
},
{
"name": "local.top_k_keywords",
"description": "텍스트에서 자주 등장하는 단어 상위 K를 뽑아준다. 입력: {text: string, k: integer}",
"parameters": {
"type": "object",
"properties": {
"text": {"type": "string"},
"k": {"type": "integer", "minimum": 1, "maximum": 50}
},
"required": ["text", "k"],
"additionalProperties": False
}
}
]
def run_local_tool(name: str, args: Dict[str, Any]) -> Dict[str, Any]:
if name == "local.math_eval":
# 아주 보수적인 계산기(숫자, + - * / () 만 허용)
expr = args.get("expr", "")
if not expr or any(ch for ch in expr if ch not in "0123456789+-*/(). "):
return {"error": "expr must contain only digits and + - * / ( )"}
try:
val = eval(expr, {"__builtins__": {}}, {})
return {"content": [{"type": "json", "json": {"value": val}}]}
except Exception as e:
return {"error": str(e)}
if name == "local.top_k_keywords":
text = args.get("text", "")
k = int(args.get("k", 5))
import re, collections
toks = re.findall(r"[A-Za-z0-9가-힣]+", text)
cnt = collections.Counter([t.lower() for t in toks])
top = cnt.most_common(k)
return {"content": [{"type": "json", "json": {"top": top}}]}
return {"error": f"unknown local tool {name}"}
# ======== 커스텀 LLM 어댑터(예시) ========
class CustomLLMClient:
def __init__(self, base_url: str, model: str):
self.base_url = base_url.rstrip("/")
self.model = model
def _headers(self):
return {"content-type": "application/json"}
def send(self, messages: List[ChatMessage], tools: List[ToolDef]) -> ChatResponse:
payload = {
"model": self.model,
"messages": [m.model_dump() for m in messages],
"tools": [t.model_dump() for t in tools]
}
r = requests.post(f"{self.base_url}/chat", data=orjson.dumps(payload), headers=self._headers(), timeout=60)
r.raise_for_status()
return ChatResponse(**r.json())
# ======== 툴 노출 전략(간단 소출 예시) ========
def selective_expose(user_text: str, all_mcp: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
keep: List[Dict[str, Any]] = []
text = user_text.lower()
# 키워드 기반 라우팅(아주 얕은 예시)
if any(k in text for k in ["csv", "파일", "ingest", "load"]):
keep += [t for t in all_mcp if t["server"] == "files"]
if any(k in text for k in ["sql", "쿼리", "db", "데이터베이스", "요약", "summary"]):
keep += [t for t in all_mcp if t["server"] == "db"]
# 항상 노출할 로컬 도구
local_basics = local_tools_catalog()
# 중복 제거 및 이름 네임스페이스 적용
def to_tooldef(t: Dict[str, Any]) -> ToolDef:
if t.get("server"):
namespaced = f"{t['server']}:{t['name']}"
fn = {"name": namespaced, "description": t["description"], "parameters": t["parameters"]}
else:
fn = {"name": t["name"], "description": t["description"], "parameters": t["parameters"]}
return ToolDef(function=fn)
uniq = {}
for t in keep:
uniq[(t["server"], t["name"])] = t
tooldefs = [to_tooldef(t) for t in uniq.values()]
tooldefs += [ToolDef(function={"name": x["name"], "description": x["description"], "parameters": x["parameters"]}) for x in local_basics]
return tooldefs
# ======== 대화 루프 ========
def coerce_args(x: Any) -> Dict[str, Any]:
if x is None:
return {}
if isinstance(x, dict):
return x
if isinstance(x, str):
try:
return json.loads(x)
except Exception:
return {"_raw": x}
return {"_raw": x}
async def chat_once(mux: MCPMux, llm: CustomLLMClient, system: str, user: str, max_rounds: int = 6):
await mux.connect_all()
all_mcp = await mux.list_tools()
messages = [ChatMessage(role="system", content=system), ChatMessage(role="user", content=user)]
for _ in range(max_rounds):
tools_exposed = selective_expose(user, all_mcp) # 턴별 소출
resp = llm.send(messages, tools_exposed)
if resp.content:
messages.append(ChatMessage(role="assistant", content=resp.content))
if not resp.tool_calls:
break
# 섞여 있는 툴콜 처리: local.* ↔ server:tool
results_texts: List[str] = []
for call in resp.tool_calls:
name = call.function.get("name", "")
args = coerce_args(call.function.get("arguments"))
try:
if name.startswith("local."):
result = run_local_tool(name, args)
else:
result = await mux.call(name, args)
results_texts.append(orjson.dumps({"tool": name, "args": args, "result": result}).decode())
except Exception as e:
results_texts.append(orjson.dumps({"tool": name, "args": args, "error": str(e)}).decode())
messages.append(ChatMessage(role="user", content="TOOL_RESULTS\n" + "\n".join(results_texts)))
return messages
# ======== 실행 예시 ========
async def main():
mux = MCPMux([
MCPServerCfg(name="files", command="node", args=["./servers/files/index.js"]),
MCPServerCfg(name="db", command="python", args=["./servers/db/server.py"]),
])
llm = CustomLLMClient(base_url="http://localhost:8000", model="my-model")
system = "너는 안전하고 신중한 에이전트다. 파괴적 작업은 dry_run을 우선한다."
user = "CSV를 적재하고(sql 요약까지). 수치 확인은 계산기로 한번 검증해줘. 파일은 ./data/sales.csv 이야."
msgs = await chat_once(mux, llm, system, user)
print("=== LOG ===")
for m in msgs:
print(f"[{m.role}] {m.content[:1000]}")
if __name__ == "__main__":
asyncio.run(main())
이 코드에서 확인할 수 있는 것
-
한 턴에 LLM이 local.math_eval(일반 툴)과 files:ingest_csv(MCP) / db:sql_summary(MCP)를 섞어 호출해도 디스패처가 각각 맞는 경로로 라우팅한다.
-
selective_expose가 턴별 툴 소출을 해준다. 유저 요청에 필요 없는 서버 툴은 숨겨서 혼란과 에러를 줄인다.
-
로컬 툴은 파이썬 함수로 바로 처리하므로 레이턴시가 적고, 간단한 검증·후처리(예: 계산으로 sanity-check)를 빠르게 수행할 수 있다.
마지막으로, 에러를 더 줄이려면
-
툴 설명에 “언제 쓰는지/언제 쓰지 말아야 하는지”를 한 문장씩 명시한다.
-
위험 툴의 parameters에 confirm 문자열(예: "I understand this deletes data")을 required로 추가한다.
-
TOOL_RESULTS 포맷을 팀 공통 JSON 스키마로 고정해 모델이 예측 가능한 후속 계획을 세우도록 한다.
-
각 서버/툴별 타임아웃·재시도·서킷브레이커를 공통 미들웨어로 묶는다.
여기까지면 커스텀 LLM 엔드포인트에서도 “일반+MCP 혼합 툴콜”을 안정적으로 돌릴 수 있다. 다음 단계는 네가 쓰는 /chat 스펙에 맞춰 CustomLLMClient의 직렬화/역직렬화 부분만 미세조정하고, 실제 서버 두 개(files/db) 최소 구현으로 E2E를 한 번 굴려보는 것이다.
오호. 다음은 어려운 문제야. 이걸 openai agent python sdk에 붙일 수 있을까?
가능하다. 게다가 “여러 MCP 서버”와 “커스텀 LLM 엔드포인트”를 동시에 쓰는 구성이 OpenAI Agents Python SDK에서 정석으로 지원된다. 요점만 단계별로 정리하고, 바로 돌아가는 최소 예시를 준다.
핵심 정리
-
Agents SDK는 MCP를 4가지 전송으로 붙인다: Hosted MCP(Responses API가 원격서버를 직접 호출), Streamable HTTP, HTTP+SSE, stdio(로컬 프로세스). 서버 여러 개를 mcp_servers 리스트에 넣으면 한 에이전트가 전부를 본다. OpenAI GitHub
-
모델은 OpenAI 모델 말고도 LiteLLM 확장을 통해 “OpenAI 호환 REST”로 노출된 임의의 엔드포인트를 모델처럼 쓸 수 있다. base_url만 네 엔드포인트로 바꾸면 된다. OpenAI GitHub
-
Hosted MCP를 쓰면 OpenAI 인프라가 원격 MCP 서버를 직접 호출하므로 파이썬 쪽 콜백이 필요 없다(공개 접근 가능한 원격 서버에 적합). 반대로 사내망/로컬은 stdio·SSE 등 “직접 붙이기”가 맞다. OpenAI GitHub
-
Agents SDK에는 MCP 전용 클래스들이 이미 들어있다(MCPServerStdio, MCPServerSse, MCPServerStreamableHttp, HostedMCPTool 등). OpenAI GitHub+1
빠른 스캐폴딩(파이썬)
설치
pip install openai-agents mcp
예시 1) 커스텀 LLM(base_url) + stdio/SSE MCP 다중 서버 한 번에 붙이기
아래는 파일시스템 MCP를 로컬(stdio)로, 사내용 검색 MCP를 SSE로 붙이고, 모델은 너의 커스텀 /v1 Chat 호환 엔드포인트를 씀.
import asyncio
from agents import Agent, Runner
from agents.model_settings import ModelSettings
from agents.mcp import MCPServerStdio, MCPServerSse
from agents.extensions.models.litellm_model import LitellmModel # 커스텀 엔드포인트용
async def main():
# 1) MCP 서버들을 context manager로 기동/연결
async with MCPServerStdio(
name="files",
params={"command": "npx", "args": ["-y", "@modelcontextprotocol/server-filesystem", "./data"]},
cache_tools_list=True,
) as files_server, MCPServerSse(
name="search",
params={"url": "https://mcp.example.com/sse", "headers": {"Authorization": "Bearer XYZ"}},
cache_tools_list=True,
) as search_server:
# 2) 커스텀 LLM 엔드포인트(OpenAI 호환) 모델로 지정
model = LitellmModel(
model="my-custom-model", # 엔진/이름
base_url="http://localhost:8000/v1", # 네 엔드포인트
api_key="sk-your-key", # 필요시
)
# 3) 에이전트 구성: 여러 MCP 서버를 한 번에 노출
agent = Agent(
name="Orchestrator",
instructions="MCP 툴을 안전하게 사용하고, 필요할 때만 호출한다.",
model=model,
mcp_servers=[files_server, search_server],
model_settings=ModelSettings(tool_choice="auto"), # 필요 시 "required"로 강제
)
# 4) 실행
result = await Runner.run(agent, "data 폴더의 CSV를 읽고, search 서버로 관련 문서 찾아 요약해줘")
print(result.final_output)
asyncio.run(main())
예시 2) Hosted MCP를 섞는 변형
원격에 공개된 MCP 서버는 HostedMCPTool 하나만 tools에 넣으면 모델이 OpenAI 인프라에서 직접 목록조회/호출까지 한다. 파이썬 쪽 콜백이 거의 없다.
from agents import Agent, Runner, HostedMCPTool
agent = Agent(
name="HostedMCPUser",
tools=[HostedMCPTool(tool_config={
"type": "mcp",
"server_label": "gitmcp",
"server_url": "https://gitmcp.io/openai/codex",
"require_approval": "never",
})],
)
print(Runner.run_sync(agent, "이 저장소의 주요 언어를 알려줘").final_output)
위 두 방식은 공식 문서의 사용법과 동일한 흐름이다. Hosted MCP, stdio/SSE/HTTP 서버, 툴 필터링, 승인 훅까지 포함된 가이드를 참고해 세부옵션을 맞추면 된다. OpenAI GitHub
툴 소출 전략(오류 줄이는 팁)
-
툴 필터링: 서버 생성자에 tool_filter를 걸거나 정적/동적 필터를 써서 턴별로 필요한 툴만 노출하라. SDK가 도우미를 제공한다. OpenAI GitHub
-
위험 툴 승인: HostedMCPTool은 require_approval 및 on_approval_request 콜백으로 민감 작업을 승인 플로로 감싼다. OpenAI GitHub
-
캐싱과 재시도: cache_tools_list, max_retry_attempts 등 옵션으로 지연과 실패 확률을 줄일 수 있다. OpenAI GitHub
한 줄 결론
OpenAI Agents Python SDK에 네가 만든 “멀티 MCP + 커스텀 LLM 엔드포인트”를 그대로 붙일 수 있다. 모델은 LiteLLMModel로 커스텀 REST를 쓰고, MCP는 mcp_servers에 여러 개를 열거하거나 HostedMCPTool을 병용하면 된다. 필요하면 너의 엔드포인트 스펙에 맞춰 모델 어댑터를 더 촘촘하게 설정하는 코드도 만들어주겠다. OpenAI GitHub+2
아니 난 너가 만든 코드를 그대로 쓰고 싶어. 왜냐면 이렇게 만든 인터페이스를 다른 Chat 에도 쓰고 싶거든. Agent만을 위한 다른 코드가 분기 되는 것이 싫어. Tool만 입력해서 동작하게 하는 방법 없어?
좋아. “Agent 전용 분기 없이” 너가 만든 인터페이스를 어떤 Chat에도 재사용하려면, 입력은 오직 툴(=프로바이더)과 LLM 엔드포인트만 주고 돌아가는 얇은 오케스트레이터가 필요하다. 아래는 그 목적에 맞춘 단일 파일 스캐폴딩이다. 핵심은 세 가지 인터페이스다.
-
ChatBackend: 어떤 채팅 모델이든 붙일 수 있는 백엔드 어댑터.
-
ToolProvider: 툴 카탈로그를 노출하고 호출을 처리하는 표준 인터페이스.
-
Orchestrator: messages + tools만으로 루프를 돌린다. 어떤 에이전트 SDK 분기도 없다.
설치
pip install mcp requests pydantic orjson
tools_only.py
import asyncio
import json
from typing import Any, Dict, List, Optional, Protocol, runtime_checkable
import orjson
import requests
from pydantic import BaseModel, Field
# ========== 공통 데이터 모델 ==========
class ChatMessage(BaseModel):
role: str
content: str
class ToolDef(BaseModel):
# OpenAI/함수콜 호환 포맷으로 통일
type: str = "function"
function: Dict[str, Any] # {name, description, parameters(JSON Schema)}
class ToolCall(BaseModel):
id: str
type: str = "function"
function: Dict[str, Any] # {name, arguments(str or dict)}
class ChatResponse(BaseModel):
content: str = ""
tool_calls: List[ToolCall] = Field(default_factory=list)
def _coerce_args(x: Any) -> Dict[str, Any]:
if x is None:
return {}
if isinstance(x, dict):
return x
if isinstance(x, str):
try:
return json.loads(x)
except Exception:
return {"_raw": x}
return {"_raw": x}
# ========== 1) ChatBackend 인터페이스 ==========
@runtime_checkable
class ChatBackend(Protocol):
def send(self, messages: List[ChatMessage], tools: List[ToolDef]) -> ChatResponse: ...
class OpenAICompatibleBackend:
"""
OpenAI Chat Completions 호환(또는 유사) 엔드포인트용 백엔드.
/v1/chat/completions 또는 /chat 같은 곳에 맞춰 최소한의 어댑터만 제공.
"""
def __init__(self, base_url: str, model: str, api_key: Optional[str] = None, headers: Optional[Dict[str,str]] = None, path: str = "/chat"):
self.base_url = base_url.rstrip("/")
self.model = model
self.api_key = api_key
self.headers = headers or {}
self.path = path # "/v1/chat/completions"로 바꿔도 됨
def _headers(self) -> Dict[str,str]:
h = {"content-type": "application/json"}
if self.api_key:
h["authorization"] = f"Bearer {self.api_key}"
h.update(self.headers)
return h
def send(self, messages: List[ChatMessage], tools: List[ToolDef]) -> ChatResponse:
# 서버 규격에 맞게 필요한 키만 담는다. 필요시 여기만 수정.
payload = {
"model": self.model,
"messages": [m.model_dump() for m in messages],
"tools": [t.model_dump() for t in tools]
}
r = requests.post(f"{self.base_url}{self.path}", data=orjson.dumps(payload), headers=self._headers(), timeout=60)
r.raise_for_status()
data = r.json()
# data를 ChatResponse로 매핑; 너의 엔드포인트 규격에 맞게 여기만 조정하면 된다.
return ChatResponse(**{
"content": data.get("content","") or data.get("choices",[{}])[0].get("message",{}).get("content",""),
"tool_calls": data.get("tool_calls", []) or data.get("choices",[{}])[0].get("message",{}).get("tool_calls", []),
})
# ========== 2) ToolProvider 인터페이스 ==========
@runtime_checkable
class ToolProvider(Protocol):
def list_tools(self) -> List[ToolDef]: ...
async def call(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]: ...
# ---- 2-1) MCP 프로바이더 (여러 서버 포함) ----
# mcp 설치 필요
from mcp.client.session import ClientSession
from mcp.client.stdio import StdioClientTransport, StdioServerParameters
try:
from mcp.client.sse import SSEClientTransport
HAVE_SSE = True
except Exception:
HAVE_SSE = False
class MCPServerCfg(BaseModel):
name: str # 네임스페이스
kind: str # "stdio" | "sse"
command: Optional[str] = None
args: Optional[List[str]] = None
url: Optional[str] = None
headers: Optional[Dict[str,str]] = None
class _Connected(BaseModel):
name: str
session: ClientSession
class MCPProvider(ToolProvider):
def __init__(self, servers: List[MCPServerCfg], cache: bool = True):
self._cfgs = servers
self._sessions: List[_Connected] = []
self._tools_cache: Optional[List[ToolDef]] = None
self._cache = cache
self._started = False
async def _ensure_started(self):
if self._started:
return
for cfg in self._cfgs:
if cfg.kind == "stdio":
transport = StdioClientTransport(StdioServerParameters(command=cfg.command, args=cfg.args or []))
elif cfg.kind == "sse":
if not HAVE_SSE:
raise RuntimeError("mcp.client.sse 모듈이 필요합니다.")
transport = SSEClientTransport(url=cfg.url, headers=cfg.headers or {})
else:
raise ValueError(f"unknown kind: {cfg.kind}")
await transport.start()
sess = ClientSession(transport)
await sess.initialize()
self._sessions.append(_Connected(name=cfg.name, session=sess))
self._started = True
def _to_tooldef(self, server: str, t: Any) -> ToolDef:
namespaced = f"{server}:{t.name}"
return ToolDef(function={
"name": namespaced,
"description": getattr(t, "description", f"MCP {namespaced}"),
"parameters": getattr(t, "input_schema", {"type":"object","properties":{}})
})
def list_tools(self) -> List[ToolDef]:
# 동기 인터페이스 요구 때문에 캐시 기반. 최초 호출 전에 warmup()을 호출하거나,
# Orchestrator가 응답 전 await ensure_ready()를 한 번 수행.
if self._tools_cache is None:
raise RuntimeError("MCPProvider not ready. Call await provider.ensure_ready() first.")
return self._tools_cache
async def ensure_ready(self):
await self._ensure_started()
# list_tools는 async라 여기서 미리 모은다.
merged: List[ToolDef] = []
for c in self._sessions:
resp = await c.session.list_tools()
for t in resp.tools:
merged.append(self._to_tooldef(c.name, t))
self._tools_cache = merged
async def call(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
# tool_name은 "server:tool"이어야 함
server, tool = tool_name.split(":", 1)
for c in self._sessions:
if c.name == server:
res = await c.session.call_tool(tool, arguments or {})
packed = {"content": []}
for it in res.content:
if it.type == "text":
packed["content"].append({"type":"text","text": it.text})
elif it.type == "json":
packed["content"].append({"type":"json","json": it.json})
else:
packed["content"].append({"type": it.type})
return packed
return {"error": f"unknown server namespace: {server}"}
# ---- 2-2) 로컬 툴 프로바이더(선택) ----
class LocalToolsProvider(ToolProvider):
def __init__(self):
self._defs = [
ToolDef(function={
"name": "local.math_eval",
"description": "안전 사칙연산. 입력: {expr: string}",
"parameters": {
"type":"object",
"properties":{"expr":{"type":"string"}},
"required":["expr"],
"additionalProperties": False
}
})
]
def list_tools(self) -> List[ToolDef]:
return self._defs
async def call(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
if tool_name == "local.math_eval":
expr = arguments.get("expr","")
if not expr or any(ch for ch in expr if ch not in "0123456789+-*/(). "):
return {"error":"expr must contain only digits and + - * / ( )"}
try:
val = eval(expr, {"__builtins__":{}}, {})
return {"content":[{"type":"json","json":{"value":val}}]}
except Exception as e:
return {"error": str(e)}
return {"error": f"unknown local tool {tool_name}"}
# ========== 3) Orchestrator (도구만 입력해서 동작) ==========
class Orchestrator:
"""
목적: 툴만 입력해서 동작. 어떤 Chat(백엔드)에도 재사용.
- providers: ToolProvider들의 리스트
- backend: ChatBackend 구현체 1개
- expose_fn: 선택적으로 '툴 소출' 전략을 주입 (기본값: 전부 노출)
"""
def __init__(self, providers: List[ToolProvider], backend: ChatBackend, expose_fn=None, max_rounds: int = 8):
self.providers = providers
self.backend = backend
self.max_rounds = max_rounds
self.expose_fn = expose_fn or (lambda _messages, tools: tools)
async def ensure_ready(self):
# MCPProvider 같은 async 준비가 필요한 프로바이더를 워밍업
for p in self.providers:
if hasattr(p, "ensure_ready") and callable(getattr(p, "ensure_ready")):
await p.ensure_ready()
def _catalog(self) -> List[ToolDef]:
tools: List[ToolDef] = []
for p in self.providers:
tools.extend(p.list_tools())
return tools
async def _dispatch_call(self, name: str, args: Dict[str, Any]) -> Dict[str, Any]:
# 각 프로바이더가 책임지는 네임스페이스로 라우팅
for p in self.providers:
# 빠른 판별: MCP는 "server:" 네임스페이스, 로컬은 "local." 같은 규칙 추천
if isinstance(p, MCPProvider):
if ":" in name: # server:tool
return await p.call(name, args)
else:
if name.startswith("local."):
return await p.call(name, args)
# 마지막 fallback: 모든 provider에 시도
for p in self.providers:
try:
return await p.call(name, args)
except Exception:
pass
return {"error": f"no provider could handle tool {name}"}
async def run(self, user_text: str, system_text: Optional[str] = None) -> List[ChatMessage]:
await self.ensure_ready()
messages = []
if system_text:
messages.append(ChatMessage(role="system", content=system_text))
messages.append(ChatMessage(role="user", content=user_text))
for _ in range(self.max_rounds):
# 툴 카탈로그를 수집하고 소출 전략 적용
all_tools = self._catalog()
exposed = self.expose_fn(messages, all_tools)
resp = self.backend.send(messages, exposed)
if resp.content:
messages.append(ChatMessage(role="assistant", content=resp.content))
if not resp.tool_calls:
break
results: List[str] = []
for call in resp.tool_calls:
name = call.function.get("name","")
args = _coerce_args(call.function.get("arguments"))
try:
result = await self._dispatch_call(name, args)
results.append(orjson.dumps({"tool":name, "args":args, "result":result}).decode())
except Exception as e:
results.append(orjson.dumps({"tool":name, "args":args, "error":str(e)}).decode())
# 모델에게 도구결과를 일괄 재주입
messages.append(ChatMessage(role="user", content="TOOL_RESULTS\n" + "\n".join(results)))
return messages
# ========== 실행 예시 ==========
async def main():
# 1) MCP 서버들: 필요 서버만 나열
mcp = MCPProvider([
MCPServerCfg(name="files", kind="stdio", command="npx", args=["-y","@modelcontextprotocol/server-filesystem","./data"]),
# MCPServerCfg(name="search", kind="sse", url="https://mcp.example.com/sse", headers={"Authorization":"Bearer X"}),
])
# 2) 로컬 툴(선택)
local = LocalToolsProvider()
# 3) 어떤 Chat이든 붙일 백엔드. base_url과 path만 맞추면 끝.
backend = OpenAICompatibleBackend(
base_url="http://localhost:8000",
path="/chat", # 필요에 따라 "/v1/chat/completions" 등으로 변경
model="my-model",
api_key=None,
)
# 4) 오케스트레이터: 입력은 “툴(프로바이더)”뿐
orch = Orchestrator(
providers=[mcp, local],
backend=backend,
# 간단 소출: 파일/SQL 관련 단어가 없으면 MCP를 숨기고 로컬만 노출
expose_fn=lambda msgs, tools: [
t for t in tools
if ("local." in t.function["name"]) or any(k in (msgs[-1].content.lower()) for k in ["csv","파일","sql","db","쿼리"])
],
max_rounds=8
)
messages = await orch.run(
user_text="CSV를 읽고 합계를 계산한 뒤 수치가 맞는지 계산기로 검증해줘. 파일은 ./data/sales.csv",
system_text="너는 MCP 툴을 신중히 사용한다. 파괴적 작업은 dry_run을 우선한다."
)
print("=== 최종 로그 ===")
for m in messages:
print(f"[{m.role}] {m.content[:1500]}")
if __name__ == "__main__":
asyncio.run(main())
설명과 사용법 요약
입력은 툴(프로바이더)만 넘긴다. 오케스트레이터는 툴 카탈로그를 합치고, LLM 응답의 tool_calls를 받아 적절한 프로바이더로 라우팅한다. 특정 에이전트 SDK 분기가 전혀 없다. 다른 Chat 백엔드로 바꾸고 싶으면 OpenAICompatibleBackend만 교체하거나 send 메소드가 동일한 시그니처를 가지는 새 클래스를 하나 추가하면 된다.
운영 팁
툴 소출 함수(expose_fn)로 턴별 필터링을 꼭 넣자. 단순 키워드만으로도 선택 오류와 불필요 호출이 크게 준다.
네임스페이스를 일관되게 유지하자. MCP는 server:tool, 로컬은 local.*처럼 구분하면 충돌이 없다.
고위험 툴은 parameters에 dry_run, confirm 문자열 같은 제약을 넣어야 한다.
대용량 결과는 MCP 서버가 파일 핸들(리소스)로 반환하고, 모델에는 요약/메타데이터만 재주입해 토큰을 아끼자.
이 구조면 같은 Orchestrator를 그대로 두고, 어떤 Chat UI/엔진에도 쉽게 이식할 수 있다. 필요하면 네 엔드포인트의 정확한 JSON 규격을 알려줘. OpenAICompatibleBackend의 매핑만 10줄쯤 손보면 바로 물릴 수 있게 맞춰줄 수 있다.
이건 또 openai agent는 안 쓰는 것 같은데
맞아, 방금 준 스캐폴딩은 “오픈AI Agents SDK”를 쓰지 않는 순수 오케스트레이터였지. 너가 원하는 건 그 코드를 그대로 유지하면서, 백엔드만 갈아끼워 “OpenAI Agents Python SDK”를 LLM 백엔드로 사용할 수 있게 하는 것. 아래처럼 백엔드 어댑터 하나만 추가하면 된다. 오케스트레이터·툴 프로바이더·라우팅은 그대로 재사용하고, 에이전트 SDK는 단지 “텍스트+툴콜 JSON을 뱉는 모델 백엔드”로만 쓴다. 즉, Agents SDK에게 MCP 실행을 맡기지 않고, 우리가 만든 라우터가 계속 MCP를 호출한다.
핵심 아이디어
에이전트 SDK에게 도구를 등록하지 않는다(tool_choice 없이). 대신 우리가 넘긴 툴 카탈로그를 프롬프트에 JSON으로 주고, 에이전트가 반드시 다음 포맷의 JSON만 출력하도록 강제한다. 그러면 우리 오케스트레이터가 그 JSON을 파싱해 실제 MCP/로컬 툴로 라우팅한다. 이러면 Agent 전용 코드 분기가 생기지 않는다. 백엔드를 바꿔 끼우는 것만으로 같은 오케스트레이터가 그대로 동작한다.
설치
pip install openai-agents mcp requests pydantic orjson
agents_backend.py
아래 파일은 네가 이전에 받은 tools_only.py의 구조를 그대로 따르되, ChatBackend 구현체로 AgentsBackend를 추가한 버전이다. 나머지는 동일하니 이 파일 하나로 교체해도 되고, 필요한 부분만 가져다 써도 된다.
import asyncio
import json
from typing import Any, Dict, List, Optional, Protocol, runtime_checkable
import orjson
import requests
from pydantic import BaseModel, Field
# ========== 공통 데이터 모델 ==========
class ChatMessage(BaseModel):
role: str
content: str
class ToolDef(BaseModel):
type: str = "function"
function: Dict[str, Any]
class ToolCall(BaseModel):
id: str
type: str = "function"
function: Dict[str, Any] # {"name": ..., "arguments": str|dict}
class ChatResponse(BaseModel):
content: str = ""
tool_calls: List[ToolCall] = Field(default_factory=list)
def _coerce_args(x: Any) -> Dict[str, Any]:
if x is None:
return {}
if isinstance(x, dict):
return x
if isinstance(x, str):
try:
return json.loads(x)
except Exception:
return {"_raw": x}
return {"_raw": x}
# ========== 1) ChatBackend 인터페이스 ==========
@runtime_checkable
class ChatBackend(Protocol):
def send(self, messages: List[ChatMessage], tools: List[ToolDef]) -> ChatResponse: ...
class OpenAICompatibleBackend:
def __init__(self, base_url: str, model: str, api_key: Optional[str] = None, headers: Optional[Dict[str,str]] = None, path: str = "/chat"):
self.base_url = base_url.rstrip("/")
self.model = model
self.api_key = api_key
self.headers = headers or {}
self.path = path
def _headers(self) -> Dict[str,str]:
h = {"content-type": "application/json"}
if self.api_key:
h["authorization"] = f"Bearer {self.api_key}"
h.update(self.headers)
return h
def send(self, messages: List[ChatMessage], tools: List[ToolDef]) -> ChatResponse:
payload = {"model": self.model, "messages": [m.model_dump() for m in messages], "tools": [t.model_dump() for t in tools]}
r = requests.post(f"{self.base_url}{self.path}", data=orjson.dumps(payload), headers=self._headers(), timeout=60)
r.raise_for_status()
data = r.json()
return ChatResponse(**{
"content": data.get("content","") or data.get("choices",[{}])[0].get("message",{}).get("content",""),
"tool_calls": data.get("tool_calls", []) or data.get("choices",[{}])[0].get("message",{}).get("tool_calls", []),
})
# ========== 1-b) OpenAI Agents SDK 백엔드 어댑터 ==========
# Agents SDK는 내부적으로 도구 실행까지 맡길 수 있지만,
# 여기서는 "텍스트+툴콜 JSON을 생성하는 모델"로만 사용한다.
# 프롬프트 강제 형식으로 tool_calls를 JSON으로 출력하게 만든다.
from agents import Agent, Runner
from agents.model_settings import ModelSettings
from agents.extensions.models.litellm_model import LitellmModel
class AgentsBackend(ChatBackend):
def __init__(self, base_url: str, model: str, api_key: Optional[str] = None, headers: Optional[Dict[str,str]] = None):
# LiteLLM 모델 확장을 사용해 임의의 OpenAI 호환 엔드포인트를 붙일 수 있다.
kwargs = {}
if api_key:
kwargs["api_key"] = api_key
if headers:
kwargs["headers"] = headers
self._model = LitellmModel(model=model, base_url=base_url.rstrip("/") + "/v1", **kwargs)
# 도구는 에이전트에 등록하지 않는다. 우리가 직접 라우팅할 것이기 때문.
self._agent = Agent(
name="LLMOnly",
instructions="",
model=self._model,
model_settings=ModelSettings(tool_choice="none") # 절대 자동 도구 호출 금지
)
@staticmethod
def _tools_to_schema_snippet(tools: List[ToolDef]) -> str:
# 툴 목록을 JSON으로 주고, 정확한 출력 포맷을 강제한다.
tools_payload = []
for t in tools:
fn = t.function
tools_payload.append({
"name": fn["name"],
"description": fn.get("description",""),
"parameters": fn.get("parameters", {"type":"object","properties":{}})
})
return json.dumps(tools_payload, ensure_ascii=False)
def _build_prompt(self, messages: List[ChatMessage], tools: List[ToolDef]) -> str:
history = []
for m in messages:
history.append(f"{m.role.upper()}: {m.content}")
tools_json = self._tools_to_schema_snippet(tools)
guide = (
"위는 대화 내역이다. 아래는 사용 가능한 도구 목록이다.\n"
f"TOOLS_JSON={tools_json}\n\n"
"반드시 다음 JSON 포맷으로만 답하라. 다른 텍스트는 절대 출력하지 말라.\n"
"{\n"
' "content": "<최종 자연어 응답(있으면)>" ,\n'
' "tool_calls": [\n'
' {"id":"id-1","type":"function","function":{"name":"<toolName>","arguments":{...}}},\n'
' {"id":"id-2","type":"function","function":{"name":"<toolName>","arguments":{...}}}\n'
" ]\n"
"}\n"
"여러 도구가 필요하면 tool_calls 배열에 순서대로 넣어라. 도구가 필요 없으면 빈 배열을 넣어라."
)
return "\n".join(history + ["", guide])
def send(self, messages: List[ChatMessage], tools: List[ToolDef]) -> ChatResponse:
prompt = self._build_prompt(messages, tools)
# Developer 메시지에 강제 포맷 가이드를 넣어 JSON만 출력하게 한다.
out = Runner.run_sync(self._agent, prompt, developer_instructions="JSON 포맷만 출력")
text = out.final_output.strip()
try:
data = json.loads(text)
except Exception:
# JSON 파싱 실패 시, 일반 텍스트만 채워서 반환
return ChatResponse(content=text, tool_calls=[])
# 표준 ChatResponse로 맵
tc_raw = data.get("tool_calls", [])
# arguments가 dict가 아닐 수도 있으니 그대로 통과(오케스트레이터 쪽에서 정규화)
return ChatResponse(
content=data.get("content",""),
tool_calls=[ToolCall(**tc) for tc in tc_raw] if isinstance(tc_raw, list) else []
)
# ========== 2) ToolProvider (MCP/로컬) ==========
from mcp.client.session import ClientSession
from mcp.client.stdio import StdioClientTransport, StdioServerParameters
try:
from mcp.client.sse import SSEClientTransport
HAVE_SSE = True
except Exception:
HAVE_SSE = False
class MCPServerCfg(BaseModel):
name: str
kind: str # "stdio" | "sse"
command: Optional[str] = None
args: Optional[List[str]] = None
url: Optional[str] = None
headers: Optional[Dict[str,str]] = None
class _Connected(BaseModel):
name: str
session: ClientSession
class MCPProvider:
def __init__(self, servers: List[MCPServerCfg]):
self._cfgs = servers
self._sessions: List[_Connected] = []
self._tools_cache: Optional[List[ToolDef]] = None
self._started = False
async def _ensure_started(self):
if self._started:
return
for cfg in self._cfgs:
if cfg.kind == "stdio":
transport = StdioClientTransport(StdioServerParameters(command=cfg.command, args=cfg.args or []))
elif cfg.kind == "sse":
if not HAVE_SSE:
raise RuntimeError("mcp.client.sse 모듈이 필요합니다.")
transport = SSEClientTransport(url=cfg.url, headers=cfg.headers or {})
else:
raise ValueError(f"unknown kind: {cfg.kind}")
await transport.start()
sess = ClientSession(transport)
await sess.initialize()
self._sessions.append(_Connected(name=cfg.name, session=sess))
self._started = True
async def ensure_ready(self):
await self._ensure_started()
merged: List[ToolDef] = []
for c in self._sessions:
resp = await c.session.list_tools()
for t in resp.tools:
namespaced = f"{c.name}:{t.name}"
merged.append(ToolDef(function={
"name": namespaced,
"description": getattr(t,"description", f"MCP {namespaced}"),
"parameters": getattr(t,"input_schema", {"type":"object","properties":{}})
}))
self._tools_cache = merged
def list_tools(self) -> List[ToolDef]:
if self._tools_cache is None:
raise RuntimeError("MCPProvider not ready. Call await ensure_ready() first.")
return self._tools_cache
async def call(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
server, tool = tool_name.split(":", 1)
for c in self._sessions:
if c.name == server:
res = await c.session.call_tool(tool, arguments or {})
packed = {"content": []}
for it in res.content:
if it.type == "text":
packed["content"].append({"type":"text","text": it.text})
elif it.type == "json":
packed["content"].append({"type":"json","json": it.json})
else:
packed["content"].append({"type": it.type})
return packed
return {"error": f"unknown server: {server}"}
class LocalToolsProvider:
def __init__(self):
self._defs = [ToolDef(function={
"name": "local.math_eval",
"description": "안전 사칙연산. 입력 {expr:string}",
"parameters": {
"type":"object",
"properties":{"expr":{"type":"string"}},
"required":["expr"],
"additionalProperties": False
}
})]
def list_tools(self) -> List[ToolDef]:
return self._defs
async def call(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
if tool_name == "local.math_eval":
expr = arguments.get("expr","")
if not expr or any(ch for ch in expr if ch not in "0123456789+-*/(). "):
return {"error":"expr must contain only digits and + - * / ( )"}
try:
val = eval(expr, {"__builtins__":{}}, {})
return {"content":[{"type":"json","json":{"value":val}}]}
except Exception as e:
return {"error": str(e)}
return {"error": f"unknown local tool {tool_name}"}
# ========== 3) Orchestrator (변경 없음) ==========
class Orchestrator:
def __init__(self, providers, backend: ChatBackend, expose_fn=None, max_rounds: int = 8):
self.providers = providers
self.backend = backend
self.max_rounds = max_rounds
self.expose_fn = expose_fn or (lambda _messages, tools: tools)
async def ensure_ready(self):
for p in self.providers:
if hasattr(p, "ensure_ready") and callable(getattr(p, "ensure_ready")):
await p.ensure_ready()
def _catalog(self) -> List[ToolDef]:
tools: List[ToolDef] = []
for p in self.providers:
tools.extend(p.list_tools())
return tools
async def _dispatch_call(self, name: str, args: Dict[str, Any]) -> Dict[str, Any]:
for p in self.providers:
# 네임스페이스 단서
if isinstance(p, MCPProvider) and ":" in name:
return await p.call(name, args)
if isinstance(p, LocalToolsProvider) and name.startswith("local."):
return await p.call(name, args)
# 폴백
for p in self.providers:
try:
return await p.call(name, args)
except Exception:
pass
return {"error": f"no provider can handle {name}"}
async def run(self, user_text: str, system_text: Optional[str] = None) -> List[ChatMessage]:
await self.ensure_ready()
messages: List[ChatMessage] = []
if system_text:
messages.append(ChatMessage(role="system", content=system_text))
messages.append(ChatMessage(role="user", content=user_text))
for _ in range(self.max_rounds):
all_tools = self._catalog()
exposed = self.expose_fn(messages, all_tools)
resp = self.backend.send(messages, exposed)
if resp.content:
messages.append(ChatMessage(role="assistant", content=resp.content))
if not resp.tool_calls:
break
results: List[str] = []
for call in resp.tool_calls:
name = call.function.get("name","")
args = _coerce_args(call.function.get("arguments"))
try:
result = await self._dispatch_call(name, args)
results.append(orjson.dumps({"tool":name,"args":args,"result":result}).decode())
except Exception as e:
results.append(orjson.dumps({"tool":name,"args":args,"error":str(e)}).decode())
messages.append(ChatMessage(role="user", content="TOOL_RESULTS\n" + "\n".join(results)))
return messages
# ========== 실행 예시 ==========
async def main():
# MCP 서버 예시
mcp = MCPProvider([
MCPServerCfg(name="files", kind="stdio", command="npx", args=["-y","@modelcontextprotocol/server-filesystem","./data"]),
# MCPServerCfg(name="search", kind="sse", url="https://mcp.example.com/sse", headers={"Authorization":"Bearer X"}),
])
local = LocalToolsProvider()
# A) OpenAI Agents SDK를 백엔드로 사용
backend = AgentsBackend(
base_url="http://localhost:8000", # OpenAI 호환 엔드포인트(LiteLLM도 가능)
model="gpt-4o-mini", # 혹은 네 임의 모델명
api_key=None
)
# B) 필요하면 기존 OpenAICompatibleBackend로 교체 가능
# backend = OpenAICompatibleBackend(base_url="http://localhost:8000", path="/chat", model="my-model")
orch = Orchestrator(
providers=[mcp, local],
backend=backend,
expose_fn=lambda msgs, tools: [
t for t in tools
if ("local." in t.function["name"]) or any(k in msgs[-1].content.lower() for k in ["csv","파일","sql","db","쿼리"])
],
max_rounds=8
)
msgs = await orch.run(
user_text="CSV를 읽고 합계를 구한 뒤 계산기로 수치 검증까지 해줘. 파일은 ./data/sales.csv",
system_text="너는 MCP 툴을 신중히 사용한다. 파괴적 작업은 dry_run을 우선한다."
)
print("=== 최종 로그 ===")
for m in msgs:
print(f"[{m.role}] {m.content[:1500]}")
if __name__ == "__main__":
asyncio.run(main())
작동 방식 정리
1단계. AgentsBackend는 OpenAI Agents Python SDK의 Agent를 모델 백엔드로만 쓴다. 도구는 등록하지 않는다.
2단계. 우리가 노출할 툴 목록을 JSON으로 프롬프트에 넣고, 에이전트가 content와 tool_calls 배열을 정확히 담은 JSON만 출력하게 강제한다.
3단계. 오케스트레이터는 그 tool_calls를 받아 기존과 똑같이 MCP/로컬 툴로 라우팅한다.
4단계. 즉, 같은 오케스트레이터 코드를 유지한 채, 백엔드만 갈아끼워 Agents SDK 또는 일반 OpenAI 호환 엔드포인트를 자유롭게 선택할 수 있다.
주의점과 팁
JSON 강제 프롬프트가 중요하다. 모델이 장식 텍스트를 섞지 않도록 developer_instructions로 “JSON만 출력”을 반복해서 고정하라.
툴 스키마를 엄격하게. required, enum, format을 충분히 넣으면 arguments 파싱 실패가 줄어든다.
성능. AgentsBackend는 Runner.run_sync를 한 턴마다 호출한다. 대화 길어지면 토큰·시간이 늘어나므로 expose_fn으로 툴을 턴별로 소출하고, 결과는 요약해서 재주입하라.
대체. 에이전트 SDK의 모델 대신 OpenAICompatibleBackend로 바꾸면 바로 동일 코드가 다른 Chat에도 붙는다. 목적에 맞춰 백엔드만 교체하면 된다.
이렇게 하면 에이전트 SDK를 “백엔드 모델”로만 쓰면서, 너의 툴 중심 인터페이스와 라우터는 그대로 유지된다. 다음에는 네 실제 엔드포인트의 출력 JSON 예시를 알려줘. AgentsBackend의 JSON 파싱 부분을 거기에 맞춰 2~3줄만 조정하면 된다.
아 복잡해 그냥 function_tool로 각 tool을 등록하면 어떨까
좋지. “그냥 function_tool로 각 툴을 등록”하는 가장 깔끔한 방법은, MCP 서버들에서 list_tools를 긁어와서 각각을 Agents SDK의 FunctionTool로 동적으로 래핑하는 거야. 그러면 Agent 쪽은 전혀 커스텀 분기 없이 표준 툴만 본다. 아래 코드가 바로 그 패턴이고, 커스텀 LLM도 LiteLLM 모델 어댑터로 붙인다.
핵심 개념 한 줄 요약
MCP의 tool 메타데이터(name, description, input_schema)를 Agents SDK의 FunctionTool(name, description, params_json_schema, on_invoke_tool)에 그대로 매핑하고, on_invoke_tool에서 해당 MCP 세션으로 call_tool만 라우팅한다. FunctionTool/@function_tool 사용법과 JSON 스키마 포맷은 Agents 문서 그대로다. OpenAI GitHub 또한 모델은 LiteLLMModel로 너의 베이스 URL을 모델처럼 쓸 수 있다. OpenAI GitHub
설치
pip install "openai-agents[litellm]" mcp pydantic orjson
mcp_as_function_tools.py
import asyncio
import json
from typing import Any, Dict, List, Optional
import orjson
from pydantic import BaseModel
from agents import Agent, Runner, FunctionTool
from agents.extensions.models.litellm_model import LitellmModel
# MCP Python SDK
from mcp.client.session import ClientSession
from mcp.client.stdio import StdioClientTransport, StdioServerParameters
try:
from mcp.client.sse import SSEClientTransport
HAVE_SSE = True
except Exception:
HAVE_SSE = False
class MCPServerCfg(BaseModel):
name: str # 네임스페이스(툴 이름 prefix에 들어감)
kind: str # "stdio" | "sse"
command: Optional[str] = None
args: Optional[List[str]] = None
url: Optional[str] = None
headers: Optional[Dict[str, str]] = None
class Connected(BaseModel):
name: str
session: ClientSession
async def connect_servers(cfgs: List[MCPServerCfg]) -> List[Connected]:
conns: List[Connected] = []
for cfg in cfgs:
if cfg.kind == "stdio":
transport = StdioClientTransport(StdioServerParameters(command=cfg.command, args=cfg.args or []))
elif cfg.kind == "sse":
if not HAVE_SSE:
raise RuntimeError("mcp.client.sse 모듈이 필요합니다.")
transport = SSEClientTransport(url=cfg.url, headers=cfg.headers or {})
else:
raise ValueError(f"unknown kind: {cfg.kind}")
await transport.start()
sess = ClientSession(transport)
await sess.initialize()
conns.append(Connected(name=cdfg.name if (cdfg := cfg) else cfg.name, session=sess))
return conns
def make_function_tool_for_mcp(server_name: str, tool_meta: Any, session: ClientSession) -> FunctionTool:
"""
MCP tool -> Agents FunctionTool
tool_meta: has .name, .description, .input_schema (JSON Schema)
"""
namespaced = f"{server_name}:{tool_meta.name}"
params_schema = getattr(tool_meta, "input_schema", {"type": "object", "properties": {}})
description = getattr(tool_meta, "description", f"MCP {namespaced}")
async def on_invoke(ctx, args_json: str) -> str:
# Agents SDK가 args_json을 JSON 문자열로 넘겨준다 → dict로 파싱 후 MCP 호출
try:
args = json.loads(args_json) if args_json else {}
except Exception:
# 모델이 형식 어기면 raw로 전달(서버 쪽 검증에 맡김)
args = {"_raw": args_json}
result = await session.call_tool(tool_meta.name, args or {})
# MCP 표준 content를 텍스트/JSON 위주로 정규화
packed: Dict[str, Any] = {"content": []}
for c in result.content:
if c.type == "text":
packed["content"].append({"type": "text", "text": c.text})
elif c.type == "json":
packed["content"].append({"type": "json", "json": c.json})
else:
packed["content"].append({"type": c.type})
# 에이전트에게는 문자열을 반환하므로 JSON 문자열로 묶어준다
return orjson.dumps(packed).decode()
# FunctionTool은 name/description/params_json_schema/on_invoke_tool만 주면 된다.
# 공식 문서의 "Custom function tools" 섹션과 동일한 생성자. :contentReference[oaicite:2]{index=2}
return FunctionTool(
name=namespaced,
description=description,
params_json_schema=params_schema,
on_invoke_tool=on_invoke,
)
async def build_tools_from_mcp(cfgs: List[MCPServerCfg]) -> List[FunctionTool]:
conns = await connect_servers(cfgs)
tools: List[FunctionTool] = []
for c in conns:
listed = await c.session.list_tools()
for t in listed.tools:
tools.append(make_function_tool_for_mcp(c.name, t, c.session))
return tools
async def main():
# 1) 여러 MCP 서버를 선언
servers = [
MCPServerCfg(name="files", kind="stdio", command="npx", args=["-y", "@modelcontextprotocol/server-filesystem", "./data"]),
# MCPServerCfg(name="search", kind="sse", url="https://mcp.example.com/sse", headers={"Authorization": "Bearer X"}),
# MCPServerCfg(name="db", kind="stdio", command="python", args=["./servers/db/server.py"]),
]
# 2) MCP의 모든 툴을 FunctionTool로 등록
mcp_function_tools = await build_tools_from_mcp(servers)
# 3) 커스텀 LLM(너의 엔드포인트)을 LiteLLM 모델로 사용
model = LitellmModel(
model="my-custom-model",
base_url="http://localhost:8000/v1", # 네 엔드포인트를 OpenAI 호환으로 노출했다는 가정
api_key=None,
)
# 4) 에이전트 구성: “function_tool로 등록된 MCP 툴”만 전달
agent = Agent(
name="MCP-through-FunctionTools",
instructions="필요할 때만 툴을 호출한다. 파괴적 작업은 dry_run을 우선한다.",
model=model,
tools=mcp_function_tools, # 포인트: 오직 function_tool만
)
# 5) 실행
result = await Runner.run(
agent,
input="data 폴더에서 sales.csv를 읽고 합계 계산해줘. 필요하면 스스로 적절한 MCP 툴을 골라 써.",
)
print(result.final_output)
if __name__ == "__main__":
asyncio.run(main())
어떻게 돌아가나
1단계. 여러 MCP 서버에 연결하고 list_tools로 메타를 긁는다.
2단계. 각 MCP 툴을 FunctionTool로 감싼다. name은 server:tool로 네임스페이스, description과 JSON 스키마는 MCP의 description/input_schema를 그대로 쓴다. 이 매핑은 Agents 문서의 Function tools/Custom function tools 규약에 맞다. OpenAI GitHub
3단계. on_invoke_tool에서 session.call_tool만 호출해 결과를 문자열(JSON)로 반환한다.
4단계. Agent는 “function_tool로 등록된 툴”만 보므로, 네가 원한 단순한 툴 등록 모델 그대로다. 모델은 LiteLLM을 통해 임의 베이스 URL의 LLM을 쓸 수 있다. OpenAI GitHub
실전 팁
툴 과적재를 피하려면 tools=mcp_function_tools 대신, 프롬프트를 보고 필요한 서버만 필터링해서 전달해라(예: 파일/CSV가 있으면 files:*만). Function tools는 설명과 인자 스키마를 자동 소비하니, MCP 쪽 스키마를 빡빡하게 유지하면 arguments 오류가 줄어든다. FunctionTool로 직접 생성하면 데코레이터 없이도 이름·설명·스키마를 원하는 대로 넣을 수 있다. OpenAI GitHub
확장 아이디어
고위험 툴에 dry_run, confirm 같은 필드를 MCP input_schema에 추가하면, 그대로 FunctionTool 인자 스키마에 반영되어 실행 전에 모델이 스스로 안전 절차를 따른다.
툴 결과가 크면 on_invoke_tool에서 요약만 반환하고, 원본은 파일 핸들이나 URL로 제공하도록 MCP 서버를 설계해 토큰 사용량을 줄인다.
원하면 네 파일/DB MCP 서버의 실제 tool 이름과 input_schema를 알려줘. 위 스크립트가 해당 스키마를 그대로 FunctionTool로 옮겨주기 때문에 추가 코드는 사실상 없다.
응 내가 원하는거랑 가까워진듯. 질문이 더 있어. 질문 1. MCP 말고 일반 Function 툴과 섞인 버젼 추가해줘. 질문 2. 일반 chat completions (with tools) 예제도 바로 아래에 추가해줘. 질문 3. MCP Client Session은 저절로 종료되나? 질문 4. 여러 Agent가 있으면 MCP 종류별로 세션을 여러개 열게될수도 있을거 같은데 하나로 유지하는건 어때? 문제가 있으려나
좋아. 네 질문 4개를 순서대로, 바로 쓸 수 있는 코드와 함께 정리한다. 인용구문은 쓰지 않는다.
1) MCP 툴 + 일반 Function 툴을 섞어 등록하는 버전
아래는 앞서 만든 “MCP → FunctionTool 래핑”에 더해, 일반 파이썬 함수도 FunctionTool로 함께 등록하는 예시다. 에이전트는 두 종류를 한꺼번에 본다.
import asyncio
import json
from typing import Any, Dict, List, Optional
from pydantic import BaseModel
from agents import Agent, Runner, FunctionTool
from agents.extensions.models.litellm_model import LitellmModel
import orjson
# ========== MCP 연결 ==========
from mcp.client.session import ClientSession
from mcp.client.stdio import StdioClientTransport, StdioServerParameters
class MCPServerCfg(BaseModel):
name: str
command: str
args: List[str] = []
class Connected(BaseModel):
name: str
session: ClientSession
async def connect_servers(cfgs: List[MCPServerCfg]) -> List[Connected]:
conns = []
for cfg in cfgs:
transport = StdioClientTransport(StdioServerParameters(command=cfg.command, args=cfg.args))
await transport.start()
sess = ClientSession(transport)
await sess.initialize()
conns.append(Connected(name=cfg.name, session=sess))
return conns
def mcp_tool_to_function(server: str, tool_meta: Any, session: ClientSession) -> FunctionTool:
namespaced = f"{server}:{tool_meta.name}"
params_schema = getattr(tool_meta, "input_schema", {"type": "object", "properties": {}})
desc = getattr(tool_meta, "description", f"MCP {namespaced}")
async def on_invoke(ctx, args_json: str) -> str:
args = {}
if args_json:
try:
args = json.loads(args_json)
except Exception:
args = {"_raw": args_json}
res = await session.call_tool(tool_meta.name, args)
packed = {"content": []}
for c in res.content:
if c.type == "text":
packed["content"].append({"type": "text", "text": c.text})
elif c.type == "json":
packed["content"].append({"type": "json", "json": c.json})
return orjson.dumps(packed).decode()
return FunctionTool(
name=namespaced,
description=desc,
params_json_schema=params_schema,
on_invoke_tool=on_invoke,
)
async def build_mcp_function_tools(cfgs: List[MCPServerCfg]) -> List[FunctionTool]:
tools: List[FunctionTool] = []
conns = await connect_servers(cfgs)
for c in conns:
listed = await c.session.list_tools()
for t in listed.tools:
tools.append(mcp_tool_to_function(c.name, t, c.session))
return tools
# ========== 일반 파이썬 함수 → FunctionTool ==========
def make_local_tools() -> List[FunctionTool]:
async def math_eval(ctx, args_json: str) -> str:
args = json.loads(args_json) if args_json else {}
expr = args.get("expr", "")
if not expr or any(ch for ch in expr if ch not in "0123456789+-*/(). "):
return json.dumps({"error": "expr must contain only digits and + - * / ( )"})
try:
val = eval(expr, {"__builtins__": {}}, {})
return json.dumps({"value": val})
except Exception as e:
return json.dumps({"error": str(e)})
return [
FunctionTool(
name="local.math_eval",
description="안전 사칙연산 계산기. 입력: {expr: string}",
params_json_schema={
"type": "object",
"properties": {"expr": {"type": "string"}},
"required": ["expr"],
"additionalProperties": False,
},
on_invoke_tool=math_eval,
)
]
# ========== 실행 예시 ==========
async def main():
# 1) MCP 툴
mcp_tools = await build_mcp_function_tools([
MCPServerCfg(name="files", command="npx", args=["-y", "@modelcontextprotocol/server-filesystem", "./data"]),
# MCPServerCfg(name="db", command="python", args=["./servers/db/server.py"]),
])
# 2) 일반 Function 툴
local_tools = make_local_tools()
# 3) 모델 (네 커스텀 LLM OpenAI-호환 엔드포인트)
model = LitellmModel(model="my-model", base_url="http://localhost:8000/v1")
agent = Agent(
name="MixedToolsAgent",
instructions="필요할 때만 툴을 호출한다. 파괴적 작업은 dry_run을 우선한다.",
model=model,
tools=mcp_tools + local_tools, # 섞어서 등록
)
result = await Runner.run(
agent,
input="data 폴더의 sales.csv를 읽어 합계를 구하고, local.math_eval로 숫자 검증까지 해줘",
)
print(result.final_output)
if __name__ == "__main__":
asyncio.run(main())
2) 일반 Chat Completions (with tools) 예제
OpenAI 호환 Chat Completions API를 직접 쓰는 순수 예시다. tools에 함수 스키마를 주고, tool_calls를 받아서 너의 실행기로 라우팅한다. 여기서는 로컬 계산기 하나만 보여주지만, 위의 MCP tools 리스트를 그대로 합쳐 넣으면 된다.
import json, requests
BASE_URL = "http://localhost:8000/v1" # 네 OpenAI 호환 엔드포인트
MODEL = "my-model"
tools = [
{
"type": "function",
"function": {
"name": "local.math_eval",
"description": "안전 사칙연산 계산기",
"parameters": {
"type": "object",
"properties": {"expr": {"type": "string"}},
"required": ["expr"],
"additionalProperties": False
}
}
}
# 여기에 MCP에서 긁어온 tools도 그대로 append 가능
]
messages = [
{"role": "system", "content": "필요할 때만 툴을 호출한다."},
{"role": "user", "content": "3*(2+5) 값을 계산해줘"},
]
def run_local_math_eval(args: dict):
expr = args.get("expr","")
if not expr or any(ch for ch in expr if ch not in "0123456789+-*/(). "):
return {"error": "expr must contain only digits and + - * / ( )"}
try:
val = eval(expr, {"__builtins__": {}}, {})
return {"value": val}
except Exception as e:
return {"error": str(e)}
# 1차 요청
resp = requests.post(
f"{BASE_URL}/chat/completions",
json={"model": MODEL, "messages": messages, "tools": tools, "tool_choice": "auto"},
timeout=60,
)
resp.raise_for_status()
data = resp.json()
msg = data["choices"][0]["message"]
messages.append(msg)
# tool_calls 처리
if "tool_calls" in msg:
for call in msg["tool_calls"]:
name = call["function"]["name"]
args = json.loads(call["function"].get("arguments", "{}") or "{}")
if name == "local.math_eval":
result = run_local_math_eval(args)
else:
result = {"error": f"unknown tool {name}"}
messages.append({
"role": "tool",
"tool_call_id": call["id"],
"name": name,
"content": json.dumps(result)
})
# 2차 요청(도구 결과 반영)
resp2 = requests.post(
f"{BASE_URL}/chat/completions",
json={"model": MODEL, "messages": messages},
timeout=60,
)
resp2.raise_for_status()
print(resp2.json()["choices"][0]["message"]["content"])
else:
print(msg.get("content", ""))
포인트
tools에 MCP에서 긁은 스키마를 그대로 추가하면, 일반 chat completions 루프에서도 MCP와 로컬 툴을 함께 쓸 수 있다. 단, MCP 호출부는 너의 라우터가 처리해 tool 역할 메시지를 만들어 넣으면 된다.
3) MCP ClientSession은 저절로 종료되나?
자동 종료를 기대하면 안 된다. 표준 패턴은 세 가지다.
-
세션을 감싼 트랜스포트(StdIO/SSE)에 대해 stop/close를 명시적으로 호출한다.
-
async with 컨텍스트를 제공하는 래퍼를 만들어 with 블록이 끝날 때 정리한다.
-
프로세스 종료 훅에서 전역 세션들을 순서대로 정리한다.
간단 정리 함수 예시:
# 세션과 트랜스포트를 함께 추적해야 안전
class MCPHandle:
def __init__(self, name, transport, session):
self.name = name
self.transport = transport
self.session = session
async def close(self):
try:
await self.session.close()
except Exception:
pass
try:
await self.transport.stop()
except Exception:
pass
세션을 열 때 MCPHandle을 만들어 리스트에 넣고, 애플리케이션 종료 시 모든 handle.close()를 await 하면 된다.
4) 여러 Agent가 세션을 공유해도 될까? 하나로 유지하는 전략과 주의점
결론부터 말하면, 공유 풀을 두고 재사용하는 게 일반적으로 좋다. 다만 몇 가지 주의점이 있다.
장점
메모리·프로세스 수 절감. stdio 서버는 프로세스를 하나만 띄우면 된다.
콜드 스타트 지연 감소. list_tools 캐시와 연결 수립 시간이 절약된다.
주의점
동시성. 하나의 MCP 세션이 다중 호출을 처리할 때, 서버/클라이언트가 동시 호출을 안전하게 처리하는지 확인해야 한다. 대부분의 MCP 서버 구현은 괜찮지만, 특정 서버가 순차 처리만 안전하게 지원한다면 큐잉이나 세마포어가 필요하다.
격리. 사용자·권한이 다른 Agent가 같은 세션을 쓰면 권한 경계가 흐려질 수 있다. 서버 인스턴스를 권한 단위로 분리하는 게 안전하다.
생명주기. 누가 언제 닫을지 합의가 필요하다. 레퍼런스 카운팅 또는 명시적 init/close API가 필요하다.
버전·옵션 차이. 같은 서버라도 플래그(예: dry_run 기본값)가 다른 Agent마다 다를 수 있다. 이땐 세션을 공유하지 말고 별도 인스턴스를 띄워라.
간단한 세션 풀 스캐폴딩(RefCount + AsyncLock):
import asyncio
from typing import Dict, Tuple
from mcp.client.session import ClientSession
from mcp.client.stdio import StdioClientTransport, StdioServerParameters
class MCPSessionPool:
def __init__(self):
self._lock = asyncio.Lock()
self._pool: Dict[Tuple[str, Tuple[str,...]], dict] = {}
# key 예시: ("stdio", ("npx","-y","@modelcontextprotocol/server-filesystem","./data"))
async def acquire_stdio(self, name: str, command: str, args: list) -> ClientSession:
key = ("stdio", tuple([command] + args))
async with self._lock:
slot = self._pool.get(key)
if slot:
slot["ref"] += 1
return slot["session"]
# 새로 생성
transport = StdioClientTransport(StdioServerParameters(command=command, args=args))
await transport.start()
session = ClientSession(transport)
await session.initialize()
self._pool[key] = {"ref": 1, "session": session, "transport": transport, "name": name}
return session
async def release_stdio(self, command: str, args: list):
key = ("stdio", tuple([command] + args))
async with self._lock:
slot = self._pool.get(key)
if not slot:
return
slot["ref"] -= 1
if slot["ref"] <= 0:
try:
await slot["session"].close()
finally:
try:
await slot["transport"].stop()
finally:
self._pool.pop(key, None)
사용 패턴
Agent가 시작할 때 acquire_stdio로 세션을 받아 쓰고, 끝날 때 release_stdio로 반납한다. 동일한 command/args 조합은 한 세션을 공유한다. 권한 격리가 필요한 경우 key에 추가 식별자(tenant, role 등)를 포함시키면 된다. 동시성 밀도가 높다면 각 MCP 서버 특성에 맞춰 per-server 세마포어를 더해주는 게 안전하다.
마무리 팁
섞인 환경(일반 함수 + MCP)에서는 툴 수가 늘어 혼란이 생기기 쉬우니, 에이전트에 등록할 때 도메인별 프리픽스(server:tool, local.*)와 짧고 명확한 설명, 엄격한 JSON 스키마(required/enum/format)를 유지하자. 필요하면 한 턴에 노출할 툴을 필터링해 과적재를 줄이는 것도 효과적이다.
1번과 2번을 합쳐서 써야지. 두 개 툴 구현이 너무 다르잖아. 툴 구현을 한 번만 하고 두 군데 모두에서 쓰고 싶은거야.
좋아, “툴 구현은 한 번만” 하고, 그걸 두 방식 모두에서 재사용하도록 합치자. 아래 스캐폴딩은 단일 ToolRegistry에 툴을 등록하고, 같은 구현을 1) OpenAI Agents SDK(FunctionTool) 경로, 2) 일반 Chat Completions(with tools) 경로 양쪽에서 그대로 씁니다. MCP 툴도 동일한 추상화로 래핑합니다.
설치
pip install "openai-agents[litellm]" mcp pydantic orjson requests
unified_tools.py
import asyncio
import json
from typing import Any, Awaitable, Callable, Dict, List, Optional
import orjson
import requests
from pydantic import BaseModel
###############################################################################
# 공통 추상화: 한 번 구현한 툴을 어디서든 재사용
###############################################################################
# Tool 핸들러 시그니처: args(dict) -> dict/str (awaitable)
ToolHandler = Callable[[Dict[str, Any]], Awaitable[Any]]
class ToolSpec(BaseModel):
name: str
description: str
parameters: Dict[str, Any] # JSON Schema
handler: ToolHandler # 실제 실행 로직(동일 구현을 양쪽에서 재사용)
class ToolRegistry:
def __init__(self):
self._tools: Dict[str, ToolSpec] = {}
def register(self, spec: ToolSpec):
if spec.name in self._tools:
raise ValueError(f"duplicated tool name: {spec.name}")
self._tools[spec.name] = spec
def get(self, name: str) -> ToolSpec:
return self._tools[name]
def list_specs(self) -> List[ToolSpec]:
return list(self._tools.values())
def list_for_chat_completions(self) -> List[Dict[str, Any]]:
# OpenAI tools 포맷으로 직렬화
out = []
for t in self.list_specs():
out.append({
"type": "function",
"function": {
"name": t.name,
"description": t.description,
"parameters": t.parameters
}
})
return out
###############################################################################
# 로컬(일반) 툴 구현 예시: 한 번만 구현
###############################################################################
async def math_eval_handler(args: Dict[str, Any]) -> Any:
expr = args.get("expr", "")
if not expr or any(ch for ch in expr if ch not in "0123456789+-*/(). "):
return {"error": "expr must contain only digits and + - * / ( )"}
try:
val = eval(expr, {"__builtins__": {}}, {})
return {"value": val}
except Exception as e:
return {"error": str(e)}
LOCAL_MATH_TOOL = ToolSpec(
name="local.math_eval",
description="안전 사칙연산 계산기. 입력 {expr: string}",
parameters={
"type": "object",
"properties": {"expr": {"type": "string"}},
"required": ["expr"],
"additionalProperties": False
},
handler=math_eval_handler
)
###############################################################################
# MCP 툴을 동일 추상화로 래핑: 세션별로 ToolSpec을 생성
###############################################################################
from mcp.client.session import ClientSession
from mcp.client.stdio import StdioClientTransport, StdioServerParameters
try:
from mcp.client.sse import SSEClientTransport
HAVE_SSE = True
except Exception:
HAVE_SSE = False
class MCPServerCfg(BaseModel):
name: str # 네임스페이스 prefix
kind: str # "stdio" | "sse"
command: Optional[str] = None
args: Optional[List[str]] = None
url: Optional[str] = None
headers: Optional[Dict[str, str]] = None
class MCPHandle:
def __init__(self, name: str, transport, session: ClientSession):
self.name = name
self.transport = transport
self.session = session
async def close(self):
try:
await self.session.close()
except Exception:
pass
try:
await self.transport.stop()
except Exception:
pass
async def connect_mcp(cfg: MCPServerCfg) -> MCPHandle:
if cfg.kind == "stdio":
transport = StdioClientTransport(StdioServerParameters(command=cfg.command, args=cfg.args or []))
elif cfg.kind == "sse":
if not HAVE_SSE:
raise RuntimeError("mcp.client.sse 모듈 필요")
transport = SSEClientTransport(url=cfg.url, headers=cfg.headers or {})
else:
raise ValueError(f"unknown kind: {cfg.kind}")
await transport.start()
session = ClientSession(transport)
await session.initialize()
return MCPHandle(cfg.name, transport, session)
def mcp_toolspecs_from_session(server_name: str, session: ClientSession) -> List[ToolSpec]:
# 동기 함수로 명세만 만들고, handler는 session을 캡처하여 비동기로 호출
# list_tools 는 async라 밖에서 호출해 metadata를 넘겨줘야 한다.
raise RuntimeError("call build_mcp_tools(...) instead")
async def build_mcp_tools(handle: MCPHandle) -> List[ToolSpec]:
listed = await handle.session.list_tools()
specs: List[ToolSpec] = []
for t in listed.tools:
namespaced = f"{handle.name}:{t.name}"
schema = getattr(t, "input_schema", {"type": "object", "properties": {}})
desc = getattr(t, "description", f"MCP {namespaced}")
async def _handler(args: Dict[str, Any], *, _tool=t):
# _tool를 default 인자로 바인딩해 late-binding 이슈 방지
res = await handle.session.call_tool(_tool.name, args or {})
packed = {"content": []}
for c in res.content:
if c.type == "text":
packed["content"].append({"type": "text", "text": c.text})
elif c.type == "json":
packed["content"].append({"type": "json", "json": c.json})
else:
packed["content"].append({"type": c.type})
return packed
specs.append(ToolSpec(
name=namespaced,
description=desc,
parameters=schema,
handler=_handler
))
return specs
###############################################################################
# 1) OpenAI Agents SDK 경로: FunctionTool로 래핑하되, handler는 동일 구현 사용
###############################################################################
from agents import Agent, Runner, FunctionTool
from agents.extensions.models.litellm_model import LitellmModel
def to_function_tool(spec: ToolSpec) -> FunctionTool:
async def on_invoke(ctx, args_json: str) -> str:
try:
args = json.loads(args_json) if args_json else {}
except Exception:
args = {"_raw": args_json}
result = await spec.handler(args)
# 에이전트는 문자열 반환이 표준이므로 JSON 문자열로 직렬화
return orjson.dumps(result).decode()
return FunctionTool(
name=spec.name,
description=spec.description,
params_json_schema=spec.parameters,
on_invoke_tool=on_invoke
)
async def run_with_agents(reg: ToolRegistry, user_text: str):
tools = [to_function_tool(t) for t in reg.list_specs()]
model = LitellmModel(model="my-model", base_url="http://localhost:8000/v1")
agent = Agent(
name="UnifiedToolsAgent",
instructions="필요할 때만 툴을 호출한다. 파괴적 작업은 dry_run을 우선한다.",
model=model,
tools=tools
)
result = await Runner.run(agent, user_text)
print(result.final_output)
###############################################################################
# 2) 일반 Chat Completions 경로: tools JSON + 동일 handler로 tool_calls 라우팅
###############################################################################
class ChatCompletionsRunner:
def __init__(self, base_url: str, model: str, api_key: Optional[str] = None, path: str = "/v1/chat/completions"):
self.base_url = base_url.rstrip("/")
self.model = model
self.api_key = api_key
self.path = path
def _headers(self):
h = {"content-type": "application/json"}
if self.api_key:
h["authorization"] = f"Bearer {self.api_key}"
return h
def _post(self, payload: Dict[str, Any]) -> Dict[str, Any]:
r = requests.post(f"{self.base_url}{self.path}", data=orjson.dumps(payload), headers=self._headers(), timeout=60)
r.raise_for_status()
return r.json()
async def run(self, reg: ToolRegistry, messages: List[Dict[str, Any]]) -> str:
tools = reg.list_for_chat_completions()
# 1차 호출
data = self._post({"model": self.model, "messages": messages, "tools": tools, "tool_choice": "auto"})
msg = data["choices"][0]["message"]
messages.append(msg)
# 도구 호출 처리 루프(필요 시 반복)
while "tool_calls" in msg:
for call in msg["tool_calls"]:
name = call["function"]["name"]
raw_args = call["function"].get("arguments", "{}") or "{}"
try:
args = json.loads(raw_args)
except Exception:
args = {"_raw": raw_args}
spec = reg.get(name)
result = await spec.handler(args)
messages.append({
"role": "tool",
"tool_call_id": call["id"],
"name": name,
"content": orjson.dumps(result).decode()
})
data = self._post({"model": self.model, "messages": messages})
msg = data["choices"][0]["message"]
messages.append(msg)
return msg.get("content", "")
###############################################################################
# 실행 예시: 한 번 구현 → 두 경로에서 사용
###############################################################################
async def main():
# 0) 레지스트리 생성 및 로컬 툴 등록(한 번만)
reg = ToolRegistry()
reg.register(LOCAL_MATH_TOOL)
# 1) MCP 세션 연결 후, MCP 툴들을 동일 추상화로 등록
files_cfg = MCPServerCfg(name="files", kind="stdio", command="npx", args=["-y", "@modelcontextprotocol/server-filesystem", "./data"])
files_handle = await connect_mcp(files_cfg)
try:
for spec in await build_mcp_tools(files_handle):
reg.register(spec)
# A) Agents SDK 경로
print("=== Agents SDK 경로 ===")
await run_with_agents(reg, "data 폴더의 sales.csv를 읽고 합계를 계산한 뒤 local.math_eval로 2배를 계산해줘")
# B) 일반 Chat Completions 경로
print("=== Chat Completions 경로 ===")
runner = ChatCompletionsRunner(base_url="http://localhost:8000", model="my-model")
out = await runner.run(
reg,
messages=[
{"role": "system", "content": "필요할 때만 툴을 호출한다. 파괴적 작업은 dry_run을 우선한다."},
{"role": "user", "content": "files 서버 툴로 sales.csv 합계를 구하고 local.math_eval로 100+23도 계산해줘"}
]
)
print(out)
finally:
await files_handle.close()
if __name__ == "__main__":
asyncio.run(main())
핵심 포인트 정리
툴은 ToolSpec(handler 포함)으로 한 번만 구현한다. MCP도 build_mcp_tools가 동일한 ToolSpec을 만들고, 로컬 툴도 같은 형태다.
OpenAI Agents SDK는 FunctionTool로 얇게 래핑만 한다. on_invoke는 같은 handler를 호출한다.
Chat Completions(with tools)도 registry의 tools JSON을 그대로 사용하고, tool_calls를 같은 handler로 라우팅한다.
MCP 세션은 MCPHandle.close로 명시 종료한다. 여러 Agent나 여러 러너가 있을 때 공유하고 싶다면 핸들 풀을 두고 레퍼런스 카운팅만 얹으면 된다.
이제 “툴 구현은 한 번만” 쓰고, Agents와 Chat Completions 어디서든 동일하게 돌아간다. 필요한 경우 expose(툴 소출) 정책이나 권한 게이트를 registry 레벨에서 필터링 훅으로 추가하면 된다.
흠 차라리 MCP에 대한 정보를 따로 받고 에이전트에게 알아서 처리하는게 나을까? 이러너 레이어를 내가 직접 만드는것보다?
짚고 가자. “MCP 정보를 에이전트에게 그대로 넘겨서 알아서 쓰게 할지” vs “네가 러너 레이어(오케스트레이터)를 직접 만들어 통제할지”는 트레이드오프다. 한쪽이 항상 더 낫지는 않다. 아래를 기준으로 선택하면 된다.
선택 기준 요약
에이전트가 MCP를 직접 다루게 한다
장점: 설정이 단순하고 빠르다. SDK가 세션 관리, 툴 호출, 반환 형식 등을 알아서 처리한다. 호스티드/원격 MCP는 코드 없이 붙이기 쉽다. openai.github.io+1
단점: 턴별 툴 소출(Selective exposure), 고급 승인 흐름, 로깅 표준화, 네임스페이스 규칙, 재시도·서킷브레이커 같은 “플랫폼 공통 정책”을 세밀하게 넣기 어렵다. 모델이 많은 툴을 동시에 보게 되면 호출 혼선이 늘어난다. OpenAI Platform+1
직접 러너(오케스트레이터)를 둔다
장점: 툴 노출·권한·에러 처리·관측성을 전부 네가 표준화할 수 있다. 여러 채팅 백엔드(Agents, 일반 Chat Completions, 사내 엔진)를 같은 규약으로 꿰매 쓸 수 있다.
단점: 네가 세션 풀, 라우팅, 재시도, 포맷 강제(JSON만 출력 등)를 구현·유지해야 한다.
권장 결론
운영 기준이 빡빡하다면(권한 분리, 승인, 로그, 복구 전략, 비용/지연 관리), 지금까지 만든 얇은 러너를 유지하되 백엔드는 갈아끼우는 방식이 적합하다. 반대로 빠른 초기 통합·PoC, 공개 원격 MCP만 쓰는 경우에는 에이전트가 MCP를 직접 다루게 하는 구성이 가볍다. 둘을 섞는 하이브리드도 가능하다. 즉, 저위험·읽기 전용 MCP는 에이전트가 직접, 고위험 MCP는 러너를 통해서만 노출.
둘 중 하나를 고른 뒤 바로 쓸 수 있게, 실제 사용 스니펫을 2개 붙인다.
에이전트가 MCP를 직접 다루는 최소 예시
여기서는 OpenAI Agents Python SDK에 여러 MCP 서버를 그대로 넘긴다. 모델은 OpenAI 호환 REST로 노출된 커스텀 엔드포인트를 쓴다(LiteLLM 래퍼).
주의: 에이전트에 툴을 모두 노출하면 혼선이 생길 수 있으니, 필요하면 툴 필터·승인 옵션을 병행하라.
import asyncio
from agents import Agent, Runner
from agents.extensions.models.litellm_model import LitellmModel
from agents.mcp import MCPServerStdio, MCPServerSse # SDK 내장 MCP 커넥터
async def main():
async with MCPServerStdio(
name="files",
params={"command": "npx", "args": ["-y", "@modelcontextprotocol/server-filesystem", "./data"]},
cache_tools_list=True,
) as files_server, MCPServerSse(
name="search",
params={"url": "https://your-mcp.example/sse", "headers": {"Authorization": "Bearer X"}},
cache_tools_list=True,
) as search_server:
model = LitellmModel(model="your-model", base_url="http://localhost:8000/v1")
agent = Agent(
name="AgentManagedMCP",
instructions="필요한 MCP 툴만 선택해서 호출한다. 파괴적 작업은 dry_run을 우선한다.",
model=model,
mcp_servers=[files_server, search_server], # 포인트: MCP 정보를 그대로 넘김
)
out = await Runner.run(agent, "data 폴더에서 sales.csv 총합을 구하고, 검색 서버로 관련 리포트를 찾아 요약해줘")
print(out.final_output)
asyncio.run(main())
일반 Chat Completions(with tools)에 바로 붙이는 최소 예시
여기서는 MCP 정보를 직접 넘기지 않고, MCP가 노출한 툴들을 OpenAI “tools” 포맷으로 변환해서 모델에게 보여준다. 모델의 tool_calls를 받으면 네가 call_tool을 실행한다. 이 경로는 러너가 얇게나마 필요하지만, 너가 이미 가진 통일 인터페이스(단일 ToolRegistry + handler 재사용)를 유지하기 좋다.
import asyncio, json, requests
from mcp.client.session import ClientSession
from mcp.client.stdio import StdioClientTransport, StdioServerParameters
BASE = "http://localhost:8000/v1"
MODEL = "your-model"
async def list_mcp_tools_as_openai_tools():
transport = StdioClientTransport(StdioServerParameters(command="npx", args=["-y","@modelcontextprotocol/server-filesystem","./data"]))
await transport.start()
session = ClientSession(transport); await session.initialize()
tools = []
listed = await session.list_tools()
for t in listed.tools:
tools.append({
"type": "function",
"function": {
"name": f"files:{t.name}",
"description": getattr(t,"description", ""),
"parameters": getattr(t,"input_schema", {"type":"object","properties":{}})
}
})
return session, tools # session은 뒤에서 call_tool에 사용
async def main():
session, tools = await list_mcp_tools_as_openai_tools()
messages = [
{"role":"system","content":"필요한 툴만 호출한다."},
{"role":"user","content":"files 서버 툴로 sales.csv 합계를 구해줘"}
]
# 1차 요청
data = requests.post(f"{BASE}/chat/completions", json={"model": MODEL, "messages": messages, "tools": tools, "tool_choice":"auto"}, timeout=60).json()
msg = data["choices"][0]["message"]; messages.append(msg)
if "tool_calls" in msg:
for call in msg["tool_calls"]:
name = call["function"]["name"] # e.g., files:read
server, tool = name.split(":",1)
args = json.loads(call["function"].get("arguments","{}") or "{}")
# MCP 실행
res = await session.call_tool(tool, args)
# 표준화하여 툴 메시지로 재주입
messages.append({
"role": "tool",
"tool_call_id": call["id"],
"name": name,
"content": json.dumps({"content":[getattr(c,"__dict__",{}) for c in res.content]})
})
# 2차 요청
data2 = requests.post(f"{BASE}/chat/completions", json={"model": MODEL, "messages": messages}, timeout=60).json()
print(data2["choices"][0]["message"]["content"])
await session.close()
await session._transport.stop()
asyncio.run(main())
언제 “에이전트에게 맡기기”가 유리한가
MCP 서버가 전부 원격·공개이고, 권한 경계가 단순하며, 툴 수가 많지 않고, 빠르게 결과를 내야 할 때. SDK가 세션·호출을 대신 관리하므로 개발 비용이 낮다. OpenAI Agents SDK의 MCP 가이드는 이 경로를 기본으로 상정한다. openai.github.io
언제 “직접 오케스트레이션”이 유리한가
툴이 많고 성격이 다양해서 턴별 소출이 필요할 때, 실패·재시도·백오프 같은 SRE 룰을 도입해야 할 때, 감사 로그와 승인 플로우가 필요한 조직 환경일 때. 이때는 네가 만든 ToolRegistry/handler 단일화 구조가 유지보수에 유리하다. 모델/플랫폼을 바꿀 때도 백엔드만 교체하면 된다. OpenAI 표준 함수콜 루프는 문서의 권장 패턴이며, 이 위에 정책을 얹기 쉽다. OpenAI Platform+1
하이브리드 팁
저위험·읽기 전용 MCP는 에이전트 직결로 빨리 붙이고, 고위험(쓰기/삭제/결제/배포) MCP는 러너 경유 툴로만 노출한다. 에이전트 쪽에는 해당 툴을 숨겨 두고, 러너가 승인된 요청만 대리 호출한다. 이 구성이 현업에서 가장 에러율·사고율이 낮고, 초기 속도도 빠르다.
마지막 체크리스트
툴 과적재를 피하려면 툴 노출 수를 턴당 8~12개 수준에서 시작하자.
namespacing은 server:tool 패턴을 고정해 충돌을 막자.
input_schema를 엄격하게(required, enum, format) 잡아 잘못된 arguments를 줄이자.
MCP 세션은 자동 종료가 아니므로 종료 훅을 두거나 컨텍스트 매니저를 사용하자.
세션 공유는 가능하지만 권한·동시성·생명주기 규칙을 먼저 정해둬야 한다.
원한다면 지금 네 실제 MCP 서버 목록과 위험도(읽기/쓰기), 그리고 사용 중인 모델 엔드포인트를 알려줘. 위 스니펫을 네 환경에 맞게 바로 돌아가도록 깔끔하게 커스터마이즈해주겠다.
MCP가 3개 이상들어왔을 때 처리에 대해 알려줘. 그리고 OpenAI Agent 말고 다른 Agent도 있어. 1. MCP에 관한 정보를 통일되게 다룰 수 있는 클래스가 필요해. 2. 1에서 만든 정보를 각각의 Agent에 맞게 Adapt를 해줄 수 있는 얇은 클래스나 함수가 필요할 것 같아.
좋아. “MCP 서버가 3개 이상”일 때 흔들리지 않게 돌리는 구조를 딱 두 층으로 깔끔하게 나눠줄게.
-
MCP 정보를 “완전히 통일된 포맷”으로 다루는 허브 클래스
-
그 통일 포맷을 각 에이전트별로 얇게 변환하는 어댑터들
한 번의 구현으로 OpenAI Agents, 일반 Chat Completions(with tools), 그리고 다른 프레임워크(원하면 LangChain/Autogen 등)까지 같은 툴 구현을 재사용할 수 있다. 아래 스캐폴딩은 그대로 복붙해서 시작해도 된다.
설치
pip install "openai-agents[litellm]" mcp pydantic orjson requests
mcp_unified_adapters.py
import asyncio, json, uuid
from typing import Any, Dict, List, Optional, Callable, Awaitable, Tuple
from pydantic import BaseModel
import orjson, requests
# =========================
# 0) 통일된 툴 스펙/레지스트리
# =========================
ToolHandler = Callable[[Dict[str, Any]], Awaitable[Any]]
class UnifiedToolSpec(BaseModel):
name: str # 예: "files:read_csv" or "local.math_eval"
description: str
parameters: Dict[str, Any] # JSON Schema
handler: ToolHandler # 모든 백엔드에서 재사용되는 실제 실행 로직
class ToolRegistry:
def __init__(self):
self._m: Dict[str, UnifiedToolSpec] = {}
def register(self, spec: UnifiedToolSpec):
if spec.name in self._m:
raise ValueError(f"duplicated tool: {spec.name}")
self._m[spec.name] = spec
def get(self, name: str) -> UnifiedToolSpec:
return self._m[name]
def list(self) -> List[UnifiedToolSpec]:
return list(self._m.values())
# =========================
# 1) MCP 허브(3개+ 서버 안정 운용)
# =========================
# a) 설정과 핸들
from mcp.client.session import ClientSession
from mcp.client.stdio import StdioClientTransport, StdioServerParameters
try:
from mcp.client.sse import SSEClientTransport
HAVE_SSE = True
except Exception:
HAVE_SSE = False
class MCPServerCfg(BaseModel):
name: str # 네임스페이스
kind: str # "stdio" | "sse"
command: Optional[str] = None
args: Optional[List[str]] = None
url: Optional[str] = None
headers: Optional[Dict[str, str]] = None
# 선택: 동시성 제한 등
max_concurrency: int = 8
request_timeout_sec: int = 60
class MCPHandle:
def __init__(self, name: str, transport, session: ClientSession, max_concurrency: int, timeout: int):
self.name = name
self.transport = transport
self.session = session
self.sema = asyncio.Semaphore(max_concurrency)
self.timeout = timeout
async def close(self):
try:
await self.session.close()
except Exception:
pass
try:
await self.transport.stop()
except Exception:
pass
# b) MCP 허브: 3개 이상 서버를 연결·통일 스펙으로 변환·실행 라우팅
class MCPHub:
def __init__(self, servers: List[MCPServerCfg]):
self.cfgs = servers
self.handles: Dict[str, MCPHandle] = {}
self._registry = ToolRegistry()
self._ready = False
async def start(self):
for cfg in self.cfgs:
if cfg.kind == "stdio":
transport = StdioClientTransport(StdioServerParameters(command=cfg.command, args=cfg.args or []))
elif cfg.kind == "sse":
if not HAVE_SSE:
raise RuntimeError("mcp.client.sse 모듈이 필요합니다.")
transport = SSEClientTransport(url=cfg.url, headers=cfg.headers or {})
else:
raise ValueError(f"unknown kind: {cfg.kind}")
await transport.start()
session = ClientSession(transport)
await session.initialize()
self.handles[cfg.name] = MCPHandle(cfg.name, transport, session, cfg.max_concurrency, cfg.request_timeout_sec)
# list_tools를 모두 긁어 통일 스펙으로 등록(네임스페이스: server:tool)
for server, h in self.handles.items():
listed = await h.session.list_tools()
for t in listed.tools:
namespaced = f"{server}:{t.name}"
params = getattr(t, "input_schema", {"type": "object", "properties": {}})
desc = getattr(t, "description", f"MCP {namespaced}")
async def _handler(args: Dict[str, Any], *, _server=server, _tool=t.name):
# 동시성 보호 + 타임아웃
async with self.handles[_server].sema:
coro = self.handles[_server].session.call_tool(_tool, args or {})
res = await asyncio.wait_for(coro, timeout=self.handles[_server].timeout)
packed = {"content": []}
for c in res.content:
if c.type == "text":
packed["content"].append({"type": "text", "text": c.text})
elif c.type == "json":
packed["content"].append({"type": "json", "json": c.json})
else:
packed["content"].append({"type": c.type})
return packed
self._registry.register(UnifiedToolSpec(
name=namespaced, description=desc, parameters=params, handler=_handler
))
self._ready = True
def registry(self) -> ToolRegistry:
if not self._ready:
raise RuntimeError("MCPHub.start() 호출 전입니다.")
return self._registry
async def close(self):
for h in list(self.handles.values()):
await h.close()
self.handles.clear()
self._ready = False
# =========================
# 2) 공통 로컬 툴(선택)
# =========================
async def math_eval(args: Dict[str, Any]) -> Any:
expr = args.get("expr","")
if not expr or any(ch for ch in expr if ch not in "0123456789+-*/(). "):
return {"error":"expr must contain only digits and + - * / ( )"}
try:
val = eval(expr, {"__builtins__": {}}, {})
return {"value": val}
except Exception as e:
return {"error": str(e)}
def local_tools() -> List[UnifiedToolSpec]:
return [
UnifiedToolSpec(
name="local.math_eval",
description="안전 사칙연산 계산기",
parameters={
"type":"object",
"properties":{"expr":{"type":"string"}},
"required":["expr"],
"additionalProperties": False
},
handler=math_eval
)
]
# =========================
# 3) 어댑터 레이어(얇게)
# =========================
# 3-a) OpenAI Agents Python SDK(FunctionTool) 어댑터
from agents import Agent, Runner, FunctionTool
from agents.extensions.models.litellm_model import LitellmModel
def to_agents_function_tools(reg: ToolRegistry) -> List[FunctionTool]:
def wrap(spec: UnifiedToolSpec) -> FunctionTool:
async def on_invoke(ctx, args_json: str) -> str:
try:
args = json.loads(args_json) if args_json else {}
except Exception:
args = {"_raw": args_json}
out = await spec.handler(args)
return orjson.dumps(out).decode()
return FunctionTool(
name=spec.name,
description=spec.description,
params_json_schema=spec.parameters,
on_invoke_tool=on_invoke
)
return [wrap(t) for t in reg.list()]
async def run_with_agents(reg: ToolRegistry, base_url: str, model: str, prompt: str):
model_impl = LitellmModel(model=model, base_url=base_url.rstrip("/") + "/v1")
agent = Agent(
name="UnifiedAgent",
instructions="필요할 때만 툴을 호출하고, 파괴적 작업은 dry_run을 우선한다.",
model=model_impl,
tools=to_agents_function_tools(reg)
)
out = await Runner.run(agent, prompt)
print(out.final_output)
# 3-b) OpenAI 호환 Chat Completions(with tools) 어댑터
class ChatCompletionsRunner:
def __init__(self, base_url: str, model: str, api_key: Optional[str] = None, path="/v1/chat/completions"):
self.base_url = base_url.rstrip("/")
self.model = model
self.api_key = api_key
self.path = path
def _headers(self):
h = {"content-type":"application/json"}
if self.api_key:
h["authorization"] = f"Bearer {self.api_key}"
return h
def _tools_json(self, reg: ToolRegistry) -> List[Dict[str, Any]]:
return [{"type":"function","function":{
"name": t.name, "description": t.description, "parameters": t.parameters
}} for t in reg.list()]
def _post(self, payload: Dict[str, Any]) -> Dict[str, Any]:
r = requests.post(f"{self.base_url}{self.path}", data=orjson.dumps(payload), headers=self._headers(), timeout=60)
r.raise_for_status()
return r.json()
async def run(self, reg: ToolRegistry, system: str, user: str) -> str:
messages = [
{"role":"system","content": system},
{"role":"user","content": user},
]
data = self._post({"model": self.model, "messages": messages, "tools": self._tools_json(reg), "tool_choice": "auto"})
msg = data["choices"][0]["message"]; messages.append(msg)
while "tool_calls" in msg:
for call in msg["tool_calls"]:
name = call["function"]["name"]
raw = call["function"].get("arguments","{}") or "{}"
try:
args = json.loads(raw)
except Exception:
args = {"_raw": raw}
spec = reg.get(name)
out = await spec.handler(args)
messages.append({
"role":"tool",
"tool_call_id": call["id"],
"name": name,
"content": orjson.dumps(out).decode()
})
data = self._post({"model": self.model, "messages": messages})
msg = data["choices"][0]["message"]; messages.append(msg)
return msg.get("content","")
# 3-c) 다른 에이전트(예: LangChain Tool)로의 얇은 어댑터(옵션)
# 필요하면 이런 형태로 추가:
def to_langchain_tools(reg: ToolRegistry):
try:
from langchain.tools import StructuredTool
except Exception:
return []
tools = []
for t in reg.list():
async def _run(**kwargs):
return await t.handler(kwargs)
tools.append(StructuredTool.from_function(
coroutine=_run,
name=t.name.replace(":", "__"), # 프레임워크 명명 규칙 맞추기
description=t.description
# schema는 프레임워크별로 별도 지정 가능
))
return tools
# =========================
# 4) 3개+ MCP 서버 포함 실행 예시
# =========================
async def main():
# a) MCP 서버 3개 이상 선언
servers = [
MCPServerCfg(name="files", kind="stdio", command="npx", args=["-y","@modelcontextprotocol/server-filesystem","./data"]),
MCPServerCfg(name="db", kind="stdio", command="python", args=["./servers/db/server.py"]),
# 공개/원격이면 SSE 가능(모듈 설치 필요)
# MCPServerCfg(name="search", kind="sse", url="https://mcp.example.com/sse", headers={"Authorization": "Bearer X"}),
]
hub = MCPHub(servers)
await hub.start()
try:
reg = hub.registry()
# 로컬(일반) 툴도 같은 레지스트리에 추가
for t in local_tools():
reg.register(t)
# 노출 과적재 방지 팁(선택): 필요할 때만 서브셋을 뽑아 다른 레지스트리에 복사해도 됨
# 여기서는 전체를 그대로 사용
# A) OpenAI Agents 경로
await run_with_agents(
reg=reg,
base_url="http://localhost:8000", # OpenAI 호환 엔드포인트
model="my-model",
prompt="files 서버로 sales.csv 합계를 구하고, local.math_eval로 100+23 계산도 해줘"
)
# B) Chat Completions 경로
runner = ChatCompletionsRunner(base_url="http://localhost:8000", model="my-model")
out = await runner.run(
reg=reg,
system="필요할 때만 툴을 호출한다. 파괴적 작업은 dry_run을 우선한다.",
user="db 서버 요약 툴로 고객별 구매 합계를 요약하고, 수치 검증은 local.math_eval로 2*(50+5) 계산해줘"
)
print(out)
finally:
await hub.close()
if __name__ == "__main__":
asyncio.run(main())
핵심 설계 포인트 정리
MCP가 3개 이상이어도 MCPHub가 모든 세션을 들고 있고, list_tools를 네임스페이스(server:tool)로 묶어 하나의 ToolRegistry에 올린다. 실행도 같은 handler를 통해 라우팅되므로 구현이 한 번뿐이다.
어댑터 레이어는 얇다. Agents(FunctionTool 리스트 생성), Chat Completions(tools JSON 생성), LangChain 등 아무 프레임워크나 20~30줄 내로 붙는다.
동시성·타임아웃을 MCPHandle에 통합했다. 서버마다 max_concurrency와 timeout을 다르게 줄 수 있다.
세션 종료는 MCPHub.close에서 일괄 처리한다. 자동 종료에 기대지 않는다.
3개 이상 서버일 때 실전 운영 팁
툴 소출: 턴별로 필요한 서브셋만 노출하면 툴 선택 오류가 급감한다. 간단한 키워드 룰부터 시작해도 효과가 크다.
네임스페이스: server:tool을 고정하고, description에도 서버명을 반복 표기하면 혼동이 줄어든다.
스키마 엄격화: required, enum, format을 가능한 한 많이 넣자. arguments 파싱 오류가 줄어든다.
관측성: handler에서 입력/출력/지연을 JSONL로 로깅하면 후속 최적화가 쉬워진다.
고위험 툴: parameters에 dry_run(boolean, default true)와 confirm 문자열을 추가해 모델이 자체로 안전 절차를 밟게 하자.
이 스캐폴딩이면 MCP 정보를 한 번만 수집·정규화하고, 어떤 에이전트든 어댑터만 갈아끼워 재사용할 수 있다. 필요하면 expose(툴 소출) 전략 함수와 세션 풀(여러 프로세스/서비스 간 공유)을 덧대서 팀 표준 플랫폼으로 굳히면 된다.
아니 mcp 서버는 그대로 넘길거야 툴로 내가 래핑 안할거야
좋아. “MCP 서버는 그대로 넘긴다(툴로 래핑 안 함)”를 전제로, 여러 개(3개 이상) MCP를 한꺼번에 다루는 통일 정보 클래스와 각 에이전트별 얇은 어댑터를 만들어줄게. 핵심은 툴 레벨 래핑이 아니라, MCP 서버 메타(연결 방식·자격증명·네임라벨)를 그대로 전달하는 것이다.
아래 스캐폴딩은 세 파트다.
-
UnifiedMCPConfig: MCP 서버 정보를 통일된 포맷으로 보관·검증.
-
Adapters: 이 정보를 각 에이전트 SDK에 맞게 그대로 투사.
2-1) OpenAI Agents Python SDK용 어댑터
2-2) Anthropic Messages API(MCP connector)용 어댑터
2-3) MCP 미지원 프레임워크용 “프로토콜 유지” 대안: 합성 프록시 서버(선택). 툴로 감싸지 않고, 하나의 MCP 서버로 프록시 제공.
추가로 3개 이상 서버일 때의 수명주기, 동시성, 공유 전략까지 포함했다.
설치
pip install mcp pydantic orjson "openai-agents[litellm]" requests
unified_mcp_pass_through.py
import asyncio
from dataclasses import dataclass
from typing import Dict, List, Optional, Literal, Any
from pydantic import BaseModel, Field, HttpUrl, field_validator
import orjson
# 1) 통일 포맷: MCP 서버 정보를 “그대로” 담아 에이전트로 전달하기 위한 모델
Kind = Literal["stdio", "sse", "http_streamable"] # 필요시 확장
class UnifiedMCPServer(BaseModel):
name: str = Field(..., description="에이전트에서 보일 서버 라벨(네임스페이스)")
kind: Kind
# stdio
command: Optional[str] = None
args: List[str] = Field(default_factory=list)
env: Dict[str, str] = Field(default_factory=dict)
# sse/http
url: Optional[str] = None
headers: Dict[str, str] = Field(default_factory=dict)
# 공통 옵션
require_approval: Literal["never", "always", "auto"] = "never"
cache_tools_list: bool = True
max_concurrency: int = 8
request_timeout_sec: int = 60
@field_validator("name")
@classmethod
def non_empty(cls, v: str) -> str:
if not v.strip():
raise ValueError("name must be non-empty")
return v
class UnifiedMCPConfig(BaseModel):
servers: List[UnifiedMCPServer]
def validate_topology(self) -> None:
names = [s.name for s in self.servers]
if len(names) != len(set(names)):
dup = [n for n in names if names.count(n) > 1]
raise ValueError(f"duplicate server names: {dup}")
if len(self.servers) == 0:
raise ValueError("no MCP servers provided")
# 최소 가드: stdio면 command 필수, sse면 url 필수
for s in self.servers:
if s.kind == "stdio" and not s.command:
raise ValueError(f"{s.name}: stdio requires command")
if s.kind in ("sse", "http_streamable") and not s.url:
raise ValueError(f"{s.name}: {s.kind} requires url")
# 2) 어댑터들
# 2-1) OpenAI Agents Python SDK 어댑터: 그대로 mcp_servers에 꽂는 팩토리
from agents.mcp import MCPServerStdio, MCPServerSse, MCPServerStreamableHttp
from agents import Agent, Runner
from agents.extensions.models.litellm_model import LitellmModel
from agents.model_settings import ModelSettings
class OpenAIAgentsMCPAdapter:
def __init__(self, cfg: UnifiedMCPConfig):
self.cfg = cfg
def build_context_managers(self):
cms = []
for s in self.cfg.servers:
if s.kind == "stdio":
cms.append(MCPServerStdio(
name=s.name,
params={"command": s.command, "args": s.args, "env": s.env},
cache_tools_list=s.cache_tools_list,
max_concurrency=s.max_concurrency,
request_timeout_sec=s.request_timeout_sec,
))
elif s.kind == "sse":
cms.append(MCPServerSse(
name=s.name,
params={"url": s.url, "headers": s.headers},
cache_tools_list=s.cache_tools_list,
max_concurrency=s.max_concurrency,
request_timeout_sec=s.request_timeout_sec,
require_approval=s.require_approval,
))
elif s.kind == "http_streamable":
cms.append(MCPServerStreamableHttp(
name=s.name,
params={"url": s.url, "headers": s.headers},
cache_tools_list=s.cache_tools_list,
max_concurrency=s.max_concurrency,
request_timeout_sec=s.request_timeout_sec,
require_approval=s.require_approval,
))
else:
raise ValueError(f"unknown kind {s.kind}")
return cms
async def run_once(self, base_url: str, model: str, prompt: str):
self.cfg.validate_topology()
cms = self.build_context_managers()
# 여러 context manager를 한 번에 열기
async with AsyncExitStackGroup(cms) as opened:
agent = Agent(
name="PassThroughMCPAgent",
instructions="MCP 서버의 툴을 안전하게 사용한다. 파괴적 작업은 dry_run을 우선한다.",
model=LitellmModel(model=model, base_url=base_url.rstrip("/") + "/v1"),
mcp_servers=opened, # 포인트: 서버를 '그대로' 넘김
model_settings=ModelSettings(tool_choice="auto")
)
out = await Runner.run(agent, prompt)
return out.final_output
# 여러 async context manager를 묶어 여닫는 유틸
class AsyncExitStackGroup:
def __init__(self, cms):
self.cms = cms
self.opened = []
async def __aenter__(self):
for cm in self.cms:
self.opened.append(await cm.__aenter__())
return self.opened
async def __aexit__(self, exc_type, exc, tb):
# 역순으로 안전 종료
for cm in reversed(self.cms):
try:
await cm.__aexit__(exc_type, exc, tb)
except Exception:
pass
# 2-2) Anthropic Messages API(MCP connector) 어댑터: payload만 만들어 반환
# 주의: 여기서는 실제 호출까지 하지 않고, mcp_servers JSON을 만들어 돌려준다.
# 너는 이 JSON을 바로 API 바디에 넣으면 된다.
class AnthropicMCPAdapter:
def __init__(self, cfg: UnifiedMCPConfig):
self.cfg = cfg
def to_messages_payload(self, model: str, user_text: str, max_tokens: int = 1000) -> Dict[str, Any]:
self.cfg.validate_topology()
mcp_servers = []
for s in self.cfg.servers:
if s.kind == "stdio":
# Anthropic 커넥터는 원격만 직접 붙이는 패턴이 일반적이므로, stdio는 보통 불가.
# 필요하면 너의 서버를 HTTP로 노출해야 한다.
raise ValueError(f"{s.name}: Anthropic payload는 stdio를 직접 전달할 수 없다. HTTP/SSE로 전환 필요")
else:
mcp_servers.append({
"type": "url",
"url": s.url,
"name": s.name,
"authorization_token": s.headers.get("Authorization") or s.headers.get("authorization"),
"tool_configuration": {"enabled": True}
})
return {
"model": model,
"max_tokens": max_tokens,
"messages": [{"role": "user", "content": user_text}],
"mcp_servers": mcp_servers,
}
# 2-3) MCP 미지원 프레임워크를 위한 “프로토콜 유지” 대안: 합성 프록시(선택)
# 툴 래핑 없이, 여러 상류 MCP를 하나의 '가짜 MCP 서버'로 노출.
# 에이전트는 MCP 한 대만 본다. 내부에서 list_tools/ call_tool를 서버별로 프록시.
# 간단 버전(개념 코드): 실제 MCP 서버 스펙에 맞춘 완성 서버는 별도 구현 필요.
class CompositeMCPProxy:
"""
아이디어: upstream 여러 MCP 세션에 붙어, 외부에는 단일 MCP 서버처럼 동작.
- list_tools: 모든 upstream에서 툴을 긁어 namespaced 이름(server:tool)으로 합쳐 반환
- call_tool: "server:tool"을 파싱해 해당 upstream에 위임
이 클래스 자체는 프로토콜 처리(transport)는 생략하고, 콜백 형태만 제공.
"""
def __init__(self, cfg: UnifiedMCPConfig):
self.cfg = cfg
self.handles: Dict[str, Any] = {}
async def start(self):
# upstream 연결만 담당(프록시 서버로 공개하려면 별도 transport/서버 구현 필요)
from mcp.client.session import ClientSession
from mcp.client.stdio import StdioClientTransport, StdioServerParameters
try:
from mcp.client.sse import SSEClientTransport
HAVE_SSE = True
except Exception:
HAVE_SSE = False
for s in self.cfg.servers:
if s.kind == "stdio":
tr = StdioClientTransport(StdioServerParameters(command=s.command, args=s.args, env=s.env or {}))
elif s.kind == "sse":
if not HAVE_SSE:
raise RuntimeError("mcp.client.sse 필요")
tr = SSEClientTransport(url=s.url, headers=s.headers)
else:
# 필요하면 streamable-http 등 추가
raise ValueError(f"unsupported kind for proxy: {s.kind}")
await tr.start()
sess = ClientSession(tr)
await sess.initialize()
self.handles[s.name] = {"transport": tr, "session": sess}
async def list_all_tools(self) -> List[Dict[str, Any]]:
merged = []
for name, h in self.handles.items():
listed = await h["session"].list_tools()
for t in listed.tools:
merged.append({
"name": f"{name}:{t.name}",
"description": getattr(t, "description", ""),
"input_schema": getattr(t, "input_schema", {"type": "object"})
})
return merged
async def call(self, namespaced: str, args: Dict[str, Any]) -> Dict[str, Any]:
server, tool = namespaced.split(":", 1)
sess = self.handles[server]["session"]
res = await sess.call_tool(tool, args or {})
packed = {"content": []}
for c in res.content:
if c.type == "text":
packed["content"].append({"type": "text", "text": c.text})
elif c.type == "json":
packed["content"].append({"type": "json", "json": c.json})
else:
packed["content"].append({"type": c.type})
return packed
async def close(self):
for name, h in list(self.handles.items()):
try:
await h["session"].close()
except Exception:
pass
try:
await h["transport"].stop()
except Exception:
pass
self.handles.clear()
# 3) 사용 예시
async def demo_openai_agents():
cfg = UnifiedMCPConfig(servers=[
UnifiedMCPServer(name="files", kind="stdio", command="npx", args=["-y","@modelcontextprotocol/server-filesystem","./data"]),
UnifiedMCPServer(name="db", kind="stdio", command="python", args=["./servers/db/server.py"]),
UnifiedMCPServer(name="search", kind="sse", url="https://mcp.example.com/sse", headers={"Authorization":"Bearer X"}),
])
adapter = OpenAIAgentsMCPAdapter(cfg)
result = await adapter.run_once(
base_url="http://localhost:8000",
model="my-model",
prompt="files와 db, search MCP를 적절히 사용해 sales.csv의 통찰을 요약해줘"
)
print(result)
def demo_anthropic_payload():
cfg = UnifiedMCPConfig(servers=[
UnifiedMCPServer(name="search", kind="sse", url="https://mcp.example.com/sse", headers={"Authorization":"Bearer X"}),
UnifiedMCPServer(name="git", kind="sse", url="https://mcp.git.example/sse", headers={"Authorization":"Bearer Y"}),
])
payload = AnthropicMCPAdapter(cfg).to_messages_payload(
model="claude-3-7-sonnet", user_text="두 MCP를 활용해서 리포트를 만들어줘"
)
print(orjson.dumps(payload, option=orjson.OPT_INDENT_2).decode())
# 메인
if __name__ == "__main__":
import asyncio
from contextlib import suppress
asyncio.run(demo_openai_agents())
with suppress(Exception):
demo_anthropic_payload()
어떻게 쓰면 되나 단계별 요약
-
UnifiedMCPConfig에 3개 이상 MCP 서버를 그대로 나열한다. stdio면 command/args, sse면 url/headers만 채우면 된다.
-
OpenAI Agents를 쓸 때는 OpenAIAgentsMCPAdapter로 context manager들을 만들어 Agent(mcp_servers=[…])에 꽂는다. 이 흐름은 MCP 툴을 “래핑하지 않고” 서버 정보를 그대로 전달한다.
-
Anthropic Messages API를 쓸 때는 AnthropicMCPAdapter가 messages.create 바디에 넣을 mcp_servers JSON을 생성해준다. stdio는 원격 공개가 아니면 바로 붙일 수 없으니 SSE/HTTP로 노출해야 한다.
-
MCP 미지원 프레임워크에는 CompositeMCPProxy 같은 방식으로 “MCP 프로토콜은 유지”한 채 합성 서버를 하나 띄우는 선택지가 있다. 이건 툴 래핑이 아니라 “MCP 서버를 하나 더 제공”하는 방식이라 네 조건과 어긋나지 않는다.
3개 이상 서버에서의 실무 체크
툴 과적재 방지. Agent에 넘길 때 서버를 전부 주되, 모델 프롬프트에 “이런 작업에는 files만, 요약은 db만” 같은 지침을 한 줄 넣는 것만으로도 혼선이 줄어든다.
세션/자원 관리. OpenAI Agents 어댑터는 context manager가 종료 시 자동 정리를 수행한다. Anthropic 경로는 원격 서버이므로 별도 세션 정리는 필요 없다.
권한 경계. 서로 다른 테넌트나 위험도를 가진 MCP는 서버 라벨을 명확히 구분하고, require_approval 옵션을 “always/auto”로 둔다(Hosted/원격에서 의미 있음).
동시성·타임아웃. 서버별 max_concurrency, request_timeout_sec를 다르게 주어 병목을 피한다.
이 구성은 “MCP 서버 정보를 있는 그대로 넘긴다”는 원칙을 지키면서, 서로 다른 에이전트들에 같은 정보를 재사용하도록 해준다. 필요한 프레임워크가 더 있으면 같은 패턴으로 어댑터를 20~30줄로 추가하면 된다.
근데 MCP를 보니 그냥 툴만 있는게 아니라 Sampling, 사용자에게 물어보기, 프롬프트 등등 있잖아. 그걸 Chat이랑 연동할 수 있을까
좋아, 핵심은 “MCP가 제공하는 건 툴만이 아니다”는 점을 전제로, Sampling(서버가 모델 호출을 위임), 사용자에게 물어보기(ask-user), 프롬프트/프롬프트 템플릿(list/get) 같은 채널을 Chat 루프에 붙이는 얇은 어댑터 레이어를 여기에 딱 만들어두는 거다. 래핑으로 툴로 바꾸지 않고, MCP 메타(프롬프트/샘플링/질의)를 그대로 전달·왕복시켜 결합한다.
아래 설계와 스캐폴딩은 이전에 만든 “백엔드 갈아끼우기(Chat Completions or OpenAI Agents)” 구조를 유지하면서, MCP의 네 가지 면을 모두 연동한다.
-
툴: 기존과 동일, call_tool 라우팅
-
프롬프트: list_prompts/get_prompt를 통해 시스템/도우미 메시지 주입
-
사용자에게 물어보기: MCP가 사용자 입력을 요청하면, Chat UI에 질문을 띄우고 답을 다시 MCP에 돌려줌
-
샘플링: MCP 서버가 “모델 호출을 대신 해달라”고 요청하면, 현재 백엔드로 대리 샘플링 후 결과를 MCP에게 반환
아래는 그대로 붙이는 최소 스캐폴딩이다. MCP SDK의 일부 API는 서버 구현/버전에 따라 차이가 있어서, 함수 존재 여부를 동적으로 체크하도록 방어코드를 넣었다.
설치
pip install mcp pydantic orjson requests "openai-agents[litellm]"
mcp_chat_bridge.py
import asyncio, json, orjson, requests
from typing import Any, Dict, List, Optional, Callable, Awaitable
from pydantic import BaseModel
from mcp.client.session import ClientSession
from mcp.client.stdio import StdioClientTransport, StdioServerParameters
try:
from mcp.client.sse import SSEClientTransport
HAVE_SSE = True
except Exception:
HAVE_SSE = False
################################################################################
# 공통: Chat 백엔드(두 가지 구현: OpenAI 호환 Chat Completions, OpenAI Agents)
################################################################################
class ChatMessage(BaseModel):
role: str
content: str
class ChatBackend:
async def send(self, messages: List[ChatMessage], tools: Optional[List[Dict[str,Any]]] = None) -> Dict[str,Any]:
raise NotImplementedError
class OpenAIChatBackend(ChatBackend):
def __init__(self, base_url: str, model: str, api_key: Optional[str] = None, path="/v1/chat/completions"):
self.base_url = base_url.rstrip("/")
self.model = model
self.api_key = api_key
self.path = path
def _headers(self):
h = {"content-type":"application/json"}
if self.api_key: h["authorization"] = f"Bearer {self.api_key}"
return h
async def send(self, messages: List[ChatMessage], tools: Optional[List[Dict[str,Any]]] = None) -> Dict[str,Any]:
payload = {
"model": self.model,
"messages": [m.model_dump() for m in messages],
}
if tools: payload["tools"] = tools; payload["tool_choice"] = "auto"
r = requests.post(f"{self.base_url}{self.path}", data=orjson.dumps(payload), headers=self._headers(), timeout=60)
r.raise_for_status()
return r.json()
# 선택: OpenAI Agents를 백엔드로 쓰고 싶다면 이 어댑터를 사용
from agents import Agent, Runner
from agents.model_settings import ModelSettings
from agents.extensions.models.litellm_model import LitellmModel
from agents import FunctionTool
class AgentsBackend(ChatBackend):
def __init__(self, base_url: str, model: str):
self._model = LitellmModel(model=model, base_url=base_url.rstrip("/") + "/v1")
self._agent = Agent(name="LLMOnly", model=self._model, instructions="", model_settings=ModelSettings(tool_choice="none"))
async def send(self, messages: List[ChatMessage], tools: Optional[List[Dict[str,Any]]] = None) -> Dict[str,Any]:
# 도구는 Agent에 등록하지 않고, 프롬프트 강제 포맷으로 tool_calls를 JSON으로 받는 전략도 가능
prompt = "\n".join([f"{m.role.upper()}: {m.content}" for m in messages])
out = await Runner.run(self._agent, prompt, developer_instructions="가능하면 JSON만 출력")
txt = out.final_output.strip()
return {"choices":[{"message":{"role":"assistant","content":txt}}]}
################################################################################
# MCP 연결: 여러 서버를 그대로 붙이고, 프롬프트/샘플링/질문을 브릿지
################################################################################
class MCPServerCfg(BaseModel):
name: str
kind: str # "stdio" | "sse"
command: Optional[str] = None
args: Optional[List[str]] = None
url: Optional[str] = None
headers: Optional[Dict[str,str]] = None
class MCPHandle:
def __init__(self, name: str, transport, session: ClientSession):
self.name = name
self.transport = transport
self.session = session
async def close(self):
try: await self.session.close()
except Exception: pass
try: await self.transport.stop()
except Exception: pass
async def connect_mcp(s: MCPServerCfg) -> MCPHandle:
if s.kind == "stdio":
tr = StdioClientTransport(StdioServerParameters(command=s.command, args=s.args or []))
elif s.kind == "sse":
if not HAVE_SSE: raise RuntimeError("mcp.client.sse 모듈 필요")
from mcp.client.sse import SSEClientTransport
tr = SSEClientTransport(url=s.url, headers=s.headers or {})
else:
raise ValueError(f"unknown kind {s.kind}")
await tr.start()
sess = ClientSession(tr)
await sess.initialize()
return MCPHandle(s.name, tr, sess)
################################################################################
# 브릿지: MCP의 툴/프롬프트/ask-user/샘플링을 Chat 루프에 접속
################################################################################
class MCPChatBridge:
def __init__(
self,
servers: List[MCPServerCfg],
chat_backend: ChatBackend,
ask_user: Callable[[str], Awaitable[str]], # 사용자에게 질문 보여주고 응답 받는 콜백
prompt_selector: Optional[Callable[[Dict[str, List[Dict[str,Any]]]], List[ChatMessage]]] = None,
sampling_policy: Optional[Callable[[Dict[str,Any]], Dict[str,Any]]] = None, # 샘플링 옵션 정규화
):
self.cfgs = servers
self.backend = chat_backend
self.ask_user = ask_user
self.prompt_selector = prompt_selector
self.sampling_policy = sampling_policy or (lambda opts: {"temperature": opts.get("temperature", 0.2)})
self.handles: Dict[str, MCPHandle] = {}
async def __aenter__(self):
for c in self.cfgs:
h = await connect_mcp(c)
self.handles[c.name] = h
return self
async def __aexit__(self, et, ev, tb):
for h in list(self.handles.values()):
await h.close()
self.handles.clear()
async def _list_all_tools(self) -> List[Dict[str,Any]]:
tools: List[Dict[str,Any]] = []
for name, h in self.handles.items():
listed = await h.session.list_tools()
for t in listed.tools:
tools.append({
"type": "function",
"function": {
"name": f"{name}:{t.name}",
"description": getattr(t, "description", ""),
"parameters": getattr(t, "input_schema", {"type":"object","properties":{}})
}
})
return tools
async def _collect_prompts(self) -> Dict[str, List[Dict[str,Any]]]:
# 서버별 프롬프트 목록 취합. 세션에 list_prompts/get_prompt 가 없으면 건너뜀.
collected: Dict[str, List[Dict[str,Any]]] = {}
for name, h in self.handles.items():
if hasattr(h.session, "list_prompts"):
try:
resp = await getattr(h.session, "list_prompts")()
items = []
# 각 항목마다 get_prompt(name)로 템플릿/메시지 구성 요소를 가져올 수 있다는 가정(서버 구현마다 상이)
for p in getattr(resp, "prompts", []):
items.append({"name": getattr(p, "name", ""), "description": getattr(p, "description", "")})
collected[name] = items
except Exception:
continue
return collected
async def _get_prompt_messages(self, server: str, prompt_name: str, **kwargs) -> List[ChatMessage]:
h = self.handles[server]
msgs: List[ChatMessage] = []
if hasattr(h.session, "get_prompt"):
try:
resp = await getattr(h.session, "get_prompt")(prompt_name, **kwargs)
# resp가 messages 비슷한 구조라고 가정하고 표준화
for it in getattr(resp, "messages", []):
role = getattr(it, "role", "system")
content = getattr(it, "content", "")
msgs.append(ChatMessage(role=role, content=content))
except Exception:
pass
return msgs
async def _maybe_server_sampling(self, request: Dict[str,Any]) -> Dict[str,Any]:
"""
MCP 서버가 '샘플링 요구'를 보내는 상황을 가정.
request = {"messages":[...], "params":{...}} 같은 형태를 정책에 맞춰 백엔드에 대리 호출
"""
params = self.sampling_policy(request.get("params", {}))
# request["messages"]는 OpenAI 포맷을 가정하고 그대로 전달
data = await self.backend.send([ChatMessage(**m) for m in request.get("messages", [])], tools=None)
# 필요하면 온전한 응답을 그대로 돌려주거나, 텍스트만 추출
content = data.get("choices",[{}])[0].get("message",{}).get("content","")
return {"content": content, "raw": data}
async def run(self, user_text: str) -> str:
# 1) 서버별 프롬프트 목록을 모아서 선택
base_messages: List[ChatMessage] = [ChatMessage(role="user", content=user_text)]
prompt_catalog = await self._collect_prompts()
if self.prompt_selector:
# prompt_selector는 {server:[{name,desc},...]}를 보고 초기 메시지(예: system 프롬프트들) 리스트를 반환
injected = self.prompt_selector(prompt_catalog)
base_messages = injected + base_messages
# 2) MCP 툴들을 OpenAI tools 포맷으로 그대로 넘김
tools = await self._list_all_tools()
# 3) 1차 모델 호출
data = await self.backend.send(base_messages, tools=tools)
msg = data.get("choices",[{}])[0].get("message",{})
base_messages.append(ChatMessage(role="assistant", content=msg.get("content","")))
# 4) tool_calls 처리 루프 + ask-user + server-sampling 훅 예시
# 실제 ask/sampling 이벤트는 서버가 별도 툴로 노출하거나, resource/protocol 필드를 통해 트리거할 수 있다.
# 여기서는 관습적으로 이름을 잡아 예시를 보인다.
while "tool_calls" in msg:
tool_results: List[Dict[str,Any]] = []
for call in msg["tool_calls"]:
name = call["function"]["name"] # e.g., "files:read_csv" or "db:sql_summary" or "search:ask_user" 등
args_raw = call["function"].get("arguments","{}") or "{}"
try:
args = json.loads(args_raw)
except Exception:
args = {"_raw": args_raw}
# 4-a) ask-user 패턴: 특정 서버가 사용자 입력을 받아달라고 하는 툴을 제공한다고 가정
if name.endswith(":ask_user") or name.startswith("ask."):
question = args.get("question", "추가 정보가 필요합니다. 입력해 주세요.")
user_answer = await self.ask_user(question)
tool_results.append({
"role":"tool",
"tool_call_id": call["id"],
"name": name,
"content": json.dumps({"answer": user_answer})
})
continue
# 4-b) server-side sampling: 서버가 모델 호출을 위임하는 툴(예: server:sample)로 구현돼 있다고 가정
if name.endswith(":sample") or name.startswith("sample."):
req = args.get("request", {"messages":[m.model_dump() for m in base_messages], "params":{}})
sampling_out = await self._maybe_server_sampling(req)
tool_results.append({
"role":"tool",
"tool_call_id": call["id"],
"name": name,
"content": json.dumps(sampling_out)
})
continue
# 4-c) 일반 MCP 툴 호출
if ":" not in name:
tool_results.append({"role":"tool","tool_call_id":call["id"],"name":name,"content":json.dumps({"error":"namespaced tool required"})})
continue
server, tool = name.split(":",1)
h = self.handles.get(server)
if not h:
tool_results.append({"role":"tool","tool_call_id":call["id"],"name":name,"content":json.dumps({"error":"unknown server"})})
continue
try:
res = await h.session.call_tool(tool, args or {})
packed = {"content":[]}
for c in res.content:
if c.type == "text": packed["content"].append({"type":"text","text":c.text})
elif c.type == "json": packed["content"].append({"type":"json","json":c.json})
else: packed["content"].append({"type":c.type})
tool_results.append({"role":"tool","tool_call_id":call["id"],"name":name,"content":orjson.dumps(packed).decode()})
except Exception as e:
tool_results.append({"role":"tool","tool_call_id":call["id"],"name":name,"content":json.dumps({"error":str(e)})})
# 4-d) 툴 결과 재주입
for tr in tool_results:
base_messages.append(ChatMessage(role="tool", content=tr["content"]))
# 일부 백엔드/모델은 tool 메시지에 name/tool_call_id를 요구하므로,
# OpenAIChatBackend를 직접 사용할 때는 messages에 dict 그대로 넣는 경로를 따로 두어도 된다.
# 4-e) 다음 턴 호출
data = await self.backend.send(base_messages, tools=None)
msg = data.get("choices",[{}])[0].get("message",{})
base_messages.append(ChatMessage(role="assistant", content=msg.get("content","")))
# 5) 최종 답변
return base_messages[-1].content
어떻게 쓰는가 예시
async def ui_ask_user(question: str) -> str:
print(f"[QUESTION from MCP] {question}")
# 실제 앱/UI에서는 모달·입력창으로 구현
return input("> ")
async def pick_prompts(prompt_catalog):
# prompt_catalog: {server: [{name,description}, ...]}
# 간단 예시: files 서버에 "analysis_default" 프롬프트가 있으면 불러와 system에 삽입
# 실제로는 bridge._get_prompt_messages(server, prompt_name)로 메시지를 받아 prepend
return [ChatMessage(role="system", content="너는 조심스럽고 정확한 분석가다.")]
async def main():
servers = [
MCPServerCfg(name="files", kind="stdio", command="npx", args=["-y","@modelcontextprotocol/server-filesystem","./data"]),
MCPServerCfg(name="db", kind="stdio", command="python", args=["./servers/db/server.py"]),
# MCPServerCfg(name="search", kind="sse", url="https://mcp.example.com/sse", headers={"Authorization":"Bearer X"}),
]
backend = OpenAIChatBackend(base_url="http://localhost:8000", model="my-model")
async with MCPChatBridge(servers, backend, ask_user=ui_ask_user, prompt_selector=pick_prompts) as bridge:
out = await bridge.run("sales.csv를 분석하고, 필요한 추가 정보가 있으면 먼저 나에게 질문해줘")
print("=== ANSWER ===")
print(out)
if __name__ == "__main__":
asyncio.run(main())
핵심 포인트 요약
샘플링. 서버가 “모델 호출을 대리해달라” 요청하면, 현재 연결된 Chat 백엔드로 그대로 라우팅해 결과를 돌려준다. 이렇게 하면 서버-주도 플로우까지 연동된다.
사용자에게 물어보기. MCP가 사용자 입력이 필요할 때를 툴 혹은 별도 시그널로 노출하는 패턴을 잡고, ask_user 콜백으로 Chat UI와 왕복한다. 러너를 두더라도 로직은 콜백 한 줄이다.
프롬프트. 서버별 프롬프트 레지스트리를 조회(list/get)해서 초기 system 메시지로 주입한다. 프롬프트가 파라미터화되어 있다면 get_prompt(name, **kwargs) 식으로 받아 메시지로 변환한다.
호환성. 같은 브릿지를 OpenAI Chat Completions, OpenAI Agents 어느 쪽에도 꽂을 수 있다. Agents를 백엔드로 쓸 땐 JSON 강제 출력 같은 프롬프트 전략을 써도 되고, 필요하면 Agents의 FunctionTool을 병행해서 ask-user나 sampling을 명시 툴로 노출하는 하이브리드도 가능하다.
3개 이상 서버. 서버 수에 상관없이 namespacing(server:tool) 룰을 유지하고, 프롬프트·질문·샘플링은 서버 라벨과 함께 로깅해 추적 가능성을 높인다. 서버별 타임아웃/동시성은 handle 단에서 개별 튜닝하는 게 안전하다.
이렇게 하면 “툴로 래핑하지 않고”도 MCP의 넓은 표면적(툴·프롬프트·샘플링·유저질문)을 동일 Chat 경험에 자연스럽게 녹일 수 있다. 실제 네 서버에서 프롬프트/샘플링/ask가 어떤 엔드포인트/이벤트로 노출되는지 알려주면, 위 브릿지의 해당 함수만 너의 스펙에 딱 맞게 구체화해 줄 수 있다.
아니 Elicitation이라고 따로 있는데
맞아, MCP에는 “Elicitation(엘리시테이션)”이 따로 있다. 요지는 서버가 작업 도중 사용자에게 추가 정보를 표준 형식으로 요청할 수 있게 해주는 기능이고, 툴과는 별개의 채널이다. 결론부터 말하면 Chat 루프에 깔끔하게 연동할 수 있다. 아래처럼 “질문 라운드트립”을 브릿지에 끼우면 된다. 핵심 포인트와 최소 코드를 단계별로 정리한다.
-
개념 정리
Elicitation은 MCP 서버가 대화 도중 “사용자에게 이 정보 좀 받아줘”라고 요청하는 표준 메커니즘이다. 요청에는 프롬프트 문구와 함께 JSON 스키마가 따라오고, 클라이언트는 그 스키마에 맞게 사용자 입력을 받아 검증한 뒤 서버에 돌려준다. 2025-06-18 스펙에 정식 포함되었고, “툴 실행 중간에 끼어드는 질문”을 안전하고 구조화된 방식으로 다루려는 목적이다. Model Context Protocol+1 -
아키텍처 선택지
A. 프로토콜-네이티브 처리: MCP 클라이언트가 서버의 Elicitation 이벤트를 직접 수신해 UI로 띄우고 응답을 서버에 돌려준다. Chat 모델은 “최종 답변 생성”에 집중하고, 추가 정보 수집은 브릿지가 담당한다.
B. 모델-중개 처리: Elicitation을 “ask_user” 같은 툴로도 노출하는 서버가 있다면, 기존 tool_call 루프에 섞어 처리해도 된다. 하지만 권장되는 방식은 스펙에 맞춘 네이티브 처리다(질문·응답이 중첩 흐름에서도 안정적). Model Context Protocol -
브릿지 확장 패턴
너의 기존 Chat 오케스트레이터/브릿지에 아래 훅 두 개만 더해주면 된다.
a) elicitation 이벤트 수신 → UI로 질문 띄우기 → JSON 스키마 검증 → 서버에 제출
b) 샘플링·프롬프트·툴 루프와 자연스럽게 공존하도록 순서 제어
아래 코드는 “개념적으로 정확한 동작 흐름”을 보여주는 미니 스캐폴딩이다. MCP SDK의 세부 메서드명은 릴리스별로 다를 수 있으므로, 이벤트 구독/응답 부분은 네 SDK 버전에 맞춰 함수명만 치환하면 된다.
설치
pip install mcp jsonschema orjson requests
코드 스니펫: MCP Elicitation 브릿지 훅
import asyncio, json
from typing import Any, Dict, List, Optional, Callable, Awaitable
import orjson
from jsonschema import validate, ValidationError
# 가정: 이미 MCP 세션을 여러 개 열어놓았고, handles = {server: session} 형태로 보유
# 또한 ask_user 콜백(UI 입력)과 chat_backend(네 Chat 또는 Agent 백엔드)가 있다.
async def handle_elicitation_loop(session, ask_user: Callable[[str, Dict[str, Any]], Awaitable[Dict[str, Any]]]):
"""
한 MCP 세션에 대해 서버가 보내는 Elicitation 요청을 처리하는 루프.
SDK마다 이벤트 수신 API 이름이 다를 수 있으므로, 아래 'await session.next_elicitation()'는
네가 쓰는 SDK의 실제 호출로 치환하면 된다.
"""
while True:
req = await session.next_elicitation() # 예시: {"id": "...", "message": "날짜를 입력하세요", "schema": {...}}
if req is None:
break
question = req.get("message", "추가 정보가 필요합니다.")
schema = req.get("schema", {"type": "object", "properties": {}})
# UI에 질문 띄우고, 스키마에 맞는 dict로 응답 받기
user_payload = await ask_user(question, schema)
# 로컬에서 1차 검증(선택이지만 권장)
try:
validate(user_payload, schema)
except ValidationError as e:
# 검증 실패 시 재질문
user_payload = await ask_user(f"입력 형식 오류: {e.message}\n다시 입력해 주세요.", schema)
validate(user_payload, schema)
# 서버에 제출(실제 API명/필드명은 SDK에 맞춰 치환)
await session.submit_elicitation(req["id"], user_payload)
async def run_chat_with_elicitation(backend, handles: Dict[str, Any], user_text: str, ask_user):
# 1) 병행으로 각 세션의 elicitation 루프를 가동
loops = [asyncio.create_task(handle_elicitation_loop(sess, ask_user)) for sess in handles.values()]
# 2) 모델 호출 루프(툴/프롬프트/샘플링 등 기존 처리와 동일)
messages = [{"role":"user","content": user_text}]
tools = [] # 필요하면 MCP list_tools를 OpenAI tools 포맷으로 넣기
# 첫 턴
data = await backend.send(messages, tools=tools)
msg = data.get("choices",[{}])[0].get("message",{})
messages.append(msg)
# tool_calls 처리(생략) + 중간에 서버가 elicitation을 보낼 수 있으므로, 다음 턴 전에 잠깐 yield
# ...
# 다음 턴 계속
# ...
# 3) 종료 시 elicitation 루프 정리
for t in loops:
t.cancel()
with contextlib.suppress(asyncio.CancelledError):
await t
UI 입력 콜백 예시
async def ask_user_via_console(message: str, schema: Dict[str, Any]) -> Dict[str, Any]:
print(f"[MCP 질문] {message}")
print(f"요구 스키마: {json.dumps(schema, ensure_ascii=False)}")
raw = input("> JSON으로 응답: ")
try:
data = json.loads(raw)
except Exception:
return await ask_user_via_console("JSON 파싱 실패. 다시 입력해 주세요.", schema)
return data
여기서 중요한 점
스키마 검증. 서버가 스키마를 제공하므로 클라이언트에서 jsonschema로 1차 검증을 돌리면 왕복 횟수와 오류가 줄어든다. 스펙이 권장하는 보안·프라이버시 정책도 함께 지켜라(민감정보 요구 금지 등). Model Context Protocol
중첩 가능성. Elicitation은 다른 MCP 기능(툴, 프롬프트, 샘플링) 안쪽에 중첩되어 등장할 수 있다. 그러므로 이벤트 루프를 Chat 루프와 병행 실행하는 패턴이 안전하다. Model Context Protocol
호환성. MCP를 직접 지원하는 클라이언트/에이전트(예: 일부 IDE, 일부 Agents SDK)는 Elicitation을 네이티브로 뜯어서 보여줄 예정이거나 이미 실험 중이다. 하지만 서버·SDK에 따라 이벤트/메서드명이 다르므로, 위 루프의 두 함수(next_elicitation, submit_elicitation) 위치만 실제 SDK 이름으로 바꾸면 된다. 개념과 흐름은 동일하다. GitHub+1
OpenAI Agents나 일반 Chat과의 결합 방식 요약
OpenAI Agents를 백엔드 모델로 쓸 때도 동일하다. “에이전트에게 MCP 서버를 그대로 넘겨서 에이전트가 툴 호출을 처리”하도록 둘 수도 있지만, Elicitation은 클라이언트(UI) 레벨의 사용자 상호작용이므로, 브릿지에서 이벤트를 수신해 UI와 왕복 후 결과를 서버에 전달하는 편이 제일 예측 가능하다.
일반 Chat Completions(with tools) 경로에서도 마찬가지다. 모델은 답변/계획에 집중시키고, 사용자 질문은 브릿지가 담당한다. 필요하면 Elicitation으로 수집된 값을 시스템/유저 메시지로도 주입해 다음 턴 답변 품질을 올릴 수 있다.
실전 팁
서버 수가 3개 이상이면 서버 라벨을 질문 메시지에 명시해준다. 예: “[files] 어떤 디렉토리를 생성할까요?”.
민감정보 가드. 스펙이 금지하는 내용(비밀번호/토큰 등)은 브릿지에서 차단하고, 허용 목록만 통과시키는 것이 안전하다. Model Context Protocol
UX. 스키마에 enum/format을 적극 사용하게 서버 쪽에 권장하라. 클라이언트는 enum이면 드롭다운, format=email이면 이메일 입력 UI처럼 친화적으로 렌더링하기 쉽다. Medium
한 줄 결론
Elicitation은 MCP의 “인간-개입 표준 채널”이고, 툴과 별개로 Chat 루프에 깨끗하게 붙일 수 있다. 위 브릿지 훅 두 개만 넣으면, 모델-툴-사용자 삼자 협업이 안정적으로 돈다. 네가 쓰는 MCP Python SDK의 정확한 elicitation API 이름을 알려주면, next_elicitation/submit_elicitation 부분을 해당 버전에 딱 맞춰 즉시 치환해 준다.
근데 MCP Server 라고 클래스명을 하니깐 진짜 서버 같이 느껴져 다른 이름 없어? 이건 설정 값에 가까운거 같은데
맞아, “MCPServer”라고 하면 네트워크 서버 프로세스를 직접 돌린다는 느낌이 강하다. 실제 네가 다루는 건 세션을 열 때 필요한 설정 정보(Descriptor, Config, Profile, Endpoint)에 가까워.
보통 이런 경우 네이밍을 이렇게 바꿔 쓰면 더 자연스럽다.
-
MCPServerConfig : 가장 직설적. 서버에 접속하는 데 필요한 설정값 묶음.
-
MCPServerSpec : “사양/스펙”이라는 의미로, 서버를 기술하는 선언적 정보라는 뉘앙스.
-
MCPServerProfile : 여러 환경(개발/운영/테스트)에서 쓰일 수 있는 “프로필”이라는 톤.
-
MCPServerEndpoint : 특히 SSE/HTTP 주소처럼 원격 엔드포인트를 지칭할 때 적합.
-
MCPConnectionConfig : 실제로는 연결 구성에 해당한다는 걸 분명히 함.
네가 만든 UnifiedMCPConfig 같은 컨테이너 안에 들어갈 원소라면,
class MCPServerConfig(BaseModel): ...
혹은 조금 더 가볍게
class MCPServerSpec(BaseModel): ...
정도가 가장 무난하다.
정리하자면: 서버(server)라는 단어 대신 Config / Spec / Endpoint / Profile 계열을 붙여주면 “실제 서버 객체”가 아니라 “접속 선언”이라는 의미가 명확해진다.
너의 코드베이스에서 MCP 서버를 실제로 구동하는 책임이 아니라 연결 정보를 담는 데이터 모델 이라면, MCPServerConfig가 가장 안전한 선택이다.
원한다면, 네가 지금 가지고 있는 클래스 정의를 내가 MCPServerConfig 식으로 리팩터링해서 전체 코드 스니펫을 다시 정리해줄 수도 있어.
MCPSpec은 어때
좋다. MCPSpec 괜찮다. 다만 “Spec”은 선언적·불변 설정이라는 뉘앙스를 주니, 런타임 상태(세션, 캐시, 동시성 제어)는 다른 이름에 두는 걸 추천한다. 정리하면 이렇게 쓰면 깔끔하다.
권장 의미 구분
-
MCPSpec: 연결·인증·전송방식 같은 선언형 스펙. 직렬화 가능, 불변 취급.
-
MCPHandle 또는 MCPSession: MCPSpec을 바탕으로 실제 연결을 연 뒤 갖게 되는 런타임 객체.
-
MCPConfig: 여러 MCPSpec을 담는 컨테이너.
작은 리팩터 예시
from typing import Literal, Optional, List, Dict
from pydantic import BaseModel, Field
Kind = Literal["stdio", "sse", "http_streamable"]
class MCPSpec(BaseModel):
name: str = Field(..., description="사람과 모델이 보는 라벨")
kind: Kind
# stdio
command: Optional[str] = None
args: List[str] = Field(default_factory=list)
env: Dict[str, str] = Field(default_factory=dict)
# sse/http
url: Optional[str] = None
headers: Dict[str, str] = Field(default_factory=dict)
# 정책/힌트(스펙이므로 기본값만 보관)
require_approval: Literal["never", "always", "auto"] = "never"
cache_tools_list: bool = True
max_concurrency: int = 8
request_timeout_sec: int = 60
class MCPConfig(BaseModel):
servers: List[MCPSpec]
def validate_topology(self):
names = [s.name for s in self.servers]
if len(names) != len(set(names)):
raise ValueError(f"duplicate names: {names}")
for s in self.servers:
if s.kind == "stdio" and not s.command:
raise ValueError(f"{s.name}: stdio requires command")
if s.kind in ("sse","http_streamable") and not s.url:
raise ValueError(f"{s.name}: {s.kind} requires url")
런타임 분리 패턴
from mcp.client.session import ClientSession
from mcp.client.stdio import StdioClientTransport, StdioServerParameters
from typing import Dict
class MCPHandle:
def __init__(self, spec: MCPSpec, transport, session: ClientSession):
self.spec = spec
self.transport = transport
self.session = session
async def close(self):
try: await self.session.close()
finally:
try: await self.transport.stop()
except Exception: pass
async def open_handle(spec: MCPSpec) -> MCPHandle:
if spec.kind == "stdio":
tr = StdioClientTransport(StdioServerParameters(command=spec.command, args=spec.args, env=spec.env))
elif spec.kind == "sse":
from mcp.client.sse import SSEClientTransport
tr = SSEClientTransport(url=spec.url, headers=spec.headers)
else:
# 필요 시 http_streamable 등 추가
raise ValueError(f"unsupported kind: {spec.kind}")
await tr.start()
s = ClientSession(tr); await s.initialize()
return MCPHandle(spec, tr, s)
어댑터에서의 사용 느낌
-
OpenAI Agents 어댑터: MCPSpec → MCPServerStdio/MCPServerSse 생성 시 그대로 넘김.
-
Anthropic 어댑터: MCPSpec(kind="sse"/"http_streamable")를 mcp_servers 페이로드로 직렬화.
장단점 한줄 요약
-
MCPSpec은 “진짜 서버”가 아니라 “접속 선언”이라는 의도가 분명해져 오해가 줄고, 테스트·직렬화·버전관리도 쉬워진다.
-
세션·동시성 같은 런타임은 Handle/Session 쪽으로 분리되니 책임이 선명해진다.
결론
MCPSpec 네이밍 채택 추천. 이미 만들어둔 코드에서 클래스명만 바꾸고, 핸들(세션) 계층을 분리하면 지금까지의 어댑터/브릿지 구조와도 잘 맞물린다.
응 런타임 처리는 안할거야. MCPSpec // ToolSpec 이렇게 할까 하는데 MCPConfig // ToolConfig 이게 나으려나 어떻게 생각해
좋아, 런타임은 안 다룬다고 했으니 이름은 “무슨 값을 선언·기술해 두는가”에만 맞추면 된다. 핵심은 Spec과 Config가 주는 뉘앙스 차이를 의도대로 쓰는 것.
정의 요약
Spec: 인터페이스·스키마·불변에 가까운 선언. 엔드포인트, 툴 시그니처, JSON 스키마 같은 “계약서”.
Config: 실행 맥락에 맞춘 선택·오버레이. 어떤 Spec을 노출할지, 별칭, 기본 옵션, 켜기/끄기 같은 “정책”.
권장 네이밍 매트릭스
단일 선언 단위
MCPSpec: MCP 접속 선언. kind, url/command, headers 같은 연결 스펙과 라벨, 태그 등.
ToolSpec: 툴 선언. name, description, parameters(JSON Schema), 위험도 플래그 등.
컬렉션과 정책
MCPConfig: 여러 MCPSpec을 담고, 환경별로 enable/disable·라벨·승인정책 등 오버레이.
ToolConfig: 여러 ToolSpec에 대한 정책 오버레이와 노출 규칙(필터링, 별칭, 기본 파라미터 등).
즉, MCPSpec // ToolSpec은 “순수 선언”, MCPConfig // ToolConfig는 “선택과 정책”으로 역할을 분리하면 깔끔하다.
바로 쓸 수 있는 최소 모델 예시
from typing import Literal, Optional, List, Dict
from pydantic import BaseModel, Field
Kind = Literal["stdio", "sse", "http_streamable"]
class MCPSpec(BaseModel):
name: str = Field(..., description="사람/모델이 볼 라벨, 네임스페이스 키")
kind: Kind
# stdio
command: Optional[str] = None
args: List[str] = Field(default_factory=list)
env: Dict[str, str] = Field(default_factory=dict)
# sse/http
url: Optional[str] = None
headers: Dict[str, str] = Field(default_factory=dict)
# 메타
version: Optional[str] = None
tags: List[str] = Field(default_factory=list)
class ToolSpec(BaseModel):
name: str # 예: "files:read_csv" 또는 "local.math_eval"
description: str
parameters: Dict[str, any] # JSON Schema
server_ref: Optional[str] = None # MCP 네임스페이스(예: "files"). 로컬 툴이면 None.
risk_level: Literal["low","med","high"] = "low"
default_dry_run: bool = False
tags: List[str] = Field(default_factory=list)
class MCPConfig(BaseModel):
specs: List[MCPSpec]
enabled: Dict[str, bool] = Field(default_factory=dict) # name -> on/off
require_approval: Dict[str, Literal["never","auto","always"]] = Field(default_factory=dict)
aliases: Dict[str, str] = Field(default_factory=dict) # name -> alias for UI/LLM
# 간단 검증
def validate(self):
names = [s.name for s in self.specs]
if len(names) != len(set(names)):
raise ValueError("duplicate MCP names")
class ToolConfig(BaseModel):
specs: List[ToolSpec]
enabled: Dict[str, bool] = Field(default_factory=dict) # tool name -> on/off
aliases: Dict[str, str] = Field(default_factory=dict) # tool name -> alias
defaults: Dict[str, Dict[str, any]] = Field(default_factory=dict) # tool name -> default args
def validate(self):
names = [t.name for t in self.specs]
if len(names) != len(set(names)):
raise ValueError("duplicate tool names")
어댑터는 “런타임 없이” 정적 변환만 하면 된다
OpenAI Chat Completions tools 변환
ToolSpec → tools JSON으로 직렬화
def tools_for_openai(tool_cfg: ToolConfig) -> List[Dict[str, any]]:
tools = []
for t in tool_cfg.specs:
if tool_cfg.enabled and not tool_cfg.enabled.get(t.name, True):
continue
tools.append({
"type": "function",
"function": {
"name": tool_cfg.aliases.get(t.name, t.name),
"description": t.description,
"parameters": t.parameters
}
})
return tools
Anthropic Messages API용 MCP 서버 페이로드
MCPSpec → mcp_servers 항목으로 직렬화(SSE/HTTP 계열만)
def mcp_servers_for_anthropic(mcp_cfg: MCPConfig) -> List[Dict[str, any]]:
out = []
for s in mcp_cfg.specs:
if mcp_cfg.enabled and not mcp_cfg.enabled.get(s.name, True):
continue
if s.kind == "stdio":
# Anthropic는 원격 URL 타입만 직접 붙일 수 있음
continue
out.append({
"type": "url",
"url": s.url,
"name": mcp_cfg.aliases.get(s.name, s.name),
"authorization_token": s.headers.get("Authorization") or s.headers.get("authorization"),
"tool_configuration": {"enabled": True}
})
return out
OpenAI Agents Python SDK용 MCP 컨텍스트 매니저 생성
MCPSpec → SDK의 MCPServerSse/MCPServerStdio로 변환에 필요한 파라미터만 준비
런타임은 안 만들지만, 어댑터가 어느 클래스를 써야 하는지 결정할 수 있도록 정적 매핑만 책임지게 하면 된다.
def mcp_params_for_openai_agents(mcp_cfg: MCPConfig) -> List[Dict[str, any]]:
params = []
for s in mcp_cfg.specs:
if mcp_cfg.enabled and not mcp_cfg.enabled.get(s.name, True):
continue
if s.kind == "stdio":
params.append({"type":"stdio","name": s.name, "params":{"command": s.command, "args": s.args, "env": s.env}})
elif s.kind == "sse":
params.append({"type":"sse","name": s.name, "params":{"url": s.url, "headers": s.headers}})
elif s.kind == "http_streamable":
params.append({"type":"http_streamable","name": s.name, "params":{"url": s.url, "headers": s.headers}})
return params
왜 이렇게 나누길 권하냐
MCPSpec / ToolSpec은 불변 계약서다. 코드 리뷰·버전 관리·테스트 더블 작성이 쉽다.
MCPConfig / ToolConfig는 환경별 설정과 정책을 담아 배포 파이프라인에서 바꿔치우기 좋다. 스테이징과 프로덕션을 오버레이로 구분하기 편하다.
추후 런타임을 붙이고 싶어도 모델이 흔들리지 않는다. Handle/Session은 별 파일에서 추가하면 된다.
실무 팁
Spec에 version, tags, capabilities 같은 필드를 두면 나중에 필터링이 편하다(예: tag가 readonly인 MCP만 노출).
ToolSpec.parameters는 반드시 엄격하게(required, enum, format, min/max) 기입해 모델의 잘못된 호출을 줄인다.
Config의 aliases는 LLM 혼동을 줄이는 데 효과적이다. 네임스페이스가 긴 툴은 별칭을 짧고 명확하게.
결론
네 목표가 선언값만 관리하는 거라면 MCPSpec // ToolSpec로 가는 게 맞다. 여기에 환경·정책을 얹는 용도로 MCPConfig // ToolConfig를 추가하면 운영 친화적이다. 한 줄 요약은 이거다.
Spec = 계약, Config = 선택과 정책.
둘 다 쓰되, 각자 역할을 분명히 나누자.
근데 클래스명을 McpSpec MCPSpec 이거 계속 헷갈림
헷갈리는 게 정상이다. 파이썬에서 약어를 클래스명에 넣을 때 선택지는 두 가지뿐이다. 하나만 고르고 끝까지 일관하면 된다.
권장 결론
McpSpec로 통일하자. 이유는 가독성(대문자 덩어리 줄임), 타이핑 편의, 그리고 파이썬 생태계에서 HttpServer, XmlParser처럼 약어를 단어 취급하는 관례가 널리 쓰이기 때문이다. 그러면 패밀리도 자연스럽게 McpConfig, McpEndpoint, McpHandle 같은 리듬이 된다.
빠른 규칙 세트
클래스: McpSpec, McpConfig, McpToolSpec, McpToolConfig
모듈 파일: mcp_spec.py, mcp_config.py
변수/매개변수: mcp_spec, mcp_config
JSON 키: snake_case 면 "mcp_spec", camelCase면 "mcpSpec" (팀 컨벤션 하나만 선택)
상수: MCP_SPEC_DEFAULTS처럼 전부 대문자도 가능하지만 클래스명과 혼동되니 피하는 편이 좋다
작게 맛보기 리팩터 스니펫
# mcp_spec.py
from typing import Literal, Optional, List, Dict
from pydantic import BaseModel, Field
Kind = Literal["stdio", "sse", "http_streamable"]
class McpSpec(BaseModel):
name: str = Field(..., description="라벨/네임스페이스")
kind: Kind
command: Optional[str] = None
args: List[str] = Field(default_factory=list)
env: Dict[str, str] = Field(default_factory=dict)
url: Optional[str] = None
headers: Dict[str, str] = Field(default_factory=dict)
version: Optional[str] = None
tags: List[str] = Field(default_factory=list)
class McpConfig(BaseModel):
specs: List[McpSpec]
enabled: Dict[str, bool] = Field(default_factory=dict)
aliases: Dict[str, str] = Field(default_factory=dict)
def validate_topology(self) -> None:
names = [s.name for s in self.specs]
if len(names) != len(set(names)):
raise ValueError("duplicate MCP names")
for s in self.specs:
if s.kind == "stdio" and not s.command:
raise ValueError(f"{s.name}: stdio requires command")
if s.kind in ("sse","http_streamable") and not s.url:
raise ValueError(f"{s.name}: {s.kind} requires url")
레거시 호환이 필요하면 한 줄로 다리 걸어두면 된다
# 임시 호환(향후 제거 예정)
class MCPSpec(McpSpec):
pass
만약 팀이 전통적인 “약어는 전부 대문자”를 고집한다면 MCPSpec로 가도 된다. 그 경우엔 문서·코드 전반에 동일 규칙을 강제하자. 예를 들어 클래스는 MCPSpec/MCPEndpoint, 변수는 mcp_spec, 파일은 mcp_spec.py처럼 “클래스만 ALLCAPS+CapWords, 나머진 snake_case”로 선을 긋는 식이다. 핵심은 혼용 금지다.
한 줄 요약
파이썬 프로젝트라면 McpSpec 계열이 덜 헷갈리고 읽기 좋다. 팀 합의로 한 스타일만 박고, 클래스·모듈·변수·JSON 키에 같은 원칙을 반복 적용하면 혼란이 사라진다.
StdioClientTransport랑 SSEClientTransport가 없다는ㄷ
좋아, 이건 보통 다음 세 가지 경우 중 하나다. 차근차근 정리하고, 바로 쓸 수 있는 호환 셰임(shim) 코드까지 붙일게.
-
패키지/버전 문제
mcp Python SDK의 트랜스포트 경로가 버전별로 조금 달라서, 네 환경에선mcp.client.stdio/mcp.client.sse모듈이 없을 수 있다. 설치가 다르거나 구버전일 가능성이 높다. -
선택적 의존성 미설치
SSE 트랜스포트는 선택(extra)로 분리된 경우가 있다. 즉, 기본pip install mcp로는 SSE 클라이언트가 없고, 추가 의존성을 깔아야 한다. -
다른 SDK를 보고 있다
이름이 비슷한 포크/래퍼(예: 예전 model-context-protocol 패키지)나 프록시 클라이언트를 깔아 두면 경로가 어긋난다.
해결 순서 한 번에 끝내기
아래 명령으로 네 환경을 표준화해보자. 실패하면 바로 호환 셰임 코드가 뒤에 있다.
1단계. 설치/업그레이드
pip install -U mcp
SSE까지 쓸 거면 아래 둘 중 환경에 맞게 하나를 시도
pip install -U "mcp[sse]"
또는
pip install -U httpx sse-starlette
2단계. 버전·모듈 확인
python - «'PY'
import pkgutil, mcp, sys
print("mcp version:", getattr(mcp, "version ", "unknown"))
print("has mcp.client.stdio:", pkgutil.find_loader("mcp.client.stdio") is not None)
print("has mcp.client.sse:", pkgutil.find_loader("mcp.client.sse") is not None)
print("has mcp.transport.stdio:", pkgutil.find_loader("mcp.transport.stdio") is not None)
print("has mcp.transport.sse:", pkgutil.find_loader("mcp.transport.sse") is not None)
PY
이제 어떤 경로가 있는지 알았으니, 호환 임포트 셰임으로 깔끔하게 처리하면 된다.
호환 임포트 셰임(둘 중 있는 걸 자동으로 씀)
이 스니펫을 너의 공통 유틸에 넣고, 나머지 코드에선 항상 여기서 임포트한 클래스를 쓰면 경로 차이에 안 흔들린다.
# mcp_compat.py
from typing import Any, Optional
# stdio
try:
from mcp.client.stdio import StdioClientTransport, StdioServerParameters # 신형 경로
except Exception:
try:
from mcp.transport.stdio import StdioClientTransport, StdioServerParameters # 구형/다른 빌드
except Exception as e:
StdioClientTransport = None # type: ignore
StdioServerParameters = None # type: ignore
# sse
try:
from mcp.client.sse import SSEClientTransport # 신형 경로
except Exception:
try:
from mcp.transport.sse import SSEClientTransport # 구형/다른 빌드
except Exception:
SSEClientTransport = None # type: ignore
def have_stdio() -> bool:
return StdioClientTransport is not None and StdioServerParameters is not None
def have_sse() -> bool:
return SSEClientTransport is not None
사용 예시(네 기존 코드 그대로, 단 임포트만 바꿈)
# 기존: from mcp.client.stdio import StdioClientTransport, StdioServerParameters
# 기존: from mcp.client.sse import SSEClientTransport
from mcp_compat import StdioClientTransport, StdioServerParameters, SSEClientTransport, have_stdio, have_sse
from mcp.client.session import ClientSession
async def connect_stdio(command: str, args=None, env=None):
if not have_stdio():
raise RuntimeError("STDIO transport가 이 환경에 없습니다. mcp 패키지 또는 버전을 확인하세요.")
transport = StdioClientTransport(StdioServerParameters(command=command, args=args or [], env=env or {}))
await transport.start()
sess = ClientSession(transport)
await sess.initialize()
return transport, sess
async def connect_sse(url: str, headers=None):
if not have_sse():
raise RuntimeError("SSE transport가 이 환경에 없습니다. 'pip install \"mcp[sse]\"' 또는 httpx 계열을 설치하세요.")
transport = SSEClientTransport(url=url, headers=headers or {})
await transport.start()
sess = ClientSession(transport)
await sess.initialize()
return transport, sess
대체 경로가 모두 없다면
OpenAI Agents Python SDK의 내장 커넥터를 써서 트랜스포트 문제를 아예 우회할 수도 있다. 네가 “MCP 서버를 그대로 넘긴다” 전략을 쓰는 경우 특히 간단하다. 이 경로에선 파이썬에서 transport를 직접 임포트하지 않는다.
from agents import Agent, Runner
from agents.mcp import MCPServerStdio, MCPServerSse
from agents.extensions.models.litellm_model import LitellmModel
async def run_with_agents():
async with MCPServerStdio(
name="files",
params={"command": "npx", "args": ["-y","@modelcontextprotocol/server-filesystem","./data"]},
cache_tools_list=True,
) as files, MCPServerSse(
name="search",
params={"url":"https://mcp.example.com/sse", "headers":{"Authorization":"Bearer X"}},
cache_tools_list=True,
) as search:
agent = Agent(
name="pass-through",
model=LitellmModel(model="my-model", base_url="http://localhost:8000/v1"),
mcp_servers=[files, search],
)
print((await Runner.run(agent, "sales.csv 합계를 구해줘")).final_output)
정리
StdioClientTransport / SSEClientTransport가 “없다”는 에러는 거의 항상 모듈 경로/버전 차이에서 온다. 위 셰임을 쓰면 코드 한 번으로 양쪽 경로를 자동 지원할 수 있고, 그래도 안 되면 Agents SDK의 내장 MCP 커넥터로 트랜스포트 계층을 통째로 맡겨 버리는 것도 깔끔한 해결책이다.
계속 쓰는 스타일을 정리하자면
-
직접 SDK를 쓸 거면 mcp_compat.py 셰임으로 경로 차이를 흡수
-
빠른 통합이면 Agents SDK의 MCPServerStdio/MCPServerSse를 사용
여기까지로 안 잡히면 네 pip show mcp 출력과 python -c "import pkgutil; import mcp, sys; ..." 결과를 알려줘. 그 정보로 정확한 임포트 경로를 딱 맞춰줄 수 있다.
음 그냥 FastMcp 쓰는게 편할것 같네. FastMCP에서 config = { 'mcpServers': { "weather": {"url": "https://weather-api.example.com/mcp"}, "assistant": {"command": "python", "args": ["./assistant_servers.py"]} } } client = Client(config) 를 하면 바로 만들어 주는 것 같거든? 각각의 McpSpec에서 각각 한 줄을 만드는 함수 작성 + McpConfig에서 Config 자체를 만드는 함수 작성하면 좋을 것 같아.
좋아, FastMCP를 기준으로 “스펙 → 한 줄”과 “스펙 모음 → 최종 config”를 자동 생성하는 얇은 레이어를 딱 만들어줄게. 네가 원하는 건 런타임이 아니라 선언값이니, McpSpec은 선언, McpConfig는 컬렉션+검증+직렬화만 맡는다.
아래 코드는 그대로 붙여 쓰면 된다.
# fastmcp_specs.py
from typing import Literal, Optional, List, Dict, Any
from pydantic import BaseModel, Field, field_validator
Kind = Literal["url", "stdio"] # FastMCP 관점: 원격(url) 또는 로컬(stdio)
class McpSpec(BaseModel):
name: str = Field(..., description="FastMCP에서 사용할 서버 라벨")
kind: Kind
# url 서버
url: Optional[str] = None
headers: Dict[str, str] = Field(default_factory=dict)
# stdio 서버
command: Optional[str] = None
args: List[str] = Field(default_factory=list)
env: Dict[str, str] = Field(default_factory=dict)
tags: List[str] = Field(default_factory=list) # 선택: 필터링/문서화에 유용
description: Optional[str] = None # 선택: 사람 읽는 설명
@field_validator("name")
@classmethod
def _name_non_empty(cls, v: str) -> str:
if not v.strip():
raise ValueError("name must be non-empty")
return v
def to_fastmcp_entry(self) -> Dict[str, Any]:
"""
FastMCP 설정의 '한 줄'을 만든다.
반환 예시:
{"url": "https://...", "headers": {...}}
{"command": "python", "args": ["./server.py"], "env": {...}}
"""
if self.kind == "url":
if not self.url:
raise ValueError(f"{self.name}: url kind requires url")
entry = {"url": self.url}
if self.headers:
entry["headers"] = self.headers
return entry
if self.kind == "stdio":
if not self.command:
raise ValueError(f"{self.name}: stdio kind requires command")
entry = {"command": self.command}
if self.args:
entry["args"] = self.args
if self.env:
entry["env"] = self.env
return entry
raise ValueError(f"{self.name}: unknown kind {self.kind!r}")
class McpConfig(BaseModel):
specs: List[McpSpec] = Field(default_factory=list)
def validate_topology(self) -> None:
names = [s.name for s in self.specs]
if len(names) != len(set(names)):
dupes = {n for n in names if names.count(n) > 1}
raise ValueError(f"duplicate spec names: {sorted(dupes)}")
for s in self.specs:
if s.kind == "url" and not s.url:
raise ValueError(f"{s.name}: url is required for kind='url'")
if s.kind == "stdio" and not s.command:
raise ValueError(f"{s.name}: command is required for kind='stdio'")
def to_fastmcp_config(
self,
*,
include_tags: Optional[List[str]] = None,
exclude_tags: Optional[List[str]] = None,
enable: Optional[Dict[str, bool]] = None,
name_alias: Optional[Dict[str, str]] = None,
) -> Dict[str, Any]:
"""
FastMCP가 바로 먹는 dict를 만든다.
반환 예시:
{"mcpServers": {"weather": {...}, "assistant": {...}}}
옵션:
include_tags: 지정된 태그가 하나라도 있는 스펙만 포함
exclude_tags: 지정된 태그가 하나라도 있으면 제외
enable: {"name": True/False} 형태의 on/off 오버라이드
name_alias: {"original": "alias"} 키 변경(LLM/사람에게 보일 라벨 바꾸기)
"""
self.validate_topology()
servers: Dict[str, Any] = {}
for spec in self.specs:
if include_tags and not (set(include_tags) & set(spec.tags)):
continue
if exclude_tags and (set(exclude_tags) & set(spec.tags)):
continue
if enable is not None and enable.get(spec.name, True) is False:
continue
key = name_alias.get(spec.name, spec.name) if name_alias else spec.name
servers[key] = spec.to_fastmcp_entry()
return {"mcpServers": servers}
# 편의: 체이닝 등록
def add(self, spec: McpSpec) -> "McpConfig":
self.specs.append(spec)
return self
간단 사용 예시
# 예시 1) 질문에서 준 config를 코드로 생성
from fastmcp_specs import McpSpec, McpConfig
cfg = McpConfig(specs=[
McpSpec(name="weather", kind="url", url="https://weather-api.example.com/mcp"),
McpSpec(name="assistant", kind="stdio", command="python", args=["./assistant_servers.py"]),
])
fastmcp_config = cfg.to_fastmcp_config()
# fastmcp_config == {
# "mcpServers": {
# "weather": {"url": "https://weather-api.example.com/mcp"},
# "assistant": {"command": "python", "args": ["./assistant_servers.py"]}
# }
# }
# FastMCP 클라이언트에 그대로 투입
# from fastmcp import Client
# client = Client(fastmcp_config)
태그/별칭/부분 사용 예시
cfg = McpConfig().add(
McpSpec(name="weather", kind="url", url="https://weather-api.example.com/mcp", tags=["readonly", "public"])
).add(
McpSpec(name="db", kind="stdio", command="python", args=["./db_server.py"], env={"MODE":"prod"}, tags=["write", "internal"])
).add(
McpSpec(name="assistant", kind="stdio", command="python", args=["./assistant_servers.py"], tags=["helper"])
)
# 읽기 전용만 포함하고, weather를 public-weather로 라벨링
fastmcp_config = cfg.to_fastmcp_config(
include_tags=["readonly"],
name_alias={"weather": "public-weather"}
)
# 결과:
# {"mcpServers": {"public-weather": {"url": "https://weather-api.example.com/mcp"}}}
검증 포인트 요약
-
McpSpec.to_fastmcp_entry가 “url”과 “stdio” 케이스를 각각 한 줄로 생성한다.
-
McpConfig.to_fastmcp_config가 필터링과 on/off, 별칭을 지원한다.
-
FastMCP의 Client(config) 호출에 바로 넣을 수 있는 구조를 만든다.
확장 아이디어
headers, env에 비밀값이 들어가면 McpConfig에서 별도 시크릿 소스로 병합하는 훅을 추가해도 좋다.
테넌트별로 enable dict를 바꾸는 것만으로 동일 스펙에서 다른 배포 구성을 만들 수 있다.
나중에 Elicitation/Prompt 같은 메타가 필요하면 McpSpec에 meta: Dict[str, Any] 정도를 추가해도 직렬화 흐름엔 영향이 없다.
이렇게 두면 “각각의 McpSpec에서 각각 한 줄”과 “McpConfig에서 최종 config”가 둘 다 한 번에 해결된다.
아 그런데 config가 없으면 어떻게 되지?
좋은 포인트다. “config가 없을 때”는 두 가지를 분리해서 생각하면 깔끔하다.
- FastMCP Client 레벨에서의 기대 동작
일반적으로 Client는 최소한 {"mcpServers": {}} 형태의 딕셔너리를 기대한다. 완전히 None이거나 키가 없으면 내부에서 KeyError/TypeError 류가 날 가능성이 높다. 안전하게 가려면 다음을 지키자.
-
비어 있어도 된다: 빈 설정은 곧 “연결된 MCP 서버 없음”이니, Client는 뜨지만 툴은 없다.
-
키는 반드시 있어야 한다: {"mcpServers": {}} 형태를 보장하자.
-
잘못된 형태는 막자: 문자열이나 리스트 같은 타입이 들어가면 바로 예외를 던지게 하자.
- 네 선언 계층(McpSpec / McpConfig)에서의 방어 전략
런타임을 다루지 않기로 했으니, 선언-직렬화 단계에서 다음의 fallbacks를 두면 실전에서 편하다.
-
config가 None이면 빈 구조로 만들어 준다.
-
mcpServers 키가 없으면 생성해 준다.
-
specs가 비어 있으면 빈 딕셔너리를 넣되, 로거 경고를 남긴다.
-
환경변수나 기본 파일 경로에서 자동으로 끌어오는 선택지를 둔다(있으면 사용, 없으면 빈 dict).
아래 헬퍼를 넣으면 실수해도 안전하게 동작한다.
# fastmcp_guard.py
from typing import Optional, Dict, Any, List
import json
import os
# 이전에 만든 선언 모델이라고 가정
# from fastmcp_specs import McpSpec, McpConfig
def ensure_fastmcp_config(
config: Optional[Dict[str, Any]] = None,
*,
specs: Optional[List["McpSpec"]] = None, # 문자열 인용은 순환 import 회피용
env_var: str = "FASTMCP_SERVERS_JSON", # 옵션: {"weather":{"url":"..."}, "assistant":{...}} 형태
file_env_var: str = "FASTMCP_CONFIG_PATH" # 옵션: JSON 파일 경로
) -> Dict[str, Any]:
"""
1순위: 명시 config
2순위: specs → {"mcpServers": {...}} 직렬화
3순위: 환경변수에서 로드
4순위: 빈 설정으로 폴백
항상 {"mcpServers": dict} 형태를 보장한다.
"""
# 1) 명시 config
if config is not None:
if not isinstance(config, dict):
raise TypeError("config must be a dict or None")
if "mcpServers" not in config:
# 부분 설정만 온 경우 보완
config = {"mcpServers": dict(config)} if config else {"mcpServers": {}}
elif not isinstance(config["mcpServers"], dict):
raise TypeError("config['mcpServers'] must be a dict")
return config
# 2) specs → 직렬화
if specs is not None:
from fastmcp_specs import McpConfig # 지역 임포트로 순환 참조 회피
cfg = McpConfig(specs=list(specs))
return cfg.to_fastmcp_config()
# 3) 환경변수: 파일 경로 우선
path = os.getenv(file_env_var)
if path and os.path.exists(path):
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
# 파일이 {"mcpServers":{...}} 형태이거나 {"weather":{...}} 같은 맵일 수도 있음
if "mcpServers" in data:
if not isinstance(data["mcpServers"], dict):
raise TypeError("file config['mcpServers'] must be a dict")
return data
if isinstance(data, dict):
return {"mcpServers": data}
raise TypeError("file config must be a dict")
raw = os.getenv(env_var)
if raw:
data = json.loads(raw)
if "mcpServers" in data:
if not isinstance(data["mcpServers"], dict):
raise TypeError("env config['mcpServers'] must be a dict")
return data
if isinstance(data, dict):
return {"mcpServers": data}
raise TypeError("env config must be a dict")
# 4) 최종 폴백: 빈 설정
return {"mcpServers": {}}
즉시 사용 예시
# 1) config가 None인 상황
from fastmcp_guard import ensure_fastmcp_config
# from fastmcp import Client
cfg = ensure_fastmcp_config(None) # {"mcpServers": {}}
# client = Client(cfg) # 툴 없이 실행
# 2) McpSpec만 있고 config가 없는 상황
from fastmcp_specs import McpSpec
specs = [
McpSpec(name="weather", kind="url", url="https://weather-api.example.com/mcp"),
McpSpec(name="assistant", kind="stdio", command="python", args=["./assistant_servers.py"]),
]
cfg = ensure_fastmcp_config(specs=specs)
# client = Client(cfg)
# 3) 환경변수로만 공급하는 상황
# export FASTMCP_SERVERS_JSON='{"weather":{"url":"https://weather-api.example.com/mcp"}}'
cfg = ensure_fastmcp_config()
# client = Client(cfg)
운영 팁 한 줄 요약
빈 설정은 허용하되 형태는 항상 보장하자. 즉, 최소 {"mcpServers": {}}를 만들어서 Client가 안정적으로 뜨게 하고, 실제 서버 없으면 “툴 없음” 상태로만 동작하게 하는 게 제일 안전하다. 필요하면 ensure_fastmcp_config 내부에 로깅을 넣어 “현재 MCP 서버 0개” 같은 경고를 남기면 실수도 빨리 잡힌다.
이렇게 만든 FastMcp Client의 툴과 내가 만든 일반 ToolSpec을 둘 다 고려해서 chat completions 호출 -> 그 후 요청하는 함수 실행을 만들고 싶어 async with Client(cfg) as client: client.list_tools() # mcp tool 반환 client.call_tool(함수명, 인자) 이렇게 쓸 수 있어. fastmcp는 이 두 가지를 어떻게 하면 잘 쓰게 할 수 있을까
좋아. 목표는 FastMCP의 툴들과 네가 직접 정의한 일반 ToolSpec들을 “하나의 툴 세트”처럼 묶어서 Chat Completions with tools를 돌리고, 모델이 요청한 tool_calls를 각각 알맞은 실행기로 라우팅하는 것. 아래 스캐폴딩은 그대로 붙여서 돌아가도록 설계했다.
핵심 아이디어
-
FastMCP의 client.list_tools() 결과를 OpenAI tools 포맷으로 정규화하고 이름에 접두사 mcp:를 붙인다.
-
네 로컬 ToolSpec들도 같은 포맷으로 직렬화한다.
-
Chat Completions 첫 호출에 두 종류 툴을 모두 넣는다.
-
tool_calls 루프에서 이름이 mcp:면 client.call_tool로, 아니면 LocalToolRegistry 핸들러로 실행한다.
코드
# unified_fastmcp_chat.py
import asyncio, json, orjson, requests
from typing import Any, Dict, List, Optional, Callable, Awaitable
from dataclasses import dataclass, field
###############################################################################
# 0) 로컬 ToolSpec 정의(한 번만 구현해서 공통 사용)
###############################################################################
@dataclass
class ToolSpec:
name: str
description: str
parameters: Dict[str, Any]
handler: Callable[[Dict[str, Any]], Awaitable[Any]]
class LocalToolRegistry:
def __init__(self):
self._m: Dict[str, ToolSpec] = {}
def register(self, spec: ToolSpec):
if spec.name in self._m:
raise ValueError(f"duplicate tool: {spec.name}")
self._m[spec.name] = spec
def get(self, name: str) -> ToolSpec:
return self._m[name]
def list_openai_tools(self) -> List[Dict[str, Any]]:
out = []
for t in self._m.values():
out.append({
"type": "function",
"function": {
"name": t.name,
"description": t.description,
"parameters": t.parameters
}
})
return out
###############################################################################
# 1) FastMCP → OpenAI tools 정규화
###############################################################################
async def mcp_tools_for_openai(client, *, prefix: str = "mcp:") -> List[Dict[str, Any]]:
"""
FastMCP client.list_tools()를 호출해 OpenAI tools 포맷으로 변환.
각 이름에는 'mcp:' 접두사를 붙여 로컬 툴과 충돌을 방지.
FastMCP 툴 메타는 다음 필드를 가진다고 가정: name, description, parameters(JSON Schema)
"""
mcp_tools = await client.list_tools()
tools = []
for t in mcp_tools:
tools.append({
"type": "function",
"function": {
"name": f"{prefix}{t['name']}",
"description": t.get("description", ""),
"parameters": t.get("parameters", {"type":"object","properties":{}})
}
})
return tools
def is_mcp_tool(tool_name: str, prefix: str = "mcp:") -> bool:
return tool_name.startswith(prefix)
def strip_mcp_prefix(tool_name: str, prefix: str = "mcp:") -> str:
return tool_name[len(prefix):] if tool_name.startswith(prefix) else tool_name
###############################################################################
# 2) Chat Completions 호출 러너(두 툴 세트를 한꺼번에 노출)
###############################################################################
class ChatCompletionsRunner:
def __init__(self, base_url: str, model: str, api_key: Optional[str] = None, path="/v1/chat/completions"):
self.base_url = base_url.rstrip("/")
self.model = model
self.api_key = api_key
self.path = path
def _headers(self):
h = {"content-type": "application/json"}
if self.api_key:
h["authorization"] = f"Bearer {self.api_key}"
return h
def _post(self, payload: Dict[str, Any]) -> Dict[str, Any]:
r = requests.post(f"{self.base_url}{self.path}", data=orjson.dumps(payload), headers=self._headers(), timeout=120)
r.raise_for_status()
return r.json()
async def run(
self,
*,
messages: List[Dict[str, Any]],
local_registry: LocalToolRegistry,
fastmcp_client,
expose: Optional[Callable[[List[Dict[str, Any]]], List[Dict[str, Any]]]] = None,
prefix: str = "mcp:",
max_rounds: int = 8,
) -> str:
# 1) 툴 카탈로그 합치기
mcp_tools = await mcp_tools_for_openai(fastmcp_client, prefix=prefix)
local_tools = local_registry.list_openai_tools()
all_tools = local_tools + mcp_tools
if expose:
all_tools = expose(all_tools) # 턴별 소출 정책(선택)
# 2) 1차 모델 호출
data = self._post({"model": self.model, "messages": messages, "tools": all_tools, "tool_choice": "auto"})
msg = data["choices"][0]["message"]
messages.append(msg)
# 3) tool_calls 루프
rounds = 0
while "tool_calls" in msg and rounds < max_rounds:
rounds += 1
for call in msg["tool_calls"]:
name = call["function"]["name"]
raw = call["function"].get("arguments", "{}") or "{}"
try:
args = json.loads(raw)
except Exception:
args = {"_raw": raw}
try:
if is_mcp_tool(name, prefix=prefix):
# FastMCP 경로
mcp_name = strip_mcp_prefix(name, prefix=prefix)
result = await fastmcp_client.call_tool(mcp_name, args)
content = orjson.dumps(result).decode()
else:
# 로컬 툴 경로
spec = local_registry.get(name)
result = await spec.handler(args)
content = orjson.dumps(result).decode()
except Exception as e:
content = json.dumps({"error": str(e)})
messages.append({
"role": "tool",
"tool_call_id": call["id"],
"name": name,
"content": content
})
# 다음 턴
data = self._post({"model": self.model, "messages": messages})
msg = data["choices"][0]["message"]
messages.append(msg)
return msg.get("content", "")
###############################################################################
# 3) 사용 예시
###############################################################################
# 예시 로컬 툴 구현
async def math_eval(args: Dict[str, Any]) -> Any:
expr = args.get("expr", "")
if not expr or any(ch for ch in expr if ch not in "0123456789+-*/(). "):
return {"error": "expr must contain only digits and + - * / ( )"}
try:
val = eval(expr, {"__builtins__": {}}, {})
return {"value": val}
except Exception as e:
return {"error": str(e)}
async def main():
# 0) FastMCP 클라이언트 준비
# from fastmcp import Client
cfg = {
"mcpServers": {
"weather": {"url": "https://weather-api.example.com/mcp"},
"assistant": {"command": "python", "args": ["./assistant_servers.py"]}
}
}
# async with Client(cfg) as client:
# ...
# 데모를 위해 간이 더미 클라이언트를 흉내내자. 실제로는 fastmcp.Client를 사용.
class DummyFastMCP:
async def __aenter__(self): return self
async def __aexit__(self, et, ev, tb): pass
async def list_tools(self):
# 이름은 서버 네임스페이스를 포함해주면 LLM이 더 잘 고른다
return [
{"name": "weather:get_forecast", "description": "도시/날짜별 일기예보", "parameters": {
"type":"object","properties":{"city":{"type":"string"},"date":{"type":"string","format":"date"}}, "required":["city"]
}},
{"name": "assistant:read_csv_sum", "description": "CSV 합계 계산", "parameters": {
"type":"object","properties":{"path":{"type":"string"}}, "required":["path"]
}}
]
async def call_tool(self, name, args):
# 여기서는 결과 형태만 맞춰준다
return {"tool": name, "args": args, "result": "demo-result"}
# 1) 로컬 툴 레지스트리
local = LocalToolRegistry()
local.register(ToolSpec(
name="local.math_eval",
description="안전 사칙연산",
parameters={"type":"object","properties":{"expr":{"type":"string"}},"required":["expr"],"additionalProperties": False},
handler=math_eval
))
# 2) 러너
runner = ChatCompletionsRunner(base_url="http://localhost:8000", model="my-model")
# 3) 통합 실행
messages = [
{"role":"system","content":"필요한 도구만 호출한다. 파괴적 작업은 dry_run 우선."},
{"role":"user","content":"assistant:read_csv_sum으로 data/sales.csv 합계를 구하고, local.math_eval로 (100+23)도 계산해줘"}
]
async with DummyFastMCP() as client:
out = await runner.run(messages=messages, local_registry=local, fastmcp_client=client)
print("=== FINAL ===")
print(out)
if __name__ == "__main__":
asyncio.run(main())
운영 포인트
이름 규칙을 고정하자. mcp: 접두사로 FastMCP에서 온 툴을 표시하면 충돌을 방지하고 라우팅이 단순해진다. FastMCP 쪽 툴 이름에 이미 server:tool 형태가 들어가 있어도 상관없다. 최종 노출명은 mcp:server:tool처럼 되지만 LLM이 의외로 잘 고른다.
툴 소출을 넣자. expose 콜백으로 턴별로 필요한 서브셋만 tools에 담으면 선택 오류와 지연이 줄어든다.
결과 포맷을 통일하자. tool role 메시지 content는 항상 JSON 문자열로 넣는 게 후속 라운드에서 안정적이다.
에러는 JSON으로. 실행기 에러도 {"error": "…"}로 통일해 모델이 복구 플랜을 세우기 좋게 만든다.
이 구조면 FastMCP가 제공하는 툴과 네 로컬 ToolSpec을 동시에 들고, 표준 Chat Completions 루프에서 자연스럽게 섞어 쓸 수 있다. 필요한 경우 expose 정책, 이름 별칭, 기본 인자 주입 같은 정책 훅을 runner.run의 인자로 얹어 확장하면 된다.
Tool이 not subscriptable mcp Tool의 속성을 저런식으로 접근하는게 잘못된거 아닐까 싶네
맞다. “Tool이 not subscriptable”은 FastMCP가 list_tools()에서 dict가 아니라 객체를 돌려줘서 생긴 증상이다. 내가 예시에서 t['name']처럼 딕셔너리로 다뤘기 때문. 해결은 간단하다. MCP 툴 메타를 dict/객체 어떤 형식이 와도 안전하게 읽도록 정규화 레이어를 한 줄 끼우면 된다.
아래 패치만 적용하면 바로 돌아간다.
- MCP 툴 메타 정규화 유틸 추가
딕셔너리, pydantic 모델, dataclass, 일반 객체를 모두 지원하고, 필드명도parameters | input_schema | schema중 있는 걸 자동 선택한다.
# utils_normalize.py
from typing import Any, Dict
def _get_attr_or_key(obj: Any, key: str, default=None):
if isinstance(obj, dict):
return obj.get(key, default)
# pydantic 모델, dataclass, 일반 객체까지 커버
if hasattr(obj, key):
return getattr(obj, key)
# 일부 객체는 to_dict()/dict() 제공
if hasattr(obj, "to_dict"):
try:
return obj.to_dict().get(key, default)
except Exception:
pass
if hasattr(obj, "dict"):
try:
return obj.dict().get(key, default)
except Exception:
pass
return default
def normalize_mcp_tool_meta(tool_obj: Any) -> Dict[str, Any]:
"""
FastMCP client.list_tools() 결과의 한 항목을 OpenAI tools 포맷용 메타로 정규화.
반환: {"name": str, "description": str, "parameters": dict}
"""
name = _get_attr_or_key(tool_obj, "name")
if not name:
# 어떤 서버는 server/name 식별자를 별도 필드에 둘 수 있음
name = _get_attr_or_key(tool_obj, "id") or _get_attr_or_key(tool_obj, "tool") or "unknown"
desc = _get_attr_or_key(tool_obj, "description", "") or _get_attr_or_key(tool_obj, "desc", "")
# JSON Schema 필드명 변주 대응
params = (
_get_attr_or_key(tool_obj, "parameters")
or _get_attr_or_key(tool_obj, "input_schema")
or _get_attr_or_key(tool_obj, "schema")
or {"type": "object", "properties": {}}
)
# 최소 형식 보정
if not isinstance(params, dict):
params = {"type": "object", "properties": {}}
return {"name": str(name), "description": str(desc), "parameters": params}
- mcp_tools_for_openai를 정규화 유틸로 교체
이제 리스트가 dict든 객체든 문제 없다. 접두사만 붙여 충돌을 피한다.
# unified_fastmcp_chat.py의 기존 함수 대체
from utils_normalize import normalize_mcp_tool_meta
async def mcp_tools_for_openai(client, *, prefix: str = "mcp:") -> list[dict]:
tools_raw = await client.list_tools()
openai_tools = []
for t in tools_raw:
meta = normalize_mcp_tool_meta(t)
openai_tools.append({
"type": "function",
"function": {
"name": f"{prefix}{meta['name']}",
"description": meta["description"],
"parameters": meta["parameters"],
}
})
return openai_tools
-
실행 시 라우팅 부분은 그대로 유지
is_mcp_tool(name)로 접두사 판단 →strip_mcp_prefix(name)로 원래 이름 복구 →await client.call_tool(mcp_name, args)호출. FastMCP가 기대하는 함수명이list_tools()에서 받은 그대로라면 이 흐름이 정확하다. -
빠른 자가진단 스니펫
혹시 그래도 타입이 애매하면, 아래를 한 번 찍어보면 어떤 필드를 쓰는지 바로 파악된다.
# 디버그용
async def debug_list_tools(client):
tools = await client.list_tools()
for i, t in enumerate(tools):
try:
print(i, type(t), getattr(t, "name", None), getattr(t, "description", None))
except Exception:
print(i, type(t), "no direct attrs")
# dict로 변환 가능하면 미리 확인
if hasattr(t, "dict"):
print(" keys:", list(t.dict().keys()))
elif hasattr(t, "to_dict"):
print(" keys:", list(t.to_dict().keys()))
elif isinstance(t, dict):
print(" keys:", list(t.keys()))
요약 정리
문제 원인: FastMCP가 객체를 반환하는데 dict처럼 t['name']로 접근함.
해결: normalize_mcp_tool_meta()로 dict/객체 양쪽을 지원하고, 필드명 변주까지 흡수. 그 결과를 OpenAI tools 포맷으로 직렬화.
장점: FastMCP 구현이 바뀌거나 서버별로 메타 필드명이 달라도 안전. 네 로컬 ToolSpec과도 충돌 없이 합쳐서 쓸 수 있다.
이 패치만 적용하면 “Tool is not subscriptable” 에러는 사라지고, mcp 툴과 로컬 툴을 한 번의 Chat Completions 루프에서 안정적으로 섞어 실행할 수 있다.
그런데 AsyncExitStackGroup에서 오픈 하는 중에 예외가 발생하면 어떻게 돼? 정리가 어떻게 돼?
핵심만 말하면, 지금 만든 AsyncExitStackGroup은 “여럿을 차례로 열다 중간에 실패”했을 때 이미 열린 것들을 자동으로 닫아주지 않는다. aenter 도중 예외가 나면 __aexit__가 호출되지 않기 때문. 따라서 부분적으로 열린 리소스가 누수될 수 있다.
해결은 두 가지다.
- 표준 라이브러리 쓰기: contextlib.AsyncExitStack
이게 바로 그런 상황을 안전하게 처리하라고 존재한다. 하나라도 열기에 실패하면, 그 이전에 성공한 컨텍스트들의 정리 함수가 역순으로 호출된다.
예시
import contextlib
async def open_many(managers):
async with contextlib.AsyncExitStack() as stack:
opened = []
for cm in managers:
entered = await stack.enter_async_context(cm)
opened.append(entered)
# 여기까지 오면 모두 정상 오픈
# opened를 사용
return opened
# 위 블록을 빠져나오면 모두 정리됨
- 네 커스텀 클래스를 안전하게 고치기
AsyncExitStack과 같은 의미론을 직접 구현하려면, 열릴 때마다 “해당 컨텍스트의 __aexit__를 스택에 push”하고, 중간에 예외가 나면 이미 쌓아둔 __aexit__들을 역순으로 호출해야 한다.
안전 버전
class AsyncExitStackGroup:
def __init__(self, cms):
self.cms = list(cms)
self._exits = []
self._entered = []
async def __aenter__(self):
try:
for cm in self.cms:
entered = await cm.__aenter__()
# 정상 진입했으면, 대응 __aexit__를 스택에 저장
self._exits.append(cm.__aexit__)
self._entered.append(entered)
return self._entered
except Exception as e:
# 여기서 정리: 이미 진입했던 컨텍스트들을 역순으로 닫음
await self._close_partial(exc_type=type(e), exc=e, tb=e.__traceback__)
raise # 원래 예외를 그대로 전파
async def __aexit__(self, exc_type, exc, tb):
await self._close_partial(exc_type, exc, tb)
# 예외 전파 여부: 여기서는 억제하지 않음(False)
return False
async def _close_partial(self, exc_type, exc, tb):
while self._exits:
exit_cb = self._exits.pop()
try:
await exit_cb(exc_type, exc, tb)
except Exception:
# 정리 중 예외는 로그만 남기고 계속 진행하는 게 보통 안전
pass
self._entered.clear()
비교 요약
-
기존 구현: __aenter__에서 예외가 나면 이미 연 리소스가 남을 수 있음.
-
개선 구현: __aenter__에서 예외가 나도 _close_partial이 호출되어 역순 정리됨.
-
표준 AsyncExitStack: 같은 보증을 이미 제공. 가능하면 이걸 쓰는 게 가장 안전하고 간단.
추천 전략
-
가능한 한 contextlib.AsyncExitStack을 사용하자. 기능 검증, 예외 전파, 정리 중 예외 처리까지 잘 되어 있다.
-
직접 구현해야 한다면 위 개선 버전을 쓰고, 정리 중 예외는 삼키되 로깅하자. 운영 환경에서는 누수보다 정리 지속이 중요하다.
-
정리 함수가 멱등적이어야 한다. 일부 MCP 커넥터는 중복 close/stop 호출이 들어와도 안전하게 무시하도록 만들어두는 편이 좋다.
-
오픈과 정리 사이에 상태를 외부로 넘길 때는, 반환값 리스트(self._entered) 대신 이름→핸들 맵을 쓰면 추적이 쉽다.
뭐야 그럼 첨에는 안전하지 않게 만들어줬던거야?
맞다. 처음에 준 AsyncExitStackGroup은 예외 시 자동 정리를 보장하지 못했으니 안전하지 않았다. 내 실수다. 바로잡자.
단계별로 정리
-
문제 핵심
여러 컨텍스트를 순차로 여는 중 하나가 실패하면, 이미 열린 것들을 닫아줘야 한다. 직접 만든 단순 구현은 aenter 중 예외가 나면 __aexit__가 호출되지 않아 누수가 난다. -
가장 안전한 해결책
파이썬 표준 contextlib.AsyncExitStack을 쓰면, 열 때마다 정리 콜백을 스택에 등록하므로 중간 실패 시 역순 정리가 자동으로 보장된다. -
표준 스택 사용 예시
import contextlib
async def open_many_safely(managers):
async with contextlib.AsyncExitStack() as stack:
opened = []
for cm in managers:
entered = await stack.enter_async_context(cm)
opened.append(entered)
return opened
# 블록을 빠져나가면 자동으로 모두 정리됨
- 커스텀 클래스를 꼭 써야 한다면 안전 버전
class AsyncExitStackGroup:
def __init__(self, cms):
self.cms = list(cms)
self._exits = []
self._entered = []
async def __aenter__(self):
try:
for cm in self.cms:
entered = await cm.__aenter__()
self._exits.append(cm.__aexit__) # 정리 콜백 스택에 push
self._entered.append(entered)
return self._entered
except Exception as e:
await self._close_partial(type(e), e, e.__traceback__)
raise
async def __aexit__(self, exc_type, exc, tb):
await self._close_partial(exc_type, exc, tb)
return False # 예외 억제 안 함
async def _close_partial(self, exc_type, exc, tb):
while self._exits:
exit_cb = self._exits.pop()
try:
await exit_cb(exc_type, exc, tb)
except Exception:
pass
self._entered.clear()
- 실전 권장
가능하면 표준 AsyncExitStack으로 교체하라. 이미 검증된 보증을 제공하고, 정리 중 예외 처리도 일관적이다. 커스텀이 필요하면 위 안전 버전을 그대로 쓰면 된다.
다음 액션
기존 AsyncExitStackGroup을 전역 검색해서 두 가지 중 하나로 바꿔라.
표준 스택으로 단순화하거나, 안전 버전으로 대체.
이후 MCP 컨텍스트 매니저를 여럿 여는 모든 경로에서 누수 가능성이 사라진다.
import contextlib async def open_many_safely(managers): async with contextlib.AsyncExitStack() as stack: opened = [] for cm in managers: entered = await stack.enter_async_context(cm) opened.append(entered) return opened # 블록을 빠져나가면 자동으로 모두 정리됨 이건 어떻게 쓰는건데? MCP 서버들을 열고 그걸 Agent에 argument로 넣고 실행한 뒤 그 후에 열린 서버들을 닫아야해
맞는 흐름은 “열고 → 사용하는 동안 열어두고 → 끝나면 자동으로 닫기”다. 그래서 반환으로 꺼내는 함수가 아니라, “여러 컨텍스트를 열어둔 상태를 yield”하는 async 컨텍스트 매니저가 필요하다. 아래처럼 바꿔 쓰면 된다.
- 유틸: 여러 컨텍스트를 안전하게 열고 닫는 async 컨텍스트 매니저
import contextlib
@contextlib.asynccontextmanager
async def open_many(managers):
async with contextlib.AsyncExitStack() as stack:
opened = []
for cm in managers:
entered = await stack.enter_async_context(cm)
opened.append(entered)
# 여기서 opened를 내보낸 뒤 with 블록이 끝날 때 자동으로 모두 닫힘
yield opened
- 사용 예시: MCP 서버들 열기 → Agent에 주입 → 실행 → 자동 정리
import asyncio
from agents import Agent, Runner
from agents.extensions.models.litellm_model import LitellmModel
from agents.mcp import MCPServerStdio, MCPServerSse
from agents.model_settings import ModelSettings
async def main():
# 1) MCP 서버 컨텍스트 매니저들 준비
managers = [
MCPServerStdio(
name="files",
params={"command": "npx", "args": ["-y", "@modelcontextprotocol/server-filesystem", "./data"]},
cache_tools_list=True,
),
MCPServerSse(
name="search",
params={"url": "https://mcp.example.com/sse", "headers": {"Authorization": "Bearer X"}},
cache_tools_list=True,
),
]
# 2) 여러 서버를 안전하게 열고, 열린 핸들들을 에이전트에 넘김
async with open_many(managers) as opened_servers:
agent = Agent(
name="PassThroughMCP",
instructions="필요할 때만 MCP 툴을 호출한다. 파괴적 작업은 dry_run을 우선한다.",
model=LitellmModel(model="my-model", base_url="http://localhost:8000/v1"),
mcp_servers=opened_servers,
model_settings=ModelSettings(tool_choice="auto"),
)
# 3) 에이전트 실행
result = await Runner.run(agent, "files와 search MCP를 활용해 sales.csv 통찰을 요약해줘")
print(result.final_output)
# 4) 여기 도달하면 opened_servers는 모두 자동으로 정리됨
asyncio.run(main())
핵심 포인트
-
open_many는 with 블록이 유지되는 동안만 리소스를 열어둔다. 블록을 벗어나면 역순으로 안전하게 닫힌다.
-
중간에 어떤 서버 열기에서 예외가 나도 이미 열린 것들은 AsyncExitStack이 알아서 닫아준다.
-
Runner.run에서 예외가 나도 with 블록을 빠져나가며 정리가 보장된다.
이 패턴만 기억하면 “MCP 여러 대 열고 → Agent에 주고 → 끝나면 자동으로 닫기”가 항상 안전하게 동작한다.