Skip to content

Commit 25e8feb

Browse files
evanlinjinclaude
andcommitted
refactor(bitcoind_rpc)!: store conversion closure in FilterIter and Emitter
Store `Fn(Header) -> D` in `FilterIter` and `Fn(&Block) -> D` in `Emitter`, enabling `Iterator` for any `D` without a `FromBlockHeader` bound. Add `new_with` constructors for custom `D` types; `new` delegates to `new_with` for the common `FromBlockHeader` case. Remove `FilterIter::next_with` and `Emitter::next_block_with` — callers should use `new_with` at construction time instead. Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
1 parent 96086c6 commit 25e8feb

File tree

2 files changed

+116
-64
lines changed

2 files changed

+116
-64
lines changed

crates/bitcoind_rpc/src/bip158.rs

Lines changed: 59 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ use bdk_core::CheckPoint;
1313
use bdk_core::FromBlockHeader;
1414
use bdk_core::ToBlockHash;
1515
use bitcoin::block::Header;
16+
use bitcoin::hashes::Hash;
17+
use bitcoin::pow::CompactTarget;
1618
use bitcoin::BlockHash;
1719
use bitcoin::{bip158::BlockFilter, Block, ScriptBuf};
1820
use bitcoincore_rpc;
@@ -33,7 +35,6 @@ use bitcoincore_rpc::{json::GetBlockHeaderResult, RpcApi};
3335
/// occur. `FilterIter` will continue to yield events until it reaches the latest chain tip.
3436
/// Events contain the updated checkpoint `cp` which may be incorporated into the local chain
3537
/// state to stay in sync with the tip.
36-
#[derive(Debug)]
3738
pub struct FilterIter<'a, D = BlockHash> {
3839
/// RPC client
3940
client: &'a bitcoincore_rpc::Client,
@@ -43,26 +44,56 @@ pub struct FilterIter<'a, D = BlockHash> {
4344
cp: CheckPoint<D>,
4445
/// Header info, contains the prev and next hashes for each header.
4546
header: Option<GetBlockHeaderResult>,
47+
/// Closure to convert a block header into checkpoint data `D`.
48+
to_data: Box<dyn Fn(Header) -> D + Send + Sync>,
4649
}
4750

48-
impl<'a, D> FilterIter<'a, D>
49-
where
50-
D: ToBlockHash + Clone + Debug,
51-
{
52-
/// Construct [`FilterIter`] with checkpoint, RPC client and SPKs.
53-
pub fn new(
51+
impl<D: Debug> core::fmt::Debug for FilterIter<'_, D> {
52+
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
53+
f.debug_struct("FilterIter")
54+
.field("spks", &self.spks)
55+
.field("cp", &self.cp)
56+
.field("header", &self.header)
57+
.finish_non_exhaustive()
58+
}
59+
}
60+
61+
impl<'a, D: 'static> FilterIter<'a, D> {
62+
/// Construct [`FilterIter`] with a custom closure to convert block headers into checkpoint
63+
/// data.
64+
///
65+
/// Use [`new`](Self::new) for the common case where `D` implements [`FromBlockHeader`].
66+
pub fn new_with(
5467
client: &'a bitcoincore_rpc::Client,
5568
cp: CheckPoint<D>,
5669
spks: impl IntoIterator<Item = ScriptBuf>,
70+
to_data: impl Fn(Header) -> D + Send + Sync + 'static,
5771
) -> Self {
5872
Self {
5973
client,
6074
spks: spks.into_iter().collect(),
6175
cp,
6276
header: None,
77+
to_data: Box::new(to_data),
6378
}
6479
}
80+
}
6581

82+
impl<'a, D: FromBlockHeader + 'static> FilterIter<'a, D> {
83+
/// Construct [`FilterIter`] with checkpoint, RPC client and SPKs.
84+
pub fn new(
85+
client: &'a bitcoincore_rpc::Client,
86+
cp: CheckPoint<D>,
87+
spks: impl IntoIterator<Item = ScriptBuf>,
88+
) -> Self {
89+
Self::new_with(client, cp, spks, D::from_blockheader)
90+
}
91+
}
92+
93+
impl<'a, D> FilterIter<'a, D>
94+
where
95+
D: ToBlockHash + Clone + Debug,
96+
{
6697
/// Return the agreement header with the remote node.
6798
///
6899
/// Error if no agreement header is found.
@@ -78,10 +109,7 @@ where
78109
Err(Error::ReorgDepthExceeded)
79110
}
80111

81-
fn try_next_with<F>(&mut self, to_data: F) -> Result<Option<Event<D>>, Error>
82-
where
83-
F: Fn(Header) -> D,
84-
{
112+
fn try_next(&mut self) -> Result<Option<Event<D>>, Error> {
85113
let mut cp = self.cp.clone();
86114

87115
let header = match self.header.take() {
@@ -111,10 +139,21 @@ where
111139
next_hash = next_header.hash;
112140
let next_height: u32 = next_header.height.try_into()?;
113141

114-
cp = cp.insert(
115-
next_height,
116-
to_data(self.client.get_block_header(&next_hash)?),
117-
);
142+
// Reconstruct the block header from the already-fetched GetBlockHeaderResult,
143+
// avoiding an extra `get_block_header` RPC call.
144+
let block_header = Header {
145+
version: next_header.version,
146+
prev_blockhash: next_header
147+
.previous_block_hash
148+
.unwrap_or_else(BlockHash::all_zeros),
149+
merkle_root: next_header.merkle_root,
150+
time: next_header.time as u32,
151+
bits: CompactTarget::from_unprefixed_hex(&next_header.bits)
152+
.map_err(|_| Error::InvalidBits(next_header.bits.clone()))?,
153+
nonce: next_header.nonce,
154+
};
155+
156+
cp = cp.insert(next_height, (self.to_data)(block_header));
118157

119158
let mut block = None;
120159
let filter = BlockFilter::new(self.client.get_block_filter(&next_hash)?.filter.as_slice());
@@ -132,14 +171,6 @@ where
132171

133172
Ok(Some(Event { cp, block }))
134173
}
135-
136-
/// Get the next event with a custom checkpoint data type.
137-
pub fn next_with<F>(&mut self, to_data: F) -> Option<Result<Event<D>, Error>>
138-
where
139-
F: Fn(Header) -> D,
140-
{
141-
self.try_next_with(to_data).transpose()
142-
}
143174
}
144175

145176
/// Event returned by [`FilterIter`].
@@ -165,12 +196,12 @@ impl Event {
165196

166197
impl<D> Iterator for FilterIter<'_, D>
167198
where
168-
D: ToBlockHash + FromBlockHeader + Clone + Debug,
199+
D: ToBlockHash + Clone + Debug,
169200
{
170201
type Item = Result<Event<D>, Error>;
171202

172203
fn next(&mut self) -> Option<Self::Item> {
173-
self.try_next_with(D::from_blockheader).transpose()
204+
self.try_next().transpose()
174205
}
175206
}
176207

@@ -185,6 +216,8 @@ pub enum Error {
185216
ReorgDepthExceeded,
186217
/// Error converting an integer
187218
TryFromInt(core::num::TryFromIntError),
219+
/// Invalid bits string from RPC
220+
InvalidBits(String),
188221
}
189222

190223
impl core::fmt::Display for Error {
@@ -194,6 +227,7 @@ impl core::fmt::Display for Error {
194227
Self::Bip158(e) => write!(f, "{e}"),
195228
Self::ReorgDepthExceeded => write!(f, "maximum reorg depth exceeded"),
196229
Self::TryFromInt(e) => write!(f, "{e}"),
230+
Self::InvalidBits(s) => write!(f, "invalid bits string: {s}"),
197231
}
198232
}
199233
}

crates/bitcoind_rpc/src/lib.rs

Lines changed: 57 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ pub struct Emitter<C, D = BlockHash> {
5555
/// sure the tip block is already emitted. When a block is emitted, the transactions in the
5656
/// block are removed from this field.
5757
mempool_snapshot: HashMap<Txid, Arc<Transaction>>,
58+
59+
/// Closure to convert a block into checkpoint data `D`.
60+
to_cp_data: Box<dyn Fn(&Block) -> D + Send + Sync>,
5861
}
5962

6063
/// Indicates that there are no initially-expected mempool transactions.
@@ -67,24 +70,17 @@ impl<C, D> Emitter<C, D>
6770
where
6871
C: Deref,
6972
C::Target: RpcApi,
70-
D: ToBlockHash + Clone + Debug,
73+
D: ToBlockHash + Clone + Debug + 'static,
7174
{
72-
/// Construct a new [`Emitter`].
75+
/// Construct a new [`Emitter`] with a custom closure to convert blocks into checkpoint data.
7376
///
74-
/// `last_cp` informs the emitter of the chain we are starting off with. This way, the emitter
75-
/// can start emission from a block that connects to the original chain.
76-
///
77-
/// `start_height` starts emission from a given height (if there are no conflicts with the
78-
/// original chain).
79-
///
80-
/// `expected_mempool_txs` is the initial set of unconfirmed transactions provided by the
81-
/// wallet. This allows the [`Emitter`] to inform the wallet about relevant mempool evictions.
82-
/// If it is known that the wallet is empty, [`NO_EXPECTED_MEMPOOL_TXS`] can be used.
83-
pub fn new(
77+
/// Use [`new`](Self::new) for the common case where `D` implements [`FromBlockHeader`].
78+
pub fn new_with(
8479
client: C,
8580
last_cp: CheckPoint<D>,
8681
start_height: u32,
8782
expected_mempool_txs: impl IntoIterator<Item = impl Into<Arc<Transaction>>>,
83+
to_cp_data: impl Fn(&Block) -> D + Send + Sync + 'static,
8884
) -> Self {
8985
Self {
9086
client,
@@ -98,15 +94,56 @@ where
9894
(tx.compute_txid(), tx)
9995
})
10096
.collect(),
97+
to_cp_data: Box::new(to_cp_data),
10198
}
10299
}
100+
}
103101

102+
impl<C, D> Emitter<C, D>
103+
where
104+
C: Deref,
105+
C::Target: RpcApi,
106+
D: ToBlockHash + FromBlockHeader + Clone + Debug + 'static,
107+
{
108+
/// Construct a new [`Emitter`].
109+
///
110+
/// `last_cp` informs the emitter of the chain we are starting off with. This way, the emitter
111+
/// can start emission from a block that connects to the original chain.
112+
///
113+
/// `start_height` starts emission from a given height (if there are no conflicts with the
114+
/// original chain).
115+
///
116+
/// `expected_mempool_txs` is the initial set of unconfirmed transactions provided by the
117+
/// wallet. This allows the [`Emitter`] to inform the wallet about relevant mempool evictions.
118+
/// If it is known that the wallet is empty, [`NO_EXPECTED_MEMPOOL_TXS`] can be used.
119+
pub fn new(
120+
client: C,
121+
last_cp: CheckPoint<D>,
122+
start_height: u32,
123+
expected_mempool_txs: impl IntoIterator<Item = impl Into<Arc<Transaction>>>,
124+
) -> Self {
125+
Self::new_with(
126+
client,
127+
last_cp,
128+
start_height,
129+
expected_mempool_txs,
130+
|block| D::from_blockheader(block.header),
131+
)
132+
}
133+
}
134+
135+
impl<C, D> Emitter<C, D>
136+
where
137+
C: Deref,
138+
C::Target: RpcApi,
139+
D: ToBlockHash + Clone + Debug + 'static,
140+
{
104141
/// Emit mempool transactions and any evicted [`Txid`]s.
105142
///
106143
/// This method returns a [`MempoolEvent`] containing the full transactions (with their
107144
/// first-seen unix timestamps) that were emitted, and [`MempoolEvent::evicted`] which are
108145
/// any [`Txid`]s which were previously seen in the mempool and are now missing. Evicted txids
109-
/// are only reported once the emitters checkpoint matches the RPCs best block in both height
146+
/// are only reported once the emitter's checkpoint matches the RPC's best block in both height
110147
/// and hash. Until `next_block()` advances the checkpoint to tip, `mempool()` will always
111148
/// return an empty `evicted` set.
112149
#[cfg(feature = "std")]
@@ -200,18 +237,9 @@ where
200237
Ok(mempool_event)
201238
}
202239

203-
/// Emit the next block, using `to_data` to construct checkpoint data from the block.
204-
///
205-
/// This is the alternative to [`next_block`](Self::next_block) when [`FromBlockHeader`] isn't
206-
/// implemented for `D`.
207-
pub fn next_block_with<F>(
208-
&mut self,
209-
to_data: F,
210-
) -> Result<Option<BlockEvent<Block, D>>, bitcoincore_rpc::Error>
211-
where
212-
F: Fn(&Block) -> D,
213-
{
214-
if let Some((checkpoint, block)) = poll(self, to_data)? {
240+
/// Emit the next block height and block (if any).
241+
pub fn next_block(&mut self) -> Result<Option<BlockEvent<Block, D>>, bitcoincore_rpc::Error> {
242+
if let Some((checkpoint, block)) = poll(self)? {
215243
// Stop tracking unconfirmed transactions that have been confirmed in this block.
216244
for tx in &block.txdata {
217245
self.mempool_snapshot.remove(&tx.compute_txid());
@@ -220,14 +248,6 @@ where
220248
}
221249
Ok(None)
222250
}
223-
224-
/// Emit the next block height and block (if any).
225-
pub fn next_block(&mut self) -> Result<Option<BlockEvent<Block, D>>, bitcoincore_rpc::Error>
226-
where
227-
D: FromBlockHeader,
228-
{
229-
self.next_block_with(|block| D::from_blockheader(block.header))
230-
}
231251
}
232252

233253
/// A new emission from mempool.
@@ -347,23 +367,21 @@ where
347367
Ok(PollResponse::AgreementPointNotFound(genesis_hash))
348368
}
349369

350-
fn poll<C, D, F>(
370+
fn poll<C, D>(
351371
emitter: &mut Emitter<C, D>,
352-
to_cp_data: F,
353372
) -> Result<Option<(CheckPoint<D>, Block)>, bitcoincore_rpc::Error>
354373
where
355374
C: Deref,
356375
C::Target: RpcApi,
357-
D: ToBlockHash + Clone + Debug,
358-
F: Fn(&Block) -> D,
376+
D: ToBlockHash + Clone + Debug + 'static,
359377
{
360378
let client = &emitter.client;
361379
loop {
362380
match poll_once(emitter)? {
363381
PollResponse::Block(res) => {
364382
let height = res.height as u32;
365383
let block = client.get_block(&res.hash)?;
366-
let cp_data = to_cp_data(&block);
384+
let cp_data = (emitter.to_cp_data)(&block);
367385

368386
let new_cp = emitter
369387
.last_cp
@@ -390,7 +408,7 @@ where
390408
}
391409
PollResponse::AgreementPointNotFound(genesis_hash) => {
392410
let block = client.get_block(&genesis_hash)?;
393-
let cp_data = to_cp_data(&block);
411+
let cp_data = (emitter.to_cp_data)(&block);
394412
emitter.last_cp = CheckPoint::new(0, cp_data);
395413
emitter.last_block = None;
396414
continue;

0 commit comments

Comments
 (0)