Skip to content

Commit ca2c9a1

Browse files
authored
Merge pull request #8491 from logzio/logzio-capnp-endless-replication-loop
Receive: fix capnproto replication in endless loop
2 parents 7ccee2f + 61df47f commit ca2c9a1

File tree

2 files changed

+16
-4
lines changed

2 files changed

+16
-4
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
1414

1515
- [#8378](https://github.com/thanos-io/thanos/pull/8378): Store: fix the reuse of dirty posting slices
1616
- [#8558](https://github.com/thanos-io/thanos/pull/8558): Query-Frontend: Fix not logging requests when external-prefix is set in query
17+
- [#8254](https://github.com/thanos-io/thanos/issues/8254) Receive: Endless loop of retried replication with capnproto and distributors
1718

1819
### Added
1920

pkg/receive/writecapnp/client.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func (r *RemoteWriteClient) RemoteWrite(ctx context.Context, in *storepb.WriteRe
6868

6969
func (r *RemoteWriteClient) writeWithReconnect(ctx context.Context, numReconnects int, in *storepb.WriteRequest) (*storepb.WriteResponse, error) {
7070
if err := r.connect(ctx); err != nil {
71-
return nil, err
71+
return nil, status.Error(codes.Unavailable, err.Error())
7272
}
7373

7474
result, release := r.writer.Write(ctx, func(params Writer_write_Params) error {
@@ -137,17 +137,28 @@ func (r *RemoteWriteClient) connect(ctx context.Context) error {
137137
return errors.Wrap(err, "failed to dial peer")
138138
}
139139
r.conn = rpc.NewConn(rpc.NewPackedStreamTransport(conn), nil)
140-
r.writer = Writer(r.conn.Bootstrap(ctx))
140+
writer := Writer(r.conn.Bootstrap(ctx))
141+
if err := writer.Resolve(ctx); err != nil {
142+
level.Warn(r.logger).Log("msg", "failed to bootstrap capnp writer, closing connection", "err", err)
143+
r.closeUnlocked()
144+
return errors.Wrap(err, "failed to bootstrap capnp writer")
145+
}
146+
147+
r.writer = writer
141148
return nil
142149
}
143150

144151
func (r *RemoteWriteClient) Close() error {
145152
r.mu.Lock()
153+
r.closeUnlocked()
154+
r.mu.Unlock()
155+
return nil
156+
}
157+
158+
func (r *RemoteWriteClient) closeUnlocked() {
146159
if r.conn != nil {
147160
conn := r.conn
148161
r.conn = nil
149162
go conn.Close()
150163
}
151-
r.mu.Unlock()
152-
return nil
153164
}

0 commit comments

Comments
 (0)