Skip to content

Commit 9e8575c

Browse files
codesomepracucci
andauthored
Removed deprecated untyped record from chunks WAL (#3115)
Signed-off-by: Ganesh Vernekar <[email protected]> Co-authored-by: Marco Pracucci <[email protected]>
1 parent c76689d commit 9e8575c

File tree

5 files changed

+74
-1171
lines changed

5 files changed

+74
-1171
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
- `-experimental.store-gateway.*` flags renamed to `-store-gateway.*`
99
- `-experimental.querier.store-gateway-client.*` flags renamed to `-querier.store-gateway-client.*`
1010
- `-experimental.querier.store-gateway-addresses` flag renamed to `-querier.store-gateway-addresses`
11+
* [CHANGE] Ingester: Removed deprecated untyped record from chunks WAL. Only if you are running `v1.0` or below, it is recommended to first upgrade to `v1.1`/`v1.2`/`v1.3` and run it for a day before upgrading to `v1.4` to avoid data loss. #3115
1112
* [CHANGE] Distributor API endpoints are no longer served unless target is set to `distributor` or `all`. #3112
1213
* [CHANGE] Increase the default Cassandra client replication factor to 3. #3007
1314
* [CHANGE] Blocks storage: removed the support to transfer blocks between ingesters on shutdown. When running the Cortex blocks storage, ingesters are expected to run with a persistent disk. The following metrics have been removed: #2996

pkg/ingester/wal.go

Lines changed: 26 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,6 @@ type WAL interface {
6565
type RecordType byte
6666

6767
const (
68-
// Currently we also support the old records without a type header.
69-
// For that, we assume the record type does not cross 7 as the proto unmarshalling
70-
// will produce an error if the first byte is less than 7 (thus we know its not the old record).
71-
// The old record will be removed in the future releases, hence the record type should not cross
72-
// '7' till then.
73-
7468
// WALRecordSeries is the type for the WAL record on Prometheus TSDB record for series.
7569
WALRecordSeries RecordType = 1
7670
// WALRecordSamples is the type for the WAL record based on Prometheus TSDB record for samples.
@@ -843,7 +837,6 @@ func processWAL(startSegment int, userStates *userStates, params walRecoveryPara
843837

844838
var (
845839
capturedErr error
846-
record = &Record{}
847840
walRecord = &WALRecord{}
848841
lp labelPairs
849842
)
@@ -857,51 +850,30 @@ Loop:
857850
default:
858851
}
859852

860-
record.Samples = record.Samples[:0]
861-
record.Labels = record.Labels[:0]
862-
// Only one of 'record' or 'walRecord' will have the data.
863-
if err := decodeWALRecord(reader.Record(), record, walRecord); err != nil {
853+
if err := decodeWALRecord(reader.Record(), walRecord); err != nil {
864854
// We don't return here in order to close/drain all the channels and
865855
// make sure all goroutines exit.
866856
capturedErr = err
867857
break Loop
868858
}
869859

870-
if len(record.Labels) > 0 || len(walRecord.Series) > 0 {
871-
872-
var userID string
873-
if len(walRecord.Series) > 0 {
874-
userID = walRecord.UserID
875-
} else {
876-
userID = record.UserId
877-
}
860+
if len(walRecord.Series) > 0 {
861+
userID := walRecord.UserID
878862

879863
state := userStates.getOrCreate(userID)
880864

881-
createSeries := func(fingerprint model.Fingerprint, lbls labelPairs) error {
882-
_, ok := state.fpToSeries.get(fingerprint)
865+
for _, s := range walRecord.Series {
866+
fp := model.Fingerprint(s.Ref)
867+
_, ok := state.fpToSeries.get(fp)
883868
if ok {
884-
return nil
885-
}
886-
_, err := state.createSeriesWithFingerprint(fingerprint, lbls, nil, true)
887-
return err
888-
}
889-
890-
for _, labels := range record.Labels {
891-
if err := createSeries(model.Fingerprint(labels.Fingerprint), labels.Labels); err != nil {
892-
// We don't return here in order to close/drain all the channels and
893-
// make sure all goroutines exit.
894-
capturedErr = err
895-
break Loop
869+
continue
896870
}
897-
}
898871

899-
for _, s := range walRecord.Series {
900872
lp = lp[:0]
901873
for _, l := range s.Labels {
902874
lp = append(lp, client.LabelAdapter(l))
903875
}
904-
if err := createSeries(model.Fingerprint(s.Ref), lp); err != nil {
876+
if _, err := state.createSeriesWithFingerprint(fp, lp, nil, true); err != nil {
905877
// We don't return here in order to close/drain all the channels and
906878
// make sure all goroutines exit.
907879
capturedErr = err
@@ -914,20 +886,12 @@ Loop:
914886
// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
915887
// cause thousands of very large in flight buffers occupying large amounts
916888
// of unused memory.
917-
for len(record.Samples) > 0 || len(walRecord.Samples) > 0 {
889+
walRecordSamples := walRecord.Samples
890+
for len(walRecordSamples) > 0 {
918891
m := 5000
919-
var userID string
920-
if len(record.Samples) > 0 {
921-
userID = record.UserId
922-
if len(record.Samples) < m {
923-
m = len(record.Samples)
924-
}
925-
}
926-
if len(walRecord.Samples) > 0 {
927-
userID = walRecord.UserID
928-
if len(walRecord.Samples) < m {
929-
m = len(walRecord.Samples)
930-
}
892+
userID := walRecord.UserID
893+
if len(walRecordSamples) < m {
894+
m = len(walRecordSamples)
931895
}
932896

933897
for i := 0; i < params.numWorkers; i++ {
@@ -949,21 +913,9 @@ Loop:
949913
}
950914
}
951915

952-
if len(record.Samples) > 0 {
953-
for _, sam := range record.Samples[:m] {
954-
mod := sam.Fingerprint % uint64(params.numWorkers)
955-
shards[mod].samples = append(shards[mod].samples, tsdb_record.RefSample{
956-
Ref: sam.Fingerprint,
957-
T: int64(sam.Timestamp),
958-
V: sam.Value,
959-
})
960-
}
961-
}
962-
if len(walRecord.Samples) > 0 {
963-
for _, sam := range walRecord.Samples[:m] {
964-
mod := sam.Ref % uint64(params.numWorkers)
965-
shards[mod].samples = append(shards[mod].samples, sam)
966-
}
916+
for _, sam := range walRecordSamples[:m] {
917+
mod := sam.Ref % uint64(params.numWorkers)
918+
shards[mod].samples = append(shards[mod].samples, sam)
967919
}
968920

969921
for i := 0; i < params.numWorkers; i++ {
@@ -972,12 +924,7 @@ Loop:
972924
}
973925
}
974926

975-
if len(record.Samples) > 0 {
976-
record.Samples = record.Samples[m:]
977-
}
978-
if len(walRecord.Samples) > 0 {
979-
walRecord.Samples = walRecord.Samples[m:]
980-
}
927+
walRecordSamples = walRecordSamples[m:]
981928
}
982929
}
983930

@@ -1171,7 +1118,7 @@ func (record *WALRecord) encodeSamples(b []byte) []byte {
11711118
return encoded
11721119
}
11731120

1174-
func decodeWALRecord(b []byte, rec *Record, walRec *WALRecord) (err error) {
1121+
func decodeWALRecord(b []byte, walRec *WALRecord) (err error) {
11751122
var (
11761123
userID string
11771124
dec tsdb_record.Decoder
@@ -1192,23 +1139,21 @@ func decodeWALRecord(b []byte, rec *Record, walRec *WALRecord) (err error) {
11921139
userID = decbuf.UvarintStr()
11931140
rseries, err = dec.Series(decbuf.B, walRec.Series)
11941141
default:
1195-
// The legacy proto record will have it's first byte >7.
1196-
// Hence it does not match any of the existing record types.
1197-
err = proto.Unmarshal(b, rec)
1198-
return err
1142+
return errors.New("unknown record type")
11991143
}
12001144

12011145
// We reach here only if its a record with type header.
12021146
if decbuf.Err() != nil {
12031147
return decbuf.Err()
12041148
}
12051149

1206-
if err == nil {
1207-
// There was no error decoding the records with type headers.
1208-
walRec.UserID = userID
1209-
walRec.Samples = rsamples
1210-
walRec.Series = rseries
1150+
if err != nil {
1151+
return err
12111152
}
12121153

1213-
return err
1154+
walRec.UserID = userID
1155+
walRec.Samples = rsamples
1156+
walRec.Series = rseries
1157+
1158+
return nil
12141159
}

0 commit comments

Comments
 (0)