-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstream.go
More file actions
executable file
·10302 lines (10117 loc) · 472 KB
/
stream.go
File metadata and controls
executable file
·10302 lines (10117 loc) · 472 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package replify
import (
"bytes"
"compress/flate"
"compress/gzip"
"context"
"errors"
"fmt"
"io"
"net/http"
"sync"
"sync/atomic"
"time"
"github.com/sivaosorg/replify/pkg/strutil"
)
// Start initiates streaming operation and returns *wrapper for consistency with replify API.
//
// This function is the primary entry point for streaming operations. It validates prerequisites (streaming wrapper
// not nil, reader configured), prevents concurrent streaming on the same wrapper, selects and executes the configured
// streaming strategy (STRATEGY_DIRECT, STRATEGY_BUFFERED, or STRATEGY_CHUNKED), monitors operation completion,
// and returns a comprehensive response wrapping the result in the standard wrapper format. Start is the public API
// method that callers use to begin streaming; it handles all coordination, error management, and response formatting.
// The function sets up initial timestamps, manages the isStreaming state flag to prevent concurrent operations,
// executes the strategy-specific streaming function with the provided context, and populates the wrapper response
// with final statistics, metadata, and outcome information. Both success and failure paths return a *wrapper object
// with appropriate HTTP status codes, messages, and debugging information for client feedback and diagnostics. The
// streaming operation respects context cancellation, allowing caller-controlled shutdown. All per-chunk errors are
// accumulated; streaming continues when possible, enabling partial success scenarios. Final statistics include
// chunk counts, byte counts, compression metrics, timing information, and error tracking, providing comprehensive
// insight into streaming performance and health.
//
// Parameters:
// - ctx: Context for cancellation, timeouts, and coordination.
// If nil, uses sw.ctx (context from streaming wrapper creation).
// Passed to streaming strategy function for deadline enforcement.
// Cancellation stops streaming immediately.
// Parent context may have deadline affecting overall operation.
//
// Returns:
// - *wrapper: Response wrapper containing streaming result.
// HTTP status code (200, 400, 409, 500).
// Message describing outcome.
// Debugging information with statistics.
// Error information if operation failed.
// Always returns non-nil wrapper for consistency.
//
// Behavior:
// - Validation: checks nil wrapper, reader configuration.
// - Mutual exclusion: prevents concurrent streaming on same wrapper.
// - Strategy selection: routes to appropriate streaming implementation.
// - Error accumulation: collects per-chunk errors without stopping.
// - Response building: wraps result in standard wrapper format.
// - Statistics: populates final metrics and timing data.
// - Status codes: HTTP codes reflecting outcome (200, 400, 409, 500).
//
// Validation Stages:
//
// Stage Check Response if Failed
// ──────────────────────────────────────────────────────────────────────────────────────
// 1. Nil wrapper sw == nil Default bad request
// 2. Reader configured sw.reader != nil 400 Bad Request
// 3. Not already streaming !sw.isStreaming 409 Conflict
// 4. Strategy known Valid STRATEGY_* constant 500 Internal Server Error
//
// HTTP Status Code Mapping:
//
// Scenario Status Code Message
// ──────────────────────────────────────────────────────────────────────────
// Streaming wrapper is nil 400 (default response)
// Reader not configured 400 "reader not set for streaming"
// Streaming already in progress 409 "streaming already in progress"
// Unknown streaming strategy 500 "unknown streaming strategy: ..."
// Streaming operation failed (error) 500 "streaming error: ..."
// Streaming completed successfully 200 "Streaming completed successfully"
//
// Pre-Streaming Checks Flow:
//
// Input (sw, ctx)
// ↓
// sw == nil?
// ├─ Yes → Return respondStreamBadRequestDefault()
// └─ No → Continue
// ↓
// sw.reader == nil?
// ├─ Yes → Return 400 "reader not set for streaming"
// └─ No → Continue
// ↓
// sw.isStreaming (with lock)?
// ├─ Yes → Return 409 "streaming already in progress"
// └─ No → Set isStreaming = true, Continue
// ↓
// Proceed to strategy selection
//
// Streaming Lifecycle:
//
// Phase Action State
// ──────────────────────────────────────────────────────────────────────────
// 1. Initialization Lock and set isStreaming Locked
// 2. Setup Create context (use or default) Ready
// 3. Logging Log start time in debugging KV Tracked
// 4. Strategy dispatch Call appropriate stream function Executing
// 5. Error collection Monitor for streamErr In-flight
// 6. Finalization Lock and clear isStreaming Cleanup
// 7. Response building (success) Populate success statistics Response ready
// 8. Response building (failure) Populate error information Error response ready
// 9. Return Return wrapper to caller Complete
//
// Context Handling:
//
// Scenario Behavior Implication
// ──────────────────────────────────────────────────────────────────────────────
// ctx provided and non-nil Use provided context Caller controls deadline
// ctx is nil Use sw.ctx (if set) Fallback to wrapper context
// Both nil Pass nil to strategy No deadline enforcement
// Parent context has deadline Inherited by strategy Affects all operations
// Cancellation via context Strategy responds to Done() Responsive shutdown
//
// Strategy Selection:
//
// Configuration Selected Function Characteristics
// ──────────────────────────────────────────────────────────────────────────
// STRATEGY_DIRECT streamDirect() Sequential, simple
// STRATEGY_BUFFERED streamBuffered() Concurrent, high throughput
// STRATEGY_CHUNKED streamChunked() Sequential, detailed control
// Unknown strategy Error return 500 status, error message
//
// State Management (isStreaming flag):
//
// Operation Lock Held isStreaming Value Purpose
// ──────────────────────────────────────────────────────────────────────────
// Pre-check (initial state) Yes false Prevent concurrent start
// Set streaming active Yes true Mark operation in progress
// Unlock after setting No true Allow other operations
// Stream execution No true Streaming active
// Set streaming inactive Yes false Streaming complete
//
// Success Response Building:
//
// Field Source Format Purpose
// ────────────────────────────────────────────────────────────────────────────────────────
// StatusCode Constant 200 (HTTP OK) Success indicator
// Message Constant "Streaming completed..." User-facing message
// completed_at sw.stats.EndTime.Unix() Unix timestamp When completed
// total_chunks sw.stats.TotalChunks int64 Chunk count
// total_bytes sw.stats.TotalBytes int64 Byte count
// compressed_bytes sw.stats.CompressedBytes int64 Compressed size
// compression_ratio sw.stats.CompressionRatio "0.xx" format Ratio display
// duration_ms EndTime - StartTime Milliseconds Operation duration
//
// Failure Response Building:
//
// Field Source Format Purpose
// ────────────────────────────────────────────────────────────────────────────────────────
// StatusCode Constant 500 (HTTP error) Failure indicator
// Error (via WithErrorAck) streamErr Error message Error details
// failed_chunks sw.stats.FailedChunks int64 Failed chunk count
// total_errors len(sw.errors) int64 Error count
//
// Example:
//
// // Example 1: Simple streaming with default context
// file, _ := os.Open("large_file.bin")
// defer file.Close()
//
// streaming := replify.New().
// WithStatusCode(200).
// WithPath("/api/stream/start").
// WithStreaming(file, nil).
// WithChunkSize(1024 * 1024).
// WithStreamingStrategy(STRATEGY_DIRECT).
// WithCallback(func(p *StreamProgress, err error) {
// if err == nil && p.CurrentChunk % 100 == 0 {
// fmt.Printf("Progress: %.1f%%\n", p.Percentage)
// }
// })
//
// // Start streaming (uses nil context, falls back to wrapper's context)
// result := streaming.Start(nil)
//
// if result.IsError() {
// fmt.Printf("Error: %s\n", result.Error())
// } else {
// fmt.Printf("Success: %s\n", result.Message())
// fmt.Printf("Chunks: %v\n", result.Debugging()["total_chunks"])
// }
//
// // Example 2: Streaming with cancellation context
// func StreamWithTimeout(fileReader io.ReadCloser) *wrapper {
// ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
// defer cancel()
//
// streaming := replify.New().
// WithStatusCode(200).
// WithPath("/api/stream/timeout").
// WithStreaming(fileReader, nil).
// WithChunkSize(512 * 1024).
// WithStreamingStrategy(STRATEGY_BUFFERED).
// WithMaxConcurrentChunks(4)
//
// // Start with context timeout
// result := streaming.Start(ctx)
//
// return result
// }
//
// // Example 3: Comprehensive error handling
// func StreamWithCompleteErrorHandling(fileReader io.ReadCloser) {
// streaming := replify.New().
// WithStatusCode(200).
// WithPath("/api/stream/errors").
// WithStreaming(fileReader, nil).
// WithChunkSize(256 * 1024).
// WithStreamingStrategy(STRATEGY_CHUNKED).
// WithCompressionType(COMP_GZIP).
// WithCallback(func(p *StreamProgress, err error) {
// if err != nil {
// fmt.Printf("Chunk %d failed: %v\n", p.CurrentChunk, err)
// }
// })
//
// result := streaming.Start(context.Background())
//
// // Analyze result
// if result.IsError() {
// fmt.Printf("Status: %d\n", result.StatusCode())
// fmt.Printf("Error: %s\n", result.Error())
//
// debugging := result.Debugging()
// fmt.Printf("Failed chunks: %v\n", debugging["failed_chunks"])
// fmt.Printf("Total errors: %v\n", debugging["total_errors"])
// } else {
// fmt.Printf("Status: %d\n", result.StatusCode())
// fmt.Printf("Message: %s\n", result.Message())
//
// debugging := result.Debugging()
// fmt.Printf("Total chunks: %v\n", debugging["total_chunks"])
// fmt.Printf("Total bytes: %v\n", debugging["total_bytes"])
// fmt.Printf("Compression ratio: %v\n", debugging["compression_ratio"])
// fmt.Printf("Duration: %vms\n", debugging["duration_ms"])
// }
// }
//
// // Example 4: Streaming with concurrent protection check
// func DemonstrateConcurrencyProtection(fileReader io.ReadCloser) {
// streaming := replify.New().
// WithStatusCode(200).
// WithPath("/api/stream/concurrency").
// WithStreaming(fileReader, nil).
// WithChunkSize(1024 * 1024)
//
// // First call succeeds
// result1 := streaming.Start(context.Background())
// fmt.Printf("First start: %d\n", result1.StatusCode()) // 200
//
// // Second call (if first is slow) would get:
// // result2 := streaming.Start(context.Background())
// // fmt.Printf("Second start: %d\n", result2.StatusCode()) // 409 Conflict
// }
//
// // Example 5: All three strategies comparison
// func CompareStreamingStrategies(fileReader io.ReadCloser) {
// strategies := []interface {}{
// STRATEGY_DIRECT,
// STRATEGY_CHUNKED,
// STRATEGY_BUFFERED,
// }
//
// for _, strategy := range strategies {
// fmt.Printf("\nTesting strategy: %v\n", strategy)
//
// // Create fresh streaming wrapper for each strategy
// newReader := createNewReader() // Create fresh reader
//
// streaming := replify.New().
// WithStatusCode(200).
// WithPath("/api/stream/compare").
// WithStreaming(newReader, nil).
// WithChunkSize(512 * 1024).
// WithStreamingStrategy(strategy)
//
// if strategy == STRATEGY_BUFFERED {
// streaming = streaming.WithMaxConcurrentChunks(4)
// }
//
// start := time.Now()
// result := streaming.Start(context.Background())
// duration := time.Since(start)
//
// if result.IsError() {
// fmt.Printf(" Status: %d (ERROR)\n", result.StatusCode())
// } else {
// debugging := result.Debugging()
// fmt.Printf(" Status: %d (OK)\n", result.StatusCode())
// fmt.Printf(" Duration: %v\n", duration)
// fmt.Printf(" Chunks: %v\n", debugging["total_chunks"])
// fmt.Printf(" Bytes: %v\n", debugging["total_bytes"])
// }
// }
// }
//
// Mutual Exclusion Pattern:
//
// Scenario Behavior Response
// ────────────────────────────────────────────────────────────────────────────────
// First Start() call Acquires lock, sets isStreaming Proceeds normally
// Concurrent Start() during streaming Checks isStreaming, finds true 409 Conflict
// Start() after completion isStreaming cleared, lock free Proceeds normally
// Rapid successive calls Mutex serializes access First waits, others get 409
//
// Error Propagation:
//
// Error Origin Recorded? Returned in Response? Status Code
// ──────────────────────────────────────────────────────────────────────────────────
// Reader validation No Yes (in message) 400
// Concurrent streaming No Yes (in message) 409
// Unknown strategy Yes Yes (via streamErr) 500
// Strategy execution (streaming) Yes Yes (via streamErr) 500
// Per-chunk errors Yes Yes (via failed_chunks) 200 or 500
//
// Statistics Tracking:
//
// Metric Updated By When Updated
// ────────────────────────────────────────────────────────────────────────────
// StartTime Start() At initialization
// EndTime Start() after strategy returns After streaming
// TotalChunks updateProgress() Per chunk
// TotalBytes updateProgress() Per chunk
// CompressedBytes Strategy functions Per chunk (if compressed)
// CompressionRatio GetStats() calculation On query
// FailedChunks Strategy on error On chunk error
// Errors list recordError() On any error
//
// Best Practices:
//
// 1. ALWAYS HANDLE RETURNED WRAPPER RESPONSE
// - Check status code for success/failure
// - Log or report error messages
// - Provide debugging info to caller
// - Pattern:
// result := streaming.Start(ctx)
// if result.IsError() {
// // Handle error
// } else {
// // Handle success
// }
//
// 2. PROVIDE CONTEXT WITH DEADLINE WHEN POSSIBLE
// - Enables timeout enforcement
// - Allows caller-controlled shutdown
// - Prevents indefinite blocking
// - Pattern:
// ctx, cancel := context.WithTimeout(context.Background(), timeout)
// defer cancel()
// result := streaming.Start(ctx)
//
// 3. USE CALLBACKS FOR PROGRESS MONITORING
// - Receive updates per chunk
// - Track errors in real-time
// - Update UI/logs continuously
// - Pattern:
// WithCallback(func(p *StreamProgress, err error) {
// // Handle progress or error
// })
//
// 4. CHOOSE STRATEGY BASED ON USE CASE
// - STRATEGY_DIRECT: simplicity, small files
// - STRATEGY_CHUNKED: control, detailed tracking
// - STRATEGY_BUFFERED: throughput, large files
// - Pattern:
// WithStreamingStrategy(STRATEGY_BUFFERED)
//
// 5. EXAMINE DEBUGGING INFO FOR DIAGNOSTICS
// - Check failed_chunks count
// - Review total_errors
// - Analyze compression_ratio
// - Track duration_ms for performance
// - Pattern:
// debugging := result.Debugging()
// if debugging["failed_chunks"] > 0 {
// // Investigate failures
// }
//
// Related Methods and Lifecycle:
//
// Method Purpose Related To
// ──────────────────────────────────────────────────────────────────
// Start() Initiate streaming (this) Entry point
// WithStreaming() Configure reader/writer Pre-streaming setup
// WithStreamingStrategy() Select strategy Pre-streaming setup
// streamDirect() Direct execution Called by Start
// streamChunked() Chunked execution Called by Start
// streamBuffered() Buffered execution Called by Start
// GetStats() Query final statistics Post-streaming
// GetProgress() Query current progress During/after streaming
// Cancel() Stop streaming Control operation
// Errors() Retrieve error list Post-streaming analysis
//
// See Also:
// - WithStreamingStrategy: Configure streaming approach before Start
// - WithStreaming: Set reader/writer configuration
// - WithChunkSize: Configure chunk size
// - GetStats: Query final statistics after Start completes
// - GetProgress: Query current progress during streaming
// - Cancel: Stop streaming operation
// - Errors: Retrieve accumulated error list
// - context.WithTimeout: Create context with deadline
// - context.WithCancel: Create cancellable context
func (sw *StreamingWrapper) Start(ctx context.Context) *wrapper {
if sw == nil {
return respondStreamBadRequestDefault()
}
if sw.reader == nil {
return sw.wrapper.
WithStatusCode(http.StatusBadRequest).
WithMessage("reader not set for streaming").
BindCause()
}
sw.mu.Lock()
if sw.isStreaming {
sw.mu.Unlock()
return sw.wrapper.
WithStatusCode(http.StatusConflict).
WithMessage("streaming already in progress").
BindCause()
}
sw.isStreaming = true
sw.mu.Unlock()
// Update status
sw.wrapper.
WithMessage("Streaming started").
WithDebuggingKV("started_at_ms", time.Now().UnixMilli())
// Use provided context or wrapper's context
if ctx == nil {
ctx = sw.ctx
}
var streamErr error
// Check if we're sending (compressing) or receiving (decompressing)
// and call appropriate streaming function, capturing any error
if sw.config.IsReceiving {
// Receiving mode: decompress incoming data
switch sw.config.Strategy {
case StrategyDirect:
streamErr = sw.streamReceiveDirect(ctx)
case StrategyBuffered:
streamErr = sw.streamReceiveBuffered(ctx)
case StrategyChunked:
streamErr = sw.streamReceiveChunked(ctx)
default:
streamErr = NewErrorf("unknown streaming strategy: %s", string(sw.config.Strategy))
}
} else {
switch sw.config.Strategy {
case StrategyDirect:
streamErr = sw.streamDirect(ctx)
case StrategyBuffered:
streamErr = sw.streamBuffered(ctx)
case StrategyChunked:
streamErr = sw.streamChunked(ctx)
default:
streamErr = NewErrorf("unknown streaming strategy: %s", string(sw.config.Strategy))
}
}
sw.mu.Lock()
sw.isStreaming = false
sw.mu.Unlock()
// Update wrapper based on streaming result
if streamErr != nil {
sw.stats.EndTime = time.Now()
sw.wrapper.
WithErrorAck(streamErr).
WithStatusCode(http.StatusInternalServerError).
WithDebuggingKV("failed_chunks", sw.stats.FailedChunks).
WithDebuggingKV("total_errors", len(sw.errors))
sw.recordError(streamErr)
return sw.wrapper
}
// Success response
if sw.stats.EndTime.IsZero() {
sw.stats.EndTime = time.Now()
}
sw.progress.Percentage = 100
sw.wrapper.
WithStatusCode(http.StatusOK).
WithMessage("Streaming completed successfully").
WithDebuggingKV("started_at_ms", sw.stats.StartTime.UnixMilli()).
WithDebuggingKV("completed_at_ms", sw.stats.EndTime.UnixMilli()).
WithDebuggingKV("total_chunks", sw.stats.TotalChunks).
WithDebuggingKV("total_bytes", sw.stats.TotalBytes).
WithDebuggingKV("compressed_bytes", sw.stats.CompressedBytes).
WithDebuggingKV("duration_ms", sw.stats.EndTime.Sub(sw.stats.StartTime).Milliseconds())
return sw.wrapper
}
// JSON returns the JSON representation of the StreamConfig.
// This method serializes the StreamConfig struct into a JSON string
// using the encoding.JSON function.
func (s *StreamConfig) JSON() string {
return jsonpass(s)
}
// JSON returns the JSON representation of the StreamingStats.
// This method serializes the StreamingStats struct into a JSON string
// using the encoding.JSON function.
func (s *StreamingStats) JSON() string {
return jsonpass(s)
}
// JSON returns the JSON representation of the StreamProgress.
// This method serializes the StreamProgress struct into a JSON string
// using the encoding.JSON function.
func (s *StreamProgress) JSON() string {
return jsonpass(s)
}
// WithReceiveMode sets the streaming mode to receiving or sending.
//
// This function configures the streaming wrapper to operate in either receiving mode
// (reading data from the reader) or sending mode (writing data to the writer).
//
// Parameters:
//
// - isReceiving: A boolean flag indicating the mode.
//
// - true: Set to receiving mode (reading from reader).
//
// - false: Set to sending mode (writing to writer).
//
// Returns:
// - A pointer to the `wrapper` instance, allowing for method chaining.
func (sw *StreamingWrapper) WithReceiveMode(isReceiving bool) *wrapper {
if sw == nil {
return respondStreamBadRequestDefault()
}
sw.config.IsReceiving = isReceiving
return sw.wrapper
}
// WithWriter sets the output writer for streaming data.
//
// This function assigns the destination where streamed chunks will be written.
// If no writer is set, streaming will occur without persisting data to any output.
//
// Parameters:
// - writer: An io.Writer implementation (e.g., *os.File, *bytes.Buffer, http.ResponseWriter).
//
// Returns:
// - A pointer to the underlying `wrapper` instance, allowing for method chaining.
// - If the streaming wrapper is nil, returns a new wrapper with an error message.
//
// Example:
//
// streaming := response.AsStreaming(reader).
// WithWriter(outputFile).
// Start(ctx)
func (sw *StreamingWrapper) WithWriter(writer io.Writer) *wrapper {
if sw == nil {
return respondStreamBadRequestDefault()
}
sw.writer = writer
return sw.wrapper
}
// WithCallback sets the callback function for streaming progress updates.
//
// This function registers a callback that will be invoked during the streaming operation
// to provide real-time progress information and error notifications. The callback is called
// for each chunk processed, allowing consumers to track transfer progress, bandwidth usage,
// and estimated time remaining.
//
// Parameters:
// - callback: A StreamingCallback function that receives progress updates and potential errors.
// The callback signature is: func(progress *StreamProgress, err error)
// - progress: Contains current progress metrics (bytes transferred, percentage, ETA, etc.)
// - err: Non-nil if an error occurred during chunk processing; otherwise nil.
//
// Returns:
// - A pointer to the underlying `wrapper` instance, allowing for method chaining.
// - If the streaming wrapper is nil, returns a new wrapper with an error message.
//
// Example:
//
// streaming := response.AsStreaming(reader).
// WithCallback(func(p *StreamProgress, err error) {
// if err != nil {
// log.Printf("Streaming error at chunk %d: %v", p.CurrentChunk, err)
// return
// }
// fmt.Printf("Progress: %.1f%% | Rate: %.2f MB/s | ETA: %s\n",
// float64(p.Percentage),
// float64(p.TransferRate) / 1024 / 1024,
// p.EstimatedTimeRemaining.String())
// }).
// Start(ctx)
func (sw *StreamingWrapper) WithCallback(callback StreamingCallback) *wrapper {
if sw == nil {
return respondStreamBadRequestDefault()
}
sw.callback = callback
return sw.wrapper
}
// WithHook sets the callback function for streaming progress updates with read context.
//
// This function registers a callback that will be invoked during the streaming operation
// to provide real-time progress information and error notifications. The callback is called
// for each chunk processed, allowing consumers to track transfer progress, bandwidth usage,
// and estimated time remaining. The callback also receives the read context for advanced
// scenarios where access to the read buffer is needed.
//
// Parameters:
//
// - callback: A StreamingCallbackR function that receives progress updates, read context, and potential errors.
// The callback signature is: func(progress *StreamProgress, w *R)
// - progress: Contains current progress metrics (bytes transferred, percentage, ETA, etc.)
// - w: A pointer to the R struct containing read context for the current chunk.
func (sw *StreamingWrapper) WithHook(callback StreamingHook) *wrapper {
if sw == nil {
return respondStreamBadRequestDefault()
}
sw.hook = callback
return sw.wrapper
}
// WithTotalBytes sets the total number of bytes to be streamed.
//
// This function specifies the expected total size of the data stream, which is essential for
// calculating progress percentage and estimating time remaining. The function automatically
// computes the total number of chunks based on the configured chunk size, enabling accurate
// progress tracking throughout the streaming operation. Thread-safe via internal mutex.
//
// Parameters:
// - totalBytes: The total size of data to be streamed in bytes. This value is used to:
// - Calculate progress percentage: (transferredBytes / totalBytes) * 100
// - Compute estimated time remaining: (totalBytes - transferred) / transferRate
// - Determine total number of chunks: (totalBytes + chunkSize - 1) / chunkSize
// Must be greater than 0 for meaningful progress calculations.
//
// Returns:
// - A pointer to the underlying `wrapper` instance, allowing for method chaining.
// - If the streaming wrapper is nil, returns a new wrapper with an error message.
// - The function automatically records totalBytes in wrapper debugging information.
//
// Example:
//
// fileInfo, _ := os.Stat("large_file.iso")
// streaming := response.AsStreaming(fileReader).
// WithChunkSize(1024 * 1024).
// WithTotalBytes(fileInfo.Size()).
// WithCallback(func(p *StreamProgress, err error) {
// if err == nil {
// fmt.Printf("Downloaded: %.2f MB / %.2f MB (%.1f%%) | ETA: %s\n",
// float64(p.TransferredBytes) / 1024 / 1024,
// float64(p.TotalBytes) / 1024 / 1024,
// float64(p.Percentage),
// p.EstimatedTimeRemaining.String())
// }
// }).
// Start(ctx)
func (sw *StreamingWrapper) WithTotalBytes(totalBytes int64) *wrapper {
if sw == nil {
return respondStreamBadRequestDefault()
}
sw.mu.Lock()
defer sw.mu.Unlock()
sw.progress.TotalBytes = totalBytes
if sw.config.ChunkSize > 0 {
sw.progress.TotalChunks = (totalBytes + sw.config.ChunkSize - 1) / sw.config.ChunkSize
}
sw.wrapper.WithDebuggingKV("total_bytes", totalBytes)
return sw.wrapper
}
// WithStreamingStrategy sets the streaming algorithm strategy for data transfer.
//
// This function configures how the streaming operation processes and transfers data chunks.
// Different strategies optimize for different scenarios: STRATEGY_DIRECT for simplicity and low memory,
// STRATEGY_BUFFERED for balanced throughput and responsiveness, and STRATEGY_CHUNKED for explicit control
// in advanced scenarios. The chosen strategy affects latency, memory usage, and overall throughput characteristics.
// The strategy is recorded in wrapper debugging information for tracking and diagnostics.
//
// Parameters:
//
// - strategy: A StreamingStrategy constant specifying the transfer algorithm.
//
// Available Strategies:
//
// - STRATEGY_DIRECT: Sequential blocking read-write without buffering.
// - Throughput: 50-100 MB/s
// - Latency: ~10ms per chunk
// - Memory: Single chunk + overhead (minimal)
// - Use case: Small files (<100MB), simple scenarios
//
// - STRATEGY_BUFFERED: Concurrent read and write with internal buffering.
// - Throughput: 100-500 MB/s
// - Latency: ~50ms per chunk
// - Memory: Multiple chunks in flight (medium)
// - Use case: Most scenarios (100MB-10GB)
//
// - STRATEGY_CHUNKED: Explicit chunk-by-chunk processing with full control.
// - Throughput: 100-500 MB/s
// - Latency: ~100ms per chunk
// - Memory: Medium
// - Use case: Large files (>10GB), specialized processing
//
// Returns:
// - A pointer to the underlying `wrapper` instance, allowing for method chaining.
// - If the streaming wrapper is nil, returns a new wrapper with an error message.
// - If the strategy is empty, returns the wrapper with an error message indicating invalid input.
// - The function automatically records the selected strategy in wrapper debugging information
// under the key "streaming_strategy" for audit and diagnostic purposes.
//
// Example:
//
// file, _ := os.Open("large_file.bin")
// defer file.Close()
//
// // Use buffered strategy for most scenarios (recommended)
// result := replify.New().
// WithStatusCode(200).
// WithPath("/api/download/file").
// WithStreaming(file, nil).
// WithStreamingStrategy(STRATEGY_BUFFERED).
// WithChunkSize(1024 * 1024).
// WithMaxConcurrentChunks(4).
// WithTotalBytes(fileSize).
// Start(context.Background())
//
// // Use direct strategy for small files with minimal overhead
// result := replify.New().
// WithStreaming(smallFile, nil).
// WithStreamingStrategy(STRATEGY_DIRECT).
// WithChunkSize(65536).
// Start(context.Background())
//
// // Use chunked strategy for very large files with explicit control
// result := replify.New().
// WithStreaming(hugeFile, nil).
// WithStreamingStrategy(STRATEGY_CHUNKED).
// WithChunkSize(10 * 1024 * 1024).
// WithCallback(func(p *StreamProgress, err error) {
// fmt.Printf("Chunk %d: %.2f MB | Rate: %.2f MB/s\n",
// p.CurrentChunk,
// float64(p.Size) / 1024 / 1024,
// float64(p.TransferRate) / 1024 / 1024)
// }).
// Start(context.Background())
//
// See Also:
// - WithChunkSize: Configures the size of data chunks
// - WithMaxConcurrentChunks: Sets parallel processing level
// - WithCompressionType: Enables data compression
// - Start: Initiates the streaming operation with chosen strategy
func (sw *StreamingWrapper) WithStreamingStrategy(strategy StreamingStrategy) *wrapper {
if sw == nil {
return respondStreamBadRequestDefault()
}
if strutil.IsEmpty(string(strategy)) {
return sw.wrapper.
WithStatusCode(http.StatusBadRequest).
WithMessage("Invalid streaming strategy: cannot be empty").
BindCause()
}
sw.config.Strategy = strategy
sw.wrapper.WithDebuggingKV("streaming_strategy", string(strategy))
return sw.wrapper
}
// WithCompressionType sets the compression algorithm applied to streamed data chunks.
//
// This function enables data compression during streaming to reduce bandwidth consumption and transfer time.
// Compression algorithms trade CPU usage for reduced data size, with different algorithms optimized for
// different data types. Compression is applied per-chunk during streaming, allowing for progressive compression
// and decompression without loading entire dataset into memory. The selected compression type is recorded
// in wrapper debugging information for tracking and validation purposes.
//
// Parameters:
// - comp: A CompressionType constant specifying the compression algorithm to apply.
//
// Available Compression Types:
//
// - COMP_NONE: No compression applied (passthrough mode).
// - Compression Ratio: 100% (no reduction)
// - CPU Overhead: None
// - Use case: Already-compressed data (video, images, archives)
// - Best for: Binary formats, encrypted data
//
// - COMP_GZIP: GZIP compression algorithm (RFC 1952).
// - Compression Ratio: 20-30% (70-80% size reduction)
// - CPU Overhead: Medium (~500ms per 100MB)
// - Speed: Medium (balanced)
// - Use case: Text, JSON, logs, CSV exports
// - Best for: RESTful APIs, data exports, text-based protocols
//
// - COMP_DEFLATE: DEFLATE compression algorithm (RFC 1951).
// - Compression Ratio: 25-35% (65-75% size reduction)
// - CPU Overhead: Low (~300ms per 100MB)
// - Speed: Fast (optimized)
// - Use case: Smaller files, time-sensitive operations
// - Best for: Quick transfers, embedded systems, IoT
// Cannot be empty; empty string will return an error.
//
// Returns:
// - A pointer to the underlying `wrapper` instance, allowing for method chaining.
// - If the streaming wrapper is nil, returns a new wrapper with an error message.
// - If the compression type is empty, returns the wrapper with an error message indicating invalid input.
// - The function automatically records the selected compression type in wrapper debugging information
// under the key "compression_type" for audit, diagnostics, and response transparency.
//
// Example:
//
// // Example 1: Export CSV data with GZIP compression (recommended for text)
// csvFile, _ := os.Open("users.csv")
// defer csvFile.Close()
//
// result := replify.New().
// WithStatusCode(200).
// WithPath("/api/export/users").
// WithCustomFieldKV("format", "csv").
// WithStreaming(csvFile, nil).
// WithCompressionType(COMP_GZIP).
// WithChunkSize(512 * 1024).
// WithTotalBytes(csvFileSize).
// WithCallback(func(p *StreamProgress, err error) {
// if err == nil && p.Percentage % 10 == 0 {
// fmt.Printf("Exported: %.2f MB (compressed) | Original: %.2f MB\n",
// float64(p.TransferredBytes) / 1024 / 1024,
// float64(p.TotalBytes) / 1024 / 1024)
// }
// }).
// Start(context.Background())
//
// // Example 2: Stream video without compression (already compressed)
// videoFile, _ := os.Open("movie.mp4")
// defer videoFile.Close()
//
// result := replify.New().
// WithStatusCode(206).
// WithPath("/api/stream/video").
// WithCustomFieldKV("codec", "h264").
// WithStreaming(videoFile, nil).
// WithCompressionType(COMP_NONE).
// WithChunkSize(256 * 1024).
// WithTotalBytes(videoFileSize).
// Start(context.Background())
//
// // Example 3: Fast log transfer with DEFLATE (IoT device)
// logData := bytes.NewReader(logBuffer)
//
// result := replify.New().
// WithStatusCode(200).
// WithPath("/api/logs/upload").
// WithCustomFieldKV("device_id", "iot-sensor-001").
// WithStreaming(logData, nil).
// WithCompressionType(COMP_DEFLATE).
// WithChunkSize(64 * 1024).
// WithThrottleRate(256 * 1024). // 256KB/s for IoT
// WithTotalBytes(int64(len(logBuffer))).
// Start(context.Background())
//
// // Example 4: Conditional compression based on content type
// contentType := "application/json"
// var compressionType CompressionType
//
// switch contentType {
// case "application/json", "text/csv", "text/plain":
// compressionType = COMP_GZIP // Text formats benefit from GZIP
// case "video/mp4", "image/jpeg", "application/zip":
// compressionType = COMP_NONE // Already compressed formats
// default:
// compressionType = COMP_DEFLATE // Default to fast DEFLATE
// }
//
// result := replify.New().
// WithStreaming(dataReader, nil).
// WithCompressionType(compressionType).
// Start(context.Background())
//
// Performance Impact Summary:
//
// Data Type GZIP Ratio Time/100MB Best Algorithm
// ─────────────────────────────────────────────────────────
// JSON 15-20% ~500ms GZIP ✓
// CSV 18-25% ~500ms GZIP ✓
// Logs 20-30% ~450ms GZIP ✓
// XML 10-15% ~500ms GZIP ✓
// Binary 40-60% ~600ms DEFLATE
// Video (MP4) 98-99% ~2000ms NONE ✓
// Images (JPEG) 98-99% ~2000ms NONE ✓
// Archives (ZIP) 100% ~0ms NONE ✓
// Encrypted 100% ~0ms NONE ✓
//
// See Also:
// - WithChunkSize: Configures chunk size for optimal compression
// - WithStreamingStrategy: Selects transfer algorithm
// - WithThrottleRate: Limits bandwidth usage
// - GetStats: Retrieve compression statistics after streaming
// - Start: Initiates streaming with compression enabled
func (sw *StreamingWrapper) WithCompressionType(comp CompressionType) *wrapper {
if sw == nil {
return respondStreamBadRequestDefault()
}
if strutil.IsEmpty(string(comp)) {
return sw.wrapper.
WithStatusCode(http.StatusBadRequest).
WithMessage("Invalid compression type: cannot be empty").
BindCause()
}
sw.config.Compression = comp
sw.wrapper.WithDebuggingKV("compression_type", string(comp))
return sw.wrapper
}
// WithChunkSize sets the size of individual data chunks processed during streaming.
//
// This function configures the buffer size for each streaming iteration, directly impacting memory usage,
// latency, and throughput characteristics. Smaller chunks reduce memory footprint and improve responsiveness
// but increase processing overhead; larger chunks maximize throughput but consume more memory and delay
// initial response. The optimal chunk size depends on file size, available memory, network bandwidth, and
// streaming strategy. Chunk size is recorded in wrapper debugging information for tracking and diagnostics.
//
// Parameters:
// - size: The size of each chunk in bytes. Must be greater than 0.
//
// Recommended sizes based on scenario:
//
// - 32KB (32768 bytes): Mobile networks, IoT devices, low-memory environments.
// - Latency: ~5ms per chunk
// - Memory: Minimal
// - Overhead: High (frequent operations)
// - Use case: Mobile streaming, embedded systems
//
// - 64KB (65536 bytes): Default, balanced for most scenarios.
// - Latency: ~10ms per chunk
// - Memory: Low
// - Overhead: Low-Medium
// - Use case: General-purpose file downloads, APIs
//
// - 256KB (262144 bytes): High-bandwidth networks, video streaming.
// - Latency: ~50ms per chunk
// - Memory: Medium
// - Overhead: Very low
// - Use case: Video/audio streaming, LAN transfers
//
// - 1MB (1048576 bytes): Database exports, large data transfer.
// - Latency: ~100ms per chunk
// - Memory: Medium-High
// - Overhead: Very low
// - Use case: Database exports, backups, bulk operations
//
// - 10MB (10485760 bytes): High-performance servers, LAN-only scenarios.
// - Latency: ~500ms per chunk
// - Memory: High
// - Overhead: Minimal
// - Use case: Server-to-server transfer, data center operations
// Invalid values: Must be > 0; zero or negative values will return an error.
//
// Returns:
// - A pointer to the underlying `wrapper` instance, allowing for method chaining.
// - If the streaming wrapper is nil, returns a new wrapper with an error message.
// - If the chunk size is ≤ 0, returns the wrapper with an error message indicating invalid input.
// - The function automatically records the chunk size in wrapper debugging information
// under the key "chunk_size" for audit, performance analysis, and diagnostics.
//
// Example:
//
// // Example 1: Mobile client download with small chunks (responsive UI)
// file, _ := os.Open("app-update.apk")
// defer file.Close()
//
// result := replify.New().
// WithStatusCode(200).
// WithPath("/api/download/app-update").
// WithCustomFieldKV("platform", "mobile").
// WithStreaming(file, nil).
// WithChunkSize(32 * 1024). // 32KB for responsive updates
// WithCompressionType(COMP_GZIP).
// WithCallback(func(p *StreamProgress, err error) {
// if err == nil {