Skip to content
This repository was archived by the owner on Apr 2, 2026. It is now read-only.

Commit 52a41d0

Browse files
authored
Merge pull request #14 from reddit/bug-fix-47072
Bug fix Segfault on ChannelLevelScoreBalancer use
2 parents 9eb8146 + 06d68c8 commit 52a41d0

4 files changed

Lines changed: 21 additions & 9 deletions

File tree

internal/querycoordv2/observers/replica_observer.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,11 @@ type ReplicaObserver struct {
4747
stopOnce sync.Once
4848
}
4949

50-
func NewReplicaObserver(meta *meta.Meta, distMgr *meta.DistributionManager) *ReplicaObserver {
50+
func NewReplicaObserver(meta *meta.Meta, distMgr *meta.DistributionManager, targetMgr meta.TargetManagerInterface) *ReplicaObserver {
5151
return &ReplicaObserver{
52-
meta: meta,
53-
distMgr: distMgr,
52+
meta: meta,
53+
distMgr: distMgr,
54+
targetMgr: targetMgr,
5455
}
5556
}
5657

@@ -179,6 +180,10 @@ func (ob *ReplicaObserver) checkNodesInReplica() {
179180
replicas := ob.meta.ReplicaManager.GetByCollection(ctx, collectionID)
180181
for _, replica := range replicas {
181182
if enableChannelExclusiveMode && !replica.IsChannelExclusiveModeEnabled() {
183+
if ob.targetMgr == nil {
184+
log.Warn("targetMgr is nil, cannot enable channel exclusive mode")
185+
continue
186+
}
182187
// register channel for enable exclusive mode
183188
mutableReplica := replica.CopyForWrite()
184189
channels := ob.targetMgr.GetDmChannelsByCollection(ctx, collectionID, meta.CurrentTargetFirst)

internal/querycoordv2/observers/replica_observer_test.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
3131
"github.com/milvus-io/milvus/internal/mocks/streamingcoord/server/mock_balancer"
3232
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
33-
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
3433
"github.com/milvus-io/milvus/internal/querycoordv2/session"
3534
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
3635
"github.com/milvus-io/milvus/internal/streamingcoord/server/balancer"
@@ -43,15 +42,19 @@ import (
4342
"github.com/milvus-io/milvus/pkg/v2/util/etcd"
4443
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
4544
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
45+
46+
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
4647
)
4748

4849
type ReplicaObserverSuite struct {
4950
suite.Suite
5051

5152
kv kv.MetaKv
5253
// dependency
53-
meta *meta.Meta
54-
distMgr *meta.DistributionManager
54+
meta *meta.Meta
55+
distMgr *meta.DistributionManager
56+
targetMgr meta.TargetManagerInterface
57+
broker *meta.MockBroker
5558

5659
nodeMgr *session.NodeManager
5760
observer *ReplicaObserver
@@ -89,9 +92,11 @@ func (suite *ReplicaObserverSuite) SetupTest() {
8992
idAllocator := RandomIncrementIDAllocator()
9093
suite.nodeMgr = session.NewNodeManager()
9194
suite.meta = meta.NewMeta(idAllocator, store, suite.nodeMgr)
95+
suite.broker = meta.NewMockBroker(suite.T())
96+
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta)
9297

9398
suite.distMgr = meta.NewDistributionManager(suite.nodeMgr)
94-
suite.observer = NewReplicaObserver(suite.meta, suite.distMgr)
99+
suite.observer = NewReplicaObserver(suite.meta, suite.distMgr, suite.targetMgr)
95100
suite.observer.Start()
96101
suite.collectionID = int64(1000)
97102
suite.partitionID = int64(100)
@@ -244,7 +249,7 @@ func (suite *ReplicaObserverSuite) TestCheckSQnodesInReplica() {
244249
}
245250
})
246251
balance.Register(b)
247-
suite.observer = NewReplicaObserver(suite.meta, suite.distMgr)
252+
suite.observer = NewReplicaObserver(suite.meta, suite.distMgr, suite.targetMgr)
248253
suite.observer.Start()
249254

250255
ctx := context.Background()

internal/querycoordv2/server.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,7 @@ func (s *Server) initObserver() {
453453
s.replicaObserver = observers.NewReplicaObserver(
454454
s.meta,
455455
s.dist,
456+
s.targetMgr,
456457
)
457458

458459
s.resourceObserver = observers.NewResourceObserver(s.meta)

internal/querycoordv2/services_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -707,7 +707,7 @@ func (suite *ServiceSuite) TestTransferNode() {
707707

708708
server.resourceObserver = observers.NewResourceObserver(server.meta)
709709
server.resourceObserver.Start()
710-
server.replicaObserver = observers.NewReplicaObserver(server.meta, server.dist)
710+
server.replicaObserver = observers.NewReplicaObserver(server.meta, server.dist, server.targetMgr)
711711
server.replicaObserver.Start()
712712
defer server.resourceObserver.Stop()
713713
defer server.replicaObserver.Stop()
@@ -1917,6 +1917,7 @@ func (suite *ServiceSuite) TestHandleNodeUp() {
19171917
suite.server.replicaObserver = observers.NewReplicaObserver(
19181918
suite.server.meta,
19191919
suite.server.dist,
1920+
suite.server.targetMgr,
19201921
)
19211922
suite.server.resourceObserver = observers.NewResourceObserver(
19221923
suite.server.meta,

0 commit comments

Comments
 (0)