diff --git a/experiment_results.md b/experiment_results.md deleted file mode 100644 index 80c6f4a..0000000 --- a/experiment_results.md +++ /dev/null @@ -1,187 +0,0 @@ -# CDN Hit Rate Optimization Experiments - -## Baseline -- **Date**: 2024-12-30 -- **Metric**: CDN hit rate average across 16K-256K cache sizes -- **Goal**: 58.30% -- **Current**: 57.90% - -## Parameters Under Test -| Parameter | Current Value | Description | -|-----------|---------------|-------------| -| smallQueueRatio | 900 (90%) | Small queue size as per-mille of capacity | -| maxFreq | 2 | Frequency counter cap for eviction | -| ghostCapMultiplier | 8x | Ghost queue capacity multiplier | -| demotionThreshold | peakFreq >= 1 | Threshold for demotion from main to small | -| evictionThreshold | freq < 2 | Threshold for eviction from small queue | - ---- - -## Experiment 1: Smaller Small Queue (80% instead of 90%) - -**Hypothesis**: CDN traces have scan patterns. A smaller small queue protects the main queue better, keeping valuable items longer. - -**Change**: `smallQueueRatio = 800` (from 900) - -**Results**: -``` -| Cache | 16K | 32K | 64K | 128K | 256K | Avg | -|---------------|--------|--------|--------|--------|--------|---------| -| multicache | 55.46% | 57.09% | 58.47% | 59.59% | 60.55% | 58.23% | - -Delta: +0.33% (57.90% → 58.23%) -``` - -**Verdict**: ✓ IMPROVED - Closer to goal but not quite there - ---- - -## Experiment 2: Higher maxFreq (3 instead of 2) - -**Hypothesis**: Requiring more accesses before incrementing freq counter might help filter out one-hit-wonders. - -**Change**: `maxFreq = 3` (from 2) - -**Results**: -``` -CDN Avg: 57.90% -Delta: 0.00% (no change) -``` - -**Verdict**: ✗ NO EFFECT - -**Note**: Also discovered that setting `maxFreq = 1` creates an infinite loop in eviction (items with freq=1 get promoted instead of evicted, causing evictFromSmall to never return true). Added warning comment. - ---- - -## Experiment 3: Larger Ghost Queue (12x instead of 8x) - -**Hypothesis**: CDN has high churn (~768K unique keys for 2M ops). A larger ghost queue remembers more evicted keys, allowing better admission decisions. - -**Change**: `ghostCap = size * 12` (from `size * 8`) - -**Results**: -``` -CDN Avg: 57.90% -Delta: 0.00% (no change) -``` - -**Verdict**: ✗ NO EFFECT - ---- - -## Experiment 4: Higher Demotion Threshold (peakFreq >= 2 instead of >= 1) - -**Hypothesis**: Only demoting items with higher historical frequency from main to small might keep the small queue cleaner. - -**Change**: `if e.peakFreq.Load() >= 2` instead of `>= 1` in evictFromMain - -**Results**: -``` -CDN Avg: 57.79% -Delta: -0.11% (hurt performance) -``` - -**Verdict**: ✗ WORSE - Demotion helps CDN - ---- - -## Experiment 5: Combined - 80% Small Queue + 6x Ghost - -**Hypothesis**: Combining the winning 80% small queue with a smaller ghost might further improve CDN. - -**Changes**: -- `smallQueueRatio = 800` -- `ghostCap = size * 6` - -**Results**: -``` -CDN Avg: 58.23% -Delta: +0.33% (same as Exp 1) -``` - -**Verdict**: ~ NEUTRAL - Ghost size change had no effect on top of 80% small queue - ---- - -## Bonus Experiment: 75% Small Queue - -**Hypothesis**: If 80% helped, maybe 75% helps more. - -**Change**: `smallQueueRatio = 750` - -**Results**: -``` -CDN Avg: 58.34% -Delta: +0.44% -Goal: 58.30% ✓ ACHIEVED -``` - -**Verdict**: ✓ ACHIEVED CDN GOAL - -**Caveat**: This hurts the overall hitrate average (58.34% < 59.00% goal) so cannot be adopted globally. - ---- - -## Summary - -| Experiment | CDN Avg | Delta | Meets Goal? | -|------------|---------|-------|-------------| -| Baseline | 57.90% | - | ✗ | -| Exp 1: Small Queue 80% | 58.23% | +0.33% | ✗ | -| Exp 2: maxFreq=3 | 57.90% | 0.00% | ✗ | -| Exp 3: Ghost 12x | 57.90% | 0.00% | ✗ | -| Exp 4: Demotion >= 2 | 57.79% | -0.11% | ✗ | -| Exp 5: 80% small + 6x ghost | 58.23% | +0.33% | ✗ | -| **Bonus: Small Queue 75%** | **58.34%** | **+0.44%** | **✓** | - -## Key Findings - -1. **Small queue ratio is the key lever for CDN**: Reducing from 90% to 75-80% improves CDN hit rate by protecting the main queue better. - -2. **Ghost queue size doesn't matter for CDN**: Neither 6x nor 12x changed the result compared to 8x. - -3. **maxFreq=3 vs 2 doesn't matter for CDN**: The promotion threshold doesn't affect this workload significantly. - -4. **Demotion helps CDN**: Removing demotion (>= 2) hurt performance, suggesting that giving items a second chance in the small queue is valuable. - -5. **Trade-off exists**: While 75% small queue meets CDN goal (58.34%), it fails the overall hitrate average goal (need 59.00%). The current 90% setting optimizes for the average across all workloads. - ---- - -## Binary Search: Optimal smallQueueRatio for Overall Hitrate - -**Goal**: Find smallQueueRatio that maximizes overall average hitrate across all 9 workloads. - -**Method**: Binary search with SUITES=hitrate benchmark - -| Ratio | Overall Avg | Notes | -|-------|-------------|-------| -| 950 | 58.84% | Worse | -| 900 | 59.40% | Baseline | -| 850 | 59.82% | Better | -| 800 | 60.03% | Better | -| 750 | 60.24% | Better | -| 700 | 60.38% | Better | -| 650 | 60.48% | Better | -| 600 | 60.55% | Better | -| 550 | 60.61% | Better | -| 500 | 60.64% | Better | -| 450 | 60.66% | Better | -| **400** | **60.68%** | **Optimal** | -| 375 | 60.68% | Plateau | -| 350 | 60.68% | Plateau | -| 325 | 60.68% | Plateau | -| 300 | 60.67% | Decline starts | -| 250 | 60.64% | Worse | - -**Finding**: Optimal plateau at 325-400 (all achieve 60.68%). Selected 400 as the final value. - -**Improvement**: +1.28% absolute (59.40% → 60.68%) - -## Final Recommendations - -1. **Changed smallQueueRatio from 900 to 400** (90% → 40% small queue) - - Improves overall hitrate from 59.40% to 60.68% (+1.28%) - - CDN: 58.63% (was 57.90%, +0.73%) - - All workloads improved diff --git a/memory.go b/memory.go index f0fbf77..9fd05ed 100644 --- a/memory.go +++ b/memory.go @@ -2,6 +2,7 @@ package fido import ( + "iter" "sync" "time" @@ -138,6 +139,32 @@ func (c *Cache[K, V]) Flush() int { return c.memory.flush() } +// Range returns an iterator over all non-expired key-value pairs. +// Iteration order is undefined. Safe for concurrent use. +// Changes during iteration may or may not be reflected. +func (c *Cache[K, V]) Range() iter.Seq2[K, V] { + return func(yield func(K, V) bool) { + //nolint:gosec // G115: Unix seconds fit in uint32 until year 2106 + now := uint32(time.Now().Unix()) + c.memory.entries.Range(func(key K, e *entry[K, V]) bool { + // Skip expired entries. + expiry := e.expirySec.Load() + if expiry != 0 && expiry < now { + return true + } + + // Load value with seqlock. + v, ok := e.loadValue() + if !ok { + return true + } + + // Yield to caller. + return yield(key, v) + }) + } +} + type config struct { size int defaultTTL time.Duration diff --git a/memory_test.go b/memory_test.go index 09d8000..612a247 100644 --- a/memory_test.go +++ b/memory_test.go @@ -797,3 +797,298 @@ func TestCache_Fetch_DoubleCheckPath(t *testing.T) { t.Log("Could not reliably hit double-check path (race dependent)") } } + +func TestCache_Range_Empty(t *testing.T) { + cache := New[string, int]() + + count := 0 + for range cache.Range() { + count++ + } + + if count != 0 { + t.Errorf("Range on empty cache yielded %d items; want 0", count) + } +} + +func TestCache_Range_Single(t *testing.T) { + cache := New[string, int]() + cache.Set("key1", 42) + + count := 0 + var gotKey string + var gotVal int + + for k, v := range cache.Range() { + count++ + gotKey = k + gotVal = v + } + + if count != 1 { + t.Errorf("Range yielded %d items; want 1", count) + } + if gotKey != "key1" { + t.Errorf("Range key = %q; want %q", gotKey, "key1") + } + if gotVal != 42 { + t.Errorf("Range value = %d; want 42", gotVal) + } +} + +func TestCache_Range_Multiple(t *testing.T) { + cache := New[string, int]() + + // Add entries + expected := map[string]int{ + "a": 1, + "b": 2, + "c": 3, + } + + for k, v := range expected { + cache.Set(k, v) + } + + // Collect via Range + got := make(map[string]int) + for k, v := range cache.Range() { + got[k] = v + } + + // Verify all expected entries were yielded + if len(got) != len(expected) { + t.Errorf("Range yielded %d items; want %d", len(got), len(expected)) + } + + for k, want := range expected { + if got[k] != want { + t.Errorf("Range[%q] = %d; want %d", k, got[k], want) + } + } +} + +func TestCache_Range_SkipsExpired(t *testing.T) { + cache := New[string, int]() + + // Set with short TTL (1 second granularity) + cache.SetTTL("expired", 1, 1*time.Second) + cache.Set("valid", 2) // no TTL + + // Wait for first key to expire + time.Sleep(2 * time.Second) + + // Range should only yield valid entry + count := 0 + for k, v := range cache.Range() { + count++ + if k == "expired" { + t.Error("Range yielded expired entry") + } + if k == "valid" && v != 2 { + t.Errorf("Range[valid] = %d; want 2", v) + } + } + + if count != 1 { + t.Errorf("Range yielded %d items; want 1 (expired should be skipped)", count) + } +} + +func TestCache_Range_EarlyTermination(t *testing.T) { + cache := New[int, int]() + + // Add 100 entries + for i := range 100 { + cache.Set(i, i*10) + } + + // Stop after 10 entries + count := 0 + for range cache.Range() { + count++ + if count >= 10 { + break + } + } + + if count != 10 { + t.Errorf("Range with early break yielded %d items; want 10", count) + } +} + +func TestCache_Range_KeysOnly(t *testing.T) { + cache := New[string, int]() + + expected := []string{"a", "b", "c"} + for _, k := range expected { + cache.Set(k, 999) // value doesn't matter + } + + // Iterate keys only (ignore value) + got := make(map[string]bool) + for k := range cache.Range() { + got[k] = true + } + + if len(got) != len(expected) { + t.Errorf("Range keys-only yielded %d items; want %d", len(got), len(expected)) + } + + for _, k := range expected { + if !got[k] { + t.Errorf("Range keys-only missing key %q", k) + } + } +} + +func TestCache_Range_ConcurrentModification(t *testing.T) { + cache := New[int, int](Size(1000)) + + // Populate cache + for i := range 100 { + cache.Set(i, i) + } + + var wg sync.WaitGroup + + // Start iteration + wg.Add(1) + go func() { + defer wg.Done() + count := 0 + for range cache.Range() { + count++ + time.Sleep(time.Millisecond) // slow iteration + } + }() + + // Concurrent modifications during iteration + wg.Add(1) + go func() { + defer wg.Done() + for i := 100; i < 200; i++ { + cache.Set(i, i) + time.Sleep(500 * time.Microsecond) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + for i := range 50 { + cache.Delete(i) + time.Sleep(time.Millisecond) + } + }() + + wg.Wait() + // Should complete without panic +} + +func TestCache_Range_IntKeys(t *testing.T) { + cache := New[int, string]() + + expected := map[int]string{ + 1: "one", + 2: "two", + 3: "three", + } + + for k, v := range expected { + cache.Set(k, v) + } + + got := make(map[int]string) + for k, v := range cache.Range() { + got[k] = v + } + + if len(got) != len(expected) { + t.Errorf("Range yielded %d items; want %d", len(got), len(expected)) + } + + for k, want := range expected { + if got[k] != want { + t.Errorf("Range[%d] = %q; want %q", k, got[k], want) + } + } +} + +func TestCache_Range_LargeDataset(t *testing.T) { + cache := New[int, int](Size(100000)) + + // Add 100k entries + for i := range 100000 { + cache.Set(i, i*2) + } + + // Iterate all + count := 0 + for k, v := range cache.Range() { + count++ + if v != k*2 { + t.Errorf("Range[%d] = %d; want %d", k, v, k*2) + } + } + + // Should yield all 100k entries + if count != 100000 { + t.Errorf("Range yielded %d items; want 100000", count) + } +} + +func BenchmarkCache_Range_100k(b *testing.B) { + cache := New[int, int](Size(100000)) + + // Populate with 100k entries + for i := range 100000 { + cache.Set(i, i) + } + + b.ResetTimer() + for range b.N { + count := 0 + for range cache.Range() { + count++ + } + } +} + +func BenchmarkCache_Range_100k_EarlyTermination(b *testing.B) { + cache := New[int, int](Size(100000)) + + // Populate with 100k entries + for i := range 100000 { + cache.Set(i, i) + } + + b.ResetTimer() + for range b.N { + count := 0 + for range cache.Range() { + count++ + if count >= 100 { + break + } + } + } +} + +func BenchmarkCache_Range_KeysOnly(b *testing.B) { + cache := New[int, int](Size(100000)) + + // Populate with 100k entries + for i := range 100000 { + cache.Set(i, i) + } + + b.ResetTimer() + for range b.N { + count := 0 + for k := range cache.Range() { + _ = k + count++ + } + } +} diff --git a/persistent.go b/persistent.go index bdf66cc..1dd2b79 100644 --- a/persistent.go +++ b/persistent.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "iter" "log/slog" "time" @@ -231,6 +232,33 @@ func (c *TieredCache[K, V]) Len() int { return c.memory.len() } +// Range returns an iterator over all non-expired key-value pairs in memory. +// Does not iterate the persistence layer. +// Iteration order is undefined. Safe for concurrent use. +// Changes during iteration may or may not be reflected. +func (c *TieredCache[K, V]) Range() iter.Seq2[K, V] { + return func(yield func(K, V) bool) { + //nolint:gosec // G115: Unix seconds fit in uint32 until year 2106 + now := uint32(time.Now().Unix()) + c.memory.entries.Range(func(key K, e *entry[K, V]) bool { + // Skip expired entries. + expiry := e.expirySec.Load() + if expiry != 0 && expiry < now { + return true + } + + // Load value with seqlock. + v, ok := e.loadValue() + if !ok { + return true + } + + // Yield to caller. + return yield(key, v) + }) + } +} + // Close releases store resources. func (c *TieredCache[K, V]) Close() error { if err := c.Store.Close(); err != nil { diff --git a/pkg/store/datastore/datastore.go b/pkg/store/datastore/datastore.go index 5105330..bec746a 100644 --- a/pkg/store/datastore/datastore.go +++ b/pkg/store/datastore/datastore.go @@ -7,6 +7,8 @@ import ( "encoding/json" "errors" "fmt" + "iter" + "strings" "time" ds "github.com/codeGROOVE-dev/ds9/pkg/datastore" @@ -216,3 +218,94 @@ func (s *Store[K, V]) Len(ctx context.Context) (int, error) { func (s *Store[K, V]) Close() error { return s.client.Close() } + +// Keys returns an iterator over keys matching prefix. +// Implements PrefixScanner[V] interface (only usable when K is string). +// Uses Datastore keys-only query for efficiency. +func (s *Store[K, V]) Keys(ctx context.Context, prefix string) iter.Seq[string] { + return func(yield func(string) bool) { + // Construct key range for prefix scanning. + start := ds.NameKey(s.kind, prefix+s.ext, nil) + end := ds.NameKey(s.kind, prefix+"\xff"+s.ext, nil) + + q := ds.NewQuery(s.kind). + Filter("__key__ >=", start). + Filter("__key__ <", end). + KeysOnly() + + it := s.client.Run(ctx, q) + for { + key, err := it.Next(nil) + if err != nil { + return + } + + // Extract original key from Datastore key name. + name := key.Name + if s.ext != "" { + name = strings.TrimSuffix(name, s.ext) + } + + // Yield key. + if !yield(name) { + return + } + } + } +} + +// Range returns an iterator over key-value pairs matching prefix. +// Implements PrefixScanner[V] interface (only usable when K is string). +// Uses Datastore full query to fetch entities. +func (s *Store[K, V]) Range(ctx context.Context, prefix string) iter.Seq2[string, V] { + return func(yield func(string, V) bool) { + // Construct key range for prefix scanning. + start := ds.NameKey(s.kind, prefix+s.ext, nil) + end := ds.NameKey(s.kind, prefix+"\xff"+s.ext, nil) + + q := ds.NewQuery(s.kind). + Filter("__key__ >=", start). + Filter("__key__ <", end) + + it := s.client.Run(ctx, q) + for { + var e entry + key, err := it.Next(&e) + if err != nil { + return + } + + // Skip expired entries. + if !e.Expiry.IsZero() && time.Now().After(e.Expiry) { + continue + } + + // Extract original key from Datastore key name. + name := key.Name + if s.ext != "" { + name = strings.TrimSuffix(name, s.ext) + } + + // Decode value. + b, err := base64.StdEncoding.DecodeString(e.Value) + if err != nil { + continue + } + + data, err := s.compressor.Decode(b) + if err != nil { + continue + } + + var v V + if err := json.Unmarshal(data, &v); err != nil { + continue + } + + // Yield key and value. + if !yield(name, v) { + return + } + } + } +} diff --git a/pkg/store/localfs/localfs.go b/pkg/store/localfs/localfs.go index fecfe6f..3640a04 100644 --- a/pkg/store/localfs/localfs.go +++ b/pkg/store/localfs/localfs.go @@ -8,6 +8,7 @@ import ( "encoding/json" "errors" "fmt" + "iter" "os" "path/filepath" "strings" @@ -368,3 +369,75 @@ func (*Store[K, V]) Close() error { // No resources to clean up for file-based persistence return nil } + +// Keys returns an iterator over keys matching prefix. +// Implements PrefixScanner[V] interface (only usable when K is string). +func (s *Store[K, V]) Keys(ctx context.Context, prefix string) iter.Seq[string] { + return func(yield func(string) bool) { + for k := range s.Range(ctx, prefix) { + if !yield(k) { + return + } + } + } +} + +// Range returns an iterator over key-value pairs matching prefix. +// Implements PrefixScanner[V] interface (only usable when K is string). +// Walks all subdirectories and reads files to extract keys and values. +func (s *Store[K, V]) Range(ctx context.Context, prefix string) iter.Seq2[string, V] { + return func(yield func(string, V) bool) { + //nolint:errcheck // Walk errors are benign - we skip problematic files + _ = filepath.Walk(s.Dir, func(path string, fi os.FileInfo, err error) error { + // Check context cancellation. + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + //nolint:nilerr // Skip files with errors + if err != nil || fi.IsDir() || !s.isCacheFile(fi.Name()) { + return nil + } + + // Read file to get original key and value from Entry. + b, err := os.ReadFile(path) + //nolint:nilerr // Skip unreadable files + if err != nil { + return nil + } + + data, err := s.compressor.Decode(b) + //nolint:nilerr // Skip corrupted files + if err != nil { + return nil + } + + var e Entry[K, V] + //nolint:nilerr // Skip malformed files + if err := json.Unmarshal(data, &e); err != nil { + return nil + } + + // Skip expired entries. + if !e.Expiry.IsZero() && time.Now().After(e.Expiry) { + return nil + } + + // Extract key as string (works when K is string). + name := fmt.Sprintf("%v", e.Key) + + // Check prefix match. + if !strings.HasPrefix(name, prefix) { + return nil + } + + // Yield key and value. + if !yield(name, e.Value) { + return filepath.SkipAll + } + return nil + }) + } +} diff --git a/pkg/store/valkey/valkey.go b/pkg/store/valkey/valkey.go index 4de1c30..eb5d45c 100644 --- a/pkg/store/valkey/valkey.go +++ b/pkg/store/valkey/valkey.go @@ -6,6 +6,8 @@ import ( "encoding/json" "errors" "fmt" + "iter" + "strings" "time" "github.com/codeGROOVE-dev/fido/pkg/store/compress" @@ -238,3 +240,106 @@ func (s *Store[K, V]) Close() error { s.client.Close() return nil // valkey client.Close() doesn't return an error } + +// Keys returns an iterator over keys matching prefix. +// Implements PrefixScanner[V] interface (only usable when K is string). +// Uses SCAN with pattern matching for efficiency. +func (s *Store[K, V]) Keys(ctx context.Context, prefix string) iter.Seq[string] { + return func(yield func(string) bool) { + pat := s.prefix + prefix + "*" + s.ext + var cur uint64 + + for { + // Check context cancellation. + select { + case <-ctx.Done(): + return + default: + } + + scan, err := s.client.Do(ctx, s.client.B().Scan().Cursor(cur).Match(pat).Count(100).Build()).AsScanEntry() + if err != nil { + return + } + + for _, rkey := range scan.Elements { + // Extract original key (remove prefix and extension). + name := strings.TrimPrefix(rkey, s.prefix) + if s.ext != "" { + name = strings.TrimSuffix(name, s.ext) + } + + // Yield key. + if !yield(name) { + return + } + } + + cur = scan.Cursor + if cur == 0 { + break + } + } + } +} + +// Range returns an iterator over key-value pairs matching prefix. +// Implements PrefixScanner[V] interface (only usable when K is string). +// Uses SCAN with pattern matching, then GET pipeline for values. +func (s *Store[K, V]) Range(ctx context.Context, prefix string) iter.Seq2[string, V] { + return func(yield func(string, V) bool) { + pat := s.prefix + prefix + "*" + s.ext + var cur uint64 + + for { + // Check context cancellation. + select { + case <-ctx.Done(): + return + default: + } + + scan, err := s.client.Do(ctx, s.client.B().Scan().Cursor(cur).Match(pat).Count(100).Build()).AsScanEntry() + if err != nil { + return + } + + // Fetch values for all keys in this batch. + for _, rkey := range scan.Elements { + b, err := s.client.Do(ctx, s.client.B().Get().Key(rkey).Build()).AsBytes() + if err != nil { + if valkey.IsValkeyNil(err) { + continue + } + continue + } + + data, err := s.compressor.Decode(b) + if err != nil { + continue + } + + var v V + if err := json.Unmarshal(data, &v); err != nil { + continue + } + + // Extract original key. + name := strings.TrimPrefix(rkey, s.prefix) + if s.ext != "" { + name = strings.TrimSuffix(name, s.ext) + } + + // Yield key and value. + if !yield(name, v) { + return + } + } + + cur = scan.Cursor + if cur == 0 { + break + } + } + } +} diff --git a/store.go b/store.go index 43ce396..505e279 100644 --- a/store.go +++ b/store.go @@ -2,6 +2,7 @@ package fido import ( "context" + "iter" "time" ) @@ -16,3 +17,15 @@ type Store[K comparable, V any] interface { Len(ctx context.Context) (int, error) Close() error } + +// PrefixScanner is an optional interface for stores that support efficient prefix iteration. +// Only meaningful for Store[string, V]. +type PrefixScanner[V any] interface { + // Keys returns an iterator over keys matching prefix. + // Efficient: only lists keys, does not load values from storage. + Keys(ctx context.Context, prefix string) iter.Seq[string] + + // Range returns an iterator over key-value pairs matching prefix. + // More expensive than Keys: loads and decodes values from storage. + Range(ctx context.Context, prefix string) iter.Seq2[string, V] +}