Skip to content

Commit bb6784c

Browse files
bborehampracucci
andauthored
Re-use memory for chunks in ingester QueryStream (#3177)
* Refactor: extract function to set up ingester benchmark data Signed-off-by: Bryan Boreham <[email protected]> * Add benchmark for ingester QueryStream() Signed-off-by: Bryan Boreham <[email protected]> * Re-use memory for chunks in ingester QueryStream This improves performance of queries and reduces garbage-collection. Signed-off-by: Bryan Boreham <[email protected]> * Add CHANGELOG entry Signed-off-by: Bryan Boreham <[email protected]> Co-authored-by: Marco Pracucci <[email protected]>
1 parent fa62742 commit bb6784c

File tree

3 files changed

+67
-14
lines changed

3 files changed

+67
-14
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
* [ENHANCEMENT] When a tenant accesses the Alertmanager UI or its API, if we have valid `-alertmanager.configs.fallback` we'll use that to start the manager and avoid failing the request. #3073
5454
* [ENHANCEMENT] Add `DELETE api/v1/rules/{namespace}` to the Ruler. It allows all the rule groups of a namespace to be deleted. #3120
5555
* [ENHANCEMENT] Experimental Delete Series: Retry processing of Delete requests during failures. #2926
56+
* [ENHANCEMENT] Improve performance of QueryStream() in ingesters. #3177
5657
* [ENHANCEMENT] Modules included in "All" target are now visible in output of `-modules` CLI flag. #3155
5758
* [BUGFIX] Ruler: when loading rules from "local" storage, check for directory after resolving symlink. #3137
5859
* [BUGFIX] Query-frontend: Fixed rounding for incoming query timestamps, to be 100% Prometheus compatible. #2990

pkg/ingester/ingester.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -723,6 +723,7 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
723723
}
724724

725725
numSeries, numChunks := 0, 0
726+
reuseWireChunks := [queryStreamBatchSize][]client.Chunk{}
726727
batch := make([]client.TimeSeriesChunk, 0, queryStreamBatchSize)
727728
// We'd really like to have series in label order, not FP order, so we
728729
// can iteratively merge them with entries coming from the chunk store. But
@@ -741,10 +742,12 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
741742
}
742743

743744
numSeries++
744-
wireChunks, err := toWireChunks(chunks, nil)
745+
reusePos := len(batch)
746+
wireChunks, err := toWireChunks(chunks, reuseWireChunks[reusePos])
745747
if err != nil {
746748
return err
747749
}
750+
reuseWireChunks[reusePos] = wireChunks
748751

749752
numChunks += len(wireChunks)
750753
batch = append(batch, client.TimeSeriesChunk{

pkg/ingester/ingester_test.go

Lines changed: 62 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"io/ioutil"
77
"math"
8+
"math/rand"
89
"net/http"
910
"os"
1011
"path/filepath"
@@ -843,19 +844,9 @@ func BenchmarkIngesterPushErrors(b *testing.B) {
843844
benchmarkIngesterPush(b, limits, true)
844845
}
845846

846-
func benchmarkIngesterPush(b *testing.B, limits validation.Limits, errorsExpected bool) {
847-
cfg := defaultIngesterTestConfig()
848-
clientCfg := defaultClientTestConfig()
849-
850-
const (
851-
series = 100
852-
samples = 100
853-
)
854-
855-
// Construct a set of realistic-looking samples, all with slightly different label sets
856-
var allLabels []labels.Labels
857-
var allSamples []client.Sample
858-
for j := 0; j < series; j++ {
847+
// Construct a set of realistic-looking samples, all with slightly different label sets
848+
func benchmarkData(nSeries int) (allLabels []labels.Labels, allSamples []client.Sample) {
849+
for j := 0; j < nSeries; j++ {
859850
labels := chunk.BenchmarkLabels.Copy()
860851
for i := range labels {
861852
if labels[i].Name == "cpu" {
@@ -865,6 +856,19 @@ func benchmarkIngesterPush(b *testing.B, limits validation.Limits, errorsExpecte
865856
allLabels = append(allLabels, labels)
866857
allSamples = append(allSamples, client.Sample{TimestampMs: 0, Value: float64(j)})
867858
}
859+
return
860+
}
861+
862+
func benchmarkIngesterPush(b *testing.B, limits validation.Limits, errorsExpected bool) {
863+
cfg := defaultIngesterTestConfig()
864+
clientCfg := defaultClientTestConfig()
865+
866+
const (
867+
series = 100
868+
samples = 100
869+
)
870+
871+
allLabels, allSamples := benchmarkData(series)
868872
ctx := user.InjectOrgID(context.Background(), "1")
869873

870874
encodings := []struct {
@@ -897,3 +901,48 @@ func benchmarkIngesterPush(b *testing.B, limits validation.Limits, errorsExpecte
897901
}
898902

899903
}
904+
905+
func BenchmarkIngester_QueryStream(b *testing.B) {
906+
cfg := defaultIngesterTestConfig()
907+
clientCfg := defaultClientTestConfig()
908+
limits := defaultLimitsTestConfig()
909+
_, ing := newTestStore(b, cfg, clientCfg, limits, nil)
910+
ctx := user.InjectOrgID(context.Background(), "1")
911+
912+
const (
913+
series = 2000
914+
samples = 1000
915+
)
916+
917+
allLabels, allSamples := benchmarkData(series)
918+
919+
// Bump the timestamp and set a random value on each of our test samples each time round the loop
920+
for j := 0; j < samples; j++ {
921+
for i := range allSamples {
922+
allSamples[i].TimestampMs = int64(j + 1)
923+
allSamples[i].Value = rand.Float64()
924+
}
925+
_, err := ing.Push(ctx, client.ToWriteRequest(allLabels, allSamples, nil, client.API))
926+
require.NoError(b, err)
927+
}
928+
929+
req := &client.QueryRequest{
930+
StartTimestampMs: 0,
931+
EndTimestampMs: samples + 1,
932+
933+
Matchers: []*client.LabelMatcher{{
934+
Type: client.EQUAL,
935+
Name: model.MetricNameLabel,
936+
Value: "container_cpu_usage_seconds_total",
937+
}},
938+
}
939+
940+
mockStream := &mockQueryStreamServer{ctx: ctx}
941+
942+
b.ResetTimer()
943+
944+
for ix := 0; ix < b.N; ix++ {
945+
err := ing.QueryStream(req, mockStream)
946+
require.NoError(b, err)
947+
}
948+
}

0 commit comments

Comments
 (0)