Skip to content
This repository was archived by the owner on Apr 2, 2026. It is now read-only.

Commit d084a32

Browse files
authored
Merge pull request #12 from reddit/cherry-pick-s3-change
enhance: Always retry writing binlogs (milvus-io#46850)
2 parents 6969bb4 + a23851b commit d084a32

5 files changed

Lines changed: 24 additions & 3 deletions

File tree

internal/datanode/importv2/util.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ import (
4141
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
4242
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
4343
"github.com/milvus-io/milvus/pkg/v2/util/merr"
44+
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
45+
"github.com/milvus-io/milvus/pkg/v2/util/retry"
4446
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
4547
)
4648

@@ -94,12 +96,18 @@ func NewSyncTask(ctx context.Context,
9496
syncPack.WithBM25Stats(bm25Stats)
9597
}
9698

99+
writeRetryAttempts := paramtable.Get().DataNodeCfg.ImportMaxWriteRetryAttempts.GetAsUint()
100+
retryOpts := []retry.Option{
101+
retry.Attempts(writeRetryAttempts), // default retry always
102+
retry.MaxSleepTime(10 * time.Second),
103+
}
97104
task := syncmgr.NewSyncTask().
98105
WithAllocator(allocator).
99106
WithMetaCache(metaCache).
100107
WithSchema(metaCache.GetSchema(0)). // TODO specify import schema if needed
101108
WithSyncPack(syncPack).
102-
WithStorageConfig(storageConfig)
109+
WithStorageConfig(storageConfig).
110+
WithWriteRetryOptions(retryOpts...)
103111
return task, nil
104112
}
105113

internal/flushcommon/writebuffer/write_buffer.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"sync"
7+
"time"
78

89
"github.com/cockroachdb/errors"
910
"github.com/samber/lo"
@@ -25,6 +26,7 @@ import (
2526
"github.com/milvus-io/milvus/pkg/v2/util/conc"
2627
"github.com/milvus-io/milvus/pkg/v2/util/merr"
2728
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
29+
"github.com/milvus-io/milvus/pkg/v2/util/retry"
2830
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
2931
)
3032

@@ -597,7 +599,8 @@ func (wb *writeBufferBase) getSyncTask(ctx context.Context, segmentID int64) (sy
597599
WithMetaWriter(wb.metaWriter).
598600
WithMetaCache(wb.metaCache).
599601
WithSchema(schema).
600-
WithSyncPack(pack)
602+
WithSyncPack(pack).
603+
WithWriteRetryOptions(retry.AttemptAlways(), retry.MaxSleepTime(10*time.Second))
601604
return task, nil
602605
}
603606

pkg/util/paramtable/component_param.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5681,6 +5681,7 @@ type dataNodeConfig struct {
56815681
ImportBaseBufferSize ParamItem `refreshable:"true"`
56825682
ImportDeleteBufferSize ParamItem `refreshable:"true"`
56835683
ImportMemoryLimitPercentage ParamItem `refreshable:"true"`
5684+
ImportMaxWriteRetryAttempts ParamItem `refreshable:"true"`
56845685

56855686
// Compaction
56865687
L0BatchMemoryRatio ParamItem `refreshable:"true"`
@@ -6033,6 +6034,14 @@ if this parameter <= 0, will set it as 10`,
60336034
}
60346035
p.ImportMemoryLimitPercentage.Init(base.mgr)
60356036

6037+
p.ImportMaxWriteRetryAttempts = ParamItem{
6038+
Key: "dataNode.import.maxWriteRetryAttempts",
6039+
Version: "2.6.9",
6040+
Doc: "The maximum number of write retry attempts. 0 means unlimited.",
6041+
DefaultValue: "0",
6042+
}
6043+
p.ImportMaxWriteRetryAttempts.Init(base.mgr)
6044+
60366045
p.L0BatchMemoryRatio = ParamItem{
60376046
Key: "dataNode.compaction.levelZeroBatchMemoryRatio",
60386047
Version: "2.4.0",

pkg/util/paramtable/component_param_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -634,6 +634,7 @@ func TestComponentParam(t *testing.T) {
634634
assert.Equal(t, 16*1024*1024, Params.ImportBaseBufferSize.GetAsInt())
635635
assert.Equal(t, 16*1024*1024, Params.ImportDeleteBufferSize.GetAsInt())
636636
assert.Equal(t, 10.0, Params.ImportMemoryLimitPercentage.GetAsFloat())
637+
assert.Equal(t, 0, Params.ImportMaxWriteRetryAttempts.GetAsInt())
637638
params.Save("datanode.gracefulStopTimeout", "100")
638639
assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second))
639640
assert.Equal(t, 16, Params.SlotCap.GetAsInt())

pkg/util/retry/retry.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ func Handle(ctx context.Context, fn func() (bool, error), opts ...Option) error
141141
}
142142

143143
var lastErr error
144-
for i := uint(0); i < c.attempts; i++ {
144+
for i := uint(0); c.attempts == 0 || i < c.attempts; i++ {
145145
if shouldRetry, err := fn(); err != nil {
146146
if i%4 == 0 {
147147
log.Warn("retry func failed",

0 commit comments

Comments
 (0)