Skip to content

Commit 74ebd71

Browse files
craig[bot]kyle-a-wong
andcommitted
Merge #158527
158527: sqlstats: flush ingester buffer if above max statements per txn r=kyle-a-wong a=kyle-a-wong There have been a couple incidents recently where workloads running multi day / million statement txns cause sql stats to have unbounded memory growth. This is happening because SQL Stats are buffered by sessionID until a transaction commits, which causes the buffer to flush and the statement stats to be recorded in the SQL stats subsystem. When there is a long running transaction with many statements, we buffer is never flushed and continues to grow. To fix this, we have introduced a new "force flush" event that forces the SQL stats ingester to flush sql stats in the current session, if a certain threshold is met. This threshold is currently set to the value of a new cluster setting: `sql.metrics.transaction_details.max_statement_stats`. If this threshold is met, the stats are automatically flushed. The side effect of this is that these stats will not have an associated transaction fingerprint id. This is because the transaction fingerprint id isn't finalized until a transaction is complete and we don't know the transaction fingerprint id at the time of this forced flush. There is also no way to "backfill" or set the transaction fingerprint id once the statement stat has been recorded to the SQL Stats subsystem. This commit also includes a change to the `SQLStatsIngester.flushBuffer` method to recording transaction stats when there are no statements stats in the buffer. This is the case when the buffer is being force flushed in the scenario mentioned above. Fixes: #158800 Epic: None Release note (bug fix): Addresses a bug with unbounded memory growth when long running transactions are contain many statements. In this case, the SQL Stats system will automatically flush these before the transaction commits. The side effect of this is that these statement statistics will no longer have an associated transaction fingerprint id. Co-authored-by: Kyle Wong <[email protected]>
2 parents d7ad21d + c924ca3 commit 74ebd71

File tree

7 files changed

+162
-36
lines changed

7 files changed

+162
-36
lines changed

pkg/sql/executor_statement_metrics.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,8 +187,8 @@ func (ex *connExecutor) recordStatementSummary(
187187
ex.metrics.EngineMetrics.StatementBytesRead.Inc(stats.bytesRead)
188188
ex.metrics.EngineMetrics.StatementIndexRowsWritten.Inc(stats.indexRowsWritten)
189189
ex.metrics.EngineMetrics.StatementIndexBytesWritten.Inc(stats.indexBytesWritten)
190-
191190
if ex.statsCollector.EnabledForTransaction() {
191+
maxRecordedStats := sqlstats.TxnStatsNumStmtFingerprintStatsToRecord.Get(&ex.server.cfg.Settings.SV)
192192
b := sqlstats.NewRecordedStatementStatsBuilder(
193193
stmtFingerprintID,
194194
planner.SessionData().Database,
@@ -228,6 +228,19 @@ func (ex *connExecutor) recordStatementSummary(
228228
}
229229

230230
ex.statsCollector.RecordStatement(ctx, b.Build())
231+
if int64(ex.state.mu.stmtCount)%(maxRecordedStats) == 0 {
232+
// If the statement count is equal to the configured limit,
233+
// force flush the SQL stats ingestion buffer.
234+
// Note that these statements that are force flushed cannot be associated
235+
// with the transaction in SQL Stats since this association can only be
236+
// done once a transaction has been committed. As a result, these
237+
// recorded stats will be given a static transaction fingerprint ID to
238+
// indicate that they are force flushed. Since these force flushes occur
239+
// in batches, any leftover statement stats will be flushed when the
240+
// transaction is committed, causing them to be associated with the
241+
// correct transaction fingerprint.
242+
ex.statsCollector.Flush(ex.planner.extendedEvalCtx.SessionID)
243+
}
231244
}
232245

233246
// Record statement execution statistics if span is recorded and no error was

pkg/sql/sqlstats/cluster_settings.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,23 @@ var TxnStatsNumStmtFingerprintIDsToRecord = settings.RegisterIntSetting(
2929
settings.PositiveInt,
3030
)
3131

32+
// TxnStatsNumStmtFingerprintStatsToRecord limits the number of recorded
33+
// statement statistics that may be associated with transaction statistics for
34+
// a single transaction. If the number of statements executed exceeds this
35+
// value, SQL Stats will force the ingester to flush the buffered stats. These
36+
// stats will not be associated with the current transaction. SQL Stats will
37+
// continue to buffer statements until this limit is reached again. In the case
38+
// that it isn't reached again, the buffered statements will be flushed when
39+
// the transaction is committed, and they will be associated with the
40+
// transaction.
41+
var TxnStatsNumStmtFingerprintStatsToRecord = settings.RegisterIntSetting(
42+
settings.ApplicationLevel,
43+
"sql.metrics.transaction_details.max_statement_stats",
44+
"max number of statement statistics that may be associated with transaction statistics",
45+
100_000,
46+
settings.PositiveInt,
47+
)
48+
3249
// TxnStatsEnable determines whether to collect per-application transaction
3350
// statistics.
3451
// TODO(117690): Unify StmtStatsEnable and TxnStatsEnable into a single cluster setting.

pkg/sql/sqlstats/sqlstats_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ func GetTxnStats(t *testing.T, conn *sqlutils.SQLRunner, appName string, dbName
122122
SELECT
123123
encode(fingerprint_id, 'hex') AS txn_fingerprint_id,
124124
metadata->'stmtFingerprintIDs' AS statement_fingerprint_ids,
125+
statistics -> 'statistics'->> 'cnt' AS count,
125126
(statistics -> 'statistics' -> 'commitLat' -> 'mean')::FLOAT > 0 AS commit_lat_not_zero,
126127
(statistics -> 'statistics' -> 'svcLat' -> 'mean')::FLOAT > 0 AS svc_lat_not_zero
127128
FROM crdb_internal.transaction_statistics ts

pkg/sql/sqlstats/sslocal/sql_stats_ingestor.go

Lines changed: 48 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package sslocal
77

88
import (
99
"context"
10+
"math"
1011
"sync"
1112
"sync/atomic"
1213
"time"
@@ -25,6 +26,8 @@ import (
2526
// will go before flushing its contents to the registry.
2627
const defaultFlushInterval = time.Millisecond * 500
2728

29+
const forceFlushTransactionFingerprintId = appstatspb.TransactionFingerprintID(math.MaxUint64)
30+
2831
// Metrics holds running measurements of various ingester-related runtime stats.
2932
type Metrics struct {
3033
// NumProcessed tracks how many items have been processed.
@@ -149,6 +152,7 @@ type event struct {
149152
sessionID clusterunique.ID
150153
transaction *sqlstats.RecordedTxnStats
151154
statement *sqlstats.RecordedStmtStats
155+
forceFlush bool
152156
}
153157

154158
type BufferOpt func(i *SQLStatsIngester)
@@ -250,14 +254,18 @@ func (i *SQLStatsIngester) ingest(ctx context.Context, events *eventBuffer) {
250254
// the stmts with and the statement's txn id will be nil. In that case
251255
// we can send immediately to the sinks.
252256
if e.statement.UnderOuterTxn {
253-
i.flushBuffer(ctx, e.statement.SessionID, nil)
257+
i.flushBuffer(ctx, e.statement.SessionID, nil, 0)
254258
}
255259
} else if e.transaction != nil {
256-
i.flushBuffer(ctx, e.transaction.SessionID, e.transaction)
260+
i.flushBuffer(ctx, e.transaction.SessionID, e.transaction, e.transaction.FingerprintID)
257261
i.metrics.NumProcessed.Inc(1)
258262
i.metrics.QueueSize.Dec(1)
259263
} else {
260-
i.clearSession(e.sessionID)
264+
if e.forceFlush {
265+
i.flushBuffer(ctx, e.sessionID, nil, forceFlushTransactionFingerprintId)
266+
} else {
267+
i.clearSession(e.sessionID)
268+
}
261269
i.metrics.NumProcessed.Inc(1)
262270
i.metrics.QueueSize.Dec(1)
263271
}
@@ -323,6 +331,18 @@ func (i *SQLStatsIngester) ClearSession(sessionID clusterunique.ID) {
323331
})
324332
}
325333

334+
// FlushBuffer sends a signal to the underlying registry to flush any cached
335+
// data associated with the given sessionID. This is an async operation.
336+
func (i *SQLStatsIngester) FlushBuffer(sessionID clusterunique.ID) {
337+
i.guard.AtomicWrite(func(writerIdx int64) {
338+
i.guard.eventBuffer[writerIdx] = event{
339+
sessionID: sessionID,
340+
forceFlush: true,
341+
}
342+
i.metrics.QueueSize.Inc(1)
343+
})
344+
}
345+
326346
func NewSQLStatsIngester(
327347
st *cluster.Settings, knobs *sqlstats.TestingKnobs, metrics Metrics, sinks ...SQLStatsSink,
328348
) *SQLStatsIngester {
@@ -397,45 +417,39 @@ func (i *SQLStatsIngester) processStatement(statement *sqlstats.RecordedStmtStat
397417
// flushBuffer sends the buffered statementsBySessionID and provided transaction
398418
// to the registered sinks. The transaction may be nil.
399419
func (i *SQLStatsIngester) flushBuffer(
400-
ctx context.Context, sessionID clusterunique.ID, transaction *sqlstats.RecordedTxnStats,
420+
ctx context.Context,
421+
sessionID clusterunique.ID,
422+
transaction *sqlstats.RecordedTxnStats,
423+
transactionFingerprintID appstatspb.TransactionFingerprintID,
401424
) {
402-
statements, ok := func() (*statementBuf, bool) {
403-
statements, ok := i.statementsBySessionID[sessionID]
404-
if !ok {
405-
return nil, false
406-
}
425+
var statements statementBuf
426+
if sessionStatements, ok := i.statementsBySessionID[sessionID]; ok {
427+
defer sessionStatements.release()
428+
statements = *sessionStatements
407429
delete(i.statementsBySessionID, sessionID)
408-
return statements, true
409-
}()
410-
if !ok {
430+
} else if transaction == nil {
431+
// No statements or transactions to flush, return early
411432
return
412433
}
413-
defer statements.release()
414434

415-
if len(*statements) == 0 {
416-
return
417-
}
418-
419-
if transaction != nil {
420-
// Here we'll set the transaction fingerprint ID for each statement if the
421-
// below cluster setting is enabled.
422-
shouldAssociateWithTxn := AssociateStmtWithTxnFingerprint.Get(&i.settings.SV)
423-
// These values are only known at the time of the transaction.
424-
for _, s := range *statements {
425-
if shouldAssociateWithTxn {
426-
s.TransactionFingerprintID = transaction.FingerprintID
427-
}
428-
if s.ImplicitTxn == transaction.ImplicitTxn {
429-
continue
430-
}
431-
// We need to recompute the fingerprint ID.
432-
s.ImplicitTxn = transaction.ImplicitTxn
433-
s.FingerprintID = appstatspb.ConstructStatementFingerprintID(
434-
s.Query, s.ImplicitTxn, s.Database)
435+
// Here we'll set the transaction fingerprint ID for each statement if the
436+
// below cluster setting is enabled.
437+
shouldAssociateWithTxn := AssociateStmtWithTxnFingerprint.Get(&i.settings.SV)
438+
// These values are only known at the time of the transaction.
439+
for _, s := range statements {
440+
if shouldAssociateWithTxn {
441+
s.TransactionFingerprintID = transactionFingerprintID
442+
}
443+
if transaction == nil || s.ImplicitTxn == transaction.ImplicitTxn {
444+
continue
435445
}
446+
// We need to recompute the fingerprint ID.
447+
s.ImplicitTxn = transaction.ImplicitTxn
448+
s.FingerprintID = appstatspb.ConstructStatementFingerprintID(
449+
s.Query, s.ImplicitTxn, s.Database)
436450
}
437451

438452
for _, sink := range i.sinks {
439-
sink.ObserveTransaction(ctx, transaction, *statements)
453+
sink.ObserveTransaction(ctx, transaction, statements)
440454
}
441455
}

pkg/sql/sqlstats/sslocal/sslocal_stats_collector.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,14 @@ func (s *StatsCollector) RecordTransaction(_ctx context.Context, value *sqlstats
164164
s.statsIngester.RecordTransaction(value)
165165
}
166166

167+
func (s *StatsCollector) Flush(sessionID clusterunique.ID) {
168+
if !s.sendStats {
169+
return
170+
}
171+
172+
s.statsIngester.FlushBuffer(sessionID)
173+
}
174+
167175
func (s *StatsCollector) EnabledForTransaction() bool {
168176
return s.sendStats
169177
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
# In this test, we verify that statement statistics are force flushed based on
2+
# the `sql.metrics.transaction_details.max_statement_ids` cluster setting. If
3+
# the expected criteria is met, statement statistics for a session will be
4+
# flushed from the buffer, regardless of whether the transaction has ended.
5+
# When statement statistics are force flushed, there will be no transaction
6+
# fingerprint ID associated with the flushed statement statistics, since this
7+
# requires a transaction to have ended to compute.
8+
9+
# Setup baseline to test against with default settings
10+
exec-sql db=max_txn_stmts_test app-name=exec
11+
BEGIN;
12+
SELECT 1;
13+
SELECT 1;
14+
SELECT 1;
15+
END;
16+
----
17+
18+
exec-sql db=max_txn_stmts_test app-name=exec
19+
BEGIN;
20+
SELECT 1;
21+
SELECT 1;
22+
SELECT 1;
23+
SELECT 1;
24+
SELECT 1;
25+
END;
26+
----
27+
28+
show-stats db=max_txn_stmts_test app-name=exec
29+
----
30+
{"count": "5", "fingerprint_id": "bdb329db6493db17", "nodes": [1], "parse_lat_not_zero": true, "plan_distributed": false, "plan_full_scan": false, "plan_hash_set": true, "plan_implicit_txn": false, "plan_lat_not_zero": true, "plan_vectorized": true, "query": "SELECT _", "run_lat_not_zero": true, "sql_type": "TypeDML", "summary": "SELECT _", "svc_lat_not_zero": true, "transaction_fingerprint_id": "023097855475259c"}
31+
{"count": "3", "fingerprint_id": "bdb329db6493db17", "nodes": [1], "parse_lat_not_zero": true, "plan_distributed": false, "plan_full_scan": false, "plan_hash_set": true, "plan_implicit_txn": false, "plan_lat_not_zero": true, "plan_vectorized": true, "query": "SELECT _", "run_lat_not_zero": true, "sql_type": "TypeDML", "summary": "SELECT _", "svc_lat_not_zero": true, "transaction_fingerprint_id": "3b5be2cb288f18aa"}
32+
33+
show-txn-stats db=max_txn_stmts_test app-name=exec
34+
----
35+
{"commit_lat_not_zero": true, "count": "1", "statement_fingerprint_ids": ["bdb329db6493db17", "bdb329db6493db17", "bdb329db6493db17", "bdb329db6493db17", "bdb329db6493db17"], "svc_lat_not_zero": true, "txn_fingerprint_id": "023097855475259c"}
36+
{"commit_lat_not_zero": true, "count": "1", "statement_fingerprint_ids": ["bdb329db6493db17", "bdb329db6493db17", "bdb329db6493db17"], "svc_lat_not_zero": true, "txn_fingerprint_id": "3b5be2cb288f18aa"}
37+
38+
exec-sql db=max_txn_stmts_test app-name=setup-limit
39+
SET CLUSTER SETTING sql.metrics.transaction_details.max_statement_stats=4;
40+
----
41+
42+
# This is over the configured settings but below the threshold for force flush,
43+
# so it will still contribute to the same SQL Stats rows as above.
44+
exec-sql db=max_txn_stmts_test app-name=exec
45+
BEGIN;
46+
SELECT 1;
47+
SELECT 1;
48+
SELECT 1;
49+
END;
50+
----
51+
52+
# This is over the threshold for force flush, so it results in a new row in
53+
# SQL stats with a blank transaction fingerprint ID.
54+
exec-sql db=max_txn_stmts_test app-name=exec
55+
BEGIN;
56+
SELECT 1;
57+
SELECT 1;
58+
SELECT 1;
59+
SELECT 1;
60+
SELECT 1;
61+
END;
62+
----
63+
64+
show-stats db=max_txn_stmts_test app-name=exec
65+
----
66+
{"count": "6", "fingerprint_id": "bdb329db6493db17", "nodes": [1], "parse_lat_not_zero": true, "plan_distributed": false, "plan_full_scan": false, "plan_hash_set": true, "plan_implicit_txn": false, "plan_lat_not_zero": true, "plan_vectorized": true, "query": "SELECT _", "run_lat_not_zero": true, "sql_type": "TypeDML", "summary": "SELECT _", "svc_lat_not_zero": true, "transaction_fingerprint_id": "023097855475259c"}
67+
{"count": "6", "fingerprint_id": "bdb329db6493db17", "nodes": [1], "parse_lat_not_zero": true, "plan_distributed": false, "plan_full_scan": false, "plan_hash_set": true, "plan_implicit_txn": false, "plan_lat_not_zero": true, "plan_vectorized": true, "query": "SELECT _", "run_lat_not_zero": true, "sql_type": "TypeDML", "summary": "SELECT _", "svc_lat_not_zero": true, "transaction_fingerprint_id": "3b5be2cb288f18aa"}
68+
{"count": "4", "fingerprint_id": "bdb329db6493db17", "nodes": [1], "parse_lat_not_zero": true, "plan_distributed": false, "plan_full_scan": false, "plan_hash_set": true, "plan_implicit_txn": false, "plan_lat_not_zero": true, "plan_vectorized": true, "query": "SELECT _", "run_lat_not_zero": true, "sql_type": "TypeDML", "summary": "SELECT _", "svc_lat_not_zero": true, "transaction_fingerprint_id": "ffffffffffffffff"}
69+
70+
show-txn-stats db=max_txn_stmts_test app-name=exec
71+
----
72+
{"commit_lat_not_zero": true, "count": "2", "statement_fingerprint_ids": ["bdb329db6493db17", "bdb329db6493db17", "bdb329db6493db17", "bdb329db6493db17", "bdb329db6493db17"], "svc_lat_not_zero": true, "txn_fingerprint_id": "023097855475259c"}
73+
{"commit_lat_not_zero": true, "count": "2", "statement_fingerprint_ids": ["bdb329db6493db17", "bdb329db6493db17", "bdb329db6493db17"], "svc_lat_not_zero": true, "txn_fingerprint_id": "3b5be2cb288f18aa"}

pkg/sql/sqlstats/testdata/query

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,4 @@ show-stats db=random_db app-name=transactions
3535

3636
show-txn-stats db=random_db app-name=transactions
3737
----
38-
{"commit_lat_not_zero": true, "statement_fingerprint_ids": ["5fe99858726d3688", "5fe99858726d3688"], "svc_lat_not_zero": true, "txn_fingerprint_id": "78d7c1c32632f05d"}
38+
{"commit_lat_not_zero": true, "count": "2", "statement_fingerprint_ids": ["5fe99858726d3688", "5fe99858726d3688"], "svc_lat_not_zero": true, "txn_fingerprint_id": "78d7c1c32632f05d"}

0 commit comments

Comments
 (0)