Skip to content

Commit fcaccda

Browse files
authored
Merge pull request #261 from pathsim/fix/worker-robustness
worker robustness
2 parents 77a65f0 + 924a94b commit fcaccda

File tree

3 files changed

+50
-18
lines changed

3 files changed

+50
-18
lines changed

pathview/app.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,13 @@
2121
from flask import Flask, request, jsonify, send_from_directory
2222
from flask_cors import CORS
2323

24-
# ---------------------------------------------------------------------------
25-
# Configuration
26-
# ---------------------------------------------------------------------------
27-
28-
SESSION_TTL = 3600 # 1 hour of inactivity before cleanup
29-
CLEANUP_INTERVAL = 60 # Check for stale sessions every 60 seconds
30-
EXEC_TIMEOUT = 35 # Server-side timeout for exec/eval (slightly > worker's 30s)
31-
WORKER_SCRIPT = str(Path(__file__).parent / "worker.py")
24+
from pathview.config import (
25+
WORKER_SCRIPT,
26+
SERVER_TIMEOUT,
27+
INIT_TIMEOUT,
28+
SESSION_TTL,
29+
CLEANUP_INTERVAL,
30+
)
3231

3332
# ---------------------------------------------------------------------------
3433
# Session management
@@ -80,7 +79,7 @@ def read_line(self) -> dict | None:
8079
except json.JSONDecodeError:
8180
continue
8281

83-
def read_line_timeout(self, timeout: float = EXEC_TIMEOUT) -> dict | None:
82+
def read_line_timeout(self, timeout: float = SERVER_TIMEOUT) -> dict | None:
8483
"""Read one JSON line with a timeout. Returns None on EOF or timeout.
8584
8685
Raises TimeoutError if no response within the timeout period.
@@ -116,7 +115,7 @@ def ensure_initialized(self, packages: list[dict] | None = None) -> list[dict]:
116115
init_msg["packages"] = packages
117116
self.send_message(init_msg)
118117
while True:
119-
resp = self.read_line()
118+
resp = self.read_line_timeout(timeout=INIT_TIMEOUT)
120119
if resp is None:
121120
raise RuntimeError("Worker process died during initialization")
122121
messages.append(resp)
@@ -480,7 +479,8 @@ def api_stream_exec():
480479
if not session:
481480
return jsonify({"error": "No active session"}), 404
482481
try:
483-
session.send_message({"type": "stream-exec", "code": code})
482+
with session.lock:
483+
session.send_message({"type": "stream-exec", "code": code})
484484
return jsonify({"status": "queued"})
485485
except Exception as e:
486486
return jsonify({"error": str(e)}), 500
@@ -497,7 +497,8 @@ def api_stream_stop():
497497
if not session:
498498
return jsonify({"status": "stopped"})
499499
try:
500-
session.send_message({"type": "stream-stop"})
500+
with session.lock:
501+
session.send_message({"type": "stream-stop"})
501502
return jsonify({"status": "stopped"})
502503
except Exception as e:
503504
return jsonify({"error": str(e)}), 500

pathview/config.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
"""Shared configuration constants for the PathView backend."""
2+
3+
from pathlib import Path
4+
5+
# ---------------------------------------------------------------------------
6+
# Paths
7+
# ---------------------------------------------------------------------------
8+
9+
WORKER_SCRIPT = str(Path(__file__).parent / "worker.py")
10+
11+
# ---------------------------------------------------------------------------
12+
# Timeouts (seconds)
13+
# ---------------------------------------------------------------------------
14+
15+
EXEC_TIMEOUT = 30 # Per exec/eval call in the worker
16+
SERVER_TIMEOUT = 35 # Server-side read timeout (slightly > worker's EXEC_TIMEOUT)
17+
INIT_TIMEOUT = 120 # Initialization / pip install (matches frontend TIMEOUTS.INIT)
18+
19+
# ---------------------------------------------------------------------------
20+
# Session management
21+
# ---------------------------------------------------------------------------
22+
23+
SESSION_TTL = 3600 # Inactive session cleanup after 1 hour
24+
CLEANUP_INTERVAL = 60 # Check for stale sessions every 60 seconds

pathview/worker.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import queue
2424
import ctypes
2525

26+
from pathview.config import EXEC_TIMEOUT
27+
2628
# Lock for thread-safe stdout writing (protocol messages only)
2729
_stdout_lock = threading.Lock()
2830

@@ -41,9 +43,6 @@
4143
_namespace = {}
4244
_initialized = False
4345

44-
# Default timeout for exec/eval (seconds)
45-
EXEC_TIMEOUT = 30
46-
4746
# Streaming state
4847
_streaming_active = False
4948
_streaming_code_queue = queue.Queue()
@@ -335,9 +334,17 @@ def run_streaming_loop(msg_id: str, expr: str) -> None:
335334
except Exception as e:
336335
send({"type": "stderr", "value": f"Stream exec error: {e}"})
337336

338-
# Step the generator
339-
exec_code_str = f"_eval_result = {expr}"
340-
exec(exec_code_str, _namespace)
337+
# Step the generator (with timeout so a stuck C extension
338+
# doesn't hang the worker forever)
339+
try:
340+
_run_with_timeout(
341+
lambda: exec(f"_eval_result = {expr}", _namespace),
342+
timeout=EXEC_TIMEOUT,
343+
)
344+
except TimeoutError:
345+
send({"type": "error", "id": msg_id,
346+
"error": f"Simulation step timed out after {EXEC_TIMEOUT}s"})
347+
break
341348
raw_result = _namespace["_eval_result"]
342349
done = raw_result.get("done", False) if isinstance(raw_result, dict) else False
343350

0 commit comments

Comments
 (0)