Skip to content

Commit bca3ebd

Browse files
committed
double up the sample types
1 parent ca5d1ba commit bca3ebd

File tree

2 files changed

+103
-30
lines changed

2 files changed

+103
-30
lines changed

libdd-profiling/src/internal/otel_style_observation.rs

Lines changed: 100 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,23 @@ use crate::{
55
api::SampleType,
66
internal::{Sample, Timestamp},
77
};
8-
use std::collections::{HashMap, HashSet};
8+
use std::collections::HashMap;
99

1010
#[derive(Default)]
1111
pub struct Observations {
1212
sample_types: Box<[SampleType]>,
1313
pub aggregated: HashMap<SampleType, HashMap<Sample, i64>>,
14+
pub aggregated2: HashMap<(SampleType, SampleType), HashMap<Sample, (i64, i64)>>,
1415
pub timestamped: HashMap<SampleType, HashMap<Sample, Vec<(i64, Timestamp)>>>,
16+
pub timestamped2:
17+
HashMap<(SampleType, SampleType), HashMap<Sample, Vec<(i64, i64, Timestamp)>>>,
1518
}
1619

1720
impl Observations {
1821
pub fn new(sample_types: Box<[SampleType]>) -> Self {
19-
let len = sample_types.len();
2022
Self {
2123
sample_types,
22-
aggregated: HashMap::with_capacity(len),
23-
timestamped: HashMap::with_capacity(len),
24+
..Default::default()
2425
}
2526
}
2627

@@ -30,27 +31,64 @@ impl Observations {
3031
timestamp: Option<Timestamp>,
3132
values: &[i64],
3233
) -> anyhow::Result<()> {
34+
let mut first = None;
35+
let mut second = None;
36+
3337
for (idx, v) in values.iter().enumerate() {
3438
if *v != 0 {
3539
let sample_type = self.sample_types[idx];
40+
if first.is_none() {
41+
first = Some((sample_type, *v));
42+
} else if second.is_none() {
43+
second = Some((sample_type, *v));
44+
} else {
45+
anyhow::bail!("too many values");
46+
}
47+
}
48+
}
49+
50+
match (first, second) {
51+
(None, None) => {}
52+
(Some((st, val)), None) => {
3653
if let Some(ts) = timestamp {
3754
self.timestamped
38-
.entry(sample_type)
55+
.entry(st)
3956
.or_default()
4057
.entry(sample)
4158
.or_default()
42-
.push((*v, ts));
59+
.push((val, ts));
4360
} else {
44-
let val = self
61+
let entry = self
4562
.aggregated
46-
.entry(sample_type)
63+
.entry(st)
4764
.or_default()
4865
.entry(sample)
4966
.or_insert(0);
50-
*val = val.saturating_add(*v);
67+
*entry = entry.saturating_add(val);
5168
}
5269
}
70+
(Some((st1, val1)), Some((st2, val2))) => {
71+
if let Some(ts) = timestamp {
72+
self.timestamped2
73+
.entry((st1, st2))
74+
.or_default()
75+
.entry(sample)
76+
.or_default()
77+
.push((val1, val2, ts));
78+
} else {
79+
let entry = self
80+
.aggregated2
81+
.entry((st1, st2))
82+
.or_default()
83+
.entry(sample)
84+
.or_insert((0, 0));
85+
entry.0 = entry.0.saturating_add(val1);
86+
entry.1 = entry.1.saturating_add(val2);
87+
}
88+
}
89+
(None, Some(_)) => unreachable!("second set implies first set"),
5390
}
91+
5492
Ok(())
5593
}
5694

@@ -59,21 +97,27 @@ impl Observations {
5997
}
6098

6199
pub fn aggregated_samples_count(&self) -> usize {
62-
// TODO: this is the actual sample count, but doesn't reflect aggregated samples being
63-
// overlapping self.aggregated.iter().map(|(_, v)| v.len()).sum()
64-
let samples: HashSet<&Sample> = self
65-
.aggregated
66-
.iter()
67-
.flat_map(|(_, samples)| samples.keys())
68-
.collect();
69-
samples.len()
100+
self.aggregated
101+
.values()
102+
.map(|samples| samples.len())
103+
.sum::<usize>()
104+
+ self
105+
.aggregated2
106+
.values()
107+
.map(|samples| samples.len())
108+
.sum::<usize>()
70109
}
71110

72111
pub fn timestamped_samples_count(&self) -> usize {
73112
self.timestamped
74113
.iter()
75114
.flat_map(|(_, v)| v.values().map(|v| v.len()))
76-
.sum()
115+
.sum::<usize>()
116+
+ self
117+
.timestamped2
118+
.iter()
119+
.flat_map(|(_, v)| v.values().map(|v| v.len()))
120+
.sum::<usize>()
77121
}
78122

79123
pub fn try_into_iter(
@@ -94,6 +138,8 @@ impl Observations {
94138

95139
let len: usize = self.sample_types.len();
96140
let index_map_ts = index_map.clone();
141+
let index_map_ts2 = index_map.clone();
142+
let index_map_accum2 = index_map.clone();
97143
let accum_iter = self
98144
.aggregated
99145
.into_iter()
@@ -107,6 +153,22 @@ impl Observations {
107153
})
108154
});
109155

156+
let accum2_iter = self
157+
.aggregated2
158+
.into_iter()
159+
.flat_map(move |((st1, st2), inner)| {
160+
#[allow(clippy::unwrap_used)]
161+
let index1 = *index_map_accum2.get(&st1).unwrap();
162+
#[allow(clippy::unwrap_used)]
163+
let index2 = *index_map_accum2.get(&st2).unwrap();
164+
inner.into_iter().map(move |(sample, (val1, val2))| {
165+
let mut vals = vec![0; len];
166+
vals[index1] = val1;
167+
vals[index2] = val2;
168+
(sample, None, vals)
169+
})
170+
});
171+
110172
// let mut accum: HashMap<Sample, Vec<i64>> = HashMap::new();
111173
// for (sample_type, samples) in self.aggregated.into_iter() {
112174
// let Some(idx) = index_map.get(&sample_type) else {
@@ -133,6 +195,25 @@ impl Observations {
133195
})
134196
})
135197
});
136-
accum_iter.chain(ts_iter)
198+
199+
let ts2_iter = self
200+
.timestamped2
201+
.into_iter()
202+
.flat_map(move |((st1, st2), inner)| {
203+
#[allow(clippy::unwrap_used)]
204+
let index1 = *index_map_ts2.get(&st1).unwrap();
205+
#[allow(clippy::unwrap_used)]
206+
let index2 = *index_map_ts2.get(&st2).unwrap();
207+
inner.into_iter().flat_map(move |(sample, ts_vals)| {
208+
ts_vals.into_iter().map(move |(val1, val2, ts)| {
209+
let mut vals = vec![0; len];
210+
vals[index1] = val1;
211+
vals[index2] = val2;
212+
(sample, Some(ts), vals)
213+
})
214+
})
215+
});
216+
217+
accum_iter.chain(accum2_iter).chain(ts_iter).chain(ts2_iter)
137218
}
138219
}

libdd-profiling/src/internal/profile/mod.rs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,7 @@ impl Profile {
462462
/// duration based on the end time minus the start time, but under anomalous conditions this
463463
/// may fail as system clocks can be adjusted. The programmer may also accidentally pass an
464464
/// earlier time. The duration will be set to zero these cases.
465+
#[cfg(feature = "otel")]
465466
pub fn serialize_into_compressed_otel(
466467
self,
467468
end_time: Option<SystemTime>,
@@ -783,7 +784,7 @@ impl Profile {
783784
end,
784785
buffer,
785786
endpoints_stats,
786-
})
787+
})
787788
}
788789

789790
/// Encodes the profile. Note that the buffer will be empty. The caller
@@ -1694,7 +1695,6 @@ mod api_tests {
16941695
}
16951696

16961697
#[test]
1697-
#[cfg_attr(feature = "otel", ignore)]
16981698
fn lazy_endpoints() -> anyhow::Result<()> {
16991699
let sample_types = [api::SampleType::CpuSamples, api::SampleType::WallTime];
17001700

@@ -1858,8 +1858,6 @@ mod api_tests {
18581858
}
18591859

18601860
#[test]
1861-
// This works if we merge accumulated samples
1862-
#[cfg_attr(feature = "otel", ignore)]
18631861
fn test_no_upscaling_if_no_rules() {
18641862
let sample_types = vec![api::SampleType::CpuSamples, api::SampleType::WallTime];
18651863

@@ -1909,8 +1907,6 @@ mod api_tests {
19091907
}
19101908

19111909
#[test]
1912-
// This works if we accumulate
1913-
#[cfg_attr(feature = "otel", ignore)]
19141910
fn test_upscaling_by_value_a_zero_value() {
19151911
let sample_types = create_samples_types();
19161912

@@ -2048,9 +2044,6 @@ mod api_tests {
20482044
}
20492045

20502046
#[test]
2051-
// This works if we accumulate
2052-
#[cfg_attr(feature = "otel", ignore)]
2053-
20542047
fn test_upscaling_by_value_on_zero_value_with_poisson() {
20552048
let sample_types = create_samples_types();
20562049

@@ -2839,6 +2832,7 @@ mod api_tests {
28392832
}
28402833

28412834
#[test]
2835+
#[cfg_attr(feature = "otel", ignore)] // too many non-zero values
28422836
fn test_fails_when_adding_byvalue_rule_colliding_on_offset_with_existing_bylabel_rule() {
28432837
let sample_types = create_samples_types();
28442838

@@ -2878,8 +2872,6 @@ mod api_tests {
28782872
}
28792873

28802874
#[test]
2881-
// This works if we merge accumulated samples
2882-
#[cfg_attr(feature = "otel", ignore)]
28832875
fn local_root_span_id_label_as_i64() -> anyhow::Result<()> {
28842876
let sample_types = vec![api::SampleType::CpuSamples, api::SampleType::WallTime];
28852877

0 commit comments

Comments
 (0)