88import logging
99import os
1010import time
11+ from concurrent .futures import ProcessPoolExecutor , ThreadPoolExecutor , as_completed
1112from dataclasses import dataclass , asdict
1213from pathlib import Path
13- from typing import Dict , List , Optional , Any
14+ from typing import Dict , List , Optional , Any , Tuple
1415
1516from .strategies import StrategyFactory
1617from .models import SymbolInfo , FileInfo
@@ -70,14 +71,53 @@ def __init__(self, project_path: str, additional_excludes: Optional[List[str]] =
7071 fallback = len (self .strategy_factory .get_fallback_extensions ())
7172 logger .info (f"Specialized parsers: { specialized } extensions, Fallback coverage: { fallback } extensions" )
7273
73- def build_index (self ) -> Dict [ str , Any ]:
74+ def _process_file (self , file_path : str , specialized_extensions : set ) -> Optional [ Tuple [ Dict , Dict , str , bool ] ]:
7475 """
75- Build the complete index using Strategy pattern.
76+ Process a single file - designed for parallel execution.
77+
78+ Args:
79+ file_path: Path to the file to process
80+ specialized_extensions: Set of extensions with specialized parsers
81+
82+ Returns:
83+ Tuple of (symbols, file_info, language, is_specialized) or None on error
84+ """
85+ try :
86+ with open (file_path , 'r' , encoding = 'utf-8' , errors = 'ignore' ) as f :
87+ content = f .read ()
88+
89+ ext = Path (file_path ).suffix .lower ()
90+ rel_path = os .path .relpath (file_path , self .project_path ).replace ('\\ ' , '/' )
91+
92+ # Get appropriate strategy
93+ strategy = self .strategy_factory .get_strategy (ext )
94+
95+ # Track strategy usage
96+ is_specialized = ext in specialized_extensions
97+
98+ # Parse file using strategy
99+ symbols , file_info = strategy .parse_file (rel_path , content )
100+
101+ logger .debug (f"Parsed { rel_path } : { len (symbols )} symbols ({ file_info .language } )" )
102+
103+ return (symbols , {rel_path : file_info }, file_info .language , is_specialized )
104+
105+ except Exception as e :
106+ logger .warning (f"Error processing { file_path } : { e } " )
107+ return None
108+
109+ def build_index (self , parallel : bool = True , max_workers : Optional [int ] = None ) -> Dict [str , Any ]:
110+ """
111+ Build the complete index using Strategy pattern with parallel processing.
112+
113+ Args:
114+ parallel: Whether to use parallel processing (default: True)
115+ max_workers: Maximum number of worker processes/threads (default: CPU count)
76116
77117 Returns:
78118 Complete JSON index with metadata, symbols, and file information
79119 """
80- logger .info ("Building JSON index using Strategy pattern..." )
120+ logger .info (f "Building JSON index using Strategy pattern (parallel= { parallel } ) ..." )
81121 start_time = time .time ()
82122
83123 all_symbols = {}
@@ -88,39 +128,67 @@ def build_index(self) -> Dict[str, Any]:
88128
89129 # Get specialized extensions for tracking
90130 specialized_extensions = set (self .strategy_factory .get_specialized_extensions ())
91-
92- # Traverse project files
93- for file_path in self ._get_supported_files ():
94- try :
95- with open (file_path , 'r' , encoding = 'utf-8' , errors = 'ignore' ) as f :
96- content = f .read ()
97-
98- ext = Path (file_path ).suffix .lower ()
99-
100- # Convert to relative path first
101- rel_path = os .path .relpath (file_path , self .project_path ).replace ('\\ ' , '/' )
102-
103- # Get appropriate strategy
104- strategy = self .strategy_factory .get_strategy (ext )
105-
106- # Track strategy usage
107- if ext in specialized_extensions :
108- specialized_count += 1
109- else :
110- fallback_count += 1
111-
112- # Parse file using strategy with relative path
113- symbols , file_info = strategy .parse_file (rel_path , content )
114-
115- # Add to index
116- all_symbols .update (symbols )
117- all_files [rel_path ] = file_info
118- languages .add (file_info .language )
119-
120- logger .debug (f"Parsed { rel_path } : { len (symbols )} symbols ({ file_info .language } )" )
121-
122- except Exception as e :
123- logger .warning (f"Error processing { file_path } : { e } " )
131+
132+ # Get list of files to process
133+ files_to_process = self ._get_supported_files ()
134+ total_files = len (files_to_process )
135+
136+ if total_files == 0 :
137+ logger .warning ("No files to process" )
138+ return self ._create_empty_index ()
139+
140+ logger .info (f"Processing { total_files } files..." )
141+
142+ if parallel and total_files > 1 :
143+ # Use ThreadPoolExecutor for I/O-bound file reading
144+ # ProcessPoolExecutor has issues with strategy sharing
145+ if max_workers is None :
146+ max_workers = min (os .cpu_count () or 4 , total_files )
147+
148+ logger .info (f"Using parallel processing with { max_workers } workers" )
149+
150+ with ThreadPoolExecutor (max_workers = max_workers ) as executor :
151+ # Submit all tasks
152+ future_to_file = {
153+ executor .submit (self ._process_file , file_path , specialized_extensions ): file_path
154+ for file_path in files_to_process
155+ }
156+
157+ # Process completed tasks
158+ processed = 0
159+ for future in as_completed (future_to_file ):
160+ file_path = future_to_file [future ]
161+ result = future .result ()
162+
163+ if result :
164+ symbols , file_info_dict , language , is_specialized = result
165+ all_symbols .update (symbols )
166+ all_files .update (file_info_dict )
167+ languages .add (language )
168+
169+ if is_specialized :
170+ specialized_count += 1
171+ else :
172+ fallback_count += 1
173+
174+ processed += 1
175+ if processed % 100 == 0 :
176+ logger .debug (f"Processed { processed } /{ total_files } files" )
177+ else :
178+ # Sequential processing
179+ logger .info ("Using sequential processing" )
180+ for file_path in files_to_process :
181+ result = self ._process_file (file_path , specialized_extensions )
182+ if result :
183+ symbols , file_info_dict , language , is_specialized = result
184+ all_symbols .update (symbols )
185+ all_files .update (file_info_dict )
186+ languages .add (language )
187+
188+ if is_specialized :
189+ specialized_count += 1
190+ else :
191+ fallback_count += 1
124192
125193 # Build index metadata
126194 metadata = IndexMetadata (
@@ -150,6 +218,25 @@ def build_index(self) -> Dict[str, Any]:
150218 logger .info (f"Strategy usage: { specialized_count } specialized, { fallback_count } fallback" )
151219
152220 return index
221+
222+ def _create_empty_index (self ) -> Dict [str , Any ]:
223+ """Create an empty index structure."""
224+ metadata = IndexMetadata (
225+ project_path = self .project_path ,
226+ indexed_files = 0 ,
227+ index_version = "2.0.0-strategy" ,
228+ timestamp = time .strftime ("%Y-%m-%dT%H:%M:%SZ" , time .gmtime ()),
229+ languages = [],
230+ total_symbols = 0 ,
231+ specialized_parsers = 0 ,
232+ fallback_files = 0
233+ )
234+
235+ return {
236+ "metadata" : asdict (metadata ),
237+ "symbols" : {},
238+ "files" : {}
239+ }
153240
154241 def get_index (self ) -> Optional [Dict [str , Any ]]:
155242 """Get the current in-memory index."""
0 commit comments