Skip to content

Commit 240ad97

Browse files
Change connection pool idle expiration logic (#19004)
Signed-off-by: Arthur Schreiber <[email protected]>
1 parent 3dd1516 commit 240ad97

File tree

2 files changed

+153
-29
lines changed

2 files changed

+153
-29
lines changed

go/pools/smartconnpool/pool.go

Lines changed: 63 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -781,35 +781,74 @@ func (pool *ConnPool[C]) closeIdleResources(now time.Time) {
781781
mono := monotonicFromTime(now)
782782

783783
closeInStack := func(s *connStack[C]) {
784-
// Do a read-only best effort iteration of all the connection in this
785-
// stack and atomically attempt to mark them as expired.
786-
// Any connections that are marked as expired are _not_ removed from
787-
// the stack; it's generally unsafe to remove nodes from the stack
788-
// besides the head. When clients pop from the stack, they'll immediately
789-
// notice the expired connection and ignore it.
790-
// see: timestamp.expired
791-
for conn := s.Peek(); conn != nil; conn = conn.next.Load() {
792-
if conn.timeUsed.expired(mono, timeout) {
793-
pool.Metrics.idleClosed.Add(1)
784+
conn, ok := s.Pop()
785+
if !ok {
786+
// Early return to skip allocating slices when the stack is empty
787+
return
788+
}
794789

795-
conn.Close()
796-
pool.closedConn()
790+
activeConnections := pool.Active()
791+
792+
// Only expire up to ~half of the active connections at a time. This should
793+
// prevent us from closing too many connections in one go which could lead to
794+
// a lot of `.Get` calls being added to the waitlist if there's a sudden spike
795+
// coming in _after_ connections were popped off the stack but _before_ being
796+
// returned back to the pool. This is unlikely to happen, but better safe than sorry.
797+
//
798+
// We always expire at least one connection per stack per iteration to ensure
799+
// that idle connections are eventually closed even in small pools.
800+
//
801+
// We will expire any additional connections in the next iteration of the idle closer.
802+
expiredConnections := make([]*Pooled[C], 0, max(activeConnections/2, 1))
803+
validConnections := make([]*Pooled[C], 0, activeConnections)
804+
805+
// Pop out connections from the stack until we get a `nil` connection
806+
for ok {
807+
if conn.timeUsed.expired(mono, timeout) {
808+
expiredConnections = append(expiredConnections, conn)
797809

798-
// Using context.Background() is fine since MySQL connection already enforces
799-
// a connect timeout via the `db-connect-timeout-ms` config param.
800-
c, err := pool.getNew(context.Background())
801-
if err != nil {
802-
// If we couldn't open a new connection, just continue
803-
continue
810+
if len(expiredConnections) == cap(expiredConnections) {
811+
// We have collected enough connections for this iteration to expire
812+
break
804813
}
814+
} else {
815+
validConnections = append(validConnections, conn)
816+
}
805817

806-
// opening a new connection might have raced with other goroutines,
807-
// so it's possible that we got back `nil` here
808-
if c != nil {
809-
// Return the new connection to the pool
810-
pool.tryReturnConn(c)
811-
}
818+
conn, ok = s.Pop()
819+
}
820+
821+
// Return all the valid connections back to waiters or the stack
822+
//
823+
// The order here is not important - because we can't guarantee to
824+
// restore the order we got the connections out of the stack anyway.
825+
//
826+
// If we return the connections in the order popped off the stack:
827+
// * waiters will get the newest connection first
828+
// * stack will have the oldest connections at the top of the stack.
829+
//
830+
// If we return the connections in reverse order:
831+
// * waiters will get the oldest connection first
832+
// * stack will have the newest connections at the top of the stack.
833+
//
834+
// Neither of these is better or worse than the other.
835+
for _, conn := range validConnections {
836+
pool.tryReturnConn(conn)
837+
}
838+
839+
// Close all the expired connections and open new ones to replace them
840+
for _, conn := range expiredConnections {
841+
pool.Metrics.idleClosed.Add(1)
842+
843+
conn.Close()
844+
845+
err := pool.connReopen(context.Background(), conn, mono)
846+
if err != nil {
847+
pool.closedConn()
848+
continue
812849
}
850+
851+
pool.tryReturnConn(conn)
813852
}
814853
}
815854

go/pools/smartconnpool/pool_test.go

Lines changed: 90 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1364,16 +1364,22 @@ func TestIdleTimeoutConnectionLeak(t *testing.T) {
13641364

13651365
// Try to get connections while they're being reopened
13661366
// This should trigger the bug where connections get discarded
1367+
wg := sync.WaitGroup{}
1368+
13671369
for i := 0; i < 2; i++ {
1368-
getCtx, cancel := context.WithTimeout(t.Context(), 50*time.Millisecond)
1369-
defer cancel()
1370+
wg.Go(func() {
1371+
getCtx, cancel := context.WithTimeout(t.Context(), 300*time.Millisecond)
1372+
defer cancel()
13701373

1371-
conn, err := p.Get(getCtx, nil)
1372-
require.NoError(t, err)
1374+
conn, err := p.Get(getCtx, nil)
1375+
require.NoError(t, err)
13731376

1374-
p.put(conn)
1377+
p.put(conn)
1378+
})
13751379
}
13761380

1381+
wg.Wait()
1382+
13771383
// Wait a moment for all reopening to complete
13781384
require.EventuallyWithT(t, func(c *assert.CollectT) {
13791385
// Check the actual number of currently open connections
@@ -1404,3 +1410,82 @@ func TestIdleTimeoutConnectionLeak(t *testing.T) {
14041410
assert.Equal(t, int64(0), state.open.Load())
14051411
assert.Equal(t, int64(4), state.close.Load())
14061412
}
1413+
1414+
func TestIdleTimeoutDoesntLeaveLingeringConnection(t *testing.T) {
1415+
var state TestState
1416+
1417+
ctx := context.Background()
1418+
p := NewPool(&Config[*TestConn]{
1419+
Capacity: 10,
1420+
IdleTimeout: 50 * time.Millisecond,
1421+
LogWait: state.LogWait,
1422+
}).Open(newConnector(&state), nil)
1423+
1424+
defer p.Close()
1425+
1426+
var conns []*Pooled[*TestConn]
1427+
for i := 0; i < 10; i++ {
1428+
conn, err := p.Get(ctx, nil)
1429+
require.NoError(t, err)
1430+
conns = append(conns, conn)
1431+
}
1432+
1433+
for _, conn := range conns {
1434+
p.put(conn)
1435+
}
1436+
1437+
require.EqualValues(t, 10, p.Active())
1438+
require.EqualValues(t, 10, p.Available())
1439+
1440+
// Wait a bit for the idle timeout worker to refresh connections
1441+
assert.Eventually(t, func() bool {
1442+
return p.Metrics.IdleClosed() > 10
1443+
}, 500*time.Millisecond, 10*time.Millisecond, "Expected at least 10 connections to be closed by idle timeout")
1444+
1445+
// Verify that new connections were created to replace the closed ones
1446+
require.EqualValues(t, 10, p.Active())
1447+
require.EqualValues(t, 10, p.Available())
1448+
1449+
// Count how many connections in the stack are closed
1450+
totalInStack := 0
1451+
for conn := p.clean.Peek(); conn != nil; conn = conn.next.Load() {
1452+
totalInStack++
1453+
}
1454+
1455+
require.LessOrEqual(t, totalInStack, 10)
1456+
}
1457+
1458+
func BenchmarkPoolCleanupIdleConnectionsPerformanceNoIdleConnections(b *testing.B) {
1459+
var state TestState
1460+
1461+
capacity := 1000
1462+
1463+
p := NewPool(&Config[*TestConn]{
1464+
Capacity: int64(capacity),
1465+
IdleTimeout: 30 * time.Second,
1466+
LogWait: state.LogWait,
1467+
}).Open(newConnector(&state), nil)
1468+
defer p.Close()
1469+
1470+
// Fill the pool
1471+
connections := make([]*Pooled[*TestConn], 0, capacity)
1472+
for range capacity {
1473+
conn, err := p.Get(context.Background(), nil)
1474+
if err != nil {
1475+
b.Fatal(err)
1476+
}
1477+
1478+
connections = append(connections, conn)
1479+
}
1480+
1481+
// Return all connections to the pool
1482+
for _, conn := range connections {
1483+
conn.Recycle()
1484+
}
1485+
1486+
b.ResetTimer()
1487+
1488+
for b.Loop() {
1489+
p.closeIdleResources(time.Now())
1490+
}
1491+
}

0 commit comments

Comments
 (0)