Skip to content
This repository was archived by the owner on Dec 5, 2025. It is now read-only.

Commit 3df413a

Browse files
author
callmedenchick
authored
Valkey cluster topology from CLUSTER SLOTS (#52)
1 parent 2dc2001 commit 3df413a

2 files changed

Lines changed: 44 additions & 29 deletions

File tree

docker-compose.sharded-valkey.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ services:
7878
environment:
7979
PORT: 8080
8080
STORAGE: "valkey"
81-
VALKEY_URI: "redis://valkey-shard1:6379,redis://valkey-shard2:6380,redis://valkey-shard3:6381"
81+
VALKEY_URI: "redis://valkey-shard1:6379"
8282
CORS_ENABLE: "true"
8383
HEARTBEAT_INTERVAL: 10
8484
RPS_LIMIT: 1000

internal/storage/valkey.go

Lines changed: 43 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -21,52 +21,67 @@ type ValkeyStorage struct {
2121
}
2222

2323
// NewValkeyStorage creates a new Valkey storage instance
24-
// Supports both single node and cluster modes based on URI format
24+
// Supports both single node and cluster modes
25+
// For cluster mode, discovers cluster topology using CLUSTER SLOTS command
2526
func NewValkeyStorage(valkeyURI string) (*ValkeyStorage, error) {
2627
log := log.WithField("prefix", "NewValkeyStorage")
2728

28-
uris := strings.Split(valkeyURI, ",")
29-
3029
var client redis.UniversalClient
31-
if len(uris) > 1 {
32-
addrs := make([]string, len(uris))
33-
// TODO what if other options differ between nodes?
34-
var firstOpts *redis.Options
35-
for i, uri := range uris {
36-
opts, err := redis.ParseURL(strings.TrimSpace(uri))
37-
if err != nil {
38-
return nil, fmt.Errorf("failed to parse URI %d: %w", i+1, err)
39-
}
40-
addrs[i] = opts.Addr
41-
if i == 0 {
42-
firstOpts = opts
30+
31+
// Parse the primary URI
32+
opts, err := redis.ParseURL(strings.TrimSpace(valkeyURI))
33+
if err != nil {
34+
return nil, fmt.Errorf("failed to parse URI: %w", err)
35+
}
36+
37+
// First, connect to the single node to check if it's part of a cluster
38+
tempClient := redis.NewClient(opts)
39+
ctxTemp, cancelTemp := context.WithTimeout(context.Background(), 5*time.Second)
40+
defer cancelTemp()
41+
42+
// Try to get cluster info
43+
clusterSlots, err := tempClient.ClusterSlots(ctxTemp).Result()
44+
if err := tempClient.Close(); err != nil {
45+
log.Warnf("failed to close temporary client: %v", err)
46+
}
47+
48+
if err != nil || len(clusterSlots) == 0 {
49+
// Not a cluster or cluster command failed, use single-node mode
50+
log.Info("Using single-node mode")
51+
client = redis.NewClient(opts)
52+
} else {
53+
// Extract all node addresses from cluster slots
54+
nodeAddrs := make(map[string]bool)
55+
for _, slot := range clusterSlots {
56+
for _, node := range slot.Nodes {
57+
nodeAddrs[node.Addr] = true
4358
}
4459
}
45-
log.Infof("Using cluster mode with %d nodes", len(uris))
60+
61+
// Convert to slice
62+
addrs := make([]string, 0, len(nodeAddrs))
63+
for addr := range nodeAddrs {
64+
addrs = append(addrs, addr)
65+
}
66+
67+
log.Infof("Using cluster mode with %d nodes discovered from CLUSTER SLOTS", len(addrs))
4668
client = redis.NewClusterClient(&redis.ClusterOptions{
4769
Addrs: addrs,
48-
Password: firstOpts.Password,
49-
Username: firstOpts.Username,
50-
TLSConfig: firstOpts.TLSConfig,
51-
// Enable automatic cluster redirection handling for AWS ElastiCache
70+
Password: opts.Password,
71+
Username: opts.Username,
72+
TLSConfig: opts.TLSConfig,
73+
// Enable automatic cluster redirection handling
5274
ReadOnly: false,
5375
RouteByLatency: true,
5476
RouteRandomly: false,
5577
// Set maximum redirects to handle MOVED responses
5678
MaxRedirects: 3,
57-
// Set appropriate timeouts for AWS ElastiCache
79+
// Set appropriate timeouts for managed clusters like AWS ElastiCache
5880
ReadTimeout: 30 * time.Second,
5981
WriteTimeout: 30 * time.Second,
6082
DialTimeout: 10 * time.Second,
6183
PoolTimeout: 30 * time.Second,
6284
})
63-
} else {
64-
opts, err := redis.ParseURL(strings.TrimSpace(uris[0]))
65-
if err != nil {
66-
return nil, fmt.Errorf("failed to parse URI: %w", err)
67-
}
68-
log.Info("Using single-node mode")
69-
client = redis.NewClient(opts)
7085
}
7186

7287
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)

0 commit comments

Comments
 (0)