Skip to content

Commit d852ed1

Browse files
author
razvan
committed
perf: reduce indexer embed concurrency and add progress display
- pkg/indexer/service.go: numWorkers=1 (Ollama is serial, parallelism only queues requests and blocks search queries); add \r progress line showing % files remaining during indexing - pkg/llm/ollama.go: remove unused mu sync.Mutex field - internal/service/search/search.go: CollectionExists cache (60s TTL) - internal/service/tools/search_local_index.go: remove semaphore from graph expansion (ExactSearch is metadata-only, no Ollama calls) - internal/service/engine/engine.go: TIMER instrumentation for SearchCode
1 parent 77e399b commit d852ed1

File tree

5 files changed

+76
-23
lines changed

5 files changed

+76
-23
lines changed

internal/service/engine/engine.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -257,10 +257,13 @@ func (e *ErrIndexingStarted) Error() string {
257257
// embeds the query ONCE, then fans out in parallel to all language collections.
258258
// includeDocs=false searches code only. Triggers background indexing if needed.
259259
func (e *Engine) SearchCode(ctx context.Context, filePath, queryText string, limit int, includeDocs bool) (*SearchCodeResult, error) {
260+
t0 := time.Now()
261+
260262
wctx, err := e.DetectContext(ctx, filePath)
261263
if err != nil {
262264
return nil, err
263265
}
266+
log.Printf("[TIMER] SearchCode detect_context=%v", time.Since(t0))
264267

265268
// Primary language from file extension
266269
primaryLang := "go"
@@ -270,7 +273,9 @@ func (e *Engine) SearchCode(ctx context.Context, filePath, queryText string, lim
270273

271274
// Ensure at least the primary collection exists before embedding (fast fail + indexing trigger)
272275
primaryColl := wctx.CollectionName(primaryLang)
276+
t1 := time.Now()
273277
exists, err := e.search.CollectionExists(ctx, primaryColl)
278+
log.Printf("[TIMER] SearchCode collection_exists_primary=%v (cached=%v)", time.Since(t1), time.Since(t1) < time.Millisecond)
274279
if err != nil {
275280
return nil, fmt.Errorf("failed to check collection: %w", err)
276281
}
@@ -293,7 +298,9 @@ func (e *Engine) SearchCode(ctx context.Context, filePath, queryText string, lim
293298
langs = []string{"go", "python", "php", "html"}
294299
}
295300

301+
t2 := time.Now()
296302
vector, err := e.search.EmbedQuery(ctx, queryText)
303+
log.Printf("[TIMER] SearchCode embed=%v", time.Since(t2))
297304
if err != nil {
298305
return nil, fmt.Errorf("embedding failed: %w", err)
299306
}
@@ -303,16 +310,19 @@ func (e *Engine) SearchCode(ctx context.Context, filePath, queryText string, lim
303310
coll string
304311
results []storage.SearchResult
305312
err error
313+
elapsed time.Duration
306314
}
307315

308316
resultsChan := make(chan langResult, len(langs))
309317
var wg sync.WaitGroup
318+
t3 := time.Now()
310319

311320
for _, lang := range langs {
312321
coll := wctx.CollectionName(lang)
313322
wg.Add(1)
314323
go func(l, c string) {
315324
defer wg.Done()
325+
gt := time.Now()
316326
ok, chkErr := e.search.CollectionExists(ctx, c)
317327
if chkErr != nil || !ok {
318328
return
@@ -324,19 +334,21 @@ func (e *Engine) SearchCode(ctx context.Context, filePath, queryText string, lim
324334
} else {
325335
res, sErr = e.search.SearchCodeWithVector(ctx, c, vector, limit)
326336
}
337+
elapsed := time.Since(gt)
327338
if sErr != nil {
328339
log.Printf("[WARN] SearchCode: fan-out failed for %s: %v", c, sErr)
329-
resultsChan <- langResult{lang: l, coll: c, err: sErr}
340+
resultsChan <- langResult{lang: l, coll: c, err: sErr, elapsed: elapsed}
330341
return
331342
}
332343
if len(res) > 0 {
333-
resultsChan <- langResult{lang: l, coll: c, results: res}
344+
resultsChan <- langResult{lang: l, coll: c, results: res, elapsed: elapsed}
334345
}
335346
}(lang, coll)
336347
}
337348

338349
wg.Wait()
339350
close(resultsChan)
351+
log.Printf("[TIMER] SearchCode fanout_total=%v (langs=%d)", time.Since(t3), len(langs))
340352

341353
// Merge: primary lang results first, others appended; surface first error if no results
342354
var primaryResults []storage.SearchResult
@@ -347,11 +359,13 @@ func (e *Engine) SearchCode(ctx context.Context, filePath, queryText string, lim
347359

348360
for lr := range resultsChan {
349361
if lr.err != nil {
362+
log.Printf("[TIMER] SearchCode lang=%s err elapsed=%v", lr.lang, lr.elapsed)
350363
if firstErr == nil {
351364
firstErr = lr.err
352365
}
353366
continue
354367
}
368+
log.Printf("[TIMER] SearchCode lang=%s hits=%d elapsed=%v", lr.lang, len(lr.results), lr.elapsed)
355369
if lr.coll == primaryColl {
356370
primaryResults = lr.results
357371
} else {
@@ -373,6 +387,9 @@ func (e *Engine) SearchCode(ctx context.Context, filePath, queryText string, lim
373387
return nil, fmt.Errorf("search failed: %w", firstErr)
374388
}
375389

390+
log.Printf("[TIMER] SearchCode TOTAL=%v (detect=%v embed=%v fanout=%v)",
391+
time.Since(t0), t1.Sub(t0), t2.Sub(t1), time.Since(t3))
392+
376393
return &SearchCodeResult{
377394
Results: all,
378395
WorkspaceRoot: wctx.Root,

internal/service/search/search.go

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,25 @@ import (
77
"math"
88
"sort"
99
"strings"
10+
"sync"
11+
"time"
1012

1113
"github.com/doITmagic/rag-code-mcp/internal/service/internalutil"
1214
"github.com/doITmagic/rag-code-mcp/pkg/llm"
1315
"github.com/doITmagic/rag-code-mcp/pkg/storage"
1416
)
1517

18+
const collectionExistsTTL = 60 * time.Second
19+
20+
type collectionCacheEntry struct {
21+
exists bool
22+
expiry time.Time
23+
}
24+
1625
type Service struct {
17-
embedder llm.Provider
18-
store storage.VectorStore
26+
embedder llm.Provider
27+
store storage.VectorStore
28+
collectionCache sync.Map // map[string]*collectionCacheEntry
1929
}
2030

2131
// NewService creates a new search service.
@@ -101,8 +111,33 @@ func (s *Service) ExactSearch(ctx context.Context, collection string, filter map
101111
}
102112

103113
// CollectionExists checks whether a collection exists in the vector store.
114+
// Results are cached for 60 s to avoid hammering Qdrant on every fan-out query.
104115
func (s *Service) CollectionExists(ctx context.Context, collection string) (bool, error) {
105-
return s.store.CollectionExists(ctx, collection)
116+
now := time.Now()
117+
if v, ok := s.collectionCache.Load(collection); ok {
118+
entry := v.(*collectionCacheEntry)
119+
if now.Before(entry.expiry) {
120+
return entry.exists, nil
121+
}
122+
}
123+
exists, err := s.store.CollectionExists(ctx, collection)
124+
if err != nil {
125+
return false, err
126+
}
127+
// Cache only positive results — "doesn't exist" can flip quickly after indexing.
128+
if exists {
129+
s.collectionCache.Store(collection, &collectionCacheEntry{
130+
exists: true,
131+
expiry: now.Add(collectionExistsTTL),
132+
})
133+
}
134+
return exists, nil
135+
}
136+
137+
// InvalidateCollectionCache removes the cached existence entry for a collection.
138+
// Call this after creating or dropping a collection.
139+
func (s *Service) InvalidateCollectionCache(collection string) {
140+
s.collectionCache.Delete(collection)
106141
}
107142

108143
// HybridSearch combines semantic search with basic lexical re-ranking.

internal/service/tools/search_local_index.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,6 @@ func (t *SearchLocalIndexTool) Execute(ctx context.Context, params map[string]in
231231
var wg sync.WaitGroup
232232
seenTargets := make(map[string]bool)
233233
expanded := 0
234-
sem := make(chan struct{}, 3) // limit concurrent DB lookups
235234

236235
for _, relRaw := range relList {
237236
if expanded >= maxExpansions {
@@ -251,8 +250,6 @@ func (t *SearchLocalIndexTool) Execute(ctx context.Context, params map[string]in
251250
wg.Add(1)
252251
go func(name string) {
253252
defer wg.Done()
254-
sem <- struct{}{}
255-
defer func() { <-sem }()
256253
// ExactSearch only — zero embedding, deterministic.
257254
// No fallback to embedding search: relation names are often stdlib/external
258255
// symbols not in the local index, and each embedding call costs ~N seconds.

pkg/indexer/service.go

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"runtime"
1212
"strings"
1313
"sync"
14+
"sync/atomic"
1415

1516
"github.com/doITmagic/rag-code-mcp/pkg/llm"
1617
"github.com/doITmagic/rag-code-mcp/pkg/parser"
@@ -129,7 +130,8 @@ func (s *Service) IndexWorkspace(ctx context.Context, root string, collection st
129130
}
130131
}
131132

132-
log.Printf("[INFO] Indexing %d changed files in %s (Language: %s)", len(changedFiles), root, opts.Language)
133+
totalFiles := len(changedFiles)
134+
log.Printf("[INFO] Indexing %d changed files in %s (Language: %s)", totalFiles, root, opts.Language)
133135

134136
// 4. Process changed files in parallel (file-level worker pool).
135137
// State.UpdateFile is mutex-protected; errors are collected safely.
@@ -141,23 +143,28 @@ func (s *Service) IndexWorkspace(ctx context.Context, root string, collection st
141143
numFileWorkers = 8
142144
}
143145

144-
filePaths := make(chan string, len(changedFiles))
146+
filePaths := make(chan string, totalFiles)
145147
for _, p := range changedFiles {
146148
filePaths <- p
147149
}
148150
close(filePaths)
149151

150152
var (
151-
fileWg sync.WaitGroup
152-
errMu sync.Mutex
153-
fileErrs []string
153+
fileWg sync.WaitGroup
154+
errMu sync.Mutex
155+
fileErrs []string
156+
doneFiles atomic.Int64
154157
)
155158

156159
for i := 0; i < numFileWorkers; i++ {
157160
fileWg.Add(1)
158161
go func() {
159162
defer fileWg.Done()
160163
for path := range filePaths {
164+
n := int(doneFiles.Add(1))
165+
remaining := totalFiles - n
166+
pct := remaining * 100 / totalFiles
167+
fmt.Fprintf(os.Stderr, "\r[INDEX] %s: %d%% (%d/%d files left) ", opts.Language, pct, remaining, totalFiles)
161168
if err := s.IndexFile(ctx, collection, path, state); err != nil {
162169
log.Printf("[ERROR] Failed to index %s: %v", path, err)
163170
errMu.Lock()
@@ -169,6 +176,7 @@ func (s *Service) IndexWorkspace(ctx context.Context, root string, collection st
169176
}
170177

171178
fileWg.Wait()
179+
fmt.Fprintf(os.Stderr, "\r[INDEX] %s: done (%d files indexed) \n", opts.Language, totalFiles)
172180

173181
if len(fileErrs) > 0 {
174182
log.Printf("[WARN] %d file(s) failed to index in %s", len(fileErrs), root)
@@ -224,14 +232,9 @@ func (s *Service) IndexItems(ctx context.Context, collection string, symbols []p
224232
return nil
225233
}
226234

227-
// Dynamic worker pool based on CPU
228-
numWorkers := 4
229-
if n := runtime.NumCPU() / 2; n > numWorkers {
230-
numWorkers = n
231-
}
232-
if numWorkers > 16 {
233-
numWorkers = 16 // Cap at 16 to avoid overwhelming downstream services
234-
}
235+
// Ollama processes embeds serially — multiple workers only queue up requests
236+
// and increase latency for concurrent search queries. Keep 1 worker.
237+
numWorkers := 1
235238

236239
type result struct {
237240
point storage.Point
@@ -308,6 +311,7 @@ func (s *Service) IndexItems(ctx context.Context, collection string, symbols []p
308311
}
309312
}
310313

314+
log.Printf("[INFO] Indexed %d symbols into %s", len(allPoints), collection)
311315
return nil
312316
}
313317

pkg/llm/ollama.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ import (
1616
type OllamaLLMProvider struct {
1717
embedModel llms.Model
1818
embedName string
19-
cachedDim uint64
20-
dimOnce sync.Once
19+
cachedDim uint64
20+
dimOnce sync.Once
2121
}
2222

2323
// NewOllamaLLMProvider creates a new Ollama provider configured for embedding only.

0 commit comments

Comments
 (0)