-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmanaged_mode.py
More file actions
241 lines (182 loc) · 7.64 KB
/
managed_mode.py
File metadata and controls
241 lines (182 loc) · 7.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
"""Managed-mode contract utilities for JARVIS subsystems.
When JARVIS_ROOT_MANAGED=true, subsystems (Prime, Reactor) run under the
root authority's supervision. This module provides the shared helpers that
every managed subsystem needs:
* Environment-variable readers for the control-plane handshake.
* Deterministic fingerprinting / hashing for capability contracts.
* HMAC-based authentication for control-plane messages.
* Health-envelope builder that enriches responses in managed mode
while passing them through unchanged in standalone mode.
**Portability note:** This module is designed to be copied verbatim into
the Prime and Reactor repos. It uses *only* stdlib imports.
Schema version follows semver and is compared by the ContractGate at boot
to reject incompatible subsystem builds.
"""
from __future__ import annotations
import hashlib
import hmac as _hmac
import json
import os
import sys
import time
import uuid
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Union
# ---------------------------------------------------------------------------
# Schema version (semver) — bumped on breaking envelope changes
# ---------------------------------------------------------------------------
SCHEMA_VERSION: str = "1.0.0"
# ---------------------------------------------------------------------------
# Well-known exit codes
# ---------------------------------------------------------------------------
EXIT_CLEAN: int = 0
EXIT_CONFIG_ERROR: int = 100
EXIT_CONTRACT_MISMATCH: int = 101
EXIT_DEPENDENCY_FAILURE: int = 200
EXIT_RUNTIME_FATAL: int = 300
# ---------------------------------------------------------------------------
# Boot-time captures (never reset on hot reload)
# ---------------------------------------------------------------------------
_BOOT_TIME_NS: int = time.monotonic_ns()
_PID: int = os.getpid()
_EXEC_FINGERPRINT: Optional[str] = None # computed lazily via get_exec_fingerprint()
# ===================================================================
# Environment-variable readers
# ===================================================================
def is_root_managed() -> bool:
"""Return True when JARVIS_ROOT_MANAGED is set to a truthy value.
Reads the environment variable *fresh* on every call so that
monkeypatching in tests (and dynamic reconfiguration) works.
"""
return os.environ.get("JARVIS_ROOT_MANAGED", "").lower() == "true"
def get_session_id() -> str:
"""Read JARVIS_ROOT_SESSION_ID from the environment.
Returns an empty string if unset (standalone mode).
"""
return os.environ.get("JARVIS_ROOT_SESSION_ID", "")
def get_subsystem_role() -> str:
"""Read JARVIS_SUBSYSTEM_ROLE from the environment.
Returns an empty string if unset (standalone mode).
"""
return os.environ.get("JARVIS_SUBSYSTEM_ROLE", "")
def get_control_plane_secret() -> str:
"""Read JARVIS_CONTROL_PLANE_SECRET from the environment.
Returns an empty string if unset (standalone / unauth mode).
"""
return os.environ.get("JARVIS_CONTROL_PLANE_SECRET", "")
# ===================================================================
# Exec fingerprinting
# ===================================================================
def compute_exec_fingerprint(
binary_path: str,
cmdline: Union[List[str], tuple],
) -> str:
"""Compute a deterministic fingerprint for a process identity.
Returns ``"sha256:<16-hex-chars>"`` derived from the binary path
and its command-line arguments.
"""
payload = json.dumps(
{"binary": binary_path, "cmdline": list(cmdline)},
sort_keys=True,
separators=(",", ":"),
)
digest = hashlib.sha256(payload.encode("utf-8")).hexdigest()[:16]
return f"sha256:{digest}"
def get_exec_fingerprint() -> str:
"""Lazy singleton: fingerprint for the current process.
Uses ``sys.executable`` and ``sys.argv``. Computed once, then
cached in ``_EXEC_FINGERPRINT`` for the lifetime of the process.
"""
global _EXEC_FINGERPRINT # noqa: PLW0603
if _EXEC_FINGERPRINT is None:
_EXEC_FINGERPRINT = compute_exec_fingerprint(sys.executable, sys.argv)
return _EXEC_FINGERPRINT
# ===================================================================
# Capability hashing
# ===================================================================
def compute_capability_hash(capabilities: Dict[str, Any]) -> str:
"""Deterministic SHA-256 hash of a capabilities dict.
Keys are sorted so that insertion order is irrelevant.
Returns the full 64-char hex digest.
"""
canonical = json.dumps(capabilities, sort_keys=True, separators=(",", ":"))
return hashlib.sha256(canonical.encode("utf-8")).hexdigest()
# ===================================================================
# HMAC authentication
# ===================================================================
def build_hmac_auth(session_id: str, secret: str) -> str:
"""Build a control-plane auth header.
Format: ``"<unix_timestamp>:<nonce>:<hmac_hex>"``
The HMAC is ``HMAC-SHA256(secret, "<timestamp>:<nonce>:<session_id>")``.
"""
ts = str(time.time())
nonce = uuid.uuid4().hex[:16]
msg = f"{ts}:{nonce}:{session_id}".encode("utf-8")
sig = _hmac.new(secret.encode("utf-8"), msg, hashlib.sha256).hexdigest()
return f"{ts}:{nonce}:{sig}"
def verify_hmac_auth(
header: str,
session_id: str,
secret: str,
tolerance_s: float = 30.0,
) -> bool:
"""Verify a control-plane auth header.
Returns ``False`` on any parse error, HMAC mismatch, or timestamp
outside the tolerance window.
"""
try:
parts = header.split(":", 2)
if len(parts) != 3:
return False
ts_str, nonce, received_sig = parts
ts = float(ts_str)
except (ValueError, TypeError):
return False
# Timestamp tolerance check
if abs(time.time() - ts) > tolerance_s:
return False
# Recompute expected HMAC
msg = f"{ts_str}:{nonce}:{session_id}".encode("utf-8")
expected_sig = _hmac.new(
secret.encode("utf-8"), msg, hashlib.sha256
).hexdigest()
return _hmac.compare_digest(received_sig, expected_sig)
# ===================================================================
# Health envelope builder
# ===================================================================
def build_health_envelope(
base_response: Dict[str, Any],
readiness: str,
drain_id: Optional[str] = None,
capability_hash: Optional[str] = None,
) -> Dict[str, Any]:
"""Enrich a health response with managed-mode metadata.
In **standalone mode** (``JARVIS_ROOT_SESSION_ID`` not set), the
``base_response`` dict is returned unchanged — no enrichment keys
are added.
In **managed mode**, the returned dict contains all original fields
from ``base_response`` plus the contract-mandated enrichment fields.
"""
session_id = get_session_id()
if not session_id:
# Standalone mode — pass through unchanged
return dict(base_response)
envelope: Dict[str, Any] = dict(base_response)
envelope.update(
{
"liveness": "up",
"readiness": readiness,
"session_id": session_id,
"pid": _PID,
"start_time_ns": _BOOT_TIME_NS,
"exec_fingerprint": get_exec_fingerprint(),
"subsystem_role": get_subsystem_role(),
"schema_version": SCHEMA_VERSION,
"capability_hash": capability_hash,
"observed_at_ns": time.monotonic_ns(),
"wall_time_utc": datetime.now(timezone.utc).isoformat(),
}
)
if drain_id is not None:
envelope["drain_id"] = drain_id
return envelope