Skip to content

Commit bf2f15b

Browse files
committed
schemachanger: handle basic TTL params
The builder was updated so that it modifies the scpb.RowLevelTTL element for when handling TTL storage params. This is needed so that we can do proper validation on the TTL params, since the validation logic checks dependencies between different parameters. The UpsertRowLevelTTL immediate mutation takes care of modifying the descriptor. Logic was also needed to avoid dropping the TTL schedule if we are modifying parameters. Release note: None
1 parent 366f424 commit bf2f15b

File tree

13 files changed

+344
-14
lines changed

13 files changed

+344
-14
lines changed

pkg/sql/schemachanger/scbuild/internal/scbuildstmt/alter_table_set_storage_param.go

Lines changed: 153 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,120 @@ import (
2525
// NotImplementedError if the param is not yet supported in the declarative
2626
// schema changer.
2727
func validateStorageParamKey(t tree.NodeFormatter, key string) {
28-
if strings.HasPrefix(strings.ToLower(key), "ttl") {
28+
loweredKey := strings.ToLower(key)
29+
// These TTL params still require legacy schema changer because they
30+
// affect column management, column dependencies, or schedule management.
31+
switch loweredKey {
32+
case "ttl", "ttl_expire_after", "ttl_expiration_expression", "ttl_job_cron":
2933
panic(scerrors.NotImplementedErrorf(t, redact.Sprintf("%s not implemented yet", redact.SafeString(key))))
3034
}
31-
if key == catpb.RBRUsingConstraintTableSettingName {
35+
if loweredKey == catpb.RBRUsingConstraintTableSettingName {
3236
panic(scerrors.NotImplementedErrorf(t, "infer_rbr_region_col_using_constraint not implemented yet"))
3337
}
3438
}
3539

40+
// isTTLParam returns true if this is a TTL storage parameter.
41+
func isTTLParam(key string) bool {
42+
loweredKey := strings.ToLower(key)
43+
return strings.HasPrefix(loweredKey, "ttl")
44+
}
45+
46+
// applyTTLStorageParamsSet processes TTL-related storage parameters for SET.
47+
// It returns:
48+
// - origElem: the original RowLevelTTL element (nil if table had no TTL).
49+
// - newElem: the updated RowLevelTTL element (nil if no TTL params were handled).
50+
// - changed: true if any TTL param was modified.
51+
func applyTTLStorageParamsSet(
52+
b BuildCtx, tbl *scpb.Table, params tree.StorageParams,
53+
) (origElem *scpb.RowLevelTTL, newElem *scpb.RowLevelTTL, changed bool) {
54+
origElem = b.QueryByID(tbl.TableID).FilterRowLevelTTL().MustGetZeroOrOneElement()
55+
56+
if len(params) == 0 {
57+
return origElem, nil, false
58+
}
59+
60+
// Build new TTL from original (or empty if none).
61+
var newTTL catpb.RowLevelTTL
62+
var origTTLExpr *scpb.Expression
63+
if origElem != nil {
64+
newTTL = origElem.RowLevelTTL
65+
origTTLExpr = origElem.TTLExpr
66+
}
67+
68+
setter := tablestorageparam.NewTTLSetter(&newTTL, false /* isNewObject */)
69+
if err := storageparam.Set(
70+
b,
71+
b.SemaCtx(),
72+
b.EvalCtx(),
73+
params,
74+
setter,
75+
); err != nil {
76+
panic(err)
77+
}
78+
79+
// Construct new scpb.RowLevelTTL element with incremented SeqNum.
80+
var seqNum uint32
81+
if origElem != nil {
82+
seqNum = origElem.SeqNum + 1
83+
}
84+
newElem = &scpb.RowLevelTTL{
85+
TableID: tbl.TableID,
86+
RowLevelTTL: newTTL,
87+
TTLExpr: origTTLExpr,
88+
SeqNum: seqNum,
89+
}
90+
return origElem, newElem, true
91+
}
92+
93+
// applyTTLStorageParamsReset processes TTL-related storage parameters for RESET.
94+
// It returns:
95+
// - origElem: the original RowLevelTTL element (nil if table had no TTL).
96+
// - newElem: the updated RowLevelTTL element (nil if no TTL params were handled).
97+
// - changed: true if any TTL param was modified.
98+
func applyTTLStorageParamsReset(
99+
b BuildCtx, tbl *scpb.Table, params []string,
100+
) (origElem *scpb.RowLevelTTL, newElem *scpb.RowLevelTTL, changed bool) {
101+
origElem = b.QueryByID(tbl.TableID).FilterRowLevelTTL().MustGetZeroOrOneElement()
102+
103+
if len(params) == 0 {
104+
return origElem, nil, false
105+
}
106+
107+
// Build new TTL from original (or empty if none).
108+
var newTTL *catpb.RowLevelTTL
109+
var origTTLExpr *scpb.Expression
110+
if origElem != nil {
111+
ttlCopy := origElem.RowLevelTTL
112+
newTTL = &ttlCopy
113+
origTTLExpr = origElem.TTLExpr
114+
}
115+
116+
// Even if origElem is nil, we apply all the resets, since some of them
117+
// have side effects like sending notices.
118+
setter := tablestorageparam.NewTTLSetter(newTTL, false /* isNewObject */)
119+
if err := storageparam.Reset(
120+
b,
121+
b.EvalCtx(),
122+
params,
123+
setter,
124+
); err != nil {
125+
panic(err)
126+
}
127+
128+
if origElem == nil {
129+
return nil, nil, false
130+
}
131+
132+
// Construct new scpb.RowLevelTTL element with incremented SeqNum.
133+
newElem = &scpb.RowLevelTTL{
134+
TableID: tbl.TableID,
135+
RowLevelTTL: *newTTL,
136+
TTLExpr: origTTLExpr,
137+
SeqNum: origElem.SeqNum + 1,
138+
}
139+
return origElem, newElem, true
140+
}
141+
36142
// AlterTableSetStorageParams implements ALTER TABLE ... SET {storage_param} in the declarative schema changer.
37143
func AlterTableSetStorageParams(
38144
b BuildCtx,
@@ -41,22 +147,41 @@ func AlterTableSetStorageParams(
41147
stmt tree.Statement,
42148
t *tree.AlterTableSetStorageParams,
43149
) {
150+
var ttlParams, otherParams tree.StorageParams
151+
for _, param := range t.StorageParams {
152+
validateStorageParamKey(t, param.Key)
153+
if isTTLParam(param.Key) {
154+
ttlParams = append(ttlParams, param)
155+
} else {
156+
otherParams = append(otherParams, param)
157+
}
158+
}
159+
160+
// Handle TTL params first, using the RowLevelTTL element.
161+
origTTL, newTTL, ttlChanged := applyTTLStorageParamsSet(b, tbl, ttlParams)
162+
if ttlChanged {
163+
if origTTL != nil {
164+
b.Drop(origTTL)
165+
}
166+
b.Add(newTTL)
167+
}
168+
44169
if err := storageparam.StorageParamPreChecks(
45170
b,
46171
b.EvalCtx(),
47172
false, /* isNewObject */
48-
t.StorageParams,
173+
otherParams,
49174
nil, /* resetParams */
50175
); err != nil {
51176
panic(err)
52177
}
53-
for _, param := range t.StorageParams {
178+
179+
for _, param := range otherParams {
180+
key := param.Key
54181
val, err := tablestorageparam.ParseAndValidate(b, b.SemaCtx(), b.EvalCtx(), param)
55182
if err != nil {
56183
panic(err) // tried to set an invalid value for param
57184
}
58-
key := param.Key
59-
validateStorageParamKey(t, key)
60185

61186
// schema_locked uses a dedicated TableSchemaLocked element.
62187
if key == "schema_locked" {
@@ -130,21 +255,40 @@ func AlterTableResetStorageParams(
130255
stmt tree.Statement,
131256
t *tree.AlterTableResetStorageParams,
132257
) {
258+
var ttlParams, otherParams []string
259+
for _, param := range t.Params {
260+
validateStorageParamKey(t, param)
261+
if isTTLParam(param) {
262+
ttlParams = append(ttlParams, param)
263+
} else {
264+
otherParams = append(otherParams, param)
265+
}
266+
}
267+
268+
// Handle TTL params first, using the RowLevelTTL element.
269+
origTTL, newTTL, ttlChanged := applyTTLStorageParamsReset(b, tbl, ttlParams)
270+
if ttlChanged {
271+
if origTTL != nil {
272+
b.Drop(origTTL)
273+
}
274+
b.Add(newTTL)
275+
}
276+
133277
if err := storageparam.StorageParamPreChecks(
134278
b,
135279
b.EvalCtx(),
136280
false, /* isNewObject */
137281
nil, /* setParams */
138-
t.Params,
282+
otherParams,
139283
); err != nil {
140284
panic(err)
141285
}
142-
for _, key := range t.Params {
286+
287+
for _, key := range otherParams {
143288
// Validate the key is a known storage parameter.
144289
if err := tablestorageparam.IsValidParamKey(key); err != nil {
145290
panic(err)
146291
}
147-
validateStorageParamKey(t, key)
148292

149293
// Get the reset value for this param. Most of the time this is the
150294
// zero value, but for some params it may be different.

pkg/sql/schemachanger/scdecomp/decomp.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -499,6 +499,10 @@ func (w *walkCtx) walkStorageParams(tbl catalog.TableDescriptor) {
499499
// schema_locked is handled separately via the TableSchemaLocked element.
500500
continue
501501
}
502+
if strings.HasPrefix(key, "ttl") {
503+
// ttl parameters are handled separately via the RowLevelTTL element.
504+
continue
505+
}
502506
w.ev(scpb.Status_PUBLIC, &scpb.TableStorageParam{
503507
TableID: tableID,
504508
Name: key,

pkg/sql/schemachanger/scexec/scmutationexec/table.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"context"
1010

1111
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
12+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
1213
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scop"
1314
"github.com/cockroachdb/cockroach/pkg/sql/storageparam/tablestorageparam"
1415
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
@@ -54,6 +55,23 @@ func (i *immediateVisitor) ResetTableStorageParam(
5455
return setter.ResetToZeroValue(ctx, op.Param.Name)
5556
}
5657

58+
func (i *immediateVisitor) UpsertRowLevelTTL(ctx context.Context, op scop.UpsertRowLevelTTL) error {
59+
tbl, err := i.checkOutTable(ctx, op.TableID)
60+
if err != nil {
61+
return err
62+
}
63+
64+
if op.RowLevelTTL == (catpb.RowLevelTTL{}) {
65+
tbl.RowLevelTTL = nil
66+
return nil
67+
}
68+
69+
// Make a copy of the RowLevelTTL so we can take its address.
70+
ttl := op.RowLevelTTL
71+
tbl.RowLevelTTL = &ttl
72+
return nil
73+
}
74+
5775
func (d *deferredVisitor) UpdateTTLScheduleMetadata(
5876
ctx context.Context, op scop.UpdateTTLScheduleMetadata,
5977
) error {

pkg/sql/schemachanger/scop/immediate_mutation.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1271,3 +1271,11 @@ type ResetTableStorageParam struct {
12711271
immediateMutationOp
12721272
Param scpb.TableStorageParam
12731273
}
1274+
1275+
// UpsertRowLevelTTL sets the RowLevelTTL on a table descriptor.
1276+
type UpsertRowLevelTTL struct {
1277+
immediateMutationOp
1278+
TableID descpb.ID
1279+
RowLevelTTL catpb.RowLevelTTL
1280+
TTLExpr *scpb.Expression
1281+
}

pkg/sql/schemachanger/scop/immediate_mutation_visitor_generated.go

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/sql/schemachanger/scpb/elements.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -770,6 +770,10 @@ message RowLevelTTL {
770770
// ExpirationExpr is not set. This expression is used to establish column
771771
// dependencies.
772772
Expression ttl_expr = 3 [(gogoproto.customname) = "TTLExpr"];
773+
// SeqNum differentiates different versions of the TTL configuration for the
774+
// same table. When modifying TTL parameters, the old element is dropped and
775+
// a new one with an incremented SeqNum is added.
776+
uint32 seq_num = 4 [(gogoproto.customname) = "SeqNum"];
773777
}
774778

775779
message ColumnName {

pkg/sql/schemachanger/scplan/internal/opgen/opgen_row_level_ttl.go

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,12 @@ func init() {
1515
toPublic(
1616
scpb.Status_ABSENT,
1717
to(scpb.Status_PUBLIC,
18-
emit(func(this *scpb.RowLevelTTL) *scop.NotImplemented {
19-
return notImplemented(this)
18+
emit(func(this *scpb.RowLevelTTL, md *opGenContext) *scop.UpsertRowLevelTTL {
19+
return &scop.UpsertRowLevelTTL{
20+
TableID: this.TableID,
21+
RowLevelTTL: this.RowLevelTTL,
22+
TTLExpr: this.TTLExpr,
23+
}
2024
}),
2125
),
2226
),
@@ -25,7 +29,10 @@ func init() {
2529
to(scpb.Status_ABSENT,
2630
// TODO(postamar): remove revertibility constraint when possible
2731
revertible(false),
28-
emit(func(this *scpb.RowLevelTTL) *scop.DeleteSchedule {
32+
emit(func(this *scpb.RowLevelTTL, md *opGenContext) *scop.DeleteSchedule {
33+
if ttlAppliedLater(this, md) {
34+
return nil
35+
}
2936
return &scop.DeleteSchedule{
3037
ScheduleID: this.RowLevelTTL.ScheduleID,
3138
}
@@ -34,3 +41,26 @@ func init() {
3441
),
3542
)
3643
}
44+
45+
// ttlAppliedLater returns true if there is another RowLevelTTL element
46+
// with the same ScheduleID and TableID, a higher SeqNum, and
47+
// a target status of PUBLIC. This indicates that the TTL schedule
48+
// will be applied later, so we should not delete the schedule yet.
49+
func ttlAppliedLater(this *scpb.RowLevelTTL, md *opGenContext) bool {
50+
if this.RowLevelTTL.ScheduleID == 0 {
51+
return false
52+
}
53+
for _, t := range md.Targets {
54+
other, ok := t.Element().(*scpb.RowLevelTTL)
55+
if !ok || other == this {
56+
continue
57+
}
58+
if other.RowLevelTTL.ScheduleID == this.RowLevelTTL.ScheduleID &&
59+
other.TableID == this.TableID &&
60+
other.SeqNum > this.SeqNum &&
61+
t.TargetStatus == scpb.Status_PUBLIC {
62+
return true
63+
}
64+
}
65+
return false
66+
}

pkg/sql/schemachanger/scplan/internal/rules/current/dep_storage_param.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,22 @@ func init() {
3131
}
3232
},
3333
)
34+
35+
// This rule ensures that when modifying TTL configurations, the old TTL
36+
// element (with lower SeqNum) is processed before the new TTL element (with
37+
// higher SeqNum).
38+
registerDepRule(
39+
"ensure row level TTL configs are in increasing seqNum order",
40+
scgraph.Precedence,
41+
"later-seqNum", "earlier-seqNum",
42+
func(from, to NodeVars) rel.Clauses {
43+
return rel.Clauses{
44+
from.Type((*scpb.RowLevelTTL)(nil)),
45+
to.Type((*scpb.RowLevelTTL)(nil)),
46+
JoinOnDescID(from, to, "table-id"),
47+
FilterElements("SmallerSeqNumFirst", from, to, func(from, to *scpb.RowLevelTTL) bool {
48+
return from.SeqNum < to.SeqNum
49+
}),
50+
}
51+
})
3452
}

0 commit comments

Comments
 (0)