Skip to content

Commit 1485830

Browse files
committed
refactor(indexer): replace globalIndexSemaphore+workers with sequential for loop
- Remove globalIndexSemaphore (RAM-based channel semaphore, 1-4 workers) - Remove file worker pool goroutines (fileWg, filePaths channel) - Add WorkspaceName field to Options (for [IDX] log prefix) - Replace with simple sequential for loop: for _, path := range changedFiles - Watchdog goroutine retained (1 goroutine vs N+1 before) - Update log format: [IDX] ws=<name> lang=<lang> [n/total] file (pct%) - per-file lines at Debug level (hidden by default, MCP_LOG_LEVEL=debug) - errors at Warn level (always visible) - DONE at Info level (always visible) - Replace TestGlobalSemaphoreOrder + TestIndexWorkspaceParallelFiles with: TestIndexWorkspaceSequentialFiles + TestIndexWorkspaceProgressIsAscending (sequential guarantee: progress calls strictly ascending by 1)
1 parent 9a8963a commit 1485830

File tree

2 files changed

+75
-175
lines changed

2 files changed

+75
-175
lines changed

pkg/indexer/service.go

Lines changed: 40 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -22,76 +22,6 @@ import (
2222
"github.com/doITmagic/rag-code-mcp/pkg/storage"
2323
)
2424

25-
// getSystemMemoryGB attempts to read total system memory from /proc/meminfo (Linux).
26-
// Returns 0 if unable to read.
27-
func getSystemMemoryGB() int {
28-
if runtime.GOOS != "linux" {
29-
return 0
30-
}
31-
data, err := os.ReadFile("/proc/meminfo")
32-
if err != nil {
33-
return 0
34-
}
35-
lines := strings.Split(string(data), "\n")
36-
for _, line := range lines {
37-
if strings.HasPrefix(line, "MemTotal:") {
38-
var kb int
39-
if _, err := fmt.Sscanf(line, "MemTotal: %d kB", &kb); err == nil {
40-
return kb / (1024 * 1024)
41-
}
42-
}
43-
}
44-
return 0
45-
}
46-
47-
// globalIndexSemaphore limits the total number of concurrent file-indexing workers
48-
// across ALL active workspace indexing jobs.
49-
// To prevent Ollama OOM while maximizing speed, we scale concurrency based on system RAM:
50-
// - <= 8GB RAM: 1 worker (Survival mode)
51-
// - <= 16GB RAM: 2 workers
52-
// - <= 32GB RAM: 3 workers
53-
// - > 32GB RAM: 4 workers (Max Cap for high-end systems to leave RAM for OS/IDE)
54-
var globalIndexSemaphore = func() chan struct{} {
55-
n := runtime.NumCPU() / 4
56-
if n < 2 {
57-
n = 2
58-
}
59-
if n > 4 {
60-
n = 4
61-
}
62-
63-
memGB := getSystemMemoryGB()
64-
if memGB > 0 {
65-
var ramWorkers int
66-
switch {
67-
case memGB <= 8:
68-
ramWorkers = 1
69-
case memGB <= 16:
70-
ramWorkers = 2
71-
case memGB <= 32:
72-
ramWorkers = 3
73-
default:
74-
ramWorkers = 4
75-
}
76-
77-
// Take the minimum between CPU-recommended workers and RAM-allowed workers
78-
if ramWorkers < n {
79-
n = ramWorkers
80-
}
81-
82-
logger.Instance.Info("🧠 Detected %dGB RAM. Dynamic indexing concurrency set to %d workers.", memGB, n)
83-
} else {
84-
// Fallback for non-Linux or failures, strictly safe
85-
logger.Instance.Warn("🧠 Could not detect system RAM. Defaulting to safe concurrency limit of 1 worker.")
86-
n = 1
87-
}
88-
89-
ch := make(chan struct{}, n)
90-
for i := 0; i < n; i++ {
91-
ch <- struct{}{}
92-
}
93-
return ch
94-
}()
9525

9626
const (
9727
deleteCollectionTimeout = 10 * time.Second
@@ -101,6 +31,7 @@ const (
10131
// Options configures the indexer.
10232
type Options struct {
10333
Language string
34+
WorkspaceName string // basename of workspace root, used for logging
10435
ExcludePatterns []string
10536
Recreate bool
10637
Progress func(doneFiles, totalFiles int)
@@ -220,34 +151,25 @@ func (s *Service) IndexWorkspace(ctx context.Context, root string, collection st
220151
}
221152
}
222153

154+
wsName := opts.WorkspaceName
155+
if wsName == "" {
156+
wsName = filepath.Base(root)
157+
}
158+
223159
totalFiles := len(changedFiles)
224-
logger.Instance.Info("Indexing %d file(s) in %s (Language: %s)", totalFiles, root, opts.Language)
160+
logger.Instance.Info("[IDX] ws=%s lang=%s ▶ %d file(s) to index", wsName, opts.Language, totalFiles)
225161

226162
// Ensure the embedding model is loaded in Ollama's memory before starting.
227163
// If another program evicted it, this will reload it (with up to 2min timeout).
228164
if ollamaProvider, ok := unwrapOllamaProvider(s.embedder); ok {
229165
if err := ollamaProvider.EnsureLoaded(ctx); err != nil {
230-
logger.Instance.Error("Cannot ensure embedding model is loaded: %v", err)
166+
logger.Instance.Error("[IDX] ws=%s lang=%s ❌ embedding model not available: %v", wsName, opts.Language, err)
231167
return fmt.Errorf("embedding model not available: %w", err)
232168
}
233169
}
234170

235-
// 4. Process changed files using the global semaphore to cap total concurrency
236-
// across all active workspace indexing jobs (prevents CPU/RAM overload).
237-
numFileWorkers := cap(globalIndexSemaphore)
238-
239-
filePaths := make(chan string, totalFiles)
240-
for _, p := range changedFiles {
241-
filePaths <- p
242-
}
243-
close(filePaths)
244-
245-
var (
246-
fileWg sync.WaitGroup
247-
errMu sync.Mutex
248-
fileErrs []string
249-
doneFiles atomic.Int64
250-
)
171+
// 4. File-level counters for watchdog (accessed from two goroutines via atomic).
172+
var doneFiles atomic.Int64
251173

252174
// Dedicated periodic-save goroutine + stall watchdog: detects silent deadlocks.
253175
saveStop := make(chan struct{})
@@ -261,35 +183,29 @@ func (s *Service) IndexWorkspace(ctx context.Context, root string, collection st
261183
select {
262184
case <-saveTicker.C:
263185
if err := state.Save(statePath); err != nil {
264-
logger.Instance.Warn("Periodic state save failed for %s: %v", root, err)
186+
logger.Instance.Warn("[IDX] ws=%s lang=%s periodic state save failed: %v", wsName, opts.Language, err)
265187
}
266188
case <-stallTicker.C:
267189
current := doneFiles.Load()
268-
// We only trigger stall logic if we haven't finished all files AND
269-
// we haven't successfully embedded ANY new symbol in the last 60 seconds.
270190
lastActivitySec := s.lastActivity.Load()
271191
elapsedSinceActivity := time.Now().Unix() - lastActivitySec
272192

273193
if current < int64(totalFiles) && elapsedSinceActivity >= 60 {
274194
stallCount++
275-
semLen := len(globalIndexSemaphore)
276-
semCap := cap(globalIndexSemaphore)
277-
logger.Instance.Warn("⚠️ Indexing stall detected for %s/%s: %d/%d files. No embed activity for %ds. Semaphore: %d/%d. Stall count: %d",
278-
opts.Language, root, current, totalFiles, elapsedSinceActivity, semLen, semCap, stallCount)
195+
logger.Instance.Warn("[IDX] ws=%s lang=%s ⚠️ STALL: no embed activity for %ds [%d/%d] (stall #%d)",
196+
wsName, opts.Language, elapsedSinceActivity, current, totalFiles, stallCount)
279197
if stallCount >= 2 {
280-
// Check if Ollama is still alive
281198
if err := healthcheck.PingOllama(""); err != nil {
282-
logger.Instance.Error("🔴 Ollama HTTP is unresponsive (%v). Forcing restart...", err)
199+
logger.Instance.Error("[IDX] ws=%s lang=%s ❌ Ollama unresponsive: %v — forcing restart", wsName, opts.Language, err)
283200
} else {
284-
logger.Instance.Error("🔴 Ollama HTTP ping is OK but embedding goroutines are DEADLOCKED. Forcing restart!")
201+
logger.Instance.Error("[IDX] ws=%s lang=%s ❌ Ollama ping OK but embed goroutine STALLED — forcing restart", wsName, opts.Language)
285202
}
286-
// Always attempt strict restart to kill stuck runners
287203
attemptOllamaRestart()
288204
}
289205
if stallCount >= 3 {
290206
buf := make([]byte, 8192)
291207
n := runtime.Stack(buf, true)
292-
logger.Instance.Error("🔴 Deadlock confirmed in indexing goroutines for %s. Goroutine dump:\n%s", root, string(buf[:n]))
208+
logger.Instance.Error("[IDX] ws=%s lang=%s goroutine dump:\n%s", wsName, opts.Language, string(buf[:n]))
293209
}
294210
} else {
295211
stallCount = 0
@@ -300,53 +216,42 @@ func (s *Service) IndexWorkspace(ctx context.Context, root string, collection st
300216
}
301217
}()
302218

303-
for i := 0; i < numFileWorkers; i++ {
304-
fileWg.Add(1)
305-
go func() {
306-
defer fileWg.Done()
307-
for path := range filePaths {
308-
// Acquire global slot — blocks if too many concurrent indexers active
309-
<-globalIndexSemaphore
310-
n := int(doneFiles.Add(1))
311-
pct := 0
312-
if totalFiles > 0 {
313-
pct = n * 100 / totalFiles
314-
}
219+
// 5. Sequential file processing — no worker pool, no semaphore.
220+
// Embed is serial in Ollama anyway (numWorkers=1 in IndexItems), so parallelism
221+
// here only added complexity without meaningful throughput gain.
222+
var fileErrs []string
223+
for _, path := range changedFiles {
224+
n := int(doneFiles.Add(1))
225+
pct := n * 100 / totalFiles
315226

316-
logger.Instance.Info("📄 [%d/%d] %s (%s, %d%%)", n, totalFiles, filepath.Base(path), opts.Language, pct)
317-
if opts.Progress != nil {
318-
opts.Progress(n, totalFiles)
319-
}
227+
logger.Instance.Debug("[IDX] ws=%s lang=%s [%d/%d] %s (%d%%)",
228+
wsName, opts.Language, n, totalFiles, filepath.Base(path), pct)
320229

321-
symCount, indexErr := s.IndexFile(ctx, collection, path, state)
322-
// Release slot immediately after processing
323-
globalIndexSemaphore <- struct{}{}
230+
if opts.Progress != nil {
231+
opts.Progress(n, totalFiles)
232+
}
324233

325-
if indexErr != nil {
326-
logger.Instance.Error("Failed to index %s: %v", path, indexErr)
327-
errMu.Lock()
328-
fileErrs = append(fileErrs, fmt.Sprintf("%s: %v", path, indexErr))
329-
errMu.Unlock()
330-
} else {
331-
logger.Instance.Info(" → %d symbol(s) indexed from %s", symCount, filepath.Base(path))
332-
}
333-
}
334-
}()
234+
symCount, indexErr := s.IndexFile(ctx, collection, path, state)
235+
if indexErr != nil {
236+
logger.Instance.Warn("[IDX] ws=%s lang=%s ⚠️ %s: %v", wsName, opts.Language, filepath.Base(path), indexErr)
237+
fileErrs = append(fileErrs, fmt.Sprintf("%s: %v", path, indexErr))
238+
} else {
239+
logger.Instance.Debug("[IDX] ws=%s lang=%s %s → %d symbol(s)", wsName, opts.Language, filepath.Base(path), symCount)
240+
}
335241
}
336242

337-
fileWg.Wait()
338-
close(saveStop) // Stop the periodic save goroutine
339-
logger.Instance.Info("[INDEX] %s done: %d file(s) indexed", opts.Language, totalFiles)
243+
close(saveStop)
340244

341245
if len(fileErrs) > 0 {
342-
logger.Instance.Warn("%d file(s) failed to index in %s", len(fileErrs), root)
246+
logger.Instance.Warn("[IDX] ws=%s lang=%s %d file(s) failed to index", wsName, opts.Language, len(fileErrs))
343247
}
344248

345-
// 5. Save state
249+
// 6. Save state
346250
if err := state.Save(statePath); err != nil {
347-
logger.Instance.Warn("Failed to save index state for %s: %v", root, err)
251+
logger.Instance.Warn("[IDX] ws=%s lang=%s failed to save state: %v", wsName, opts.Language, err)
348252
}
349253

254+
logger.Instance.Info("[IDX] ws=%s lang=%s ✅ DONE %d file(s)", wsName, opts.Language, totalFiles)
350255
return nil
351256
}
352257

pkg/indexer/service_test.go

Lines changed: 35 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -156,9 +156,9 @@ func TestIndexItemsParallelism(t *testing.T) {
156156
Expect(len(mockS.upsertPoints)).To(Equal(100))
157157
}
158158

159-
// TestIndexWorkspaceParallelFiles verifies that multiple files are indexed concurrently
160-
// with no data races on shared state. Run with -race to detect races.
161-
func TestIndexWorkspaceParallelFiles(t *testing.T) {
159+
// TestIndexWorkspaceSequentialFiles verifies that multiple files are indexed correctly
160+
// in a sequential for loop. Run with -race to detect races on shared state.
161+
func TestIndexWorkspaceSequentialFiles(t *testing.T) {
162162
RegisterTestingT(t)
163163

164164
// One file per sub-package directory so each IndexFile call yields exactly 1 symbol.
@@ -179,7 +179,7 @@ func TestIndexWorkspaceParallelFiles(t *testing.T) {
179179
mockS := &mockStore{}
180180
svc := NewService(mockEmbed, mockS)
181181

182-
err := svc.IndexWorkspace(context.Background(), dir, "test-parallel", Options{Language: "go"})
182+
err := svc.IndexWorkspace(context.Background(), dir, "test-sequential", Options{Language: "go"})
183183
Expect(err).NotTo(HaveOccurred())
184184

185185
// Each sub-package has 1 function → numFiles symbols total
@@ -191,56 +191,51 @@ func TestIndexWorkspaceParallelFiles(t *testing.T) {
191191
Expect(int(atomic.LoadInt32(&mockEmbed.embedCount))).To(BeNumerically(">=", numFiles))
192192
}
193193

194-
// TestGlobalSemaphoreOrder verifies that the semaphore protocol is correct:
195-
// acquire = receive a token (<-ch), release = send a token back (ch <- struct{}{}).
196-
// A pre-filled buffered channel of capacity N means N concurrent workers can acquire.
197-
// If acquire/release were swapped, this test would deadlock and be caught by the timeout.
198-
func TestGlobalSemaphoreOrder(t *testing.T) {
194+
// TestIndexWorkspaceProgressIsAscending verifies that Progress callback is called
195+
// in strictly ascending order — guaranteed by the sequential for loop.
196+
func TestIndexWorkspaceProgressIsAscending(t *testing.T) {
199197
RegisterTestingT(t)
200198

201-
const cap = 3
202-
sem := make(chan struct{}, cap)
203-
// Pre-fill: each token represents a free slot.
204-
for i := 0; i < cap; i++ {
205-
sem <- struct{}{}
199+
dir := t.TempDir()
200+
const numFiles = 5
201+
for i := 0; i < numFiles; i++ {
202+
pkgDir := filepath.Join(dir, fmt.Sprintf("ppkg%d", i))
203+
Expect(os.MkdirAll(pkgDir, 0755)).To(Succeed())
204+
src := fmt.Sprintf("package ppkg%d\n\nfunc PF%d() {}\n", i, i)
205+
Expect(os.WriteFile(filepath.Join(pkgDir, "f.go"), []byte(src), 0644)).To(Succeed())
206206
}
207207

208-
// Acquire all N tokens — should not block.
209-
for i := 0; i < cap; i++ {
210-
select {
211-
case <-sem: // acquire = receive
212-
default:
213-
t.Fatalf("acquire %d should not block on a fresh semaphore", i)
214-
}
215-
}
208+
var calls []int
209+
var mu sync.Mutex
216210

217-
// Semaphore empty — next acquire MUST block.
218-
select {
219-
case <-sem:
220-
t.Fatal("acquire should block when semaphore is exhausted")
221-
default:
222-
// expected — would block
223-
}
211+
mockEmbed := &mockEmbedder{}
212+
mockS := &mockStore{}
213+
svc := NewService(mockEmbed, mockS)
224214

225-
// Release one slot.
226-
sem <- struct{}{} // release = send
215+
err := svc.IndexWorkspace(context.Background(), dir, "test-ascending", Options{
216+
Language: "go",
217+
Progress: func(done, total int) {
218+
mu.Lock()
219+
calls = append(calls, done)
220+
mu.Unlock()
221+
},
222+
})
223+
Expect(err).NotTo(HaveOccurred())
224+
Expect(len(calls)).To(Equal(numFiles))
227225

228-
// Now acquire should succeed again.
229-
select {
230-
case <-sem:
231-
// OK
232-
default:
233-
t.Fatal("acquire should succeed after release")
226+
// Sequential: each call must increment by exactly 1
227+
for i := 1; i < len(calls); i++ {
228+
Expect(calls[i]).To(Equal(calls[i-1]+1), "progress calls must be strictly ascending")
234229
}
235230
}
236231

237232
// TestIndexWorkspaceNoDeadlock runs IndexWorkspace with many files and a tight timeout.
238-
// If the semaphore is inverted (the original bug), this will deadlock and fail via timeout.
233+
// With the sequential model there is no semaphore to deadlock, but this verifies
234+
// that IndexWorkspace completes within a reasonable time.
239235
func TestIndexWorkspaceNoDeadlock(t *testing.T) {
240236
RegisterTestingT(t)
241237

242238
dir := t.TempDir()
243-
// Create more files than the semaphore capacity to trigger contention.
244239
const numFiles = 20
245240
for i := 0; i < numFiles; i++ {
246241
pkgDir := filepath.Join(dir, fmt.Sprintf("dpkg%d", i))
@@ -264,7 +259,7 @@ func TestIndexWorkspaceNoDeadlock(t *testing.T) {
264259
case err := <-done:
265260
Expect(err).NotTo(HaveOccurred())
266261
case <-time.After(30 * time.Second):
267-
t.Fatal("IndexWorkspace deadlocked — semaphore acquire/release likely inverted")
262+
t.Fatal("IndexWorkspace took too long — possible infinite loop or deadlock")
268263
}
269264

270265
mockS.mu.Lock()

0 commit comments

Comments
 (0)