Skip to content

Commit 0c81333

Browse files
authored
Merge pull request #739 from tnull/2026-01-parallelize-payment-reads
Parallelize `read_payments`
2 parents 690d1f4 + c724a89 commit 0c81333

File tree

15 files changed

+357
-248
lines changed

15 files changed

+357
-248
lines changed

src/builder.rs

Lines changed: 62 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,8 @@ use bitcoin::bip32::{ChildNumber, Xpriv};
1919
use bitcoin::key::Secp256k1;
2020
use bitcoin::secp256k1::PublicKey;
2121
use bitcoin::{BlockHash, Network};
22-
2322
use bitcoin_payment_instructions::onion_message_resolver::LDKOnionMessageDNSSECHrnResolver;
24-
2523
use lightning::chain::{chainmonitor, BestBlock, Watch};
26-
use lightning::io::Cursor;
2724
use lightning::ln::channelmanager::{self, ChainParameters, ChannelManagerReadArgs};
2825
use lightning::ln::msgs::{RoutingMessageHandler, SocketAddress};
2926
use lightning::ln::peer_handler::{IgnoringMessageHandler, MessageHandler};
@@ -57,7 +54,9 @@ use crate::fee_estimator::OnchainFeeEstimator;
5754
use crate::gossip::GossipSource;
5855
use crate::io::sqlite_store::SqliteStore;
5956
use crate::io::utils::{
60-
read_external_pathfinding_scores_from_cache, read_node_metrics, write_node_metrics,
57+
read_event_queue, read_external_pathfinding_scores_from_cache, read_network_graph,
58+
read_node_metrics, read_output_sweeper, read_payments, read_peer_info, read_scorer,
59+
write_node_metrics,
6160
};
6261
use crate::io::vss_store::VssStoreBuilder;
6362
use crate::io::{
@@ -1053,7 +1052,9 @@ fn build_with_store_internal(
10531052
}
10541053

10551054
// Initialize the status fields.
1056-
let node_metrics = match read_node_metrics(Arc::clone(&kv_store), Arc::clone(&logger)) {
1055+
let node_metrics = match runtime
1056+
.block_on(async { read_node_metrics(&*kv_store, Arc::clone(&logger)).await })
1057+
{
10571058
Ok(metrics) => Arc::new(RwLock::new(metrics)),
10581059
Err(e) => {
10591060
if e.kind() == std::io::ErrorKind::NotFound {
@@ -1067,19 +1068,20 @@ fn build_with_store_internal(
10671068
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(Arc::clone(&logger)));
10681069
let fee_estimator = Arc::new(OnchainFeeEstimator::new());
10691070

1070-
let payment_store = match io::utils::read_payments(Arc::clone(&kv_store), Arc::clone(&logger)) {
1071-
Ok(payments) => Arc::new(PaymentStore::new(
1072-
payments,
1073-
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
1074-
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
1075-
Arc::clone(&kv_store),
1076-
Arc::clone(&logger),
1077-
)),
1078-
Err(e) => {
1079-
log_error!(logger, "Failed to read payment data from store: {}", e);
1080-
return Err(BuildError::ReadFailed);
1081-
},
1082-
};
1071+
let payment_store =
1072+
match runtime.block_on(async { read_payments(&*kv_store, Arc::clone(&logger)).await }) {
1073+
Ok(payments) => Arc::new(PaymentStore::new(
1074+
payments,
1075+
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
1076+
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
1077+
Arc::clone(&kv_store),
1078+
Arc::clone(&logger),
1079+
)),
1080+
Err(e) => {
1081+
log_error!(logger, "Failed to read payment data from store: {}", e);
1082+
return Err(BuildError::ReadFailed);
1083+
},
1084+
};
10831085

10841086
let (chain_source, chain_tip_opt) = match chain_data_source_config {
10851087
Some(ChainDataSourceConfig::Esplora { server_url, headers, sync_config }) => {
@@ -1294,24 +1296,23 @@ fn build_with_store_internal(
12941296
));
12951297

12961298
// Initialize the network graph, scorer, and router
1297-
let network_graph =
1298-
match io::utils::read_network_graph(Arc::clone(&kv_store), Arc::clone(&logger)) {
1299-
Ok(graph) => Arc::new(graph),
1300-
Err(e) => {
1301-
if e.kind() == std::io::ErrorKind::NotFound {
1302-
Arc::new(Graph::new(config.network.into(), Arc::clone(&logger)))
1303-
} else {
1304-
log_error!(logger, "Failed to read network graph from store: {}", e);
1305-
return Err(BuildError::ReadFailed);
1306-
}
1307-
},
1308-
};
1299+
let network_graph = match runtime
1300+
.block_on(async { read_network_graph(&*kv_store, Arc::clone(&logger)).await })
1301+
{
1302+
Ok(graph) => Arc::new(graph),
1303+
Err(e) => {
1304+
if e.kind() == std::io::ErrorKind::NotFound {
1305+
Arc::new(Graph::new(config.network.into(), Arc::clone(&logger)))
1306+
} else {
1307+
log_error!(logger, "Failed to read network graph from store: {}", e);
1308+
return Err(BuildError::ReadFailed);
1309+
}
1310+
},
1311+
};
13091312

1310-
let local_scorer = match io::utils::read_scorer(
1311-
Arc::clone(&kv_store),
1312-
Arc::clone(&network_graph),
1313-
Arc::clone(&logger),
1314-
) {
1313+
let local_scorer = match runtime.block_on(async {
1314+
read_scorer(&*kv_store, Arc::clone(&network_graph), Arc::clone(&logger)).await
1315+
}) {
13151316
Ok(scorer) => scorer,
13161317
Err(e) => {
13171318
if e.kind() == std::io::ErrorKind::NotFound {
@@ -1327,7 +1328,9 @@ fn build_with_store_internal(
13271328
let scorer = Arc::new(Mutex::new(CombinedScorer::new(local_scorer)));
13281329

13291330
// Restore external pathfinding scores from cache if possible.
1330-
match read_external_pathfinding_scores_from_cache(Arc::clone(&kv_store), Arc::clone(&logger)) {
1331+
match runtime.block_on(async {
1332+
read_external_pathfinding_scores_from_cache(&*kv_store, Arc::clone(&logger)).await
1333+
}) {
13311334
Ok(external_scores) => {
13321335
scorer.lock().unwrap().merge(external_scores, cur_time);
13331336
log_trace!(logger, "External scores from cache merged successfully");
@@ -1380,13 +1383,12 @@ fn build_with_store_internal(
13801383

13811384
// Initialize the ChannelManager
13821385
let channel_manager = {
1383-
if let Ok(res) = KVStoreSync::read(
1386+
if let Ok(reader) = KVStoreSync::read(
13841387
&*kv_store,
13851388
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
13861389
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
13871390
CHANNEL_MANAGER_PERSISTENCE_KEY,
13881391
) {
1389-
let mut reader = Cursor::new(res);
13901392
let channel_monitor_references =
13911393
channel_monitors.iter().map(|(_, chanmon)| chanmon).collect();
13921394
let read_args = ChannelManagerReadArgs::new(
@@ -1403,7 +1405,7 @@ fn build_with_store_internal(
14031405
channel_monitor_references,
14041406
);
14051407
let (_hash, channel_manager) =
1406-
<(BlockHash, ChannelManager)>::read(&mut reader, read_args).map_err(|e| {
1408+
<(BlockHash, ChannelManager)>::read(&mut &*reader, read_args).map_err(|e| {
14071409
log_error!(logger, "Failed to read channel manager from store: {}", e);
14081410
BuildError::ReadFailed
14091411
})?;
@@ -1486,15 +1488,11 @@ fn build_with_store_internal(
14861488
{
14871489
let mut locked_node_metrics = node_metrics.write().unwrap();
14881490
locked_node_metrics.latest_rgs_snapshot_timestamp = None;
1489-
write_node_metrics(
1490-
&*locked_node_metrics,
1491-
Arc::clone(&kv_store),
1492-
Arc::clone(&logger),
1493-
)
1494-
.map_err(|e| {
1495-
log_error!(logger, "Failed writing to store: {}", e);
1496-
BuildError::WriteFailed
1497-
})?;
1491+
write_node_metrics(&*locked_node_metrics, &*kv_store, Arc::clone(&logger))
1492+
.map_err(|e| {
1493+
log_error!(logger, "Failed writing to store: {}", e);
1494+
BuildError::WriteFailed
1495+
})?;
14981496
}
14991497
p2p_source
15001498
},
@@ -1616,14 +1614,17 @@ fn build_with_store_internal(
16161614
let connection_manager =
16171615
Arc::new(ConnectionManager::new(Arc::clone(&peer_manager), Arc::clone(&logger)));
16181616

1619-
let output_sweeper = match io::utils::read_output_sweeper(
1620-
Arc::clone(&tx_broadcaster),
1621-
Arc::clone(&fee_estimator),
1622-
Arc::clone(&chain_source),
1623-
Arc::clone(&keys_manager),
1624-
Arc::clone(&kv_store),
1625-
Arc::clone(&logger),
1626-
) {
1617+
let output_sweeper = match runtime.block_on(async {
1618+
read_output_sweeper(
1619+
Arc::clone(&tx_broadcaster),
1620+
Arc::clone(&fee_estimator),
1621+
Arc::clone(&chain_source),
1622+
Arc::clone(&keys_manager),
1623+
Arc::clone(&kv_store),
1624+
Arc::clone(&logger),
1625+
)
1626+
.await
1627+
}) {
16271628
Ok(output_sweeper) => Arc::new(output_sweeper),
16281629
Err(e) => {
16291630
if e.kind() == std::io::ErrorKind::NotFound {
@@ -1644,7 +1645,8 @@ fn build_with_store_internal(
16441645
},
16451646
};
16461647

1647-
let event_queue = match io::utils::read_event_queue(Arc::clone(&kv_store), Arc::clone(&logger))
1648+
let event_queue = match runtime
1649+
.block_on(async { read_event_queue(Arc::clone(&kv_store), Arc::clone(&logger)).await })
16481650
{
16491651
Ok(event_queue) => Arc::new(event_queue),
16501652
Err(e) => {
@@ -1657,7 +1659,9 @@ fn build_with_store_internal(
16571659
},
16581660
};
16591661

1660-
let peer_store = match io::utils::read_peer_info(Arc::clone(&kv_store), Arc::clone(&logger)) {
1662+
let peer_store = match runtime
1663+
.block_on(async { read_peer_info(Arc::clone(&kv_store), Arc::clone(&logger)).await })
1664+
{
16611665
Ok(peer_store) => Arc::new(peer_store),
16621666
Err(e) => {
16631667
if e.kind() == std::io::ErrorKind::NotFound {

src/chain/bitcoind.rs

Lines changed: 11 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -205,14 +205,10 @@ impl BitcoindChainSource {
205205
unix_time_secs_opt;
206206
locked_node_metrics.latest_onchain_wallet_sync_timestamp =
207207
unix_time_secs_opt;
208-
write_node_metrics(
209-
&*locked_node_metrics,
210-
Arc::clone(&self.kv_store),
211-
Arc::clone(&self.logger),
212-
)
213-
.unwrap_or_else(|e| {
214-
log_error!(self.logger, "Failed to persist node metrics: {}", e);
215-
});
208+
write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)
209+
.unwrap_or_else(|e| {
210+
log_error!(self.logger, "Failed to persist node metrics: {}", e);
211+
});
216212
}
217213
break;
218214
},
@@ -420,11 +416,11 @@ impl BitcoindChainSource {
420416
*self.latest_chain_tip.write().unwrap() = Some(tip);
421417

422418
periodically_archive_fully_resolved_monitors(
423-
Arc::clone(&channel_manager),
424-
chain_monitor,
425-
Arc::clone(&self.kv_store),
426-
Arc::clone(&self.logger),
427-
Arc::clone(&self.node_metrics),
419+
&*channel_manager,
420+
&*chain_monitor,
421+
&*self.kv_store,
422+
&*self.logger,
423+
&*self.node_metrics,
428424
)?;
429425
},
430426
Ok(_) => {},
@@ -469,11 +465,7 @@ impl BitcoindChainSource {
469465
locked_node_metrics.latest_lightning_wallet_sync_timestamp = unix_time_secs_opt;
470466
locked_node_metrics.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt;
471467

472-
write_node_metrics(
473-
&*locked_node_metrics,
474-
Arc::clone(&self.kv_store),
475-
Arc::clone(&self.logger),
476-
)?;
468+
write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?;
477469

478470
Ok(())
479471
}
@@ -586,11 +578,7 @@ impl BitcoindChainSource {
586578
{
587579
let mut locked_node_metrics = self.node_metrics.write().unwrap();
588580
locked_node_metrics.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt;
589-
write_node_metrics(
590-
&*locked_node_metrics,
591-
Arc::clone(&self.kv_store),
592-
Arc::clone(&self.logger),
593-
)?;
581+
write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?;
594582
}
595583

596584
Ok(())

src/chain/electrum.rs

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,8 @@ impl ElectrumChainSource {
149149
unix_time_secs_opt;
150150
write_node_metrics(
151151
&*locked_node_metrics,
152-
Arc::clone(&self.kv_store),
153-
Arc::clone(&self.logger),
152+
&*self.kv_store,
153+
&*self.logger,
154154
)?;
155155
}
156156
Ok(())
@@ -239,19 +239,15 @@ impl ElectrumChainSource {
239239
{
240240
let mut locked_node_metrics = self.node_metrics.write().unwrap();
241241
locked_node_metrics.latest_lightning_wallet_sync_timestamp = unix_time_secs_opt;
242-
write_node_metrics(
243-
&*locked_node_metrics,
244-
Arc::clone(&self.kv_store),
245-
Arc::clone(&self.logger),
246-
)?;
242+
write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?;
247243
}
248244

249245
periodically_archive_fully_resolved_monitors(
250-
Arc::clone(&channel_manager),
251-
Arc::clone(&chain_monitor),
252-
Arc::clone(&self.kv_store),
253-
Arc::clone(&self.logger),
254-
Arc::clone(&self.node_metrics),
246+
&*channel_manager,
247+
&*chain_monitor,
248+
&*self.kv_store,
249+
&*self.logger,
250+
&*self.node_metrics,
255251
)?;
256252
}
257253

@@ -284,11 +280,7 @@ impl ElectrumChainSource {
284280
{
285281
let mut locked_node_metrics = self.node_metrics.write().unwrap();
286282
locked_node_metrics.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt;
287-
write_node_metrics(
288-
&*locked_node_metrics,
289-
Arc::clone(&self.kv_store),
290-
Arc::clone(&self.logger),
291-
)?;
283+
write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?;
292284
}
293285

294286
Ok(())

src/chain/esplora.rs

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,8 @@ impl EsploraChainSource {
128128
locked_node_metrics.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt;
129129
write_node_metrics(
130130
&*locked_node_metrics,
131-
Arc::clone(&self.kv_store),
132-
Arc::clone(&self.logger)
131+
&*self.kv_store,
132+
&*self.logger
133133
)?;
134134
}
135135
Ok(())
@@ -259,19 +259,15 @@ impl EsploraChainSource {
259259
let mut locked_node_metrics = self.node_metrics.write().unwrap();
260260
locked_node_metrics.latest_lightning_wallet_sync_timestamp =
261261
unix_time_secs_opt;
262-
write_node_metrics(
263-
&*locked_node_metrics,
264-
Arc::clone(&self.kv_store),
265-
Arc::clone(&self.logger),
266-
)?;
262+
write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?;
267263
}
268264

269265
periodically_archive_fully_resolved_monitors(
270-
Arc::clone(&channel_manager),
271-
Arc::clone(&chain_monitor),
272-
Arc::clone(&self.kv_store),
273-
Arc::clone(&self.logger),
274-
Arc::clone(&self.node_metrics),
266+
&*channel_manager,
267+
&*chain_monitor,
268+
&*self.kv_store,
269+
&*self.logger,
270+
&*self.node_metrics,
275271
)?;
276272
Ok(())
277273
},
@@ -353,11 +349,7 @@ impl EsploraChainSource {
353349
{
354350
let mut locked_node_metrics = self.node_metrics.write().unwrap();
355351
locked_node_metrics.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt;
356-
write_node_metrics(
357-
&*locked_node_metrics,
358-
Arc::clone(&self.kv_store),
359-
Arc::clone(&self.logger),
360-
)?;
352+
write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?;
361353
}
362354

363355
Ok(())

src/chain/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -488,8 +488,8 @@ impl Filter for ChainSource {
488488
}
489489

490490
fn periodically_archive_fully_resolved_monitors(
491-
channel_manager: Arc<ChannelManager>, chain_monitor: Arc<ChainMonitor>,
492-
kv_store: Arc<DynStore>, logger: Arc<Logger>, node_metrics: Arc<RwLock<NodeMetrics>>,
491+
channel_manager: &ChannelManager, chain_monitor: &ChainMonitor, kv_store: &DynStore,
492+
logger: &Logger, node_metrics: &RwLock<NodeMetrics>,
493493
) -> Result<(), Error> {
494494
let mut locked_node_metrics = node_metrics.write().unwrap();
495495
let cur_height = channel_manager.current_best_block().height;

0 commit comments

Comments
 (0)