Skip to content

Commit d88bd5c

Browse files
Merge pull request #52 from excoffierleonard/feat/internal-benchmark
Feat/internal benchmark
2 parents a60d8e2 + 37ff293 commit d88bd5c

6 files changed

Lines changed: 303 additions & 81 deletions

File tree

Cargo.lock

Lines changed: 18 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/cli/src/lib.rs

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use clap::Parser;
2-
use parser_core::parse;
2+
use parser_core::{parse, ParserError};
33
use rayon::prelude::*;
4-
use std::{fs::read, path::PathBuf};
4+
use std::{fs::read, io::Error, path::PathBuf};
55

66
/// CLI arguments parser
77
#[derive(Parser)]
@@ -13,17 +13,16 @@ pub struct Cli {
1313
}
1414

1515
/// Parses files in parallel and returns a Result containing either all parsed texts or the first error
16-
pub fn parse_files(files: &[PathBuf]) -> Result<Vec<String>, parser_core::ParserError> {
17-
let results: Vec<_> = files
18-
.par_iter()
19-
.filter_map(|path| read(path).ok().map(|data| parse(&data)))
20-
.collect();
16+
pub fn parse_files(paths: &[PathBuf]) -> Result<Vec<String>, ParserError> {
17+
// Read files into memory
18+
let files = paths
19+
.iter()
20+
.map(read)
21+
.collect::<Result<Vec<Vec<u8>>, Error>>()?;
2122

22-
// Check if all results are Ok
23-
if results.iter().all(|r| r.is_ok()) {
24-
Ok(results.into_iter().filter_map(Result::ok).collect())
25-
} else {
26-
// Return the first error
27-
Err(results.into_iter().find_map(|r| r.err()).unwrap())
28-
}
23+
// Process files in parallel
24+
files
25+
.par_iter()
26+
.map(|data| parse(data))
27+
.collect::<Result<Vec<String>, ParserError>>()
2928
}

crates/core/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ categories = ["text-processing", "parsing", "encoding"]
1616
pkg-config = "0.3.29"
1717

1818
[dependencies]
19-
rayon = { workspace = true }
2019
calamine = "0.26.1"
2120
docx-rs = "0.4.17"
2221
infer = "0.16.0"
@@ -32,7 +31,8 @@ zip = "2.3.0"
3231
criterion = "0.5"
3332
parser-test-utils = { workspace = true }
3433
rayon = { workspace = true }
34+
num_cpus = "1.16.0"
3535

3636
[[bench]]
37-
name = "parsing_benchmark"
37+
name = "function_parse"
3838
harness = false
Lines changed: 265 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
use std::time::{Duration, Instant};
2+
3+
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
4+
use rayon::prelude::*;
5+
6+
use parser_core::{parse, ParserError};
7+
use parser_test_utils::read_test_file;
8+
9+
const TEST_FILESNAMES_BASE: &[&str] = &[
10+
"test_csv_1.csv",
11+
"test_docx_1.docx",
12+
"test_jpg_1.jpg",
13+
"test_json_1.json",
14+
"test_pdf_1.pdf",
15+
"test_png_1.png",
16+
"test_pptx_1.pptx",
17+
"test_txt_1.txt",
18+
"test_webp_1.webp",
19+
"test_xlsx_1.xlsx",
20+
];
21+
22+
const TEST_FILESNAMES_IMAGES: &[&str] = &["test_jpg_1.jpg", "test_png_1.png", "test_webp_1.webp"];
23+
24+
const _TEST_FILESNAMES_FULL: &[&str] = &[
25+
"test_csv_1.csv",
26+
"test_docx_1.docx",
27+
"test_jpg_1.jpg",
28+
"test_json_1.json",
29+
"test_pdf_1.pdf",
30+
"test_png_1.png",
31+
"test_pptx_1.pptx",
32+
"test_txt_1.txt",
33+
"test_webp_1.webp",
34+
"test_xlsx_1.xlsx",
35+
];
36+
37+
fn benchmark_sequential_vs_parallel(c: &mut Criterion) {
38+
// Create a vector of file data the size of the number of CPUs
39+
let file_data = read_test_file("test_pdf_1.pdf");
40+
let files: Vec<&[u8]> = vec![&file_data; num_cpus::get()];
41+
42+
let mut group = c.benchmark_group("Sequential vs Parallel Parsing");
43+
44+
group.throughput(Throughput::Elements(files.len() as u64));
45+
46+
// Benchmark parallel parsing
47+
group.bench_function("parallel", |b| {
48+
b.iter(|| {
49+
files
50+
.par_iter()
51+
.map(|d| parse(black_box(d)))
52+
.collect::<Result<Vec<String>, ParserError>>()
53+
})
54+
});
55+
56+
// Benchmark sequential parsing
57+
group.bench_function("sequential", |b| {
58+
b.iter(|| {
59+
files
60+
.iter()
61+
.map(|d| parse(black_box(d)))
62+
.collect::<Result<Vec<String>, ParserError>>()
63+
})
64+
});
65+
66+
group.finish();
67+
}
68+
69+
fn benchmark_parallel_efficiency(c: &mut Criterion) {
70+
let file_data = read_test_file("test_pdf_1.pdf");
71+
72+
let cpu_count = num_cpus::get();
73+
let mut counts = vec![
74+
cpu_count / 4,
75+
cpu_count / 2,
76+
cpu_count,
77+
cpu_count * 2,
78+
cpu_count * 4,
79+
];
80+
counts.dedup();
81+
82+
let mut group = c.benchmark_group("Parallel Efficiency");
83+
84+
group.throughput(Throughput::Elements(1));
85+
86+
for &count in &counts {
87+
let files: Vec<&[u8]> = vec![&file_data; count];
88+
89+
group.bench_function(BenchmarkId::new("files", count), |b| {
90+
b.iter(|| {
91+
files
92+
.par_iter()
93+
.map(|d| parse(black_box(d)))
94+
.collect::<Result<Vec<String>, ParserError>>()
95+
})
96+
});
97+
}
98+
99+
group.finish();
100+
}
101+
102+
fn benchmark_individual_files(c: &mut Criterion) {
103+
let cpus = num_cpus::get();
104+
let mut group = c.benchmark_group("Individual File Parsing");
105+
106+
// Set throughput to the number of CPUs
107+
group.throughput(Throughput::Elements(cpus as u64));
108+
109+
for &filename in TEST_FILESNAMES_BASE {
110+
let file = read_test_file(filename);
111+
let files: Vec<&[u8]> = vec![&file; cpus];
112+
113+
group.bench_function(filename, |b| {
114+
b.iter(|| {
115+
files
116+
.par_iter()
117+
.map(|d| parse(black_box(d)))
118+
.collect::<Result<Vec<String>, ParserError>>()
119+
})
120+
});
121+
}
122+
123+
for &filename in TEST_FILESNAMES_IMAGES {
124+
let file = read_test_file(filename);
125+
let files: Vec<&[u8]> = vec![&file; cpus];
126+
127+
group.sample_size(10);
128+
129+
group.bench_function(filename, |b| {
130+
b.iter(|| {
131+
files
132+
.par_iter()
133+
.map(|d| parse(black_box(d)))
134+
.collect::<Result<Vec<String>, ParserError>>()
135+
})
136+
});
137+
}
138+
139+
group.finish();
140+
}
141+
142+
// Finds the threshold number of files for each type that takes less than 16ms
143+
fn benchmark_parallel_threshold(c: &mut Criterion) {
144+
let max_time_threshold = Duration::from_millis(16);
145+
146+
// Read each test file only once
147+
for &filename in TEST_FILESNAMES_BASE {
148+
let file_extension = filename.split('.').last().unwrap_or("unknown");
149+
let group_name = format!("Parallel {} Processing", file_extension.to_uppercase());
150+
let mut group = c.benchmark_group(&group_name);
151+
152+
// Cache the file data once
153+
let file_data = read_test_file(filename);
154+
155+
// Function to measure processing time for a given count
156+
let measure_time = |count: usize| -> Duration {
157+
// Pre-allocate the vector of references outside timing
158+
let files: Vec<&[u8]> = vec![&file_data; count];
159+
160+
// Perform warm-up runs to stabilize cache and runtime behavior
161+
for _ in 0..3 {
162+
black_box(
163+
files
164+
.par_iter()
165+
.map(|d| parse(black_box(d)))
166+
.collect::<Result<Vec<String>, ParserError>>()
167+
.unwrap(),
168+
);
169+
}
170+
171+
// Take multiple measurements and use median for robustness
172+
const SAMPLE_COUNT: usize = 5;
173+
let mut durations = Vec::with_capacity(SAMPLE_COUNT);
174+
175+
for _ in 0..SAMPLE_COUNT {
176+
// Clear caches between runs to ensure consistent starting state
177+
black_box(());
178+
179+
let start = Instant::now();
180+
black_box(
181+
files
182+
.par_iter()
183+
.map(|d| parse(black_box(d)))
184+
.collect::<Result<Vec<String>, ParserError>>()
185+
.unwrap(),
186+
);
187+
durations.push(start.elapsed());
188+
}
189+
190+
// Sort and take median duration (more robust against outliers)
191+
durations.sort();
192+
durations[SAMPLE_COUNT / 2]
193+
};
194+
195+
// Finding and benchmarking the threshold count
196+
let mut low = 1;
197+
let mut high = 1;
198+
199+
// Phase 1: Find upper bound using exponential search
200+
while measure_time(high) <= max_time_threshold {
201+
low = high;
202+
high *= 2;
203+
}
204+
205+
// Phase 2: Binary search between bounds
206+
while high - low > 1 {
207+
let mid = low + (high - low) / 2;
208+
if measure_time(mid) <= max_time_threshold {
209+
low = mid;
210+
} else {
211+
high = mid;
212+
}
213+
}
214+
215+
// The threshold count is now in 'low'
216+
let threshold_count = low;
217+
218+
// Define percentages to test around the threshold
219+
let percentages = [90.0, 99.0, 99.9, 100.0, 100.1, 101.0, 110.0];
220+
221+
// Generate test points based on percentages of the threshold
222+
let mut test_points: Vec<usize> = percentages
223+
.iter()
224+
.map(|&p| ((threshold_count as f64 * p / 100.0).ceil() as usize).max(1))
225+
.collect();
226+
227+
test_points.dedup();
228+
229+
// Benchmark each test point with proper throughput measurement
230+
for &count in &test_points {
231+
// Set throughput for proper operations/second measurements
232+
group.throughput(Throughput::Elements(count as u64));
233+
234+
// Benchmark with the current count
235+
let files: Vec<&[u8]> = vec![&file_data; count];
236+
group.bench_with_input(BenchmarkId::new("files", count), &count, |b, &_| {
237+
b.iter(|| {
238+
files
239+
.par_iter()
240+
.map(|d| parse(black_box(d)))
241+
.collect::<Result<Vec<String>, ParserError>>()
242+
})
243+
});
244+
}
245+
246+
// Add custom threshold marker to output
247+
println!(
248+
"Threshold for {}: {} files within {}ms",
249+
file_extension,
250+
threshold_count,
251+
max_time_threshold.as_millis()
252+
);
253+
254+
group.finish();
255+
}
256+
}
257+
258+
criterion_group!(
259+
benches,
260+
benchmark_sequential_vs_parallel,
261+
benchmark_parallel_efficiency,
262+
benchmark_individual_files,
263+
benchmark_parallel_threshold
264+
);
265+
criterion_main!(benches);

0 commit comments

Comments
 (0)