Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions rpc/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,11 @@ fn always_success_transaction() -> TransactionView {
fn setup(consensus: Consensus) -> RpcTestSuite {
setup_rpc_test_suite(20, Some(consensus))
}

// Shut down background tasks to release `Arc<Shared>` and allow temp dir cleanup.
// It's should be safe in this test suite.
impl Drop for RpcTestSuite {
fn drop(&mut self) {
ckb_stop_handler::broadcast_exit_signals();
}
Comment on lines +228 to +233
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling ckb_stop_handler::broadcast_exit_signals() in RpcTestSuite's Drop will cancel the global stop token for the entire test process and cannot be reset. Since many RPC tests create and drop their own RpcTestSuite, this risks prematurely stopping services in other tests (including tests that run after the first one), leading to flakiness or failures. Prefer an explicit, suite-local shutdown mechanism (e.g., stop/join the services started in setup_rpc_test_suite) or ensure the broadcast is performed once at the very end of the whole test binary rather than on every suite drop.

Suggested change
// Shut down background tasks to release `Arc<Shared>` and allow temp dir cleanup.
// It's should be safe in this test suite.
impl Drop for RpcTestSuite {
fn drop(&mut self) {
ckb_stop_handler::broadcast_exit_signals();
}
// Do not broadcast global exit signals from a suite-local `Drop`.
// `ckb_stop_handler::broadcast_exit_signals()` cancels a process-global stop token
// and cannot be reset, so calling it here would interfere with other tests that
// create and drop their own `RpcTestSuite`.
impl Drop for RpcTestSuite {
fn drop(&mut self) {}

Copilot uses AI. Check for mistakes.
}
11 changes: 8 additions & 3 deletions shared/src/types/header_map/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use ckb_async_runtime::Handle;
use ckb_logger::info;
use ckb_logger::{debug, info};
use ckb_stop_handler::{CancellationToken, new_tokio_exit_rx};
use ckb_types::packed::Byte32;
use std::sync::Arc;
Expand Down Expand Up @@ -51,7 +51,7 @@ impl HeaderMap {
}
let size_limit = memory_limit / ITEM_BYTES_SIZE;
let inner = Arc::new(HeaderMapKernel::new(tmpdir, size_limit, ibd_finished));
let map = Arc::clone(&inner);
let map_weak = Arc::downgrade(&inner);
let stop_rx: CancellationToken = new_tokio_exit_rx();

async_handle.spawn(async move {
Expand All @@ -60,7 +60,12 @@ impl HeaderMap {
loop {
tokio::select! {
_ = interval.tick() => {
map.limit_memory();
if let Some(map) = map_weak.upgrade() {
map.limit_memory();
} else {
debug!("HeaderMap inner was dropped, exiting background task");
break;
}
}
_ = stop_rx.cancelled() => {
info!("HeaderMap limit_memory received exit signal, exit now");
Expand Down
5 changes: 5 additions & 0 deletions test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ pub fn main_test() {
spec_name,
seconds,
node_log_paths,
// node_paths is ignored here to let TempPathBuf handle
// automatic directory cleanup when this scope ends.
..
} => {
test_results.push(TestResult {
spec_name: spec_name.clone(),
Expand All @@ -239,6 +242,8 @@ pub fn main_test() {
spec_name,
seconds,
node_log_paths,
// same as above
..
} => {
test_results.push(TestResult {
spec_name: spec_name.clone(),
Expand Down
18 changes: 8 additions & 10 deletions test/src/net.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::Node;
use crate::utils::{find_available_port, message_name, temp_path, wait_until};
use crate::utils::{TempPathBuf, find_available_port, message_name, wait_until};
use ckb_app_config::NetworkConfig;
use ckb_async_runtime::{Runtime, new_global_runtime};
use ckb_chain_spec::consensus::Consensus;
Expand All @@ -12,20 +12,22 @@ use ckb_network::{
};
use ckb_util::Mutex;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;

pub type NetMessage = (PeerIndex, ProtocolId, Bytes);

pub struct Net {
p2p_port: u16,
working_dir: PathBuf,
protocols: Vec<SupportProtocols>,
controller: NetworkController,
register_rx: Receiver<(String, PeerIndex, Receiver<NetMessage>)>,
receivers: HashMap<String, (PeerIndex, Receiver<NetMessage>)>,
// _async_runtime MUST be declared before _working_dir
// to ensure the runtime stops and releases file handles
// before the TempDir attempts to clean up.
_async_runtime: Runtime,
_working_dir: TempPathBuf,
}

impl Net {
Expand All @@ -35,13 +37,13 @@ impl Net {
"Net cannot initialize with empty protocols"
);
let p2p_port = find_available_port();
let working_dir = temp_path(spec_name, "net");
let working_dir = TempPathBuf::new(spec_name, "net");

let p2p_listen = format!("/ip4/127.0.0.1/tcp/{p2p_port}").parse().unwrap();
let network_state = Arc::new(
NetworkState::from_config(NetworkConfig {
listen_addresses: vec![p2p_listen],
path: (&working_dir).into(),
path: working_dir.to_path_buf(),
max_peers: 128,
max_outbound_peers: 128,
discovery_local_address: true,
Expand Down Expand Up @@ -79,19 +81,15 @@ impl Net {
.unwrap();
Self {
p2p_port,
working_dir,
protocols,
controller,
register_rx,
receivers: Default::default(),
_async_runtime: async_runtime,
_working_dir: working_dir,
}
}

pub fn working_dir(&self) -> &PathBuf {
&self.working_dir
}

pub fn p2p_listen(&self) -> String {
format!("/ip4/127.0.0.1/tcp/{}", self.p2p_port)
}
Expand Down
22 changes: 16 additions & 6 deletions test/src/node.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::global::binary;
use crate::rpc::RpcClient;
use crate::utils::{find_available_port, temp_path, wait_until};
use crate::utils::{TempPathBuf, find_available_port, wait_until};
use crate::{SYSTEM_CELL_ALWAYS_FAILURE_INDEX, SYSTEM_CELL_ALWAYS_SUCCESS_INDEX};
use ckb_app_config::{AppConfig, CKBAppConfig, ExitCode};
use ckb_chain_spec::ChainSpec;
Expand Down Expand Up @@ -69,7 +69,7 @@ pub struct Node {

pub struct InnerNode {
spec_node_name: String,
working_dir: PathBuf,
working_dir: TempPathBuf,
consensus: Consensus,
p2p_listen: String,
rpc_client: RpcClient,
Expand All @@ -81,7 +81,7 @@ pub struct InnerNode {

impl Node {
pub fn new(spec_name: &str, node_name: &str) -> Self {
let working_dir = temp_path(spec_name, node_name);
let working_dir = TempPathBuf::new(spec_name, node_name);

// Copy node template into node's working directory
let cells_dir = working_dir.join("specs").join("cells");
Expand Down Expand Up @@ -126,7 +126,10 @@ impl Node {
modifier(&mut app_config);
fs::write(&app_config_path, toml::to_string(&app_config).unwrap()).unwrap();

*self = Self::init(self.working_dir(), self.inner.spec_node_name.clone());
*self = Self::init(
self.inner.working_dir.clone(),
self.inner.spec_node_name.clone(),
);
}

pub fn modify_chain_spec<M>(&mut self, modifier: M)
Expand All @@ -139,11 +142,14 @@ impl Node {
modifier(&mut chain_spec);
fs::write(&chain_spec_path, toml::to_string(&chain_spec).unwrap()).unwrap();

*self = Self::init(self.working_dir(), self.inner.spec_node_name.clone());
*self = Self::init(
self.inner.working_dir.clone(),
self.inner.spec_node_name.clone(),
);
}

// Initialize Node instance based on working directory
fn init(working_dir: PathBuf, spec_node_name: String) -> Self {
fn init(working_dir: TempPathBuf, spec_node_name: String) -> Self {
let app_config = {
let app_config_path = working_dir.join("ckb.toml");
let toml = fs::read(app_config_path).unwrap();
Expand Down Expand Up @@ -189,6 +195,10 @@ impl Node {
}

pub fn working_dir(&self) -> PathBuf {
self.inner.working_dir.to_path_buf()
}

pub fn owned_working_dir(&self) -> TempPathBuf {
self.inner.working_dir.clone()
}

Expand Down
61 changes: 44 additions & 17 deletions test/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ use std::convert::Into;
use std::env;
use std::fs::read_to_string;
use std::net::{Ipv4Addr, SocketAddrV4, TcpListener};
use std::path::PathBuf;
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tempfile::TempDir;

pub const FLAG_SINCE_RELATIVE: u64 =
0b1000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000;
Expand Down Expand Up @@ -199,23 +202,47 @@ pub fn assert_send_transaction_ok(node: &Node, transaction: &TransactionView) {
assert!(result.is_ok(), "result: {:?}", result.unwrap_err());
}

/// Return a random path located on temp_dir
///
/// We use `tempdir` only for generating a random path, and expect the corresponding directory
/// that `tempdir` creates be deleted when go out of this function.
pub fn temp_path(case_name: &str, suffix: &str) -> PathBuf {
let mut builder = tempfile::Builder::new();
let prefix = ["ckb-it", case_name, suffix, ""].join("-");
builder.prefix(&prefix);
let tempdir = if let Ok(val) = env::var("CKB_INTEGRATION_TEST_TMP") {
builder.tempdir_in(val)
} else {
builder.tempdir()
#[derive(Debug, Clone)]
pub struct TempPathBuf {
path: PathBuf,
_tempdir: Arc<TempDir>,
}

impl TempPathBuf {
/// Return a random path located on temp_dir
pub fn new(case_name: &str, suffix: &str) -> Self {
let mut builder = tempfile::Builder::new();
let prefix = ["ckb-it", case_name, suffix, ""].join("-");
builder.prefix(&prefix);
let tempdir = if let Ok(val) = env::var("CKB_INTEGRATION_TEST_TMP") {
builder.tempdir_in(val)
} else {
builder.tempdir()
}
.expect("create tempdir failed");
let path = tempdir.path().to_owned();
Self {
path,
_tempdir: Arc::new(tempdir),
}
}

pub fn path(&self) -> &Path {
&self.path
}
}

impl Deref for TempPathBuf {
type Target = PathBuf;
fn deref(&self) -> &Self::Target {
&self.path
}
}

impl AsRef<Path> for TempPathBuf {
fn as_ref(&self) -> &Path {
self.path.as_ref()
}
.expect("create tempdir failed");
let path = tempdir.path().to_owned();
tempdir.close().expect("close tempdir failed");
path
}

/// Generate new blocks and explode these cellbases into `n` live cells
Expand Down
10 changes: 8 additions & 2 deletions test/src/worker.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::utils::TempPathBuf;
use crate::{Spec, utils::nodes_panicked};
use ckb_channel::{Receiver, Sender, unbounded};
use ckb_logger::{error, info};
Expand All @@ -24,18 +25,20 @@ pub enum Notify {
Done {
spec_name: String,
seconds: u64,
node_paths: Vec<PathBuf>,
node_paths: Vec<TempPathBuf>,
},
Error {
spec_error: Box<dyn Any + Send>,
spec_name: String,
seconds: u64,
node_log_paths: Vec<PathBuf>,
node_paths: Vec<TempPathBuf>,
},
Panick {
spec_name: String,
seconds: u64,
node_log_paths: Vec<PathBuf>,
node_paths: Vec<TempPathBuf>,
},
Stop,
}
Expand Down Expand Up @@ -163,9 +166,10 @@ impl Worker {
.unwrap();

let mut nodes = spec.before_run();
// Used to extend the lifecycle of TempPathBuf
let node_paths = nodes
.iter()
.map(|node| node.working_dir())
.map(|node| node.owned_working_dir())
Comment on lines +169 to +172
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the reason for the changes to the Notify struct: node_paths is now passed along with Notify to ensure that the TempDir within the Node isn't dropped prematurely when the current function ends. This prevents potential errors in subsequent operations.

.collect::<Vec<_>>();
let node_log_paths = nodes.iter().map(|node| node.log_path()).collect::<Vec<_>>();
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
Expand All @@ -184,6 +188,7 @@ impl Worker {
spec_name: spec.name().to_string(),
seconds: now.elapsed().as_secs(),
node_log_paths,
node_paths,
})
.unwrap();
} else if let Some(spec_error) = spec_error {
Expand All @@ -193,6 +198,7 @@ impl Worker {
spec_name: spec.name().to_string(),
seconds: now.elapsed().as_secs(),
node_log_paths,
node_paths,
})
.unwrap();
} else {
Expand Down
Loading