diff --git a/Cargo.lock b/Cargo.lock index bd64277..390ba3f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,23 +2,54 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "anstyle" version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5192cca8006f1fd4f7237516f40fa183bb07f8fbdfedaa0036de5ea9b0b45e78" +[[package]] +name = "autocfg" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" + [[package]] name = "bitflags" -version = "2.9.4" +version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2261d10cca569e4643e526d8dc2e62e433cc8aba21ab764233731f8d369bf394" +checksum = "812e12b5285cc515a9c72a5c1d3b6d46a19dac5acfef5265968c166106e31dd3" + +[[package]] +name = "bumpalo" +version = "3.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" + +[[package]] +name = "cc" +version = "1.2.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b97463e1064cb1b1c1384ad0a0b9c8abd0988e2a91f52606c80ef14aadb63e36" +dependencies = [ + "find-msvc-tools", + "shlex", +] [[package]] name = "cfg-if" -version = "1.0.3" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fd1289c04a9ea8cb22300a459a72a385d7c73d3259e2ed7dcb2af674838cfa9" +checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" [[package]] name = "cfg_aliases" @@ -26,11 +57,24 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" +[[package]] +name = "chrono" +version = "0.4.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "145052bdd345b87320e369255277e3fb5152762ad123a901ef5c262dd38fe8d2" +dependencies = [ + "iana-time-zone", + "js-sys", + "num-traits", + "wasm-bindgen", + "windows-link", +] + [[package]] name = "clap" -version = "4.5.48" +version = "4.5.53" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2134bb3ea021b78629caa971416385309e0131b351b25e01dc16fb54e1b5fae" +checksum = "c9e340e012a1bf4935f5282ed1436d1489548e8f72308207ea5df0e23d2d03f8" dependencies = [ "clap_builder", "clap_derive", @@ -38,9 +82,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.48" +version = "4.5.53" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2ba64afa3c0a6df7fa517765e31314e983f51dda798ffba27b988194fb65dc9" +checksum = "d76b5d13eaa18c901fd2f7fca939fefe3a0727a953561fefdf3b2922b8569d00" dependencies = [ "anstyle", "clap_lex", @@ -48,9 +92,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.47" +version = "4.5.49" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbfd7eae0b0f1a6e63d4b13c9c478de77c2eb546fba158ad50b4203dc24b9f9c" +checksum = "2a0b5487afeab2deb2ff4e03a807ad1a03ac532ff5a2cee5d86884440c7f7671" dependencies = [ "heck", "proc-macro2", @@ -60,21 +104,29 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.7.5" +version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b94f61472cee1439c0b966b47e3aca9ae07e45d070759512cd390ea2bebc6675" +checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d" [[package]] name = "conmon" version = "3.0.0-dev" dependencies = [ + "chrono", "clap", + "log", "mockall", "nix", "serde_json", "tempfile", ] +[[package]] +name = "core-foundation-sys" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" + [[package]] name = "downcast" version = "0.11.0" @@ -97,6 +149,12 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" +[[package]] +name = "find-msvc-tools" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a3076410a55c90011c298b04d0cfa770b00fa04e1e3c97d3f6c9de105a03844" + [[package]] name = "fragile" version = "2.0.1" @@ -105,14 +163,14 @@ checksum = "28dd6caf6059519a65843af8fe2a3ae298b14b80179855aeb4adc2c1934ee619" [[package]] name = "getrandom" -version = "0.3.3" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4" +checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd" dependencies = [ "cfg-if", "libc", "r-efi", - "wasi", + "wasip2", ] [[package]] @@ -121,17 +179,51 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "iana-time-zone" +version = "0.1.64" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33e57f83510bb73707521ebaffa789ec8caf86f9657cad665b092b581d40e9fb" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "log", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "itoa" version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" +[[package]] +name = "js-sys" +version = "0.3.82" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b011eec8cc36da2aab2d5cff675ec18454fad408585853910a202391cf9f8e65" +dependencies = [ + "once_cell", + "wasm-bindgen", +] + [[package]] name = "libc" -version = "0.2.176" +version = "0.2.177" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58f929b4d672ea937a23a1ab494143d968337a5f47e56d0815df1e0890ddf174" +checksum = "2874a2af47a2325c2001a6e6fad9b16a53b802102b528163885171cf92b15976" [[package]] name = "linux-raw-sys" @@ -139,12 +231,27 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df1d3c3b53da64cf5760482273a98e575c651a67eec7f77df96b5b642de8f039" +[[package]] +name = "log" +version = "0.4.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432" + [[package]] name = "memchr" version = "2.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273" +[[package]] +name = "memoffset" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "488016bfae457b036d996092f6cb448677611ce4449e970ceaf42695203f218a" +dependencies = [ + "autocfg", +] + [[package]] name = "mockall" version = "0.13.1" @@ -181,6 +288,16 @@ dependencies = [ "cfg-if", "cfg_aliases", "libc", + "memoffset", +] + +[[package]] +name = "num-traits" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" +dependencies = [ + "autocfg", ] [[package]] @@ -217,18 +334,18 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.101" +version = "1.0.103" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89ae43fd86e4158d6db51ad8e2b80f313af9cc74f5c0e03ccb87de09998732de" +checksum = "5ee95bc4ef87b8d5ba32e8b7714ccc834865276eab0aed5c9958d00ec45f49e8" dependencies = [ "unicode-ident", ] [[package]] name = "quote" -version = "1.0.41" +version = "1.0.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce25767e7b499d1b604768e7cde645d14cc8584231ea6b295e9c9eb22c02e1d1" +checksum = "a338cc41d27e6cc6dce6cefc13a0729dfbb81c262b1f519331575dd80ef3067f" dependencies = [ "proc-macro2", ] @@ -252,6 +369,12 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "rustversion" +version = "1.0.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" + [[package]] name = "ryu" version = "1.0.20" @@ -300,11 +423,17 @@ dependencies = [ "serde_core", ] +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "syn" -version = "2.0.106" +version = "2.0.110" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ede7c438028d4436d71104916910f5bb611972c5cfd7f89b8300a8186e6fada6" +checksum = "a99801b5bd34ede4cf3fc688c5919368fea4e4814a4664359503e6015b280aea" dependencies = [ "proc-macro2", "quote", @@ -332,26 +461,97 @@ checksum = "8f50febec83f5ee1df3015341d8bd429f2d1cc62bcba7ea2076759d315084683" [[package]] name = "unicode-ident" -version = "1.0.19" +version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f63a545481291138910575129486daeaf8ac54aee4387fe7906919f7830c7d9d" +checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5" [[package]] -name = "wasi" -version = "0.14.7+wasi-0.2.4" +name = "wasip2" +version = "1.0.1+wasi-0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "883478de20367e224c0090af9cf5f9fa85bed63a95c1abf3afc5c083ebc06e8c" +checksum = "0562428422c63773dad2c345a1882263bbf4d65cf3f42e90921f787ef5ad58e7" dependencies = [ - "wasip2", + "wit-bindgen", ] [[package]] -name = "wasip2" -version = "1.0.1+wasi-0.2.4" +name = "wasm-bindgen" +version = "0.2.105" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0562428422c63773dad2c345a1882263bbf4d65cf3f42e90921f787ef5ad58e7" +checksum = "da95793dfc411fbbd93f5be7715b0578ec61fe87cb1a42b12eb625caa5c5ea60" dependencies = [ - "wit-bindgen", + "cfg-if", + "once_cell", + "rustversion", + "wasm-bindgen-macro", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.105" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04264334509e04a7bf8690f2384ef5265f05143a4bff3889ab7a3269adab59c2" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.105" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "420bc339d9f322e562942d52e115d57e950d12d88983a14c79b86859ee6c7ebc" +dependencies = [ + "bumpalo", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.105" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76f218a38c84bcb33c25ec7059b07847d465ce0e0a76b995e134a45adcb6af76" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "windows-core" +version = "0.62.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8e83a14d34d0623b51dce9581199302a221863196a1dde71a7663a4c2be9deb" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-link", + "windows-result", + "windows-strings", +] + +[[package]] +name = "windows-implement" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "windows-interface" +version = "0.59.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" +dependencies = [ + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -360,6 +560,24 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" +[[package]] +name = "windows-result" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7781fa89eaf60850ac3d2da7af8e5242a5ea78d1a11c49bf2910bb5a73853eb5" +dependencies = [ + "windows-link", +] + +[[package]] +name = "windows-strings" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7837d08f69c77cf6b07689544538e017c1bfcf57e34b4c0ff58e6c2cd3b37091" +dependencies = [ + "windows-link", +] + [[package]] name = "windows-sys" version = "0.61.2" diff --git a/Cargo.toml b/Cargo.toml index c0d0228..0b49742 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,8 +16,10 @@ path = "src/main.rs" [dependencies] clap = { version = "4.5", default-features = false, features = ["std", "derive"] } -nix = { version = "0.30.1", features = ["process", "signal", "resource", "fs", "poll"] } +nix = { version = "0.30.1", features = ["process", "signal", "resource", "fs", "poll", "socket"] } serde_json = "1" +log = { version = "0.4", features = ["std"] } +chrono = "0.4" [dev-dependencies] tempfile = "3" diff --git a/Makefile b/Makefile index 82f94ba..e778e79 100644 --- a/Makefile +++ b/Makefile @@ -108,7 +108,7 @@ conmon-v2: ## Fetch the conmon-v2 into "conmon-v2" directory. echo "Adding worktree at $(CONMON_V2_DIR) -> $(CONMON_V2_REMOTE)/$(CONMON_V2_BRANCH)"; \ git worktree add --force "$(CONMON_V2_DIR)" "$(CONMON_V2_REMOTE)/$(CONMON_V2_BRANCH)"; \ fi; \ - # Re-create the CONMONV_V2_DIR in case it has been removed. + # Re-create the CONMON_V2_DIR in case it has been removed. if [ ! -d "$(CONMON_V2_DIR)" ]; then \ git worktree add --force "$(CONMON_V2_DIR)" "$(CONMON_V2_REMOTE)/$(CONMON_V2_BRANCH)"; \ fi; \ diff --git a/src/cli.rs b/src/cli.rs index 6add355..02f5106 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -13,7 +13,7 @@ use clap::{ArgAction, Parser}; override_usage = "conmon [OPTIONS] -c --runtime ", disable_version_flag = true )] -#[derive(Default)] +#[derive(Default, Debug)] pub struct Opts { /// Conmon API version to use #[arg(long = "api-version", value_parser = clap::value_parser!(i32))] @@ -226,15 +226,19 @@ pub struct CommonCfg { pub runtime_opts: Vec, pub no_pivot: bool, pub no_new_keyring: bool, + pub conmon_pidfile: Option, + pub container_pidfile: PathBuf, + pub bundle: PathBuf, + pub full_attach: bool, + pub socket_dir_path: Option, + pub stdin: bool, + pub leave_stdin_open: bool, } #[derive(Debug, Default)] pub struct CreateCfg { pub common: CommonCfg, - pub bundle: PathBuf, - pub container_pidfile: PathBuf, pub systemd_cgroup: bool, - pub conmon_pidfile: Option, } #[derive(Debug, Default)] @@ -242,7 +246,6 @@ pub struct ExecCfg { pub common: CommonCfg, pub exec_process_spec: PathBuf, pub attach: bool, - pub container_pidfile: PathBuf, } #[derive(Debug, Default)] @@ -250,8 +253,6 @@ pub struct RestoreCfg { pub common: CommonCfg, pub restore_path: PathBuf, pub systemd_cgroup: bool, - pub container_pidfile: PathBuf, - pub bundle: PathBuf, } /// Try to detect "executable" bit. @@ -316,6 +317,18 @@ pub fn determine_cmd(mut opts: Opts) -> ConmonResult { )); } + let cwd = std::env::current_dir() + .map_err(|e| ConmonError::new(format!("Failed to get working directory: {e}"), 1))?; + + // container-pidfile defaults to "$cwd/pidfile-$cid" if none provided + let container_pidfile = opts + .container_pidfile + .take() + .unwrap_or_else(|| cwd.join(format!("pidfile-{}", cid))); + + // bundle defaults to "$cwd" if none provided + let bundle = opts.bundle.take().unwrap_or_else(|| cwd.clone()); + let common = CommonCfg { api_version, cid, @@ -325,28 +338,21 @@ pub fn determine_cmd(mut opts: Opts) -> ConmonResult { runtime_opts: opts.runtime_opts, no_pivot: opts.no_pivot, no_new_keyring: opts.no_new_keyring, + conmon_pidfile: opts.conmon_pidfile, + container_pidfile, + bundle, + full_attach: opts.full_attach, + socket_dir_path: opts.socket_dir_path, + stdin: opts.stdin, + leave_stdin_open: opts.leave_stdin_open, }; - let cwd = std::env::current_dir() - .map_err(|e| ConmonError::new(format!("Failed to get working directory: {e}"), 1))?; - - // bundle defaults to "$cwd" if none provided - let bundle = opts.bundle.take().unwrap_or_else(|| cwd.clone()); - - // container-pidfile defaults to "$cwd/pidfile-$cid" if none provided - let container_pidfile = opts - .container_pidfile - .take() - .unwrap_or_else(|| cwd.join(format!("pidfile-{}", common.cid))); - // decide which subcommand this flag combination means if let Some(restore_path) = opts.restore.take() { Ok(Cmd::Restore(RestoreCfg { common, restore_path, systemd_cgroup: opts.systemd_cgroup, - container_pidfile, - bundle, })) } else if opts.exec { let exec_process_spec = opts.exec_process_spec.take().ok_or_else(|| { @@ -359,15 +365,11 @@ pub fn determine_cmd(mut opts: Opts) -> ConmonResult { common, exec_process_spec, attach: opts.attach, - container_pidfile, })) } else { Ok(Cmd::Create(CreateCfg { common, - bundle, - container_pidfile, systemd_cgroup: opts.systemd_cgroup, - conmon_pidfile: opts.conmon_pidfile, })) } } @@ -616,9 +618,9 @@ mod tests { match cmd { Cmd::Create(cfg) => { // bundle defaults to cwd - assert_eq!(cfg.bundle, cwd); + assert_eq!(cfg.common.bundle, cwd); // container-pidfile defaults to "$cwd/pidfile-$cid" - assert_eq!(cfg.container_pidfile, cwd.join("pidfile-abc")); + assert_eq!(cfg.common.container_pidfile, cwd.join("pidfile-abc")); } _ => panic!("expected Run"), } diff --git a/src/commands/create.rs b/src/commands/create.rs index 95f12a6..89dffcd 100644 --- a/src/commands/create.rs +++ b/src/commands/create.rs @@ -1,12 +1,9 @@ +use std::process::ExitCode; + use crate::cli::CreateCfg; -use crate::error::{ConmonError, ConmonResult}; +use crate::error::ConmonResult; use crate::logging::plugin::LogPlugin; -use crate::parent_pipe::{get_pipe_fd_from_env, write_or_close_sync_fd}; -use crate::runtime::args::{RuntimeArgsGenerator, generate_runtime_args}; -use crate::runtime::run::{run_runtime, wait_for_runtime}; -use crate::runtime::stdio::{create_pipe, handle_stdio, read_pipe}; -use std::fs; -use std::process::Stdio; +use crate::runtime::args::RuntimeArgsGenerator; pub struct Create { cfg: CreateCfg, @@ -17,81 +14,30 @@ impl Create { Self { cfg } } - // Helper function to read and return the container pid. - fn read_container_pid(&self) -> ConmonResult { - let contents = fs::read_to_string(self.cfg.container_pidfile.as_path())?; - let pid = contents.trim().parse::().map_err(|e| { - ConmonError::new( - format!( - "Invalid PID contents in {}: {} ({})", - self.cfg.container_pidfile.display(), - contents.trim(), - e - ), - 1, - ) - })?; - Ok(pid) - } - - pub fn exec(&self, log_plugin: &mut dyn LogPlugin) -> ConmonResult<()> { - // Get the sync_pipe FD. It is used by the conmon caller to obtain the container_pid - // or the runtime error message later. - let sync_pipe_fd = get_pipe_fd_from_env("_OCI_SYNCPIPE")?; - - // Generate the list of arguments for runtime. - let runtime_args = generate_runtime_args(&self.cfg.common, self)?; - - // Generate pipes to handle stdio. - let (mainfd_stdout, workerfd_stdout) = create_pipe()?; - let (mainfd_stderr, workerfd_stderr) = create_pipe()?; - - // Run the `runtime create` and store our PID after first fork to `conmon_pidfile. - let runtime_pid = run_runtime( - &runtime_args, - Stdio::null(), // TODO - Stdio::from(workerfd_stdout), - Stdio::from(workerfd_stderr), - )?; - if let Some(pidfile) = &self.cfg.conmon_pidfile { - std::fs::write(pidfile, runtime_pid.to_string())?; - } + pub fn exec(&self, log_plugin: &mut dyn LogPlugin) -> ConmonResult { + // Start the `runtime create` session. + let mut runtime_session = crate::runtime::session::RuntimeSession::new(); + runtime_session.launch(&self.cfg.common, self, false)?; // === - // Now, after the `run_runtime`, we are in the child process of our original process - // (See `run_runtime` code and description for more information). + // Now, after the `launch()`, we are in the child process of our original process, + // because we double-fork in the RuntimeProcess::spawn. + // (See `RuntimeProcess::spawn` code and description for more information). // === - // Wait until the `runtime create` finishes. - let runtime_status = wait_for_runtime(runtime_pid)?; - if runtime_status != 0 { - // Pass the error to sync_pipe if there is one. - if let Some(fd) = sync_pipe_fd { - let err_bytes = read_pipe(&mainfd_stderr)?; - let err_str = String::from_utf8(err_bytes)?; - write_or_close_sync_fd(fd, -1, Some(&err_str), self.cfg.common.api_version, false)?; - } - return Err(ConmonError::new( - format!("Runtime exited with status: {runtime_status}"), - 1, - )); - } - - // Pass the container_pid to sync_pipe if there is one. - if let Some(fd) = sync_pipe_fd { - let container_pid = self.read_container_pid()?; - write_or_close_sync_fd(fd, container_pid, None, self.cfg.common.api_version, false)?; - } + // Wait until the `runtime create` finishes and return an error in case it fails. + runtime_session.wait_for_success(self.cfg.common.api_version)?; + runtime_session.write_container_pid_file(&self.cfg.common)?; // === // Now we wait for an external application like podman to really start the container. // and handle the containers stdio or its termination. // === - // Handle the stdio. - handle_stdio(log_plugin, mainfd_stdout, mainfd_stderr)?; + // Run the eventloop to forward log messages to log plugin. + runtime_session.run_event_loop(log_plugin, self.cfg.common.leave_stdin_open)?; - Ok(()) + Ok(ExitCode::SUCCESS) } } @@ -107,9 +53,13 @@ impl RuntimeArgsGenerator for Create { argv.extend([ "create".to_string(), "--bundle".to_string(), - self.cfg.bundle.to_string_lossy().into_owned(), + self.cfg.common.bundle.to_string_lossy().into_owned(), "--pid-file".to_string(), - self.cfg.container_pidfile.to_string_lossy().into_owned(), + self.cfg + .common + .container_pidfile + .to_string_lossy() + .into_owned(), ]); Ok(()) } @@ -127,6 +77,8 @@ mod tests { runtime_opts: Vec<&str>, no_pivot: bool, no_new_keyring: bool, + pidfile: &str, + bundle: &str, ) -> CommonCfg { CommonCfg { runtime: PathBuf::from("./runtime"), @@ -135,22 +87,16 @@ mod tests { runtime_opts: runtime_opts.into_iter().map(|s| s.to_string()).collect(), no_pivot, no_new_keyring, + container_pidfile: PathBuf::from(pidfile), + bundle: PathBuf::from(bundle), ..Default::default() } } - fn mk_create_cfg( - systemd_cgroup: bool, - bundle: &str, - pidfile: &str, - common: CommonCfg, - ) -> CreateCfg { + fn mk_create_cfg(systemd_cgroup: bool, common: CommonCfg) -> CreateCfg { CreateCfg { systemd_cgroup, - bundle: PathBuf::from(bundle), - container_pidfile: PathBuf::from(pidfile), common, - ..Default::default() } } @@ -162,8 +108,10 @@ mod tests { vec!["--optA", "X"], false, false, + "/tmp/pid-A", + "/tmp/bundle-A", ); - let cfg = mk_create_cfg(true, "/tmp/bundle-A", "/tmp/pid-A", common); + let cfg = mk_create_cfg(true, common); let create = Create::new(cfg); let argv = @@ -188,8 +136,16 @@ mod tests { #[test] fn generate_args_create_without_systemd_cgroup_with_generic_flags() { - let common = mk_common("cid456", vec![], vec!["--optB"], true, true); - let cfg = mk_create_cfg(false, "/tmp/bundle-B", "/tmp/pid-B", common); + let common = mk_common( + "cid456", + vec![], + vec!["--optB"], + true, + true, + "/tmp/pid-B", + "/tmp/bundle-B", + ); + let cfg = mk_create_cfg(false, common); let create = Create::new(cfg); let argv = diff --git a/src/commands/exec.rs b/src/commands/exec.rs index 83f6323..7455984 100644 --- a/src/commands/exec.rs +++ b/src/commands/exec.rs @@ -1,6 +1,9 @@ +use std::process::ExitCode; + use crate::cli::ExecCfg; use crate::error::ConmonResult; -use crate::runtime::args::{RuntimeArgsGenerator, generate_runtime_args}; +use crate::logging::plugin::LogPlugin; +use crate::runtime::args::RuntimeArgsGenerator; pub struct Exec { cfg: ExecCfg, @@ -11,10 +14,24 @@ impl Exec { Self { cfg } } - pub fn exec(&self) -> ConmonResult<()> { - let _runtime_args = generate_runtime_args(&self.cfg.common, self); + pub fn exec(&self, log_plugin: &mut dyn LogPlugin) -> ConmonResult { + let mut runtime_session = crate::runtime::session::RuntimeSession::new(); + runtime_session.launch(&self.cfg.common, self, self.cfg.attach)?; - Ok(()) + // === + // Now, after the `launch`, we are in the child process of our original process + // (See `RuntimeProcess::spawn` code and description for more information). + // === + + // Run the eventloop to forward log messages to log plugin. + runtime_session.run_event_loop(log_plugin, self.cfg.common.leave_stdin_open)?; + + // Wait for the `runtime exec` to finish and write its exit code. + runtime_session.wait()?; + runtime_session.write_container_pid_file(&self.cfg.common)?; + runtime_session.write_exit_code(self.cfg.common.api_version)?; + + Ok(ExitCode::from(runtime_session.exit_code() as u8)) } } @@ -27,7 +44,11 @@ impl RuntimeArgsGenerator for Exec { argv.extend([ "exec".to_string(), "--pid-file".to_string(), - self.cfg.container_pidfile.to_string_lossy().into_owned(), + self.cfg + .common + .container_pidfile + .to_string_lossy() + .into_owned(), "--process".to_string(), self.cfg.exec_process_spec.to_string_lossy().into_owned(), "--detach".to_string(), @@ -48,6 +69,7 @@ mod tests { runtime_opts: Vec<&str>, no_pivot: bool, no_new_keyring: bool, + pidfile: &str, ) -> CommonCfg { CommonCfg { runtime: PathBuf::from("./runtime"), @@ -56,13 +78,13 @@ mod tests { runtime_opts: runtime_opts.into_iter().map(|s| s.to_string()).collect(), no_pivot, no_new_keyring, + container_pidfile: PathBuf::from(pidfile), ..Default::default() } } - fn mk_exec_cfg(pidfile: &str, proc_spec: &str, common: CommonCfg) -> ExecCfg { + fn mk_exec_cfg(proc_spec: &str, common: CommonCfg) -> ExecCfg { ExecCfg { - container_pidfile: PathBuf::from(pidfile), exec_process_spec: PathBuf::from(proc_spec), common, ..Default::default() @@ -77,8 +99,9 @@ mod tests { vec!["--optA", "X"], false, false, + "/tmp/pidfile", ); - let cfg = mk_exec_cfg("/tmp/pidfile", "/tmp/process.json", common); + let cfg = mk_exec_cfg("/tmp/process.json", common); let exec = Exec::new(cfg); let argv = @@ -103,8 +126,8 @@ mod tests { #[test] fn generate_args_exec_with_generic_flags() { - let common = mk_common("cid456", vec![], vec!["--optB"], true, true); - let cfg = mk_exec_cfg("/run/pid", "/cfg/proc.json", common); + let common = mk_common("cid456", vec![], vec!["--optB"], true, true, "/run/pid"); + let cfg = mk_exec_cfg("/cfg/proc.json", common); let exec = Exec::new(cfg); let argv = diff --git a/src/commands/restore.rs b/src/commands/restore.rs index 7aa08c6..876899f 100644 --- a/src/commands/restore.rs +++ b/src/commands/restore.rs @@ -1,3 +1,5 @@ +use std::process::ExitCode; + use crate::cli::RestoreCfg; use crate::error::ConmonResult; use crate::runtime::args::{RuntimeArgsGenerator, generate_runtime_args}; @@ -11,10 +13,10 @@ impl Restore { Self { cfg } } - pub fn exec(&self) -> ConmonResult<()> { + pub fn exec(&self) -> ConmonResult { let _runtime_args = generate_runtime_args(&self.cfg.common, self); - Ok(()) + Ok(ExitCode::SUCCESS) } } @@ -30,9 +32,13 @@ impl RuntimeArgsGenerator for Restore { argv.extend([ "restore".to_string(), "--bundle".to_string(), - self.cfg.bundle.to_string_lossy().into_owned(), + self.cfg.common.bundle.to_string_lossy().into_owned(), "--pid-file".to_string(), - self.cfg.container_pidfile.to_string_lossy().into_owned(), + self.cfg + .common + .container_pidfile + .to_string_lossy() + .into_owned(), ]); Ok(()) } @@ -51,6 +57,8 @@ mod tests { runtime_opts: Vec<&str>, no_pivot: bool, no_new_keyring: bool, + pidfile: &str, + bundle: &str, ) -> CommonCfg { CommonCfg { runtime: PathBuf::from("./runtime"), @@ -59,20 +67,15 @@ mod tests { runtime_opts: runtime_opts.into_iter().map(|s| s.to_string()).collect(), no_pivot, no_new_keyring, + container_pidfile: PathBuf::from(pidfile), + bundle: PathBuf::from(bundle), ..Default::default() } } - fn mk_restore_cfg( - systemd_cgroup: bool, - bundle: &str, - pidfile: &str, - common: CommonCfg, - ) -> RestoreCfg { + fn mk_restore_cfg(systemd_cgroup: bool, common: CommonCfg) -> RestoreCfg { RestoreCfg { systemd_cgroup, - bundle: PathBuf::from(bundle), - container_pidfile: PathBuf::from(pidfile), common, ..Default::default() } @@ -86,8 +89,10 @@ mod tests { vec!["--optA", "X"], false, false, + "/tmp/pid-A", + "/tmp/bundle-A", ); - let cfg = mk_restore_cfg(true, "/tmp/bundle-A", "/tmp/pid-A", common); + let cfg = mk_restore_cfg(true, common); let restore = Restore::new(cfg); let argv = generate_runtime_args(&restore.cfg.common, &restore).expect("ok"); @@ -111,8 +116,16 @@ mod tests { #[test] fn generate_args_without_systemd_cgroup() { - let common = mk_common("cid456", vec![], vec!["--optB"], true, true); - let cfg = mk_restore_cfg(false, "/tmp/bundle-B", "/tmp/pid-B", common); + let common = mk_common( + "cid456", + vec![], + vec!["--optB"], + true, + true, + "/tmp/pid-B", + "/tmp/bundle-B", + ); + let cfg = mk_restore_cfg(false, common); let restore = Restore::new(cfg); let argv = generate_runtime_args(&restore.cfg.common, &restore).expect("ok"); diff --git a/src/commands/version.rs b/src/commands/version.rs index 444a1dc..2d483ad 100644 --- a/src/commands/version.rs +++ b/src/commands/version.rs @@ -1,13 +1,15 @@ +use std::process::ExitCode; + use crate::error::ConmonResult; pub struct Version {} impl Version { - pub fn exec(&self) -> ConmonResult<()> { + pub fn exec(&self) -> ConmonResult { let version = env!("CARGO_PKG_VERSION"); let git_commit = option_env!("GIT_COMMIT").unwrap_or("unknown"); println!("conmon version {version}\ncommit: {git_commit}"); - Ok(()) + Ok(ExitCode::SUCCESS) } } diff --git a/src/error/mod.rs b/src/error/mod.rs index 1ac239b..00b3c1b 100644 --- a/src/error/mod.rs +++ b/src/error/mod.rs @@ -1,4 +1,4 @@ -use std::{fmt, string::FromUtf8Error}; +use std::{ffi::FromBytesUntilNulError, fmt, str::Utf8Error, string::FromUtf8Error}; use nix::errno::Errno; @@ -54,3 +54,15 @@ impl From for ConmonError { ConmonError::new(format!("UTF-8 error: {}", err), 1) } } + +impl From for ConmonError { + fn from(err: Utf8Error) -> Self { + ConmonError::new(format!("UTF-8 error: {}", err), 1) + } +} + +impl From for ConmonError { + fn from(err: FromBytesUntilNulError) -> Self { + ConmonError::new(format!("UTF-8 error: {}", err), 1) + } +} diff --git a/src/lib.rs b/src/lib.rs index 2f9cbc6..9c14d6a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,9 @@ +#![allow(clippy::collapsible_if)] pub mod cli; pub mod commands; pub mod error; +pub mod log; pub mod logging; pub mod parent_pipe; pub mod runtime; +pub mod unix_socket; diff --git a/src/log.rs b/src/log.rs new file mode 100644 index 0000000..ea411e2 --- /dev/null +++ b/src/log.rs @@ -0,0 +1,81 @@ +use chrono::Utc; +use log::{LevelFilter, Log, Metadata, Record}; +use std::fs::OpenOptions; +use std::{fs::File, io::Write, path::PathBuf, sync::Mutex}; + +use crate::error::{ConmonError, ConmonResult}; + +pub struct FileLogger { + level: LevelFilter, + file: Mutex, +} + +impl FileLogger { + pub fn new(file: File, level: LevelFilter) -> Self { + Self { + level, + file: Mutex::new(file), + } + } +} + +impl Log for FileLogger { + fn enabled(&self, metadata: &Metadata) -> bool { + metadata.level() <= self.level + } + + fn log(&self, record: &Record) { + if !self.enabled(record.metadata()) { + return; + } + + let now = Utc::now(); + let mut file = match self.file.lock() { + Ok(guard) => guard, + Err(poisoned) => poisoned.into_inner(), + }; + + let _ = writeln!( + &mut *file, + "[{}][{:>5}] {}: {}", + now.to_rfc3339(), + record.level(), + record.target(), + record.args() + ); + } + + fn flush(&self) { + if let Ok(mut file) = self.file.lock() { + let _ = file.flush(); + } + } +} + +pub fn init_logging( + path_env_var: &str, + default_path: PathBuf, + level_env_var: &str, + default_level: LevelFilter, +) -> ConmonResult<()> { + let level = std::env::var(level_env_var) + .ok() + .and_then(|s| s.parse::().ok()) + .unwrap_or(default_level); + + let path = std::env::var(path_env_var) + .ok() + .unwrap_or(default_path.to_string_lossy().to_string()); + + let file = OpenOptions::new() + .create(true) + .append(true) + .open(path) + .map_err(|e| ConmonError::new(format!("Failed to open log file: {e}"), 1))?; + + let logger = FileLogger::new(file, level); + + log::set_max_level(level); + log::set_boxed_logger(Box::new(logger)) + .map_err(|e| ConmonError::new(format!("Failed to create logger: {e}"), 1)) +} diff --git a/src/logging/plugin.rs b/src/logging/plugin.rs index faa3bea..d536e6c 100644 --- a/src/logging/plugin.rs +++ b/src/logging/plugin.rs @@ -9,7 +9,7 @@ pub trait LogPlugin { fn write(&mut self, is_stdout: bool, data: &[u8]) -> ConmonResult<()>; } -#[derive(Default)] +#[derive(Default, Debug)] pub struct LogPluginCfg { pub path: PathBuf, } diff --git a/src/main.rs b/src/main.rs index dcdccba..2d4ec3a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,7 @@ +#![allow(clippy::collapsible_if)] +use ::log::LevelFilter; +use ::log::debug; +use ::log::info; use clap::Parser; use conmon::cli::{Cmd, Opts, determine_cmd, determine_log_plugin}; use conmon::commands::create::Create; @@ -5,21 +9,36 @@ use conmon::commands::exec::Exec; use conmon::commands::restore::Restore; use conmon::commands::version::Version; use conmon::error::ConmonResult; +use conmon::log; use conmon::logging::plugin::initialize_log_plugin; use std::process::ExitCode; -fn run_conmon() -> ConmonResult<()> { +fn run_conmon() -> ConmonResult { let opts = Opts::parse(); + if let Some(ref bundle) = opts.bundle { + log::init_logging( + "CONMON_LOG_PATH", + bundle.join("conmon-debug.log"), + "CONMON_LOG_LEVEL", + LevelFilter::Debug, + )?; + } + + let git_commit = option_env!("GIT_COMMIT").unwrap_or("unknown"); + info!("Starting conmon version {git_commit}"); + debug!("Command line options: {opts:?}"); + let (plugin_name, plugin_cfg) = determine_log_plugin(&opts)?; + info!("Using log plugin: {plugin_name:?} {plugin_cfg:?}"); let mut log_plugin = initialize_log_plugin(&plugin_name, &plugin_cfg)?; - match determine_cmd(opts)? { + let exit_code = match determine_cmd(opts)? { Cmd::Create(cfg) => Create::new(cfg).exec(log_plugin.as_mut())?, - Cmd::Exec(cfg) => Exec::new(cfg).exec()?, + Cmd::Exec(cfg) => Exec::new(cfg).exec(log_plugin.as_mut())?, Cmd::Restore(cfg) => Restore::new(cfg).exec()?, Cmd::Version => Version {}.exec()?, - } - Ok(()) + }; + Ok(exit_code) } fn main() -> ExitCode { diff --git a/src/parent_pipe.rs b/src/parent_pipe.rs index 3120e05..c6fe8e3 100644 --- a/src/parent_pipe.rs +++ b/src/parent_pipe.rs @@ -77,7 +77,7 @@ pub fn write_or_close_sync_fd( str_data: Option<&str>, opt_api_version: i32, opt_exec: bool, -) -> ConmonResult<()> { +) -> ConmonResult> { let data_key = if opt_api_version >= 1 { "data" } else if opt_exec { @@ -100,10 +100,10 @@ pub fn write_or_close_sync_fd( let mut json = Value::Object(obj).to_string(); json.push('\n'); - // Write all; on EPIPE just return Ok(()). OwnedFd will close on drop. + // Write all; on EPIPE just return Ok(()), OwnedFd will close on drop. match write_all_fd(&fd, json.as_bytes()) { - Ok(_) => Ok(()), - Err(Errno::EPIPE) => Ok(()), + Ok(_) => Ok(Some(fd)), + Err(Errno::EPIPE) => Ok(None), Err(_) => Err(ConmonError::new( "Unable to send container stderr message to parent", 1, @@ -181,11 +181,11 @@ mod tests { fn write_writes_exit_code_and_message() -> ConmonResult<()> { let (r, w) = create_pipe()?; write_or_close_sync_fd(w, 7, Some("ok"), 0, true)?; - let s = read_pipe(&r)?; + let mut buf = [0u8; 8192]; + let n = read_pipe(&r, &mut buf)?; drop(r); - - let output = String::from_utf8(s)?; - let v: Value = serde_json::from_str(&output)?; + let output = std::str::from_utf8(&buf[..n])?; + let v: Value = serde_json::from_str(output)?; assert_eq!(v.get("exit_code").unwrap(), 7); assert_eq!(v.get("message").unwrap(), "ok"); Ok(()) @@ -195,11 +195,11 @@ mod tests { fn write_writes_pid_without_message() -> ConmonResult<()> { let (r, w) = create_pipe()?; write_or_close_sync_fd(w, 123, None, 0, false)?; - let s = read_pipe(&r)?; + let mut buf = [0u8; 8192]; + let n = read_pipe(&r, &mut buf)?; drop(r); - - let output = String::from_utf8(s)?; - let v: Value = serde_json::from_str(&output)?; + let output = std::str::from_utf8(&buf[..n])?; + let v: Value = serde_json::from_str(output)?; assert_eq!(v.get("pid").unwrap(), 123); assert!(v.get("message").is_none()); Ok(()) @@ -209,11 +209,11 @@ mod tests { fn write_writes_data_for_api_v1() -> ConmonResult<()> { let (r, w) = create_pipe()?; write_or_close_sync_fd(w, 42, Some("hi"), 1, false)?; - let s = read_pipe(&r)?; + let mut buf = [0u8; 8192]; + let n = read_pipe(&r, &mut buf)?; drop(r); - - let output = String::from_utf8(s)?; - let v: Value = serde_json::from_str(&output)?; + let output = std::str::from_utf8(&buf[..n])?; + let v: Value = serde_json::from_str(output)?; assert_eq!(v.get("data").unwrap(), 42); assert_eq!(v.get("message").unwrap(), "hi"); Ok(()) diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 2e8243a..f932108 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -1,3 +1,4 @@ pub mod args; -pub mod run; +pub mod process; +pub mod session; pub mod stdio; diff --git a/src/runtime/process.rs b/src/runtime/process.rs new file mode 100644 index 0000000..fd54dfa --- /dev/null +++ b/src/runtime/process.rs @@ -0,0 +1,181 @@ +use crate::error::{ConmonError, ConmonResult}; +use crate::runtime::stdio::read_pipe; + +use nix::fcntl::{OFlag, open}; +use nix::sys::signal::{SigSet, SigmaskHow, Signal, kill, pthread_sigmask}; +use nix::sys::stat::Mode; +use nix::sys::wait::{WaitStatus, waitpid}; +use nix::unistd::{ForkResult, Pid, dup2_stderr, dup2_stdin, dup2_stdout, fork, setsid}; + +use std::io::{Error, Result as IoResult}; +use std::os::fd::{AsFd, OwnedFd}; +use std::os::unix::process::CommandExt; // for pre_exec +use std::process::{Command, Stdio, exit}; + +/// Convert a nix::Error into std::io::Error (for use inside pre_exec closure). +fn io_err(e: nix::Error) -> Error { + Error::from_raw_os_error(e as i32) +} + +/// Block signals in the parent before we spawn. +/// Returns the old mask, which the child will restore in `pre_exec`. +fn block_signals() -> ConmonResult { + let mut mask = SigSet::empty(); + mask.add(Signal::SIGTERM); + mask.add(Signal::SIGQUIT); + mask.add(Signal::SIGINT); + mask.add(Signal::SIGHUP); + + let mut oldmask = SigSet::empty(); + pthread_sigmask(SigmaskHow::SIG_BLOCK, Some(&mask), Some(&mut oldmask)) + .map_err(|e| ConmonError::new(format!("Failed to block signals: {e}"), 1))?; + Ok(oldmask) +} + +/// Helper function to redirect stdio to /dev/null. +fn redirect_self_to_devnull() -> ConmonResult<()> { + // stdin -> /dev/null (read side) + let fd_in = open("/dev/null", OFlag::O_RDONLY, Mode::empty())?; + dup2_stdin(fd_in.as_fd())?; + + // stdout/stderr -> /dev/null (write side) + let fd_out = open("/dev/null", OFlag::O_WRONLY, Mode::empty())?; + dup2_stdout(fd_out.as_fd())?; + dup2_stderr(fd_out.as_fd())?; + + Ok(()) +} + +/// Represents single RuntimeProcess. +/// For is low-level implementation. Use RuntimeSession for more convenient +/// way to work with Runtime. +#[derive(Default)] +pub struct RuntimeProcess { + pid: i32, +} + +impl RuntimeProcess { + pub fn new() -> Self { + Self { pid: -1 } + } + + /// Spawn the runtime binary defined by `args`. + /// The stdio is redirected to `workerfd_stdin`, `workerfd_stdout` and `workerfd_stderr`. + /// Returns the PID. + pub fn spawn( + &mut self, + args: &[String], + workerfd_stdin: Stdio, + workerfd_stdout: Stdio, + workerfd_stderr: Stdio, + mut start_pipe_fd: Option, + ) -> ConmonResult { + if args.is_empty() { + return Err(ConmonError::new( + "Failed to execute runtime binary: empty args", + 1, + )); + } + + redirect_self_to_devnull()?; + + unsafe { + match fork() { + // In the parent: exit immediately so the child won't be a process group leader. + Ok(ForkResult::Parent { .. }) => { + exit(0); + } + // In the child: continue execution. + Ok(ForkResult::Child) => {} + Err(e) => { + return Err(ConmonError::new(format!("Failed to fork: {e}"), 1)); + } + } + } + + // Wait with the `spawn()` until parent tells us to start the runtime + // using the start_pipe_fd (if defined). + if let Some(fd) = start_pipe_fd.take() { + // It is OK to just once from the pipe here. The pipe is used as a sync + // mechanism. We do not care about the read data at all. + let mut buf = [0u8; 8192]; + read_pipe(&fd, &mut buf)?; + } + + // Block signals in the parent so none are delivered between fork and exec. + let oldmask = block_signals()?; + + // Child setup performed between fork and exec. + fn child_setup(oldmask: &SigSet) -> IoResult<()> { + // Detach from controlling terminal: new session. + setsid().map_err(io_err)?; + + // Restore (unblock) the parent's original signal mask. + pthread_sigmask(SigmaskHow::SIG_SETMASK, Some(oldmask), None).map_err(io_err)?; + + // Set conservative umask. + nix::sys::stat::umask(nix::sys::stat::Mode::from_bits_truncate(0o022)); + Ok(()) + } + + // Build and spawn the child. + let program = &args[0]; + let argv = &args[1..]; + let mut cmd = Command::new(program); + cmd.args(argv) + .stdin(workerfd_stdin) + .stdout(workerfd_stdout) + .stderr(workerfd_stderr); + + unsafe { + cmd.pre_exec(move || child_setup(&oldmask)); + } + + let child = cmd + .spawn() + .map_err(|e| ConmonError::new(format!("Failed to spawn: {e}"), 1))?; + + self.pid = child.id() as i32; + Ok(self.pid) + } + + /// Returns the runtime process pid or -1 if it's not running. + pub fn pid(&self) -> i32 { + self.pid + } + + /// Block until the runtime process exits. Returns the exit code. + pub fn wait(&self) -> ConmonResult { + let pid = Pid::from_raw(self.pid); + + loop { + match waitpid(pid, None) { + Ok(WaitStatus::Exited(_, code)) => return Ok(code), + Ok(WaitStatus::Signaled(_, sig, _core_dumped)) => { + return Err(ConmonError::new( + format!("Runtime process exited due to signal: {sig:?}"), + 1, + )); + } + // These shouldn’t occur with no flags, but if they do, keep waiting. + Ok(WaitStatus::StillAlive) + | Ok(WaitStatus::Stopped(_, _)) + | Ok(WaitStatus::Continued(_)) + | Ok(WaitStatus::PtraceEvent(_, _, _)) + | Ok(WaitStatus::PtraceSyscall(_)) => continue, + + // Interrupted - continue. + Err(nix::Error::EINTR) => continue, + + Err(e) => { + // Try to kill the child, then surface the error. + let _ = kill(pid, Signal::SIGKILL); + return Err(ConmonError::new( + format!("Failed to wait for runtime process to exit: {e}"), + 1, + )); + } + } + } + } +} diff --git a/src/runtime/run.rs b/src/runtime/run.rs deleted file mode 100644 index 6490159..0000000 --- a/src/runtime/run.rs +++ /dev/null @@ -1,148 +0,0 @@ -use crate::error::{ConmonError, ConmonResult}; - -use nix::fcntl::{OFlag, open}; -use nix::sys::signal::{SigSet, SigmaskHow, Signal, kill, pthread_sigmask}; -use nix::sys::stat::Mode; -use nix::sys::wait::{WaitStatus, waitpid}; -use nix::unistd::{ForkResult, Pid, dup2_stderr, dup2_stdin, dup2_stdout, fork, setsid}; - -use std::io::{Error, Result as IoResult}; -use std::os::fd::AsFd; -use std::os::unix::process::CommandExt; // for pre_exec -use std::process::{Command, Stdio, exit}; - -/// Convert a nix::Error into std::io::Error (for use inside pre_exec closure). -fn io_err(e: nix::Error) -> Error { - Error::from_raw_os_error(e as i32) -} - -/// Block signals in the parent before we spawn. -/// Returns the old mask, which the child will restore in `pre_exec`. -fn block_signals() -> ConmonResult { - let mut mask = SigSet::empty(); - mask.add(Signal::SIGTERM); - mask.add(Signal::SIGQUIT); - mask.add(Signal::SIGINT); - mask.add(Signal::SIGHUP); - - let mut oldmask = SigSet::empty(); - pthread_sigmask(SigmaskHow::SIG_BLOCK, Some(&mask), Some(&mut oldmask)) - .map_err(|e| ConmonError::new(format!("Failed to block signals: {e}"), 1))?; - Ok(oldmask) -} - -/// Helper function to redirect stdio to /dev/null. -fn redirect_self_to_devnull() -> ConmonResult<()> { - // stdin -> /dev/null (read side) - let fd_in = open("/dev/null", OFlag::O_RDONLY, Mode::empty())?; - dup2_stdin(fd_in.as_fd())?; - - // stdout/stderr -> /dev/null (write side) - let fd_out = open("/dev/null", OFlag::O_WRONLY, Mode::empty())?; - dup2_stdout(fd_out.as_fd())?; - dup2_stderr(fd_out.as_fd())?; - - Ok(()) -} - -/// Run the runtime binary defined by `args`. -pub fn run_runtime( - args: &[String], - workerfd_stdin: Stdio, - workerfd_stdout: Stdio, - workerfd_stderr: Stdio, -) -> ConmonResult { - if args.is_empty() { - return Err(ConmonError::new( - "Failed to execute runtime binary: empty args", - 1, - )); - } - - redirect_self_to_devnull()?; - - unsafe { - match fork() { - // In the parent: exit immediately so the child won't be a process group leader. - Ok(ForkResult::Parent { .. }) => { - exit(0); - } - // In the child: continue execution. - Ok(ForkResult::Child) => {} - Err(e) => { - return Err(ConmonError::new(format!("Failed to fork: {e}"), 1)); - } - } - } - - // Block signals in the parent so none are delivered between fork and exec. - let oldmask = block_signals()?; - - // Child setup performed between fork and exec. - fn child_setup(oldmask: &SigSet) -> IoResult<()> { - // Detach from controlling terminal: new session. - setsid().map_err(io_err)?; - - // Restore (unblock) the parent's original signal mask. - pthread_sigmask(SigmaskHow::SIG_SETMASK, Some(oldmask), None).map_err(io_err)?; - - // Set conservative umask. - nix::sys::stat::umask(nix::sys::stat::Mode::from_bits_truncate(0o022)); - Ok(()) - } - - // Build and spawn the child. - let program = &args[0]; - let argv = &args[1..]; - let mut cmd = Command::new(program); - cmd.args(argv) - .stdin(workerfd_stdin) - .stdout(workerfd_stdout) - .stderr(workerfd_stderr); - - unsafe { - cmd.pre_exec(move || child_setup(&oldmask)); - } - - let child = cmd - .spawn() - .map_err(|e| ConmonError::new(format!("Failed to spawn: {e}"), 1))?; - - Ok(child.id() as i32) -} - -/// Block until the runtime process defined by `runtime_pid` exits. -/// Returns the runtime exit code. -pub fn wait_for_runtime(runtime_pid: i32) -> ConmonResult { - let pid = Pid::from_raw(runtime_pid); - - loop { - match waitpid(pid, None) { - Ok(WaitStatus::Exited(_, code)) => return Ok(code), - Ok(WaitStatus::Signaled(_, sig, _core_dumped)) => { - return Err(ConmonError::new( - format!("Runtime process exited due to signal: {sig:?}"), - 1, - )); - } - // These shouldn’t occur with no flags, but if they do, keep waiting. - Ok(WaitStatus::StillAlive) - | Ok(WaitStatus::Stopped(_, _)) - | Ok(WaitStatus::Continued(_)) - | Ok(WaitStatus::PtraceEvent(_, _, _)) - | Ok(WaitStatus::PtraceSyscall(_)) => continue, - - // Interrupted - continue. - Err(nix::Error::EINTR) => continue, - - Err(e) => { - // Try to kill the child, then surface the error. - let _ = kill(pid, Signal::SIGKILL); - return Err(ConmonError::new( - format!("Failed to wait for runtime process to exit: {e}"), - 1, - )); - } - } - } -} diff --git a/src/runtime/session.rs b/src/runtime/session.rs new file mode 100644 index 0000000..24e9a67 --- /dev/null +++ b/src/runtime/session.rs @@ -0,0 +1,337 @@ +use std::{fs, os::fd::OwnedFd, path::PathBuf, process::Stdio}; + +use nix::sys::{ + socket::{SockFlag, SockType}, + stat::Mode, +}; + +use crate::{ + cli::CommonCfg, + error::{ConmonError, ConmonResult}, + logging::plugin::LogPlugin, + parent_pipe::{get_pipe_fd_from_env, write_or_close_sync_fd}, + runtime::{ + args::{RuntimeArgsGenerator, generate_runtime_args}, + process::RuntimeProcess, + stdio::{create_pipe, handle_stdio, read_pipe}, + }, + unix_socket::{SocketType, UnixSocket}, +}; + +/// Represents Runtime session. +/// Handles spawning of runtime process, reading its stdio, writing its +/// pid and error code as well as the event loop to forward its log messages +/// to log plugins. +#[derive(Default)] +pub struct RuntimeSession { + process: RuntimeProcess, + sync_pipe_fd: Option, + workerfd_stdin: Option, + mainfd_stdout: Option, + mainfd_stderr: Option, + exit_code: i32, + attach_socket: Option, +} + +impl RuntimeSession { + pub fn new() -> Self { + Self { + process: RuntimeProcess::new(), + exit_code: -1, + ..Default::default() + } + } + + /// Returns the exit_code. + pub fn exit_code(&self) -> i32 { + self.exit_code + } + + // Helper function to read and return the container pid. + fn read_container_pid(&self, common: &CommonCfg) -> ConmonResult { + let contents = fs::read_to_string(common.container_pidfile.as_path())?; + let pid = contents.trim().parse::().map_err(|e| { + ConmonError::new( + format!( + "Invalid PID contents in {}: {} ({})", + common.container_pidfile.display(), + contents.trim(), + e + ), + 1, + ) + })?; + Ok(pid) + } + + /// Launches the Runtime binary and ensures the stdio pipes are created and + /// pid written to locations according to configuration. + pub fn launch( + &mut self, + common: &CommonCfg, + args_gen: &impl RuntimeArgsGenerator, + attach: bool, + ) -> ConmonResult<()> { + // Get the sync_pipe FD. It is used by the conmon caller to obtain the container_pid + // or the runtime error message later. + self.sync_pipe_fd = get_pipe_fd_from_env("_OCI_SYNCPIPE")?; + + // Get the attach pipe FD. We use it later to inform parent that attach + // socket is ready. + let mut attach_pipe_fd: Option = None; + if attach { + attach_pipe_fd = get_pipe_fd_from_env("_OCI_ATTACHPIPE")?; + if attach_pipe_fd.is_none() { + return Err(ConmonError::new( + "--attach specified but _OCI_ATTACHPIPE was not set", + 1, + )); + } + } + + // Create the `attach` socket which is used to send data to container's stdin. + let mut attach_socket = UnixSocket::new( + SocketType::Console, + common.full_attach, + common.bundle.clone(), + common.socket_dir_path.clone(), + common.cuuid.clone(), + ); + attach_socket.listen( + Some(PathBuf::from("attach")), + SockType::SeqPacket, + SockFlag::SOCK_NONBLOCK | SockFlag::SOCK_CLOEXEC, + Mode::from_bits_truncate(0o700), + )?; + self.attach_socket = Some(attach_socket); + + // Inform the parent that the attach socket is ready. + if let Some(fd) = attach_pipe_fd.take() { + write_or_close_sync_fd(fd, 0, None, common.api_version, true)?; + } + + // Get the start pipe FD. We wait for the parent to write some data into it + // before continuing with the runtime process execution. This is a simple + // sync mechanism between parent and us. + let mut start_pipe_fd = get_pipe_fd_from_env("_OCI_STARTPIPE")?; + if let Some(fd) = start_pipe_fd.take() { + // It is OK to just once from the pipe here. The pipe is used as a sync + // mechanism. We do not care about the read data at all. + let mut buf = [0u8; 8192]; + read_pipe(&fd, &mut buf)?; + // If we are using attach, we want to keep the start_pipe_fd valid, + // so it can be passed to `process.spawn()` and block the runtime + // execution for the second time. Parent use that to inform us that + // it is attached to container. + if attach { + start_pipe_fd = Some(fd); + } + } + + // Generate the list of arguments for runtime. + let runtime_args = generate_runtime_args(common, args_gen)?; + + // Generate pipes to handle stdio. + let (workerfd_stdin, mainfd_stdin_stdio) = if common.stdin { + let (fd_out, fd_in) = create_pipe()?; + (Some(fd_in), Stdio::from(fd_out)) + } else { + (None, Stdio::null()) + }; + let (mainfd_stdout, workerfd_stdout) = create_pipe()?; + let (mainfd_stderr, workerfd_stderr) = create_pipe()?; + self.workerfd_stdin = workerfd_stdin; + self.mainfd_stdout = Some(mainfd_stdout); + self.mainfd_stderr = Some(mainfd_stderr); + + // Run the `runtime create` and store our PID after first fork to `conmon_pidfile. + self.process.spawn( + &runtime_args, + mainfd_stdin_stdio, + Stdio::from(workerfd_stdout), + Stdio::from(workerfd_stderr), + start_pipe_fd, + )?; + + // Store the RuntimeProcess::pid in the `conmon_pidfile`. + if let Some(pidfile) = &common.conmon_pidfile { + std::fs::write(pidfile, self.process.pid().to_string())?; + } + + Ok(()) + } + + /// Write the container pid file to all the configured locations. + pub fn write_container_pid_file(&mut self, common: &CommonCfg) -> ConmonResult<()> { + // Pass the container_pid to sync_pipe if there is one. + if let Some(fd) = self.sync_pipe_fd.take() { + let container_pid = self.read_container_pid(common)?; + self.sync_pipe_fd = + write_or_close_sync_fd(fd, container_pid, None, common.api_version, false)?; + } + Ok(()) + } + + /// Writes the Runtime exit code to all the configured locations. + pub fn write_exit_code(&mut self, api_version: i32) -> ConmonResult<()> { + #[allow(clippy::collapsible_if)] + if let Some(fd) = self.sync_pipe_fd.take() { + if let Some(mainfd_stderr) = &self.mainfd_stderr { + // TODO: We are reading just once here and if container prints more than + // a buffer sizeto stderr, we ignore whatever does not fid into the buffer. + // This might be a problem, but the original conmon-v2 code behaves the same way. + let mut err_bytes = [0u8; 8192]; + let n = read_pipe(mainfd_stderr, &mut err_bytes)?; + let err_str = std::str::from_utf8(&err_bytes[..n])?; + self.sync_pipe_fd = + write_or_close_sync_fd(fd, self.exit_code, Some(err_str), api_version, true)?; + } + } + Ok(()) + } + + /// Waits for the Runtime process to exit. Returns the exit code. + pub fn wait(&mut self) -> ConmonResult { + self.exit_code = self.process.wait()?; + Ok(self.exit_code) + } + + /// Waits for the Runtime process to exit with zero exit code. + pub fn wait_for_success(&mut self, api_version: i32) -> ConmonResult<()> { + // Wait until the `runtime create` finishes. + self.wait()?; + if self.exit_code != 0 { + self.write_exit_code(api_version)?; + return Err(ConmonError::new( + format!("Runtime exited with status: {}", self.exit_code), + 1, + )); + } + Ok(()) + } + + /// Runs the event loop handling the container Runtime stdio. + pub fn run_event_loop( + &mut self, + log_plugin: &mut dyn LogPlugin, + leave_stdin_open: bool, + ) -> ConmonResult<()> { + #[allow(clippy::collapsible_if)] + if let Some(mainfd_out) = &self.mainfd_stdout { + if let Some(mainfd_err) = &self.mainfd_stderr { + handle_stdio( + log_plugin, + mainfd_out, + mainfd_err, + self.workerfd_stdin.take(), + self.attach_socket.as_ref(), + leave_stdin_open, + )?; + return Ok(()); + } + } + + Err(ConmonError::new("RuntimeSession called without stdio", 1)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use nix::unistd::write; + use tempfile::tempdir; + + #[test] + fn exit_code_defaults_and_accessor_work() { + let s = RuntimeSession::new(); + assert_eq!(s.exit_code(), -1); + } + + #[test] + fn read_container_pid_returns_pid_on_valid_file() -> ConmonResult<()> { + let tmp = tempdir()?; + let pid_path = tmp.path().join("pidfile.txt"); + std::fs::write(&pid_path, b"12345\n")?; + + let cfg = CommonCfg { + container_pidfile: pid_path, + conmon_pidfile: None, + api_version: 1, + ..Default::default() + }; + + let sess = RuntimeSession::new(); + let pid = sess.read_container_pid(&cfg)?; + assert_eq!(pid, 12345); + Ok(()) + } + + #[test] + fn read_container_pid_errors_on_invalid_contents() -> ConmonResult<()> { + let tmp = tempdir()?; + let pid_path = tmp.path().join("pidfile.txt"); + std::fs::write(&pid_path, b"not-a-number")?; + + let cfg = CommonCfg { + container_pidfile: pid_path.clone(), + conmon_pidfile: None, + api_version: 1, + ..Default::default() + }; + + let sess = RuntimeSession::new(); + let err = sess.read_container_pid(&cfg).unwrap_err(); + let msg = format!("{err}"); + assert!( + msg.contains("Invalid PID contents"), + "unexpected error: {msg}" + ); + Ok(()) + } + + #[test] + fn run_event_loop_errors_without_stdio() -> ConmonResult<()> { + struct NoopLog; + impl crate::logging::plugin::LogPlugin for NoopLog { + fn write(&mut self, _is_stdout: bool, _data: &[u8]) -> ConmonResult<()> { + Ok(()) + } + } + + let mut sess = RuntimeSession::new(); + let err = sess.run_event_loop(&mut NoopLog, false).unwrap_err(); + let msg = format!("{err}"); + assert!( + msg.contains("RuntimeSession called without stdio"), + "unexpected error message: {msg}" + ); + Ok(()) + } + + #[test] + fn write_exit_code_is_noop_without_sync_fd() -> ConmonResult<()> { + // If no syncpipe FD was captured in launch(), write_exit_code should simply succeed + // and not attempt to read from stderr (so it must not block). + let mut sess = RuntimeSession::new(); + let (r, w) = create_pipe()?; + // Write something into the write end so that if the code ever tried to read it, + // there would be data instead of blocking — but the code MUST NOT read + // because sync_pipe_fd is None. + write(w, "err\n".as_bytes())?; + // Prevent double-close of w after turning into File + // (we wrote and let File drop; raw FD consumed). + + sess.mainfd_stderr = Some(r); + + // Also set an exit code to make sure the function path is covered. + sess.exit_code = 42; + + // No sync_pipe_fd set -> should be a no-op success. + let res = sess.write_exit_code(1); + assert!( + res.is_ok(), + "write_exit_code should be a no-op when sync fd is None" + ); + Ok(()) + } +} diff --git a/src/runtime/stdio.rs b/src/runtime/stdio.rs index 0369364..2aa96d7 100644 --- a/src/runtime/stdio.rs +++ b/src/runtime/stdio.rs @@ -1,18 +1,20 @@ use crate::{ error::{ConmonError, ConmonResult}, logging::plugin::LogPlugin, + unix_socket::{RemoteSocket, SocketType, UnixSocket}, }; use nix::{ errno::Errno, fcntl::OFlag, poll::{PollFd, PollFlags, PollTimeout, poll}, - unistd::{pipe2, read}, + sys::socket::{SockaddrStorage, recvfrom}, + unistd::{pipe2, read, write}, }; use std::{ io, - os::fd::{AsFd, AsRawFd, OwnedFd}, + os::fd::{AsFd, AsRawFd, BorrowedFd, OwnedFd}, }; // Creates new pipe and return read/write fds. @@ -31,39 +33,60 @@ pub fn create_pipe() -> ConmonResult<(OwnedFd, OwnedFd)> { Ok((rfd, wfd)) } -// Reads data from fd and returns it. -pub fn read_pipe(fd: &OwnedFd) -> ConmonResult> { - let mut buf = [0u8; 8192]; - - let n = loop { - match read(fd, &mut buf) { - Ok(0) => return Ok(Vec::new()), // EOF - Ok(n) => break n, +/// Reads data from fd and stores them in the buffer. +/// Returns the number of bytes read. +pub fn read_pipe(fd: &OwnedFd, buf: &mut [u8]) -> ConmonResult { + loop { + match read(fd, buf) { + Ok(n) => return Ok(n), Err(Errno::EINTR) | Err(Errno::EAGAIN) => continue, - Err(_) => { - return Err(ConmonError::new("read() failed while reading pipe", 1)); + Err(e) => { + return Err(ConmonError::new( + format!("read() failed while reading pipe: {e}"), + 1, + )); } } - }; - - Ok(buf[..n].to_vec()) + } } -// Handles the writes to `mainfd_stdout` and `mainfd_stderr` by reading the data -// and forwarding it to log plugin. +/// Handles incomming data on fds and forwards them to right destination. +/// This function blocks until the container is running. pub fn handle_stdio( log_plugin: &mut dyn LogPlugin, - mainfd_stdout: OwnedFd, - mainfd_stderr: OwnedFd, + mainfd_stdout: &OwnedFd, + mainfd_stderr: &OwnedFd, + mut workerfd_stdin: Option, + attach_socket: Option<&UnixSocket>, + leave_stdin_open: bool, ) -> ConmonResult<()> { - let mut fds = [ - PollFd::new(mainfd_stdout.as_fd(), PollFlags::POLLIN), - PollFd::new(mainfd_stderr.as_fd(), PollFlags::POLLIN), + let mut buf = [0u8; 8192]; + let mut remote_sockets: Vec = Vec::new(); + + // Container's stdout and stderr. + let mut fds: Vec = vec![ + PollFd::new(mainfd_stdout.as_fd(), PollFlags::POLLIN), // index 0 + PollFd::new(mainfd_stderr.as_fd(), PollFlags::POLLIN), // index 1 ]; + let stdout_fd = mainfd_stdout.as_raw_fd(); + let stderr_fd = mainfd_stderr.as_raw_fd(); - let mut buf = [0u8; 8192]; + // Optional attach socket. + let mut attach_index: Option = None; + #[allow(clippy::collapsible_if)] + if let Some(attach) = attach_socket.as_ref() { + if let Some(fd) = attach.fd() { + let idx = fds.len(); + fds.push(PollFd::new(fd.as_fd(), PollFlags::POLLIN)); + attach_index = Some(idx); + } + } + + // All RemoteSockets live at indices >= remote_base in `fds` + let remote_base = fds.len(); // constant for the lifetime of this function loop { + // Run poll to get informed about new fd events. let n = poll(&mut fds, PollTimeout::NONE).map_err(|e| { ConmonError::new( format!( @@ -75,39 +98,123 @@ pub fn handle_stdio( })?; if n == 0 { - // Timeout. It should not happen, but be defensive. + // This should not happen, since we use PollTimeout::NONE, but + // be defensive. continue; } - for pfd in &fds { + // We will mutate fds/remote_sockets, so iterate by index. + let mut i = 0; + while i < fds.len() { + let pfd = &fds[i]; + if let Some(revents) = pfd.revents() { + let fd = pfd.as_fd().as_raw_fd(); if revents.contains(PollFlags::POLLIN) { - let n = match read(pfd.as_fd(), &mut buf) { - Ok(n) => n, - - Err(Errno::EINTR) | Err(Errno::EAGAIN) => break, - - Err(e) => { - return Err(ConmonError::new( - format!( - "handle_stdio read() failed: {}", - io::Error::from_raw_os_error(e as i32) - ), - 1, - )); + // 1) attach socket ready: accept new RemoteSocket, extend vectors + if Some(i) == attach_index { + #[allow(clippy::collapsible_if)] + if let Some(attach) = attach_socket.as_ref() { + if let Some(remote) = attach.accept()? { + // add RemoteSocket to `remote_sockets` and its fd to `fds`. + let raw = remote.fd.as_raw_fd(); + remote_sockets.push(remote); + let borrowed = unsafe { BorrowedFd::borrow_raw(raw) }; + fds.push(PollFd::new(borrowed, PollFlags::POLLIN)); + } } - }; - if n > 0 { - let is_stdout = pfd.as_fd().as_raw_fd() == mainfd_stdout.as_raw_fd(); - let _ = log_plugin.write(is_stdout, &buf); - } else { - // EOF - return Ok(()); + // Go to next fd. + i += 1; + continue; + } + + // 2) stdout / stderr ready: read the data and forward to logs. + if fd == stdout_fd || fd == stderr_fd { + match read(pfd.as_fd(), &mut buf) { + Ok(n) => { + if n > 0 { + let is_stdout = fd == stdout_fd; + let _ = log_plugin.write(is_stdout, &buf[..n]); + } else { + // EOF + return Ok(()); + } + } + Err(err) => { + if err == Errno::EWOULDBLOCK || err == Errno::EAGAIN { + // nothing more to read right now, jump to next fd + i += 1; + continue; + } + return Err(ConmonError::new( + format!( + "handle_stdio read() failed: {}", + io::Error::from_raw_os_error(err as i32) + ), + 1, + )); + } + } + i += 1; + continue; + } + + // 3) RemoteSocket ready: read it and forward to the right destination. + if i >= remote_base { + let remote_idx = i - remote_base; // index in remote_sockets + match recvfrom::(pfd.as_fd().as_raw_fd(), &mut buf) { + Ok((n, _sockaddr)) => { + if n > 0 { + match remote_sockets[remote_idx].socket_type { + SocketType::Console => { + // Console socket: forward data to container's stdin. + if let Some(workerfd_stdin) = workerfd_stdin.as_ref() { + write(workerfd_stdin, &buf[..n])?; + } + } + SocketType::Notify => { + // handle data coming from this remote + } + } + // Go to next fd. + i += 1; + } else { + // EOF: drop this remote socket + let remote = remote_sockets.swap_remove(remote_idx); + fds.swap_remove(i); + if remote.socket_type == SocketType::Console + && !leave_stdin_open + { + // This closes the socket, since moves out of scope. + workerfd_stdin.take(); + } + // do NOT increment i: we need to process the fd that got swapped in + } + } + Err(err) => { + if err == Errno::EWOULDBLOCK || err == Errno::EAGAIN { + i += 1; + } else { + // treat as fatal for this remote: remove it + let remote = remote_sockets.swap_remove(remote_idx); + fds.swap_remove(i); + if remote.socket_type == SocketType::Console + && !leave_stdin_open + { + // This closes the socket, since moves out of scope. + workerfd_stdin.take(); + } + } + } + } + continue; } } else if revents.contains(PollFlags::POLLHUP) { return Ok(()); } } + + i += 1; } } } @@ -121,6 +228,17 @@ mod tests { }; use nix::unistd::write as nix_write; + use std::os::unix::net::UnixStream; + use std::time::Duration; + use std::{io::Write as _, path::PathBuf}; + + use nix::sys::{ + socket::{SockFlag, SockType}, + stat::Mode, + }; + + use tempfile::TempDir; + mock! { pub LogPlugin {} impl crate::logging::plugin::LogPlugin for LogPlugin { @@ -152,7 +270,7 @@ mod tests { drop(w_err); // Read from the other side of pipes. - handle_stdio(&mut mock, r_out, r_err)?; + handle_stdio(&mut mock, &r_out, &r_err, None, None, false)?; Ok(()) } @@ -187,7 +305,138 @@ mod tests { drop(w_err); // Read from the other side of pipes. - handle_stdio(&mut mock, r_out, r_err)?; + handle_stdio(&mut mock, &r_out, &r_err, None, None, false)?; + Ok(()) + } + + struct TestLog; + + impl crate::logging::plugin::LogPlugin for TestLog { + fn write(&mut self, _is_stdout: bool, _data: &[u8]) -> ConmonResult<()> { + Ok(()) + } + } + + /// Helper that drives handle_stdio with an attach socket and two console clients. + /// + /// If `leave_open` is true, data from both clients should reach container stdin. + /// If false, only data from the first client should be forwarded (stdin closed on first EOF). + fn run_leave_stdin_open_scenario(leave_open: bool) -> ConmonResult> { + // Container stdout/stderr pipes. + let (r_out, w_out) = create_pipe()?; + let (r_err, w_err) = create_pipe()?; + + // Container stdin pipe: w_in is what handle_stdio writes to. + let (r_in, w_in) = create_pipe()?; + + // Prepare attach Unix socket. + let tmpdir = TempDir::new()?; + let socket_path = tmpdir.path().join("attach.sock"); + + let mut attach = UnixSocket::new( + SocketType::Console, + true, + PathBuf::from(tmpdir.path()), + None, + None, + ); + attach + .listen( + Some(socket_path.clone()), + SockType::Stream, + SockFlag::SOCK_NONBLOCK, + Mode::from_bits_truncate(0o600), + ) + .expect("listen failed"); + + // Spawn a thread to simulate attach clients and send stdout/stderr data. + let socket_path_for_thread = socket_path.clone(); + let client_thread = std::thread::spawn(move || { + // First console client – should always be forwarded. + let mut c1 = + UnixStream::connect(&socket_path_for_thread).expect("failed to connect client1"); + c1.write_all(b"CLIENT1\n").expect("write client1 failed"); + drop(c1); // EOF + + // Allow time for server to process EOF and, if !leave_open, close stdin. + std::thread::sleep(Duration::from_millis(100)); + + // Second console client – only forwarded if leave_stdin_open == true. + let mut c2 = + UnixStream::connect(&socket_path_for_thread).expect("failed to connect client2"); + c2.write_all(b"CLIENT2\n").expect("write client2 failed"); + drop(c2); // EOF + + // Allow time for server to process the second EOF. + std::thread::sleep(Duration::from_millis(100)); + + // Also send something to stdout/stderr so handle_stdio can eventually exit. + nix_write(w_out.as_fd(), b"OUT\n").ok(); + nix_write(w_err.as_fd(), b"ERR\n").ok(); + + // Close writers to produce EOF on stdout/stderr. + drop(w_out); + drop(w_err); + }); + + // Run handle_stdio in the main thread. + let mut logger = TestLog; + handle_stdio( + &mut logger, + &r_out, + &r_err, + Some(w_in), + Some(&attach), + leave_open, + )?; + + // Wait for client thread to finish. + client_thread.join().expect("client thread panicked"); + + // Read everything that reached "container stdin" (r_in). + let mut collected = Vec::new(); + let mut buf = [0u8; 1024]; + loop { + match read(r_in.as_fd(), &mut buf) { + Ok(0) => break, + Ok(n) => collected.extend_from_slice(&buf[..n]), + Err(Errno::EINTR) | Err(Errno::EAGAIN) => continue, + Err(e) => panic!("read from container stdin failed: {e}"), + } + } + + Ok(collected) + } + + #[test] + fn handle_stdio_leave_stdin_open_true() -> ConmonResult<()> { + let data = run_leave_stdin_open_scenario(true)?; + + let s = String::from_utf8_lossy(&data); + assert!( + s.contains("CLIENT1"), + "stdin data did not contain CLIENT1: {s:?}" + ); + assert!( + s.contains("CLIENT2"), + "stdin data did not contain CLIENT2 even though leave_stdin_open=true: {s:?}" + ); + Ok(()) + } + + #[test] + fn handle_stdio_leave_stdin_open_false() -> ConmonResult<()> { + let data = run_leave_stdin_open_scenario(false)?; + + let s = String::from_utf8_lossy(&data); + assert!( + s.contains("CLIENT1"), + "stdin data did not contain CLIENT1: {s:?}" + ); + assert!( + !s.contains("CLIENT2"), + "stdin data unexpectedly contained CLIENT2 even though leave_stdin_open=false: {s:?}" + ); Ok(()) } } diff --git a/src/unix_socket.rs b/src/unix_socket.rs new file mode 100644 index 0000000..7089c6a --- /dev/null +++ b/src/unix_socket.rs @@ -0,0 +1,298 @@ +use std::{ + os::fd::OwnedFd, + path::{Path, PathBuf}, +}; + +use crate::error::{ConmonError, ConmonResult}; +use std::{ + ffi::OsStr, + io, + os::fd::{AsRawFd, FromRawFd}, + os::unix::ffi::OsStrExt, +}; + +use nix::{ + NixPath, + errno::Errno, + fcntl::{AT_FDCWD, OFlag, open}, + sys::{ + socket::{ + AddressFamily, Backlog, SockFlag, SockType, UnixAddr, accept, bind, listen, socket, + }, + stat::{Mode, fchmod}, + }, + unistd::{mkstemp, symlinkat, unlink}, +}; + +use ::log::info; + +// Type of the UnixSocket and RemoteSocket. +#[derive(Debug, Copy, Clone, Eq, PartialEq, Default)] +pub enum SocketType { + #[default] + Console, // Socket for container's stdin ("console"). + Notify, // Socket for sd-notify. +} + +/// Remote side (attach client or sd-notify FD inside container). +#[derive(Debug)] +pub struct RemoteSocket { + pub socket_type: SocketType, + pub fd: OwnedFd, + pub buf: [u8; 8192], +} + +impl RemoteSocket { + pub fn new(socket_type: SocketType, fd: OwnedFd) -> Self { + Self { + socket_type, + fd, + buf: [0u8; 8192], + } + } +} + +impl Drop for RemoteSocket { + fn drop(&mut self) { + info!("Dropping RemoteSocket {:?}", self.fd) + } +} + +/// Represents single UnixSocket. +#[derive(Default)] +pub struct UnixSocket { + use_full_attach_path: bool, + bundle_path: PathBuf, + socket_path: Option, + cuuid: Option, + path: Option, + fd: Option, + socket_type: SocketType, +} + +impl UnixSocket { + pub fn new( + socket_type: SocketType, + use_full_attach_path: bool, + bundle_path: PathBuf, + socket_path: Option, + cuuid: Option, + ) -> Self { + let mut s = Self::default(); + s.socket_type = socket_type; + s.use_full_attach_path = use_full_attach_path; + s.bundle_path = bundle_path; + s.socket_path = socket_path; + s.cuuid = cuuid; + s + } + + pub fn fd(&self) -> Option<&OwnedFd> { + self.fd.as_ref() + } + + /// Generates the socket path and starts listening for new client (remote) connections. + pub fn listen( + &mut self, + path: Option, + sock_type: SockType, + sock_flags: SockFlag, + perms: Mode, + ) -> ConmonResult<()> { + let mut full_path: PathBuf; + let mut dir_fd: Option = None; + + if let Some(path) = path { + // We have some path, but we need a full-path. + // If the path is a full-path, use it. + // If it's not, generate the full-path using socket_parent_dir() and + // prefix the path with it. + full_path = path.to_owned(); + let fallback; + let dir = if let Some(parent) = path.parent() { + if !parent.is_empty() { + parent + } else { + fallback = self.socket_parent_dir()?; + let fallback_path = fallback.as_path(); + full_path = fallback_path.join(path); + fallback_path + } + } else { + fallback = self.socket_parent_dir()?; + let fallback_path = fallback.as_path(); + full_path = fallback_path.join(path); + fallback_path + }; + + // Create the parent-directory of full-path. + let flags = OFlag::O_CREAT | OFlag::O_CLOEXEC | OFlag::O_PATH; + let dfd = open(dir, flags, Mode::from_bits_truncate(0o600))?; + + // Store the dir_fd, because we will be creating the socket in this dir. + dir_fd = Some(dfd); + } else { + // We do not have a path, so create temporary one. + let tmpdir = std::env::temp_dir(); + full_path = tmpdir.join("conmon-term.XXXXXX"); + let (fd_tmp, x) = mkstemp(&full_path)?; + full_path = x; + drop(fd_tmp); + } + + // Remove old socket if present. + unlink(&full_path).or_else(|e| { + if e == nix::Error::ENOENT { + Ok(()) + } else { + Err(ConmonError::new( + format!("Failed to remove old socket {full_path:?}: {e}"), + 1, + )) + } + })?; + + // Now bind & listen on the console socket path. + let fd = socket(AddressFamily::Unix, sock_type, sock_flags, None)?; + + self.bind_relative_to_dir(&fd, dir_fd.as_ref(), &full_path, perms)?; + listen(&fd, Backlog::MAXCONN)?; + info!("Listening on {full_path:?}"); + self.fd = Some(fd); + self.path = Some(full_path); + + Ok(()) + } + + /// Bind the fd socket to relative path in dir_fd if defined. + /// If not defined, the path is considered as full-path. + fn bind_relative_to_dir( + &mut self, + fd: &OwnedFd, + dir_fd: Option<&OwnedFd>, + path: &PathBuf, + perms: Mode, + ) -> ConmonResult<()> { + let addr = if let Some(dfd) = dir_fd { + // Get the base_name - the directory is defined by dir_fd. + let base_name = path + .file_name() + .map(PathBuf::from) + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "no basename"))?; + // /proc/self/fd// + let name = format!( + "/proc/self/fd/{}/{}", + dfd.as_raw_fd(), + base_name.to_string_lossy() + ); + let path = Path::new(&name); + UnixAddr::new(path).map_err(|e| { + ConmonError::new(format!("Failed to create UnixAddr from {path:?}: {e:?}"), 1) + })? + } else { + UnixAddr::new(path).map_err(|e| { + ConmonError::new(format!("Failed to create UnixAddr from {path:?}: {e:?}"), 1) + })? + }; + + fchmod(fd, perms)?; + bind(fd.as_raw_fd(), &addr)?; + Ok(()) + } + + /// Returns the max socket path length. + fn max_socket_path_len(&mut self) -> usize { + let addr: nix::sys::socket::sockaddr_un = unsafe { std::mem::zeroed() }; + addr.sun_path.len() + } + + /// Generates the socket parent directory based on the UnixSocket options. + fn socket_parent_dir(&mut self) -> ConmonResult { + let base_path = if self.use_full_attach_path { + self.bundle_path.to_owned() + } else if let Some(cuuid) = &self.cuuid { + if let Some(socket_path) = &self.socket_path { + socket_path.join(cuuid) + } else { + "".into() + } + } else { + "".into() + }; + + if base_path.is_empty() { + return Err(ConmonError::new( + "Base path for socket cannot be determined", + 1, + )); + } + + if self.use_full_attach_path { + // nothing else to do + return Ok(base_path); + } + + let desired_len = self.max_socket_path_len(); + let mut base_path_bytes = base_path.as_os_str().as_bytes().to_vec(); + if base_path_bytes.len() >= desired_len - 1 { + // chop last char + if let Some(last) = base_path_bytes.last_mut() { + *last = b'\0'; + } + } + let new_base = PathBuf::from(OsStr::from_bytes( + base_path_bytes + .iter() + .take_while(|b| **b != 0) + .copied() + .collect::>() + .as_slice(), + )); + + // Remove old symlink if present + unlink(&new_base).or_else(|e| { + if e == nix::Error::ENOENT { + Ok(()) + } else { + Err(e) + } + })?; + + // symlink(bundle_path, base_path) + symlinkat(&self.bundle_path, AT_FDCWD, &new_base)?; + + Ok(new_base) + } + + /// Accept new UnixSocket client (remote) connection. + pub fn accept(&self) -> ConmonResult> { + if self.fd.is_none() { + return Ok(None); + } + + match accept(self.fd.as_ref().unwrap().as_raw_fd()) { + Ok(new_fd) => { + info!( + "Accepted new remote connection on socket {:?}: {}", + self.path, new_fd + ); + let remote = + RemoteSocket::new(self.socket_type, unsafe { OwnedFd::from_raw_fd(new_fd) }); + Ok(Some(remote)) + } + Err(Errno::EWOULDBLOCK) => Ok(None), + Err(e) => { + eprintln!("warn: Failed to accept client connection on attach socket: {e}"); + Ok(None) + } + } + } +} + +impl Drop for UnixSocket { + fn drop(&mut self) { + if let Some(path) = self.path.take() { + let _ = unlink(&path); + } + } +}