Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/uu/head/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ memchr = { workspace = true }
thiserror = { workspace = true }
uucore = { workspace = true, features = [
"parser-size",
"pipes",
"ringbuffer",
"lines",
"fs",
Expand Down
9 changes: 9 additions & 0 deletions src/uu/head/src/head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,14 @@ fn wrap_in_stdout_error(err: io::Error) -> io::Error {
)
}

// zero-copy fast-path
#[cfg(any(target_os = "linux", target_os = "android"))]
fn read_n_bytes(input: impl Read + AsFd, n: u64) -> io::Result<u64> {
let out = io::stdout();
uucore::pipes::send_n_bytes(input, out, n).map_err(wrap_in_stdout_error)
}

#[cfg(not(any(target_os = "linux", target_os = "android")))]
fn read_n_bytes(input: impl Read, n: u64) -> io::Result<u64> {
// Read the first `n` bytes from the `input` reader.
let mut reader = input.take(n);
Expand Down Expand Up @@ -608,6 +616,7 @@ mod tests {
}

#[test]
#[cfg(not(any(target_os = "linux", target_os = "android")))] // missing trait for AsFd
fn read_early_exit() {
let mut empty = io::BufReader::new(Cursor::new(Vec::new()));
assert!(read_n_bytes(&mut empty, 0).is_ok());
Expand Down
102 changes: 101 additions & 1 deletion src/uucore/src/lib/features/pipes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ use rustix::pipe::{SpliceFlags, fcntl_setpipe_size};
#[cfg(any(target_os = "linux", target_os = "android", test))]
use std::fs::File;
#[cfg(any(target_os = "linux", target_os = "android"))]
use std::os::fd::AsFd;
use std::{io::Read, os::fd::AsFd, sync::OnceLock};
#[cfg(any(target_os = "linux", target_os = "android"))]
pub const MAX_ROOTLESS_PIPE_SIZE: usize = 1024 * 1024;
#[cfg(any(target_os = "linux", target_os = "android"))]
const KERNEL_DEFAULT_PIPE_SIZE: usize = 64 * 1024;

/// A wrapper around [`rustix::pipe::pipe`] that ensures the pipe is cleaned up.
///
Expand All @@ -30,6 +32,20 @@ pub fn pipe() -> std::io::Result<(File, File)> {
Ok((File::from(read), File::from(write)))
}

/// return pipe larger than given size and kernel's default size
///
/// useful to save RAM usage
#[inline]
#[cfg(any(target_os = "linux", target_os = "android"))]
fn pipe_with_size(s: usize) -> std::io::Result<(File, File)> {
let (read, write) = rustix::pipe::pipe()?;
if s > KERNEL_DEFAULT_PIPE_SIZE {
let _ = fcntl_setpipe_size(&read, s);
}

Ok((File::from(read), File::from(write)))
}

/// Less noisy wrapper around [`rustix::pipe::splice`].
///
/// Up to `len` bytes are moved from `source` to `target`. Returns the number
Expand Down Expand Up @@ -72,6 +88,90 @@ pub fn might_fuse(source: &impl AsFd) -> bool {
.unwrap_or(true)
}

/// splice `n` bytes with safe read/write fallback
/// return actually sent bytes
#[inline]
#[cfg(any(target_os = "linux", target_os = "android"))]
pub fn send_n_bytes(
input: impl Read + AsFd,
mut target: impl std::io::Write + AsFd,
n: u64,
) -> std::io::Result<u64> {
static PIPE_CACHE: OnceLock<Option<(File, File)>> = OnceLock::new();
let pipe_size = MAX_ROOTLESS_PIPE_SIZE.min(n as usize);
let mut n = n;
let mut bytes_written: u64 = 0;
// do not always fallback to write as it needs 2 Ctrl+D to exit process on tty
let fallback = if let Ok(b) = splice(&input, &target, n as usize) {
bytes_written = b as u64;
n -= bytes_written;
if n > 0 {
// improve throughput or save RAM usage
// expected that input is already extended if it is coming from splice
// we can use pipe_size * N with some case e.g. head -c N inputs, but we need N splice call anyway
if pipe_size > KERNEL_DEFAULT_PIPE_SIZE {
let _ = fcntl_setpipe_size(&target, pipe_size);
}
} else {
// avoid unnecessary syscalls
return Ok(bytes_written);
}

loop {
match splice(&input, &target, n as usize) {
Ok(0) => break might_fuse(&input),
Ok(s @ 1..) => {
n -= s as u64;
bytes_written += s as u64;
}
_ => break true,
}
}
} else if let Some((broker_r, broker_w)) = PIPE_CACHE
.get_or_init(|| pipe_with_size(pipe_size).ok())
.as_ref()
{
loop {
match splice(&input, &broker_w, n as usize) {
Ok(0) => break might_fuse(&input),
Ok(s @ 1..) => {
if splice_exact(&broker_r, &target, s).is_ok() {
n -= s as u64;
bytes_written += s as u64;
if n == 0 {
// avoid unnecessary splice for small input
break false;
}
} else {
let mut drain = Vec::with_capacity(s); // bounded by pipe size
broker_r.take(s as u64).read_to_end(&mut drain)?;
target.write_all(&drain)?;
break true;
}
}
_ => break true,
}
}
} else {
true
};

if !fallback {
return Ok(bytes_written);
}
let mut reader = input.take(n);
let mut buf = vec![0u8; (32 * 1024).min(n as usize)]; //use heap to avoid early allocation
loop {
match reader.read(&mut buf)? {
0 => return Ok(bytes_written),
n => {
target.write_all(&buf[..n])?;
bytes_written += n as u64;
}
}
}
}

/// Return verified /dev/null
///
/// `splice` to /dev/null is faster than `read` when we skip or count the non-seekable input
Expand Down
Loading