-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathmcp_server.py
More file actions
176 lines (151 loc) · 5.25 KB
/
mcp_server.py
File metadata and controls
176 lines (151 loc) · 5.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
#!/usr/bin/env python3
"""AMReXAgent MCP server entrypoint.
Thin stdio server wrapper; tool implementations live in src.mcp_tools.
"""
from __future__ import annotations
import asyncio
import sys
import uuid
from typing import Any
try:
from mcp.server import Server
from mcp.server.stdio import stdio_server
HAS_MCP = True
except ImportError:
HAS_MCP = False
print("[WARN] MCP library not available. Install with: pip install mcp", file=sys.stderr)
try:
from src.mcp_tools import (
AMReXAgentConfig,
AnalysisService,
ArchitectService,
InputWriterService,
LocalRunner,
PeleKnowledgeService,
SimulationPlan,
SuperfacilityRunner,
ValidationService,
VisualizationService,
config,
get_tool_specs,
mcp_analyze_results,
mcp_apply_plan,
mcp_create_proposed_modifications_with_plan,
mcp_create_simulation_plan,
mcp_execute_workflow,
mcp_generate_visualizations,
mcp_query_knowledge,
mcp_run_simulation,
mcp_select_baseline_case,
mcp_setup_job,
mcp_stage_out_globus,
mcp_validate_config,
mcp_validate_inputs,
_get_session_context,
_persist_session_context,
)
from src.interactive_service import invoke_tool
except ModuleNotFoundError:
import pathlib
AMREX_AGENT_ROOT = pathlib.Path(__file__).parent
sys.path.insert(0, str(AMREX_AGENT_ROOT))
from src.mcp_tools import (
AMReXAgentConfig,
AnalysisService,
ArchitectService,
InputWriterService,
LocalRunner,
PeleKnowledgeService,
SimulationPlan,
SuperfacilityRunner,
ValidationService,
VisualizationService,
config,
get_tool_specs,
mcp_analyze_results,
mcp_apply_plan,
mcp_create_proposed_modifications_with_plan,
mcp_create_simulation_plan,
mcp_execute_workflow,
mcp_generate_visualizations,
mcp_query_knowledge,
mcp_run_simulation,
mcp_select_baseline_case,
mcp_setup_job,
mcp_stage_out_globus,
mcp_validate_config,
mcp_validate_inputs,
_get_session_context,
_persist_session_context,
)
from src.interactive_service import invoke_tool
if not HAS_MCP:
print("[ERROR] MCP library required for server mode", file=sys.stderr)
print(" Install with: pip install mcp", file=sys.stderr)
sys.exit(1)
app = Server("pele-agent")
MAX_CONCURRENT_TOOL_CALLS = 5
_TOOL_CALL_SEMAPHORE = asyncio.Semaphore(MAX_CONCURRENT_TOOL_CALLS)
print("[MCP] AMReXAgent starting...", file=sys.stderr)
print(f"[MCP] Environment: {config.environment}", file=sys.stderr)
print(f"[MCP] FAISS DB path: {config.faiss_db_path}", file=sys.stderr)
print(f"[MCP] Knowledge base path: {config.knowledge_base_path}", file=sys.stderr)
def _invoke_tool_inline(fn: Any) -> bool:
"""
Test harnesses monkeypatch invoke_tool with locally-defined callables.
Running those inline avoids occasional asyncio.to_thread deadlocks.
"""
module_name = getattr(fn, "__module__", "")
return module_name == "__main__" or module_name.startswith("tests.")
@app.list_tools()
async def list_tools():
"""List available AMReXAgent tools."""
from mcp import Tool
tool_specs = get_tool_specs()
return [Tool(**spec) for spec in tool_specs]
@app.call_tool()
async def call_tool(name: str, arguments: dict | None) -> Any:
"""Handle tool calls from MCP client."""
try:
session_id = None
context = dict(arguments or {})
if "session_id" in context:
session_id = str(context.get("session_id") or uuid.uuid4())
context.pop("session_id", None)
# Do not accept caller_action from untrusted MCP payloads.
context.pop("caller_action", None)
async with _TOOL_CALL_SEMAPHORE:
kwargs = {
"session_id": session_id,
"surface": "mcp",
"caller_action": None,
}
if _invoke_tool_inline(invoke_tool):
return invoke_tool(name, context, **kwargs)
return await asyncio.to_thread(invoke_tool, name, context, **kwargs)
except Exception as exc:
import traceback
return {
"error": str(exc),
"traceback": traceback.format_exc(),
}
async def main() -> None:
"""Start the MCP stdio server."""
print("[MCP] Starting stdio server...", file=sys.stderr)
async with stdio_server() as (read_stream, write_stream):
init_options = app.create_initialization_options()
await app.run(read_stream, write_stream, init_options)
if __name__ == "__main__":
print("[MCP] AMReXAgent MCP Server", file=sys.stderr)
print(f"[MCP] Environment: {config.environment}", file=sys.stderr)
print(f"[MCP] FAISS indices: {config.faiss_db_path}", file=sys.stderr)
print("[MCP] Starting...", file=sys.stderr)
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n[MCP] Server stopped by user", file=sys.stderr)
except Exception as exc:
print(f"[MCP] Server error: {exc}", file=sys.stderr)
import traceback
traceback.print_exc()
sys.exit(1)