Skip to content

Commit da98753

Browse files
committed
fix: Adapted Fysom and gunicorn hook to multi-threaded/multi-worker instrumentation
Signed-off-by: Cagri Yonca <[email protected]>
1 parent c81f2d4 commit da98753

File tree

9 files changed

+210
-70
lines changed

9 files changed

+210
-70
lines changed

src/instana/__init__.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,9 @@ def key_to_bool(k: str) -> bool:
8383
import inspect
8484

8585
all_accepted_patch_all_args = inspect.getfullargspec(monkey.patch_all)[0]
86-
provided_options = provided_options.replace(" ", "").replace("--", "").split(",")
86+
provided_options = (
87+
provided_options.replace(" ", "").replace("--", "").split(",")
88+
)
8789

8890
provided_options = [
8991
k for k in provided_options if short_key(k) in all_accepted_patch_all_args
@@ -210,11 +212,6 @@ def boot_agent() -> None:
210212
server as tornado_server, # noqa: F401
211213
)
212214

213-
# Hooks
214-
from instana.hooks import (
215-
hook_gunicorn, # noqa: F401
216-
)
217-
218215

219216
def _start_profiler() -> None:
220217
"""Start the Instana Auto Profile."""

src/instana/agent/host.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -457,9 +457,13 @@ def diagnostics(self) -> None:
457457
logger.warning(
458458
f"is_collector_thread_running?: {self.collector.is_reporting_thread_running()}"
459459
)
460-
logger.warning(
461-
f"background_report_lock.locked?: {self.collector.background_report_lock.locked()}"
460+
# RLock doesn't have a locked() method, so we check by trying to acquire
461+
lock_acquired = self.collector.background_report_lock.acquire(
462+
blocking=False
462463
)
464+
if lock_acquired:
465+
self.collector.background_report_lock.release()
466+
logger.warning(f"background_report_lock.locked?: {not lock_acquired}")
463467
logger.warning(f"ready_to_start: {self.collector.ready_to_start}")
464468
logger.warning(f"reporting_thread: {self.collector.reporting_thread}")
465469
logger.warning(f"report_interval: {self.collector.report_interval}")

src/instana/collector/base.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def __init__(self, agent: Type["BaseAgent"]) -> None:
5555
# Lock used synchronize reporting - no updates when sending
5656
# Used by the background reporting thread. Used to synchronize report attempts and so
5757
# that we never have two in progress at once.
58-
self.background_report_lock = threading.Lock()
58+
self.background_report_lock = threading.RLock()
5959

6060
# Reporting interval for the background thread(s)
6161
self.report_interval = 1
@@ -68,12 +68,9 @@ def __init__(self, agent: Type["BaseAgent"]) -> None:
6868

6969
def is_reporting_thread_running(self) -> bool:
7070
"""
71-
Indicates if there is a thread running with the name self.THREAD_NAME
71+
Checks if the collector is started and the reporting thread is alive.
7272
"""
73-
for thread in threading.enumerate():
74-
if thread.name == self.THREAD_NAME:
75-
return True
76-
return False
73+
return bool(self.reporting_thread and self.reporting_thread.is_alive())
7774

7875
def start(self) -> None:
7976
"""
@@ -91,8 +88,9 @@ def start(self) -> None:
9188
timer.start()
9289
return
9390
logger.debug(
94-
f"BaseCollector.start non-fatal: call but thread already running (started: {self.started})"
91+
f"BaseCollector.start: Skipping start call - reporting thread already running (started: {self.started})"
9592
)
93+
return
9694

9795
if self.agent.can_send():
9896
logger.debug("BaseCollector.start: launching collection thread")
@@ -120,6 +118,8 @@ def shutdown(self, report_final: bool = True) -> None:
120118
logger.debug("Collector.shutdown: Reporting final data.")
121119
self.prepare_and_report_data()
122120
self.started = False
121+
# Clear the thread reference to ensure clean restart after fork
122+
self.reporting_thread = None
123123

124124
def background_report(self) -> None:
125125
"""

src/instana/collector/host.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
"""
77

88
from time import time
9-
from typing import DefaultDict, Any
9+
from typing import Any, DefaultDict
1010

1111
from instana.collector.base import BaseCollector
1212
from instana.collector.helpers.runtime import RuntimeHelper
@@ -43,19 +43,20 @@ def prepare_and_report_data(self) -> None:
4343
state machine case.
4444
"""
4545
try:
46-
if self.agent.machine.fsm.current == "wait4init":
46+
with self.agent.machine.lock:
47+
current_state = self.agent.machine.fsm.current
48+
49+
if current_state == "wait4init":
4750
# Test the host agent if we're ready to send data
4851
if self.agent.is_agent_ready():
49-
if self.agent.machine.fsm.current != "good2go":
50-
logger.debug("Agent is ready. Getting to work.")
51-
self.agent.machine.fsm.ready()
52+
with self.agent.machine.lock:
53+
if self.agent.machine.fsm.current != "good2go":
54+
logger.debug("Agent is ready. Getting to work.")
55+
self.agent.machine.fsm.ready()
5256
else:
5357
return
5458

55-
if (
56-
self.agent.machine.fsm.current == "good2go"
57-
and self.agent.is_timed_out()
58-
):
59+
if current_state == "good2go" and self.agent.is_timed_out():
5960
logger.info(
6061
"The Instana host agent has gone offline or is no longer reachable for > 1 min. Will retry periodically."
6162
)

src/instana/fsm.py

Lines changed: 49 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,16 @@ class TheMachine:
2626
RETRY_PERIOD = 30
2727
THREAD_NAME = "Instana Machine"
2828

29-
warnedPeriodic = False
30-
3129
def __init__(self, agent: "HostAgent") -> None:
3230
logger.debug("Initializing host agent state machine")
3331

32+
self._lock = threading.RLock()
33+
self._warned_periodic = False
34+
3435
self.agent = agent
3536
self.fsm = Fysom(
3637
{
38+
"initial": "*",
3739
"events": [
3840
("lookup", "*", "found"),
3941
("announce", "found", "announced"),
@@ -42,7 +44,7 @@ def __init__(self, agent: "HostAgent") -> None:
4244
],
4345
"callbacks": {
4446
# Can add the following to debug
45-
# "onchangestate": self.print_state_change,
47+
# "onchangestate": self.print_state_change,
4648
"onlookup": self.lookup_agent_host,
4749
"onannounce": self.announce_sensor,
4850
"onpending": self.on_ready,
@@ -51,17 +53,33 @@ def __init__(self, agent: "HostAgent") -> None:
5153
}
5254
)
5355

54-
self.timer = threading.Timer(1, self.fsm.lookup)
55-
self.timer.daemon = True
56-
self.timer.name = self.THREAD_NAME
57-
self.timer.start()
56+
with self._lock:
57+
self.timer = threading.Timer(1, self._safe_fsm_lookup)
58+
self.timer.daemon = True
59+
self.timer.name = self.THREAD_NAME
60+
self.timer.start()
5861

5962
@staticmethod
6063
def print_state_change(e: Any) -> None:
6164
logger.debug(
6265
f"========= ({os.getpid()}#{threading.current_thread().name}) FSM event: {e.event}, src: {e.src}, dst: {e.dst} =========="
6366
)
6467

68+
def _safe_fsm_lookup(self) -> None:
69+
"""Thread-safe wrapper for FSM lookup."""
70+
with self._lock:
71+
self.fsm.lookup()
72+
73+
def _safe_fsm_announce(self) -> None:
74+
"""Thread-safe wrapper for FSM announce."""
75+
with self._lock:
76+
self.fsm.announce()
77+
78+
def _safe_fsm_pending(self) -> None:
79+
"""Thread-safe wrapper for FSM pending."""
80+
with self._lock:
81+
self.fsm.pending()
82+
6583
def reset(self) -> None:
6684
"""
6785
reset is called to start from scratch in a process. It may be called on first boot or
@@ -73,14 +91,14 @@ def reset(self) -> None:
7391
:return: void
7492
"""
7593
logger.debug("State machine being reset. Will start a new announce cycle.")
76-
self.fsm.lookup()
94+
self._safe_fsm_lookup()
7795

7896
def lookup_agent_host(self, e: Any) -> bool:
7997
host = self.agent.options.agent_host
8098
port = self.agent.options.agent_port
8199

82100
if self.agent.is_agent_listening(host, port):
83-
self.fsm.announce()
101+
self._safe_fsm_announce()
84102
return True
85103

86104
if os.path.exists("/proc/"):
@@ -89,14 +107,15 @@ def lookup_agent_host(self, e: Any) -> bool:
89107
if self.agent.is_agent_listening(host, port):
90108
self.agent.options.agent_host = host
91109
self.agent.options.agent_port = port
92-
self.fsm.announce()
110+
self._safe_fsm_announce()
93111
return True
94112

95-
if self.warnedPeriodic is False:
96-
logger.info(
97-
"Instana Host Agent couldn't be found. Will retry periodically..."
98-
)
99-
self.warnedPeriodic = True
113+
with self._lock:
114+
if self._warned_periodic is False:
115+
logger.info(
116+
"Instana Host Agent couldn't be found. Will retry periodically..."
117+
)
118+
self._warned_periodic = True
100119

101120
self.schedule_retry(
102121
self.lookup_agent_host, e, f"{self.THREAD_NAME}: agent_lookup"
@@ -143,17 +162,18 @@ def announce_sensor(self, e: Any) -> bool:
143162
return False
144163

145164
self.agent.set_from(payload)
146-
self.fsm.pending()
165+
self._safe_fsm_pending()
147166
logger.debug(
148167
f"Announced PID: {pid} (true PID: {self.agent.announce_data.pid}). Waiting for Agent Ready..."
149168
)
150169
return True
151170

152171
def schedule_retry(self, fun: Callable, e: Any, name: str) -> None:
153-
self.timer = threading.Timer(self.RETRY_PERIOD, fun, [e])
154-
self.timer.daemon = True
155-
self.timer.name = name
156-
self.timer.start()
172+
with self._lock:
173+
self.timer = threading.Timer(self.RETRY_PERIOD, fun, [e])
174+
self.timer.daemon = True
175+
self.timer.name = name
176+
self.timer.start()
157177

158178
def on_ready(self, _: Any) -> None:
159179
self.agent.start()
@@ -258,3 +278,12 @@ def _get_cmdline(self, pid: int) -> List[str]:
258278
except Exception:
259279
logger.debug("Error getting command line: ", exc_info=True)
260280
return sys.argv
281+
282+
@property
283+
def lock(self) -> threading.RLock:
284+
"""
285+
Returns the thread lock used for synchronizing FSM state transitions.
286+
287+
:return: The RLock instance used for thread synchronization
288+
"""
289+
return self._lock

src/instana/hooks/__init__.py

Whitespace-only changes.

src/instana/hooks/hook_gunicorn.py

Lines changed: 0 additions & 25 deletions
This file was deleted.

tests/collector/test_base_collector.py

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ def reporting_function():
5454
name=self.collector.THREAD_NAME, target=reporting_function
5555
)
5656
sample_thread.start()
57+
# Set the required state for is_reporting_thread_running to return True
58+
self.collector.started = True
59+
self.collector.reporting_thread = sample_thread
5760
try:
5861
assert self.collector.is_reporting_thread_running()
5962
finally:
@@ -86,7 +89,7 @@ def test_start_collector_while_running_thread(
8689
):
8790
self.collector.start()
8891
assert (
89-
"BaseCollector.start non-fatal: call but thread already running (started: False)"
92+
"BaseCollector.start: Skipping start call - reporting thread already running (started: False)"
9093
in caplog.messages
9194
)
9295

@@ -207,3 +210,41 @@ def test_queued_profiles(
207210
time.sleep(0.1)
208211
profiles = self.collector.queued_profiles()
209212
assert len(profiles) == 3
213+
214+
def test_is_reporting_thread_running_when_thread_is_none(self) -> None:
215+
"""Test is_reporting_thread_running when reporting_thread is None."""
216+
self.collector.reporting_thread = None
217+
assert not self.collector.is_reporting_thread_running()
218+
219+
def test_is_reporting_thread_running_when_thread_is_dead(self) -> None:
220+
"""Test is_reporting_thread_running when thread has finished."""
221+
222+
def quick_function():
223+
pass
224+
225+
sample_thread = threading.Thread(target=quick_function)
226+
sample_thread.start()
227+
sample_thread.join() # Wait for thread to finish
228+
229+
self.collector.reporting_thread = sample_thread
230+
assert not self.collector.is_reporting_thread_running()
231+
232+
def test_is_reporting_thread_running_when_started_false(self) -> None:
233+
"""Test is_reporting_thread_running when started is False but thread exists."""
234+
stop_event = threading.Event()
235+
236+
def reporting_function():
237+
stop_event.wait()
238+
239+
sample_thread = threading.Thread(target=reporting_function)
240+
sample_thread.start()
241+
242+
self.collector.started = False
243+
self.collector.reporting_thread = sample_thread
244+
245+
try:
246+
# Should still return True if thread is alive, regardless of started flag
247+
assert self.collector.is_reporting_thread_running()
248+
finally:
249+
stop_event.set()
250+
sample_thread.join()

0 commit comments

Comments
 (0)