Skip to content

Commit 8d6d6de

Browse files
committed
chore: documentation
1 parent cbc5ff1 commit 8d6d6de

File tree

1 file changed

+117
-19
lines changed

1 file changed

+117
-19
lines changed

laygo/pipeline.py

Lines changed: 117 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,31 @@
2828

2929
# This function must be defined at the top level of the module (e.g., after imports)
3030
def _branch_consumer_process[T](transformer: Transformer, queue: "Queue", context_handle: IContextHandle) -> list[Any]:
31-
"""
32-
The entry point for a consumer process. It reconstructs the necessary
33-
objects and runs a dedicated pipeline instance on the data from its queue.
31+
"""Entry point for a consumer process in parallel branching.
32+
33+
Reconstructs the necessary objects and runs a dedicated pipeline instance
34+
on the data from its queue. This function is called in separate processes
35+
during process-based parallel execution.
36+
37+
Args:
38+
transformer: The transformer to apply to the data.
39+
queue: Process-safe queue containing batched data items.
40+
context_handle: Handle to create a context proxy in the new process.
41+
42+
Returns:
43+
List of processed results from applying the transformer.
3444
"""
3545
# Re-create the context proxy within the new process
3646
context_proxy = context_handle.create_proxy()
3747

3848
def stream_from_queue() -> Iterator[T]:
39-
"""A generator that yields items from the process-safe queue."""
49+
"""Generate items from the process-safe queue.
50+
51+
Yields items from batches until a None sentinel is received.
52+
53+
Yields:
54+
Items from the queue batches.
55+
"""
4056
while (batch := queue.get()) is not None:
4157
yield from batch
4258

@@ -265,7 +281,9 @@ def to_list(self) -> tuple[list[T], dict[str, Any]]:
265281
and materializes all results into memory.
266282
267283
Returns:
268-
A list containing all processed items from the pipeline.
284+
A tuple containing:
285+
- A list of all processed items from the pipeline
286+
- The final context dictionary
269287
270288
Note:
271289
This operation consumes the pipeline's iterator, making subsequent
@@ -274,7 +292,7 @@ def to_list(self) -> tuple[list[T], dict[str, Any]]:
274292
return list(self.processed_data), self.context_manager.to_dict()
275293

276294
def each(self, function: PipelineFunction[T]) -> tuple[None, dict[str, Any]]:
277-
"""Apply a function to each element (terminal operation).
295+
"""Apply a function to each element for side effects.
278296
279297
This is a terminal operation that processes each element for side effects
280298
and consumes the pipeline's iterator without returning results.
@@ -283,6 +301,11 @@ def each(self, function: PipelineFunction[T]) -> tuple[None, dict[str, Any]]:
283301
function: The function to apply to each element. Should be used for
284302
side effects like logging, updating external state, etc.
285303
304+
Returns:
305+
A tuple containing:
306+
- None (no results are collected)
307+
- The final context dictionary
308+
286309
Note:
287310
This operation consumes the pipeline's iterator, making subsequent
288311
operations on the same pipeline return empty results.
@@ -293,7 +316,7 @@ def each(self, function: PipelineFunction[T]) -> tuple[None, dict[str, Any]]:
293316
return None, self.context_manager.to_dict()
294317

295318
def first(self, n: int = 1) -> tuple[list[T], dict[str, Any]]:
296-
"""Get the first n elements of the pipeline (terminal operation).
319+
"""Get the first n elements of the pipeline.
297320
298321
This is a terminal operation that consumes up to n elements from the
299322
pipeline's iterator and returns them as a list.
@@ -302,8 +325,10 @@ def first(self, n: int = 1) -> tuple[list[T], dict[str, Any]]:
302325
n: The number of elements to retrieve. Must be at least 1.
303326
304327
Returns:
305-
A list containing the first n elements, or fewer if the pipeline
306-
contains fewer than n elements.
328+
A tuple containing:
329+
- A list containing the first n elements, or fewer if the pipeline
330+
contains fewer than n elements
331+
- The final context dictionary
307332
308333
Raises:
309334
AssertionError: If n is less than 1.
@@ -316,12 +341,17 @@ def first(self, n: int = 1) -> tuple[list[T], dict[str, Any]]:
316341
return list(itertools.islice(self.processed_data, n)), self.context_manager.to_dict()
317342

318343
def consume(self) -> tuple[None, dict[str, Any]]:
319-
"""Consume the pipeline without returning results (terminal operation).
344+
"""Consume the pipeline without returning results.
320345
321346
This is a terminal operation that processes all elements in the pipeline
322347
for their side effects without materializing any results. Useful when
323348
the pipeline operations have side effects and you don't need the results.
324349
350+
Returns:
351+
A tuple containing:
352+
- None (no results are collected)
353+
- The final context dictionary
354+
325355
Note:
326356
This operation consumes the pipeline's iterator, making subsequent
327357
operations on the same pipeline return empty results.
@@ -337,7 +367,16 @@ def _producer_fanout(
337367
queues: dict[str, Queue],
338368
batch_size: int,
339369
) -> None:
340-
"""Producer for fan-out: sends every item to every branch."""
370+
"""Producer for fan-out mode.
371+
372+
Sends every item to every branch. Used for unconditional branching
373+
where all branches process all items.
374+
375+
Args:
376+
source_iterator: The source data iterator.
377+
queues: Dictionary mapping branch names to their queues.
378+
batch_size: Number of items per batch.
379+
"""
341380
for batch_tuple in itertools.batched(source_iterator, batch_size):
342381
batch_list = list(batch_tuple)
343382
for q in queues.values():
@@ -352,7 +391,17 @@ def _producer_router(
352391
parsed_branches: list[tuple[str, Transformer, Callable]],
353392
batch_size: int,
354393
) -> None:
355-
"""Producer for router (`first_match=True`): sends item to the first matching branch."""
394+
"""Producer for router mode.
395+
396+
Sends each item to the first matching branch when first_match=True.
397+
This implements conditional routing where items go to exactly one branch.
398+
399+
Args:
400+
source_iterator: The source data iterator.
401+
queues: Dictionary mapping branch names to their queues.
402+
parsed_branches: List of (name, transformer, condition) tuples.
403+
batch_size: Number of items per batch.
404+
"""
356405
buffers = {name: [] for name, _, _ in parsed_branches}
357406
for item in source_iterator:
358407
for name, _, condition in parsed_branches:
@@ -376,7 +425,17 @@ def _producer_broadcast(
376425
parsed_branches: list[tuple[str, Transformer, Callable]],
377426
batch_size: int,
378427
) -> None:
379-
"""Producer for broadcast (`first_match=False`): sends item to all matching branches."""
428+
"""Producer for broadcast mode.
429+
430+
Sends each item to all matching branches when first_match=False.
431+
This implements conditional broadcasting where items can go to multiple branches.
432+
433+
Args:
434+
source_iterator: The source data iterator.
435+
queues: Dictionary mapping branch names to their queues.
436+
parsed_branches: List of (name, transformer, condition) tuples.
437+
batch_size: Number of items per batch.
438+
"""
380439
buffers = {name: [] for name, _, _ in parsed_branches}
381440
for item in source_iterator:
382441
item_matches = [name for name, _, condition in parsed_branches if condition(item)]
@@ -394,8 +453,6 @@ def _producer_broadcast(
394453
for q in queues.values():
395454
q.put(None)
396455

397-
# In your Pipeline class
398-
399456
# Overload 1: Unconditional fan-out
400457
@overload
401458
def branch(
@@ -502,7 +559,22 @@ def _execute_branching_process(
502559
batch_size: int,
503560
max_batch_buffer: int,
504561
) -> tuple[dict[str, list[Any]], dict[str, Any]]:
505-
"""Branching execution using a process pool for consumers."""
562+
"""Execute branching using a process pool for consumers.
563+
564+
Uses multiprocessing for true CPU parallelism. The producer runs in a
565+
thread while consumers run in separate processes.
566+
567+
Args:
568+
producer_fn: The producer function to use for routing items.
569+
parsed_branches: List of (name, transformer, condition) tuples.
570+
batch_size: Number of items per batch.
571+
max_batch_buffer: Maximum number of batches to buffer per branch.
572+
573+
Returns:
574+
A tuple containing:
575+
- Dictionary mapping branch names to their result lists
576+
- The final context dictionary
577+
"""
506578
source_iterator = self.processed_data
507579
num_branches = len(parsed_branches)
508580
final_results: dict[str, list[Any]] = {name: [] for name, _, _ in parsed_branches}
@@ -561,15 +633,41 @@ def _execute_branching_thread(
561633
batch_size: int,
562634
max_batch_buffer: int,
563635
) -> tuple[dict[str, list[Any]], dict[str, Any]]:
564-
"""Shared execution logic for thread-based branching modes."""
565-
# ... (The original implementation of _execute_branching goes here)
636+
"""Execute branching using a thread pool for consumers.
637+
638+
Uses threading for I/O-bound tasks. Both producer and consumers run
639+
in separate threads within the same process.
640+
641+
Args:
642+
producer_fn: The producer function to use for routing items.
643+
parsed_branches: List of (name, transformer, condition) tuples.
644+
batch_size: Number of items per batch.
645+
max_batch_buffer: Maximum number of batches to buffer per branch.
646+
647+
Returns:
648+
A tuple containing:
649+
- Dictionary mapping branch names to their result lists
650+
- The final context dictionary
651+
"""
566652
source_iterator = self.processed_data
567653
num_branches = len(parsed_branches)
568654
final_results: dict[str, list[Any]] = {name: [] for name, _, _ in parsed_branches}
569655
queues = {name: Queue(maxsize=max_batch_buffer) for name, _, _ in parsed_branches}
570656

571657
def consumer(transformer: Transformer, queue: Queue, context_handle: IContextHandle) -> list[Any]:
572-
"""Consumes batches from a queue and processes them."""
658+
"""Consume batches from a queue and process them with a transformer.
659+
660+
Creates a mini-pipeline for the transformer and processes all
661+
batches from the queue until completion.
662+
663+
Args:
664+
transformer: The transformer to apply to the data.
665+
queue: Queue containing batched data items.
666+
context_handle: Handle to create a context proxy.
667+
668+
Returns:
669+
List of processed results from applying the transformer.
670+
"""
573671

574672
def stream_from_queue() -> Iterator[T]:
575673
while (batch := queue.get()) is not None:

0 commit comments

Comments
 (0)