Skip to content

Commit d9fbc8c

Browse files
craig[bot]aerfreiandrew-r-thomas
committed
158172: changefeedccl: add metamorphic testing for db-level feeds r=log-head,asg0451 a=aerfrei This adds metamorphic testing of db-level feeds, making table level feeds into db-level feeds unless the test explicitly opts out or is otherwise setting changefeed options that aren't compatible with db-level feeds. Epic: CRDB-1421 Fixes: #148858 158775: authors: add andrew-r-thomas to authors r=jeffswenson a=andrew-r-thomas Epic: None Release note: None Co-authored-by: Aerin Freilich <[email protected]> Co-authored-by: Andrew Thomas <[email protected]>
3 parents e0282d6 + e32f1dc + 49b833a commit d9fbc8c

File tree

6 files changed

+321
-47
lines changed

6 files changed

+321
-47
lines changed

AUTHORS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ Andrew Bonventre <[email protected]> <[email protected]>
7373
7474
Andrew Kryczka <[email protected]> Andrew Kryczka <[email protected]> <@cockroachlabs.com>
7575
Andrew NS Yeow <[email protected]>
76+
Andrew Thomas <@cockroachlabs.com>
7677
Andrew Werner <[email protected]>
7778
7879
Andrey Shinkevich <[email protected]>

pkg/ccl/changefeedccl/alter_changefeed_test.go

Lines changed: 60 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,9 @@ func TestAlterChangefeedAddTarget(t *testing.T) {
221221
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
222222
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`)
223223

224-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`)
224+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`, optOutOfMetamorphicDBLevelChangefeed{
225+
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
226+
})
225227
defer closeFeed(t, testFeed)
226228

227229
feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
@@ -261,7 +263,9 @@ func TestAlterChangefeedAddTargetAfterInitialScan(t *testing.T) {
261263
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
262264
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY, b INT)`)
263265

264-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`)
266+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`, optOutOfMetamorphicDBLevelChangefeed{
267+
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
268+
})
265269
defer closeFeed(t, testFeed)
266270

267271
feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
@@ -458,7 +462,9 @@ func TestAlterChangefeedDropTarget(t *testing.T) {
458462
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
459463
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`)
460464

461-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar`)
465+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar`, optOutOfMetamorphicDBLevelChangefeed{
466+
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
467+
})
462468
defer closeFeed(t, testFeed)
463469

464470
feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
@@ -493,7 +499,11 @@ func TestAlterChangefeedDropTargetAfterTableDrop(t *testing.T) {
493499
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
494500
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`)
495501

496-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH on_error='pause'`)
502+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH on_error='pause'`,
503+
optOutOfMetamorphicDBLevelChangefeed{
504+
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
505+
},
506+
)
497507
defer closeFeed(t, testFeed)
498508

499509
feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
@@ -765,7 +775,9 @@ func TestAlterChangefeedDropAllTargetsError(t *testing.T) {
765775
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
766776
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`)
767777

768-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar`)
778+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar`, optOutOfMetamorphicDBLevelChangefeed{
779+
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
780+
})
769781
defer closeFeed(t, testFeed)
770782

771783
feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
@@ -799,7 +811,9 @@ func TestAlterChangefeedTelemetry(t *testing.T) {
799811
// Reset the counts.
800812
_ = telemetry.GetFeatureCounts(telemetry.Raw, telemetry.ResetCounts)
801813

802-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH diff`)
814+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH diff`, optOutOfMetamorphicDBLevelChangefeed{
815+
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
816+
})
803817
defer closeFeed(t, testFeed)
804818
feed := testFeed.(cdctest.EnterpriseTestFeed)
805819

@@ -994,7 +1008,10 @@ func TestAlterChangefeedAddTargetErrors(t *testing.T) {
9941008
return true, nil
9951009
}
9961010

997-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved = '100ms'`)
1011+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved = '100ms'`,
1012+
optOutOfMetamorphicDBLevelChangefeed{
1013+
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
1014+
})
9981015

9991016
// Kafka feeds are not buffered, so we have to consume messages.
10001017
g := ctxgroup.WithContext(context.Background())
@@ -1119,7 +1136,11 @@ func TestAlterChangefeedDatabaseScope(t *testing.T) {
11191136
`INSERT INTO new_movr.drivers VALUES (1, 'Bob')`,
11201137
)
11211138

1122-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR movr.drivers WITH diff`)
1139+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR movr.drivers WITH diff`,
1140+
optOutOfMetamorphicDBLevelChangefeed{
1141+
reason: "changefeed watches tables not in the default database",
1142+
},
1143+
)
11231144
defer closeFeed(t, testFeed)
11241145

11251146
assertPayloads(t, testFeed, []string{
@@ -1162,7 +1183,10 @@ func TestAlterChangefeedDatabaseScopeUnqualifiedName(t *testing.T) {
11621183
)
11631184

11641185
sqlDB.Exec(t, `USE movr`)
1165-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR drivers WITH diff, resolved = '100ms'`)
1186+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR drivers WITH diff, resolved = '100ms'`,
1187+
optOutOfMetamorphicDBLevelChangefeed{
1188+
reason: "changefeed watches tables not in the default database",
1189+
})
11661190
defer closeFeed(t, testFeed)
11671191

11681192
assertPayloads(t, testFeed, []string{
@@ -1211,6 +1235,9 @@ func TestAlterChangefeedColumnFamilyDatabaseScope(t *testing.T) {
12111235
if _, ok := f.(*webhookFeedFactory); ok {
12121236
args = append(args, optOutOfMetamorphicEnrichedEnvelope{reason: "metamorphic enriched envelope does not support column families for webhook sinks"})
12131237
}
1238+
args = append(args, optOutOfMetamorphicDBLevelChangefeed{
1239+
reason: "changefeed watches tables not in the default database",
1240+
})
12141241
testFeed := feed(t, f, `CREATE CHANGEFEED FOR movr.drivers WITH diff, split_column_families`, args...)
12151242
defer closeFeed(t, testFeed)
12161243

@@ -1263,6 +1290,9 @@ func TestAlterChangefeedAlterTableName(t *testing.T) {
12631290
if _, ok := f.(*webhookFeedFactory); ok {
12641291
args = append(args, optOutOfMetamorphicEnrichedEnvelope{reason: "see comment"})
12651292
}
1293+
args = append(args, optOutOfMetamorphicDBLevelChangefeed{
1294+
reason: "changefeed watches tables not in the default database",
1295+
})
12661296

12671297
testFeed := feed(t, f, `CREATE CHANGEFEED FOR movr.users WITH diff, resolved = '100ms'`, args...)
12681298
defer closeFeed(t, testFeed)
@@ -1350,7 +1380,10 @@ func TestAlterChangefeedAddTargetsDuringSchemaChangeError(t *testing.T) {
13501380
}
13511381

13521382
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo
1353-
WITH resolved = '1s', no_initial_scan, min_checkpoint_frequency='1ns'`)
1383+
WITH resolved = '1s', no_initial_scan, min_checkpoint_frequency='1ns'`,
1384+
optOutOfMetamorphicDBLevelChangefeed{
1385+
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
1386+
})
13541387
jobFeed := testFeed.(cdctest.EnterpriseTestFeed)
13551388
jobRegistry := s.Server.JobRegistry().(*jobs.Registry)
13561389

@@ -1691,7 +1724,10 @@ func TestAlterChangefeedDropTargetDuringInitialScan(t *testing.T) {
16911724
if rnd.Intn(2) == 0 {
16921725
targets = "bar, foo"
16931726
}
1694-
testFeed := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED for %s`, targets))
1727+
testFeed := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED for %s`, targets),
1728+
optOutOfMetamorphicDBLevelChangefeed{
1729+
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
1730+
})
16951731
defer closeFeed(t, testFeed)
16961732

16971733
// Wait for all spans to have been resolved.
@@ -1742,7 +1778,10 @@ func TestAlterChangefeedInitialScan(t *testing.T) {
17421778
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`)
17431779
sqlDB.Exec(t, `INSERT INTO bar VALUES (1), (2), (3)`)
17441780

1745-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved = '1s', no_initial_scan`)
1781+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved = '1s', no_initial_scan`,
1782+
optOutOfMetamorphicDBLevelChangefeed{
1783+
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
1784+
})
17461785
defer closeFeed(t, testFeed)
17471786

17481787
expectResolvedTimestamp(t, testFeed)
@@ -1861,7 +1900,9 @@ func TestAlterChangefeedAccessControl(t *testing.T) {
18611900
rootDB := sqlutils.MakeSQLRunner(s.DB)
18621901

18631902
createFeed := func(stmt string) (cdctest.EnterpriseTestFeed, func()) {
1864-
successfulFeed := feed(t, f, stmt)
1903+
successfulFeed := feed(t, f, stmt, optOutOfMetamorphicDBLevelChangefeed{
1904+
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
1905+
})
18651906
closeCf := func() {
18661907
closeFeed(t, successfulFeed)
18671908
}
@@ -1933,7 +1974,9 @@ func TestAlterChangefeedAddDropSameTarget(t *testing.T) {
19331974
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
19341975
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`)
19351976

1936-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`)
1977+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`, optOutOfMetamorphicDBLevelChangefeed{
1978+
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
1979+
})
19371980
defer closeFeed(t, testFeed)
19381981

19391982
feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
@@ -2050,7 +2093,9 @@ func TestAlterChangefeedRandomizedTargetChanges(t *testing.T) {
20502093
createStmt := fmt.Sprintf(
20512094
`CREATE CHANGEFEED FOR %s WITH updated`, strings.Join(initialTables, ", "))
20522095
t.Log(createStmt)
2053-
testFeed := feed(t, f, createStmt)
2096+
testFeed := feed(t, f, createStmt, optOutOfMetamorphicDBLevelChangefeed{
2097+
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
2098+
})
20542099
defer closeFeed(t, testFeed)
20552100

20562101
feed, ok := testFeed.(cdctest.EnterpriseTestFeed)

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 54 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1092,7 +1092,10 @@ func TestChangefeedQuotedTableNameTopicName(t *testing.T) {
10921092
sqlDB.Exec(t, `UPSERT INTO "MyTable" VALUES (0, 'updated')`)
10931093

10941094
foo := feed(t, f,
1095-
`CREATE CHANGEFEED FOR d.public."MyTable" WITH diff, full_table_name`)
1095+
`CREATE CHANGEFEED FOR d.public."MyTable" WITH diff, full_table_name`,
1096+
optOutOfMetamorphicDBLevelChangefeed{
1097+
reason: "fully-qualified table names are simplified when converted to db-level feed filters",
1098+
})
10961099
defer closeFeed(t, foo)
10971100

10981101
// The topic name should be d.public.MyTable and not d.public._u0022_MyTable_u0022_
@@ -4276,7 +4279,8 @@ func TestChangefeedJobControl(t *testing.T) {
42764279
ChangefeedJobPermissionsTestSetup(t, s)
42774280

42784281
createFeed := func(stmt string) (cdctest.EnterpriseTestFeed, func()) {
4279-
successfulFeed := feed(t, f, stmt)
4282+
successfulFeed := feed(t, f, stmt, optOutOfMetamorphicDBLevelChangefeed{
4283+
reason: "tests table level changefeed permissions"})
42804284
closeCf := func() {
42814285
closeFeed(t, successfulFeed)
42824286
}
@@ -5707,28 +5711,40 @@ func TestChangefeedLowFrequencyNotices(t *testing.T) {
57075711
t.Run("no options specified", func(t *testing.T) {
57085712
actual = "(no notice)"
57095713
f := makeKafkaFeedFactory(t, s, dbWithHandler)
5710-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/'`)
5714+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/'`,
5715+
optOutOfMetamorphicDBLevelChangefeed{
5716+
reason: "test requires split_column_families NOT to be set",
5717+
})
57115718
defer closeFeed(t, testFeed)
57125719
require.Equal(t, `changefeed will emit to topic _u2603_`, actual)
57135720
})
57145721
t.Run("normal resolved and min_checkpoint_frequency", func(t *testing.T) {
57155722
actual = "(no notice)"
57165723
f := makeKafkaFeedFactory(t, s, dbWithHandler)
5717-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/' WITH resolved='10s', min_checkpoint_frequency='10s'`)
5724+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/' WITH resolved='10s', min_checkpoint_frequency='10s'`,
5725+
optOutOfMetamorphicDBLevelChangefeed{
5726+
reason: "test requires split_column_families NOT to be set",
5727+
})
57185728
defer closeFeed(t, testFeed)
57195729
require.Equal(t, `changefeed will emit to topic _u2603_`, actual)
57205730
})
57215731
t.Run("low resolved timestamp", func(t *testing.T) {
57225732
actual = "(no notice)"
57235733
f := makeKafkaFeedFactory(t, s, dbWithHandler)
5724-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/' WITH resolved='200ms'`)
5734+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/' WITH resolved='200ms'`,
5735+
optOutOfMetamorphicDBLevelChangefeed{
5736+
reason: "test requires split_column_families NOT to be set",
5737+
})
57255738
defer closeFeed(t, testFeed)
57265739
require.Equal(t, `the 'resolved' timestamp interval (200ms) is very low; consider increasing it to at least 500ms`, actual)
57275740
})
57285741
t.Run("low min_checkpoint_frequency timestamp", func(t *testing.T) {
57295742
actual = "(no notice)"
57305743
f := makeKafkaFeedFactory(t, s, dbWithHandler)
5731-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/' WITH min_checkpoint_frequency='200ms'`)
5744+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/' WITH min_checkpoint_frequency='200ms'`,
5745+
optOutOfMetamorphicDBLevelChangefeed{
5746+
reason: "test requires split_column_families NOT to be set",
5747+
})
57325748
defer closeFeed(t, testFeed)
57335749
require.Equal(t, `the 'min_checkpoint_frequency' timestamp interval (200ms) is very low; consider increasing it to at least 500ms`, actual)
57345750
})
@@ -5764,15 +5780,21 @@ func TestChangefeedOutputTopics(t *testing.T) {
57645780
t.Run("kafka", func(t *testing.T) {
57655781
actual = "(no notice)"
57665782
f := makeKafkaFeedFactory(t, s, dbWithHandler)
5767-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/'`)
5783+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/'`,
5784+
optOutOfMetamorphicDBLevelChangefeed{
5785+
reason: "test requires split_column_families NOT to be set",
5786+
})
57685787
defer closeFeed(t, testFeed)
57695788
require.Equal(t, `changefeed will emit to topic _u2603_`, actual)
57705789
})
57715790

57725791
t.Run("pubsub v2", func(t *testing.T) {
57735792
actual = "(no notice)"
57745793
f := makePubsubFeedFactory(s, dbWithHandler)
5775-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'gcpubsub://does.not.matter/'`)
5794+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'gcpubsub://does.not.matter/'`,
5795+
optOutOfMetamorphicDBLevelChangefeed{
5796+
reason: "test requires split_column_families NOT to be set",
5797+
})
57765798
defer closeFeed(t, testFeed)
57775799
// Pubsub doesn't sanitize the topic name.
57785800
require.Equal(t, `changefeed will emit to topic ☃`, actual)
@@ -5781,7 +5803,10 @@ func TestChangefeedOutputTopics(t *testing.T) {
57815803
t.Run("webhooks does not emit anything", func(t *testing.T) {
57825804
actual = "(no notice)"
57835805
f := makePubsubFeedFactory(s, dbWithHandler)
5784-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'webhook-https://does.not.matter/'`)
5806+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'webhook-https://does.not.matter/'`,
5807+
optOutOfMetamorphicDBLevelChangefeed{
5808+
reason: "test requires split_column_families NOT to be set",
5809+
})
57855810
defer closeFeed(t, testFeed)
57865811
require.Equal(t, `(no notice)`, actual)
57875812
})
@@ -8205,10 +8230,15 @@ func TestChangefeedTelemetry(t *testing.T) {
82058230
// Reset the counts.
82068231
_ = telemetry.GetFeatureCounts(telemetry.Raw, telemetry.ResetCounts)
82078232

8208-
// Start some feeds (and read from them to make sure they've started.
8209-
foo := feed(t, f, `CREATE CHANGEFEED FOR foo`)
8233+
// Start some feeds (and read from them to make sure they've started).
8234+
foo := feed(t, f, `CREATE CHANGEFEED FOR foo`, optOutOfMetamorphicDBLevelChangefeed{
8235+
reason: "db-level changefeeds do not have a static number of tables for telemetry",
8236+
})
82108237
defer closeFeed(t, foo)
8211-
fooBar := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH format=json`)
8238+
fooBar := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH format=json`,
8239+
optOutOfMetamorphicDBLevelChangefeed{
8240+
reason: "db-level changefeeds do not have a static number of tables for telemetry",
8241+
})
82128242
defer closeFeed(t, fooBar)
82138243
assertPayloads(t, foo, []string{
82148244
`foo: [1]->{"after": {"a": 1}}`,
@@ -8489,7 +8519,9 @@ func TestChangefeedHandlesDrainingNodes(t *testing.T) {
84898519
defer closeSink()
84908520

84918521
atomic.StoreInt32(&shouldDrain, 1)
8492-
feed := feed(t, f, "CREATE CHANGEFEED FOR foo")
8522+
feed := feed(t, f, "CREATE CHANGEFEED FOR foo", optOutOfMetamorphicDBLevelChangefeed{
8523+
reason: "changefeed watches tables not in the default database",
8524+
})
84938525
defer closeFeed(t, feed)
84948526

84958527
jobID := feed.(cdctest.EnterpriseTestFeed).JobID()
@@ -8654,7 +8686,10 @@ func TestChangefeedHandlesRollingRestart(t *testing.T) {
86548686
defer closeSink()
86558687

86568688
proceed <- struct{}{} // Allow changefeed to start.
8657-
feed := feed(t, f, "CREATE CHANGEFEED FOR foo WITH initial_scan='no', min_checkpoint_frequency='100ms'")
8689+
feed := feed(t, f, "CREATE CHANGEFEED FOR foo WITH initial_scan='no', min_checkpoint_frequency='100ms'",
8690+
optOutOfMetamorphicDBLevelChangefeed{
8691+
reason: "changefeed watches tables not in the default database",
8692+
})
86588693
defer closeFeed(t, feed)
86598694

86608695
jf := feed.(cdctest.EnterpriseTestFeed)
@@ -8829,7 +8864,8 @@ func TestChangefeedTimelyResolvedTimestampUpdatePostRollingRestart(t *testing.T)
88298864
defer DiscardMessages(testFeed)()
88308865

88318866
// Ensure the changefeed is able to complete in a reasonable amount of time.
8832-
require.NoError(t, testFeed.(cdctest.EnterpriseTestFeed).WaitDurationForState(5*time.Minute, func(s jobs.State) bool {
8867+
require.NoError(t, testFeed.(cdctest.EnterpriseTestFeed).WaitDurationForState(time.Minute, func(s jobs.State) bool {
8868+
require.NoError(t, testFeed.(cdctest.EnterpriseTestFeed).FetchTerminalJobErr())
88338869
return s == jobs.StateSucceeded
88348870
}))
88358871
}
@@ -10860,7 +10896,9 @@ func TestAlterChangefeedTelemetryLogs(t *testing.T) {
1086010896
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
1086110897
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY, b STRING)`)
1086210898

10863-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar`)
10899+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar`, optOutOfMetamorphicDBLevelChangefeed{
10900+
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
10901+
})
1086410902
defer closeFeed(t, testFeed)
1086510903
feed := testFeed.(cdctest.EnterpriseTestFeed)
1086610904

0 commit comments

Comments
 (0)