Skip to content

Commit 8988b46

Browse files
committed
sql/fingerprint: process range-sized spans
Subdividing each span from partition spans into n range-sized spans ensures both that no single span passed to an ExportRequest (or some SQL-based span fingerprinter) will run for an excessive time, allowing the use of reasonable timeouts to catch underlying failures, and makes the unit of progress we can complete and checkpoint more granular. It should have no effect on the result (as tested here). Release note: none. Epic: none.
1 parent f5ff484 commit 8988b46

File tree

3 files changed

+108
-20
lines changed

3 files changed

+108
-20
lines changed

pkg/sql/fingerprint/BUILD.bazel

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,15 +55,14 @@ go_test(
5555
"//pkg/security/username",
5656
"//pkg/server",
5757
"//pkg/sql",
58-
"//pkg/testutils",
58+
"//pkg/testutils/jobutils",
5959
"//pkg/testutils/serverutils",
6060
"//pkg/testutils/sqlutils",
6161
"//pkg/testutils/testcluster",
6262
"//pkg/util/hlc",
6363
"//pkg/util/leaktest",
6464
"//pkg/util/log",
6565
"//pkg/util/span",
66-
"@com_github_cockroachdb_errors//:errors",
6766
"@com_github_stretchr_testify//require",
6867
],
6968
)

pkg/sql/fingerprint/fingerprint_job.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func (r *resumer) Resume(ctx context.Context, execCtx interface{}) error {
7878
id: jobspb.JobID(*r),
7979
execCtx: jobExecCtx,
8080
},
81-
partitioner: partitionSpans{jobExecCtx},
81+
partitioner: partitionSpans{jobExecCtx, false},
8282
fn: kvFingerprinter{
8383
sender: jobExecCtx.ExecCfg().DB.NonTransactionalSender(),
8484
asOf: details.AsOf,
@@ -225,6 +225,7 @@ func (p *persist) store(ctx context.Context, state checkpointState, frac float64
225225

226226
type partitionSpans struct {
227227
sql.JobExecContext
228+
rangeSized bool
228229
}
229230

230231
var _ partitioner = partitionSpans{}
@@ -249,6 +250,36 @@ func (p partitionSpans) partition(
249250
if err != nil {
250251
return nil, err
251252
}
253+
if p.rangeSized {
254+
// Sub-divide each partition into indivially range-aligned sub-spans.
255+
for part := range spanPartitions {
256+
subdivided := make([]roachpb.Span, 0, len(spanPartitions[part].Spans))
257+
for _, sp := range spanPartitions[part].Spans {
258+
rdi, err := p.JobExecContext.ExecCfg().RangeDescIteratorFactory.NewLazyIterator(ctx, sp, 64)
259+
if err != nil {
260+
return nil, err
261+
}
262+
remaining := sp
263+
for ; rdi.Valid(); rdi.Next() {
264+
rangeDesc := rdi.CurRangeDescriptor()
265+
rangeSpan := roachpb.Span{Key: rangeDesc.StartKey.AsRawKey(), EndKey: rangeDesc.EndKey.AsRawKey()}
266+
subspan := remaining.Intersect(rangeSpan)
267+
if !subspan.Valid() {
268+
return nil, errors.AssertionFailedf("%s not in %s of %s", rangeSpan, remaining, sp)
269+
}
270+
subdivided = append(subdivided, subspan)
271+
remaining.Key = subspan.EndKey
272+
}
273+
if err := rdi.Error(); err != nil {
274+
return nil, err
275+
}
276+
if remaining.Valid() {
277+
subdivided = append(subdivided, remaining)
278+
}
279+
}
280+
spanPartitions[part].Spans = subdivided
281+
}
282+
}
252283
return spanPartitions, nil
253284
}
254285

pkg/sql/fingerprint/fingerprint_job_test.go

Lines changed: 75 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,21 @@ package fingerprint
88
import (
99
"context"
1010
"testing"
11+
"time"
1112

1213
"github.com/cockroachdb/cockroach/pkg/base"
1314
"github.com/cockroachdb/cockroach/pkg/jobs"
1415
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1516
"github.com/cockroachdb/cockroach/pkg/roachpb"
1617
"github.com/cockroachdb/cockroach/pkg/security/username"
1718
"github.com/cockroachdb/cockroach/pkg/sql"
18-
"github.com/cockroachdb/cockroach/pkg/testutils"
19+
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
1920
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
2021
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
2122
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2223
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
2324
"github.com/cockroachdb/cockroach/pkg/util/log"
2425
"github.com/cockroachdb/cockroach/pkg/util/span"
25-
"github.com/cockroachdb/errors"
2626
"github.com/stretchr/testify/require"
2727
)
2828

@@ -141,6 +141,77 @@ func TestKVFingerprinter(t *testing.T) {
141141
require.Equal(t, uint64(0x2249712bb6b0388a), result.fingerprint)
142142
}
143143

144+
func TestRangeSizedSpans(t *testing.T) {
145+
defer leaktest.AfterTest(t)()
146+
defer log.Scope(t).Close(t)
147+
148+
ctx := context.Background()
149+
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
150+
defer s.Stopper().Stop(ctx)
151+
152+
runner := sqlutils.MakeSQLRunner(db)
153+
154+
// Create a table with enough data to span multiple ranges.
155+
runner.Exec(t, `CREATE TABLE r (i PRIMARY KEY, j) AS SELECT i, i::string FROM generate_series(1, 1000) i`)
156+
runner.Exec(t, `ALTER TABLE r SPLIT AT SELECT i*10 FROM generate_series(1, 100) i`)
157+
var tableID uint32
158+
runner.QueryRow(t, `SELECT id FROM system.namespace WHERE name = 'r'`).Scan(&tableID)
159+
160+
// Use actual key encoding for the table.
161+
tableSpan := roachpb.Span{
162+
Key: s.Codec().TablePrefix(tableID),
163+
EndKey: s.Codec().TablePrefix(tableID).PrefixEnd(),
164+
}
165+
166+
execCfg := s.ApplicationLayer().ExecutorConfig().(sql.ExecutorConfig)
167+
jobExecCtx, cleanup := sql.MakeJobExecContext(ctx, "test", username.RootUserName(), &sql.MemoryMetrics{}, &execCfg)
168+
defer cleanup()
169+
170+
parts := partitionSpans{JobExecContext: jobExecCtx}
171+
172+
g := gatherer{
173+
spans: []roachpb.Span{tableSpan},
174+
fn: kvFingerprinter{
175+
sender: s.DB().NonTransactionalSender(),
176+
asOf: s.Clock().Now(),
177+
stripped: true,
178+
}.fingerprintSpan,
179+
partitioner: parts,
180+
chkptFreq: func() time.Duration { return time.Hour },
181+
persister: &testPersister{},
182+
}
183+
184+
r1, err := g.run(ctx)
185+
require.NoError(t, err)
186+
187+
spCount := func() int {
188+
total := 0
189+
for _, p := range g.mu.prog {
190+
total += p.total
191+
}
192+
return total
193+
}
194+
195+
// Reset g and run again to verify determinism.
196+
g.persister, g.mu.done, g.mu.fingerprint, g.mu.prog = &testPersister{}, nil, 0, nil
197+
r2, err := g.run(ctx)
198+
require.NoError(t, err)
199+
require.Equal(t, r1, r2)
200+
// Single-node => partitionSpans just returned one span, so total spans = 1.
201+
require.Equal(t, 1, spCount())
202+
203+
// Now reset g and run again, but this time with range-sized spans.
204+
g.persister, g.mu.done, g.mu.fingerprint, g.mu.prog = &testPersister{}, nil, 0, nil
205+
parts.rangeSized = true
206+
g.partitioner = parts
207+
r3, err := g.run(ctx)
208+
require.NoError(t, err)
209+
require.Equal(t, r2, r3)
210+
211+
// Verify that using range-sized spans resulted in an extra span per split.
212+
require.Equal(t, 101, spCount())
213+
}
214+
144215
// TestJobE2E tests that a fingerprint job can be created as adoptable and
145216
// successfully runs through the job system.
146217
func TestJobE2E(t *testing.T) {
@@ -183,21 +254,8 @@ func TestJobE2E(t *testing.T) {
183254
job, err := registry.CreateAdoptableJobWithTxn(ctx, record, registry.MakeJobID(), nil /* txn */)
184255
require.NoError(t, err)
185256

186-
// Wait for the job to be adopted and succeed.
187-
testutils.SucceedsSoon(t, func() error {
188-
// Nudge the adoption queue to pick up the job immediately.
189-
registry.TestingNudgeAdoptionQueue()
190-
191-
var status string
192-
runner.QueryRow(t, `SELECT status FROM system.jobs WHERE id = $1`, job.ID()).Scan(&status)
193-
if status == string(jobs.StateSucceeded) {
194-
return nil
195-
}
196-
if status == string(jobs.StateFailed) || status == string(jobs.StateCanceled) {
197-
return errors.Errorf("job failed with status %s", status)
198-
}
199-
return errors.Errorf("job not yet succeeded, current status: %s", status)
200-
})
257+
registry.TestingNudgeAdoptionQueue()
258+
jobutils.WaitForJobToSucceed(t, runner, job.ID())
201259

202260
// Verify that the job completed and has a fingerprint stored.
203261
execCfg := s.ApplicationLayer().ExecutorConfig().(sql.ExecutorConfig)

0 commit comments

Comments
 (0)