@@ -256,6 +256,180 @@ func registerBackupNodeShutdown(r registry.Registry) {
256256
257257}
258258
259+ func registerBackupIntents (r registry.Registry ) {
260+ r .Add (registry.TestSpec {
261+ Name : "backup/intents/pending" ,
262+ Owner : registry .OwnerKV ,
263+ Cluster : r .MakeClusterSpec (4 ),
264+ EncryptionSupport : registry .EncryptionMetamorphic ,
265+ Leases : registry .MetamorphicLeases ,
266+ CompatibleClouds : registry .Clouds (spec .GCE , spec .Local ),
267+ Suites : registry .Suites (registry .Nightly ),
268+ TestSelectionOptOutSuites : registry .Suites (registry .Nightly ),
269+ PostProcessPerfMetrics : func (test string , histogram * roachtestutil.HistogramMetric ) (roachtestutil.AggregatedPerfMetrics , error ) {
270+ return roachtestutil.AggregatedPerfMetrics {
271+ {
272+ Name : fmt .Sprintf ("%s_backup_latency" , test ),
273+ Value : histogram .Elapsed ,
274+ Unit : "ms" ,
275+ IsHigherBetter : false ,
276+ },
277+ }, nil
278+ },
279+ Run : func (ctx context.Context , t test.Test , c cluster.Cluster ) {
280+ c .Start (ctx , t .L (), option .NewStartOpts (option .NoBackupSchedule ), install .MakeClusterSettings ())
281+ exporter := roachtestutil .CreateWorkloadHistogramExporter (t , c )
282+ tick , perfBuf := initBulkJobPerfArtifacts (5 * time .Minute , t , exporter )
283+ defer roachtestutil .CloseExporter (ctx , exporter , t , c , perfBuf , c .Node (1 ), "" )
284+
285+ tick ()
286+ runBackup (ctx , t , c , false /* abandon */ )
287+ tick ()
288+ },
289+ })
290+ r .Add (registry.TestSpec {
291+ Name : "backup/intents/abandoned" ,
292+ Owner : registry .OwnerKV ,
293+ Cluster : r .MakeClusterSpec (4 ),
294+ EncryptionSupport : registry .EncryptionMetamorphic ,
295+ Leases : registry .MetamorphicLeases ,
296+ CompatibleClouds : registry .Clouds (spec .GCE , spec .Local ),
297+ Suites : registry .Suites (registry .Nightly ),
298+ TestSelectionOptOutSuites : registry .Suites (registry .Nightly ),
299+ PostProcessPerfMetrics : func (test string , histogram * roachtestutil.HistogramMetric ) (roachtestutil.AggregatedPerfMetrics , error ) {
300+ return roachtestutil.AggregatedPerfMetrics {
301+ {
302+ Name : fmt .Sprintf ("%s_backup_latency" , test ),
303+ Value : histogram .Elapsed ,
304+ Unit : "ms" ,
305+ IsHigherBetter : false ,
306+ },
307+ }, nil
308+ },
309+ Run : func (ctx context.Context , t test.Test , c cluster.Cluster ) {
310+ settings := install .MakeClusterSettings ()
311+ settings .Env = append (settings .Env , "COCKROACH_TEST_ONLY_ASYNC_INTENT_RESOLUTION_DISABLED=true" )
312+ c .Start (ctx , t .L (), option .NewStartOpts (option .NoBackupSchedule ), settings )
313+ exporter := roachtestutil .CreateWorkloadHistogramExporter (t , c )
314+ tick , perfBuf := initBulkJobPerfArtifacts (5 * time .Minute , t , exporter )
315+ defer roachtestutil .CloseExporter (ctx , exporter , t , c , perfBuf , c .Node (1 ), "" )
316+
317+ tick ()
318+ runBackup (ctx , t , c , true /* abandon */ )
319+ tick ()
320+ },
321+ })
322+ }
323+
324+ func runBackup (ctx context.Context , t test.Test , c cluster.Cluster , abandon bool ) {
325+ conn := c .Conn (ctx , t .L (), 1 )
326+ defer conn .Close ()
327+
328+ const totalRowCount = 1000000
329+ const perTransactionRowCount = 1000
330+ numTxns := totalRowCount / perTransactionRowCount
331+
332+ _ , err := conn .Exec ("CREATE TABLE foo(k INT PRIMARY KEY, v INT NOT NULL)" )
333+ if err != nil {
334+ t .Fatalf (err .Error ())
335+ }
336+ // Create a split to ensure that most intents live on a different range than
337+ // the corresponding txn record. This will trigger async intent resolution
338+ // after the txn is aborted.
339+ statement := fmt .Sprintf ("ALTER TABLE foo SPLIT AT VALUES (%d)" , numTxns )
340+ _ , err = conn .Exec (statement )
341+ if err != nil {
342+ t .Fatalf (err .Error ())
343+ }
344+
345+ // Disable buffered writes to ensure transactions write intents.
346+ _ , err = conn .Exec ("SET CLUSTER SETTING kv.transaction.write_buffering.enabled = false" )
347+ if err != nil {
348+ t .Fatalf (err .Error ())
349+ }
350+
351+ // Disable randomized anchor keys since the test controls in which range the
352+ // transaction records live.
353+ _ , err = conn .Exec ("SET CLUSTER SETTING kv.transaction.randomized_anchor_key.enabled=false" )
354+ if err != nil {
355+ t .Fatalf (err .Error ())
356+ }
357+
358+ t .Status ("writing intents" )
359+ transactions := make ([]* gosql.Tx , numTxns )
360+ for i := 0 ; i < totalRowCount ; i += perTransactionRowCount {
361+ txnIndex := i / perTransactionRowCount
362+ var tx * gosql.Tx
363+ tx , err = conn .Begin ()
364+ if err != nil {
365+ t .Fatalf (err .Error ())
366+ }
367+ transactions [txnIndex ] = tx
368+ // Anchor these transactions on the range that contains key i < numTxns.
369+ statement = fmt .Sprintf ("INSERT INTO foo (k, v) VALUES (%d, %d)" , txnIndex , txnIndex )
370+ _ , err = tx .Exec (statement )
371+ if err != nil {
372+ t .Fatalf (err .Error ())
373+ }
374+ baseKey := numTxns + txnIndex * perTransactionRowCount
375+ for j := 0 ; j < perTransactionRowCount ; j += 1 {
376+ // Ensure the intents are on a different range than the txn record.
377+ // TODO(mira): Currently, this takes ~4-5 min. We can speed up writing
378+ // these intents by using generate_series.
379+ statement = fmt .Sprintf ("INSERT INTO foo (k, v) VALUES (%d, %d)" , baseKey + j , j )
380+ _ , err = tx .Exec (statement )
381+ if err != nil {
382+ t .Fatalf (err .Error ())
383+ }
384+ }
385+ }
386+
387+ // For abandoned intents, abort the transactions. We have disabled async
388+ // intent resolution above, so this should result in abandoned intents.
389+ if abandon {
390+ t .Status ("aborting the intent transactions" )
391+ for _ , tx := range transactions {
392+ if err = tx .Rollback (); err != nil {
393+ t .Fatalf (err .Error ())
394+ }
395+ }
396+ }
397+
398+ // These cluster settings ensure that ExportRequests issued by the backup
399+ // processor are not delayed for too long, especially for abandon=false, when
400+ // only the final high-priority ExportRequest can push the slow pending txn.
401+ _ , err = conn .Exec ("SET CLUSTER SETTING bulkio.backup.read_with_priority_after = '500ms'" )
402+ if err != nil {
403+ t .Fatalf (err .Error ())
404+ }
405+ _ , err = conn .Exec ("SET CLUSTER SETTING bulkio.backup.read_retry_delay = '100ms'" )
406+ if err != nil {
407+ t .Fatalf (err .Error ())
408+ }
409+
410+ // The backup runs with a timeout configured by bulkio.backup.read_timeout,
411+ // which defaults to 5min. Currently, before the virtualized intent resolution
412+ // work, this test takes ~2min for the pending case and ~3min for the
413+ // abandoned case. The 5min timeout should be sufficient for this test, and
414+ // if exceeded, will provide a signal that the intent resolution is taking
415+ // longer than expected.
416+ // TODO(mira): Adjust the timeout if the test flakes, and after the VIR work.
417+ t .Status ("running backup" )
418+ _ , err = conn .Exec (fmt .Sprintf ("BACKUP TABLE foo INTO 'nodelocal://1/%s'" , destinationName (c )))
419+ if err != nil {
420+ t .Fatalf (err .Error ())
421+ }
422+
423+ // Commit the pending transactions after backup.
424+ if ! abandon {
425+ for _ , tx := range transactions {
426+ if err = tx .Commit (); err != nil {
427+ t .Fatalf (err .Error ())
428+ }
429+ }
430+ }
431+ }
432+
259433// initBulkJobPerfArtifacts registers a histogram, creates a performance
260434// artifact directory and returns a method that when invoked records a tick.
261435func initBulkJobPerfArtifacts (
0 commit comments