Skip to content

Commit 181a6e0

Browse files
joostjagerclaude
andcommitted
Defer MonitorUpdatingPersister writes to flush()
Update MonitorUpdatingPersister and MonitorUpdatingPersisterAsync to queue persist operations in memory instead of writing immediately to disk. The Persist trait methods now return ChannelMonitorUpdateStatus:: InProgress and the actual writes happen when flush() is called. This fixes a race condition that could cause channel force closures: previously, if the node crashed after writing channel monitors but before writing the channel manager, the monitors would be ahead of the manager on restart. By deferring monitor writes until after the channel manager is persisted (via flush()), we ensure the manager is always at least as up-to-date as the monitors. The flush() method takes an optional count parameter to flush only a specific number of queued writes. The background processor captures the queue size before persisting the channel manager, then flushes exactly that many writes afterward. This prevents flushing monitor updates that arrived after the manager state was captured. Key changes: - Add PendingWrite enum to represent queued write/remove operations - Add pending_writes queue to MonitorUpdatingPersisterAsyncInner - Add pending_write_count() and flush(count) to Persist trait and ChainMonitor - ChainMonitor::flush() calls channel_monitor_updated for each completed write - Update Persist impl to queue writes and return InProgress - Call flush() in background processor after channel manager persistence - Remove unused event_notifier from AsyncPersister Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent b4fb555 commit 181a6e0

File tree

3 files changed

+242
-213
lines changed

3 files changed

+242
-213
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1152,6 +1152,11 @@ where
11521152

11531153
let mut futures = Joiner::new();
11541154

1155+
// Capture the number of pending monitor writes before persisting the channel manager.
1156+
// We'll only flush this many writes after the manager is persisted, to avoid flushing
1157+
// monitor updates that arrived after the manager state was captured.
1158+
let pending_monitor_writes = chain_monitor.pending_write_count();
1159+
11551160
if channel_manager.get_cm().get_and_clear_needs_persistence() {
11561161
log_trace!(logger, "Persisting ChannelManager...");
11571162

@@ -1349,6 +1354,14 @@ where
13491354
res?;
13501355
}
13511356

1357+
// Flush the monitor writes that were pending before we persisted the channel manager.
1358+
// Any writes that arrived after are left in the queue for the next iteration.
1359+
if pending_monitor_writes > 0 {
1360+
if let Err(e) = chain_monitor.flush(Some(pending_monitor_writes)) {
1361+
log_error!(logger, "Failed to flush chain monitor: {}", e);
1362+
}
1363+
}
1364+
13521365
match check_and_reset_sleeper(&mut last_onion_message_handler_call, || {
13531366
sleeper(ONION_MESSAGE_HANDLER_TIMER)
13541367
}) {
@@ -1413,6 +1426,12 @@ where
14131426
channel_manager.get_cm().encode(),
14141427
)
14151428
.await?;
1429+
1430+
// Flush all pending monitor writes after final channel manager persistence.
1431+
if let Err(e) = chain_monitor.flush(None) {
1432+
log_error!(logger, "Failed to flush chain monitor: {}", e);
1433+
}
1434+
14161435
if let Some(ref scorer) = scorer {
14171436
kv_store
14181437
.write(
@@ -1722,6 +1741,9 @@ impl BackgroundProcessor {
17221741
channel_manager.get_cm().timer_tick_occurred();
17231742
last_freshness_call = Instant::now();
17241743
}
1744+
// Capture the number of pending monitor writes before persisting the channel manager.
1745+
let pending_monitor_writes = chain_monitor.pending_write_count();
1746+
17251747
if channel_manager.get_cm().get_and_clear_needs_persistence() {
17261748
log_trace!(logger, "Persisting ChannelManager...");
17271749
(kv_store.write(
@@ -1733,6 +1755,13 @@ impl BackgroundProcessor {
17331755
log_trace!(logger, "Done persisting ChannelManager.");
17341756
}
17351757

1758+
// Flush the monitor writes that were pending before we persisted the channel manager.
1759+
if pending_monitor_writes > 0 {
1760+
if let Err(e) = chain_monitor.flush(Some(pending_monitor_writes)) {
1761+
log_error!(logger, "Failed to flush chain monitor: {}", e);
1762+
}
1763+
}
1764+
17361765
if let Some(liquidity_manager) = liquidity_manager.as_ref() {
17371766
log_trace!(logger, "Persisting LiquidityManager...");
17381767
let _ = liquidity_manager.get_lm().persist().map_err(|e| {
@@ -1853,6 +1882,12 @@ impl BackgroundProcessor {
18531882
CHANNEL_MANAGER_PERSISTENCE_KEY,
18541883
channel_manager.get_cm().encode(),
18551884
)?;
1885+
1886+
// Flush all pending monitor writes after final channel manager persistence.
1887+
if let Err(e) = chain_monitor.flush(None) {
1888+
log_error!(logger, "Failed to flush chain monitor: {}", e);
1889+
}
1890+
18561891
if let Some(ref scorer) = scorer {
18571892
kv_store.write(
18581893
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,

lightning/src/chain/chainmonitor.rs

Lines changed: 44 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use crate::chain::channelmonitor::{
3939
use crate::chain::transaction::{OutPoint, TransactionData};
4040
use crate::chain::{BestBlock, ChannelMonitorUpdateStatus, Filter, WatchedOutput};
4141
use crate::events::{self, Event, EventHandler, ReplayEvent};
42+
use crate::io;
4243
use crate::ln::channel_state::ChannelDetails;
4344
#[cfg(peer_storage)]
4445
use crate::ln::msgs::PeerStorage;
@@ -198,16 +199,22 @@ pub trait Persist<ChannelSigner: EcdsaChannelSigner> {
198199
/// the monitor already exists in the archive.
199200
fn archive_persisted_channel(&self, monitor_name: MonitorName);
200201

201-
/// Fetches the set of [`ChannelMonitorUpdate`]s, previously persisted with
202-
/// [`Self::update_persisted_channel`], which have completed.
202+
/// Returns the number of pending writes in the queue.
203203
///
204-
/// Returning an update here is equivalent to calling
205-
/// [`ChainMonitor::channel_monitor_updated`]. Because of this, this method is defaulted and
206-
/// hidden in the docs.
207-
#[doc(hidden)]
208-
fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> {
209-
Vec::new()
204+
/// This can be used to capture the queue size before persisting the channel manager,
205+
/// then pass that count to [`Self::flush`] to only flush those specific updates.
206+
fn pending_write_count(&self) -> usize {
207+
0
210208
}
209+
210+
/// Flushes pending writes to the underlying storage.
211+
///
212+
/// If `count` is `Some(n)`, only the first `n` pending writes are flushed.
213+
/// If `count` is `None`, all pending writes are flushed.
214+
///
215+
/// For implementations that queue writes (returning [`ChannelMonitorUpdateStatus::InProgress`]
216+
/// from persist methods), this method should write queued data to storage.
217+
fn flush(&self, count: Option<usize>) -> Result<(), io::Error>;
211218
}
212219

213220
struct MonitorHolder<ChannelSigner: EcdsaChannelSigner> {
@@ -272,7 +279,6 @@ pub struct AsyncPersister<
272279
FE::Target: FeeEstimator,
273280
{
274281
persister: MonitorUpdatingPersisterAsync<K, S, L, ES, SP, BI, FE>,
275-
event_notifier: Arc<Notifier>,
276282
}
277283

278284
impl<
@@ -320,26 +326,28 @@ where
320326
&self, monitor_name: MonitorName,
321327
monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
322328
) -> ChannelMonitorUpdateStatus {
323-
let notifier = Arc::clone(&self.event_notifier);
324-
self.persister.spawn_async_persist_new_channel(monitor_name, monitor, notifier);
329+
self.persister.queue_new_channel(monitor_name, monitor);
325330
ChannelMonitorUpdateStatus::InProgress
326331
}
327332

328333
fn update_persisted_channel(
329334
&self, monitor_name: MonitorName, monitor_update: Option<&ChannelMonitorUpdate>,
330335
monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
331336
) -> ChannelMonitorUpdateStatus {
332-
let notifier = Arc::clone(&self.event_notifier);
333-
self.persister.spawn_async_update_channel(monitor_name, monitor_update, monitor, notifier);
337+
self.persister.queue_channel_update(monitor_name, monitor_update, monitor);
334338
ChannelMonitorUpdateStatus::InProgress
335339
}
336340

337341
fn archive_persisted_channel(&self, monitor_name: MonitorName) {
338342
self.persister.spawn_async_archive_persisted_channel(monitor_name);
339343
}
340344

341-
fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> {
342-
self.persister.get_and_clear_completed_updates()
345+
fn pending_write_count(&self) -> usize {
346+
self.persister.pending_write_count()
347+
}
348+
349+
fn flush(&self, count: Option<usize>) -> Result<(), io::Error> {
350+
crate::util::persist::poll_sync_future(self.persister.flush(count))
343351
}
344352
}
345353

@@ -440,7 +448,6 @@ impl<
440448
persister: MonitorUpdatingPersisterAsync<K, S, L, ES, SP, T, F>, _entropy_source: ES,
441449
_our_peerstorage_encryption_key: PeerStorageKey,
442450
) -> Self {
443-
let event_notifier = Arc::new(Notifier::new());
444451
Self {
445452
monitors: RwLock::new(new_hash_map()),
446453
chain_source,
@@ -450,8 +457,8 @@ impl<
450457
_entropy_source,
451458
pending_monitor_events: Mutex::new(Vec::new()),
452459
highest_chain_height: AtomicUsize::new(0),
453-
event_notifier: Arc::clone(&event_notifier),
454-
persister: AsyncPersister { persister, event_notifier },
460+
event_notifier: Arc::new(Notifier::new()),
461+
persister: AsyncPersister { persister },
455462
pending_send_only_events: Mutex::new(Vec::new()),
456463
#[cfg(peer_storage)]
457464
our_peerstorage_encryption_key: _our_peerstorage_encryption_key,
@@ -742,6 +749,25 @@ where
742749
.collect()
743750
}
744751

752+
/// Returns the number of pending writes in the persister queue.
753+
///
754+
/// This can be used to capture the queue size before persisting the channel manager,
755+
/// then pass that count to [`Self::flush`] to only flush those specific updates.
756+
pub fn pending_write_count(&self) -> usize {
757+
self.persister.pending_write_count()
758+
}
759+
760+
/// Flushes pending writes to the underlying storage.
761+
///
762+
/// If `count` is `Some(n)`, only the first `n` pending writes are flushed.
763+
/// If `count` is `None`, all pending writes are flushed.
764+
///
765+
/// For persisters that queue writes (returning [`ChannelMonitorUpdateStatus::InProgress`]
766+
/// from persist methods), this method writes queued data to storage.
767+
pub fn flush(&self, count: Option<usize>) -> Result<(), io::Error> {
768+
self.persister.flush(count)
769+
}
770+
745771
#[cfg(any(test, feature = "_test_utils"))]
746772
pub fn remove_monitor(&self, channel_id: &ChannelId) -> ChannelMonitor<ChannelSigner> {
747773
self.monitors.write().unwrap().remove(channel_id).unwrap().monitor
@@ -1497,9 +1523,6 @@ where
14971523
fn release_pending_monitor_events(
14981524
&self,
14991525
) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, PublicKey)> {
1500-
for (channel_id, update_id) in self.persister.get_and_clear_completed_updates() {
1501-
let _ = self.channel_monitor_updated(channel_id, update_id);
1502-
}
15031526
let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0);
15041527
for monitor_state in self.monitors.read().unwrap().values() {
15051528
let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events();

0 commit comments

Comments
 (0)