Skip to content

Commit 5139f43

Browse files
committed
uucore: splice fast-path for head -c & preliminary for dd, tail, split
1 parent 4e68e67 commit 5139f43

File tree

3 files changed

+111
-1
lines changed

3 files changed

+111
-1
lines changed

src/uu/head/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ memchr = { workspace = true }
2424
thiserror = { workspace = true }
2525
uucore = { workspace = true, features = [
2626
"parser-size",
27+
"pipes",
2728
"ringbuffer",
2829
"lines",
2930
"fs",

src/uu/head/src/head.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,14 @@ fn wrap_in_stdout_error(err: io::Error) -> io::Error {
166166
)
167167
}
168168

169+
// zero-copy fast-path
170+
#[cfg(any(target_os = "linux", target_os = "android"))]
171+
fn read_n_bytes(input: impl Read + AsFd, n: u64) -> io::Result<u64> {
172+
let out = io::stdout();
173+
uucore::pipes::send_n_bytes(input, out, n).map_err(wrap_in_stdout_error)
174+
}
175+
176+
#[cfg(not(any(target_os = "linux", target_os = "android")))]
169177
fn read_n_bytes(input: impl Read, n: u64) -> io::Result<u64> {
170178
// Read the first `n` bytes from the `input` reader.
171179
let mut reader = input.take(n);
@@ -608,6 +616,7 @@ mod tests {
608616
}
609617

610618
#[test]
619+
#[cfg(not(any(target_os = "linux", target_os = "android")))] // missing trait for AsFd
611620
fn read_early_exit() {
612621
let mut empty = io::BufReader::new(Cursor::new(Vec::new()));
613622
assert!(read_n_bytes(&mut empty, 0).is_ok());

src/uucore/src/lib/features/pipes.rs

Lines changed: 101 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@ use rustix::pipe::{SpliceFlags, fcntl_setpipe_size};
1010
#[cfg(any(target_os = "linux", target_os = "android", test))]
1111
use std::fs::File;
1212
#[cfg(any(target_os = "linux", target_os = "android"))]
13-
use std::os::fd::AsFd;
13+
use std::{io::Read, os::fd::AsFd, sync::OnceLock};
1414
#[cfg(any(target_os = "linux", target_os = "android"))]
1515
pub const MAX_ROOTLESS_PIPE_SIZE: usize = 1024 * 1024;
16+
#[cfg(any(target_os = "linux", target_os = "android"))]
17+
const KERNEL_DEFAULT_PIPE_SIZE: usize = 64 * 1024;
1618

1719
/// A wrapper around [`rustix::pipe::pipe`] that ensures the pipe is cleaned up.
1820
///
@@ -30,6 +32,20 @@ pub fn pipe() -> std::io::Result<(File, File)> {
3032
Ok((File::from(read), File::from(write)))
3133
}
3234

35+
/// return pipe larger than given size and kernel's default size
36+
///
37+
/// useful to save RAM usage
38+
#[inline]
39+
#[cfg(any(target_os = "linux", target_os = "android"))]
40+
fn pipe_with_size(s: usize) -> std::io::Result<(File, File)> {
41+
let (read, write) = rustix::pipe::pipe()?;
42+
if s > KERNEL_DEFAULT_PIPE_SIZE {
43+
let _ = fcntl_setpipe_size(&read, s);
44+
}
45+
46+
Ok((File::from(read), File::from(write)))
47+
}
48+
3349
/// Less noisy wrapper around [`rustix::pipe::splice`].
3450
///
3551
/// Up to `len` bytes are moved from `source` to `target`. Returns the number
@@ -72,6 +88,90 @@ pub fn might_fuse(source: &impl AsFd) -> bool {
7288
.unwrap_or(true)
7389
}
7490

91+
/// splice `n` bytes with safe read/write fallback
92+
/// return actually sent bytes
93+
#[inline]
94+
#[cfg(any(target_os = "linux", target_os = "android"))]
95+
pub fn send_n_bytes(
96+
input: impl Read + AsFd,
97+
mut target: impl std::io::Write + AsFd,
98+
n: u64,
99+
) -> std::io::Result<u64> {
100+
static PIPE_CACHE: OnceLock<Option<(File, File)>> = OnceLock::new();
101+
let pipe_size = MAX_ROOTLESS_PIPE_SIZE.min(n as usize);
102+
let mut n = n;
103+
let mut bytes_written: u64 = 0;
104+
// do not always fallback to write as it needs 2 Ctrl+D to exit process on tty
105+
let fallback = if let Ok(b) = splice(&input, &target, n as usize) {
106+
bytes_written = b as u64;
107+
n -= bytes_written;
108+
if n > 0 {
109+
// improve throughput or save RAM usage
110+
// expected that input is already extended if it is coming from splice
111+
// we can use pipe_size * N with some case e.g. head -c N inputs, but we need N splice call anyway
112+
if pipe_size > KERNEL_DEFAULT_PIPE_SIZE {
113+
let _ = fcntl_setpipe_size(&target, pipe_size);
114+
}
115+
} else {
116+
// avoid unnecessary syscalls
117+
return Ok(bytes_written);
118+
}
119+
120+
loop {
121+
match splice(&input, &target, n as usize) {
122+
Ok(0) => break might_fuse(&input),
123+
Ok(s @ 1..) => {
124+
n -= s as u64;
125+
bytes_written += s as u64;
126+
}
127+
_ => break true,
128+
}
129+
}
130+
} else if let Some((broker_r, broker_w)) = PIPE_CACHE
131+
.get_or_init(|| pipe_with_size(pipe_size).ok())
132+
.as_ref()
133+
{
134+
loop {
135+
match splice(&input, &broker_w, n as usize) {
136+
Ok(0) => break might_fuse(&input),
137+
Ok(s @ 1..) => {
138+
if splice_exact(&broker_r, &target, s).is_ok() {
139+
n -= s as u64;
140+
bytes_written += s as u64;
141+
if n == 0 {
142+
// avoid unnecessary splice for small input
143+
break false;
144+
}
145+
} else {
146+
let mut drain = Vec::with_capacity(s); // bounded by pipe size
147+
broker_r.take(s as u64).read_to_end(&mut drain)?;
148+
target.write_all(&drain)?;
149+
break true;
150+
}
151+
}
152+
_ => break true,
153+
}
154+
}
155+
} else {
156+
true
157+
};
158+
159+
if !fallback {
160+
return Ok(bytes_written);
161+
}
162+
let mut reader = input.take(n);
163+
let mut buf = vec![0u8; (32 * 1024).min(n as usize)]; //use heap to avoid early allocation
164+
loop {
165+
match reader.read(&mut buf)? {
166+
0 => return Ok(bytes_written),
167+
n => {
168+
target.write_all(&buf[..n])?;
169+
bytes_written += n as u64;
170+
}
171+
}
172+
}
173+
}
174+
75175
/// Return verified /dev/null
76176
///
77177
/// `splice` to /dev/null is faster than `read` when we skip or count the non-seekable input

0 commit comments

Comments
 (0)