Skip to content

Commit c989f8f

Browse files
committed
feat(bitcoind_rpc)!: Emitters support different checkpoint data types
1 parent 69056ee commit c989f8f

File tree

3 files changed

+131
-83
lines changed

3 files changed

+131
-83
lines changed

crates/bitcoind_rpc/src/bip158.rs

Lines changed: 83 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,13 @@
66
//! [0]: https://github.com/bitcoin/bips/blob/master/bip-0157.mediawiki
77
//! [1]: https://github.com/bitcoin/bips/blob/master/bip-0158.mediawiki
88
9+
use core::fmt::Debug;
10+
911
use bdk_core::bitcoin;
1012
use bdk_core::CheckPoint;
13+
use bdk_core::FromBlockHeader;
14+
use bdk_core::ToBlockHash;
15+
use bitcoin::block::Header;
1116
use bitcoin::BlockHash;
1217
use bitcoin::{bip158::BlockFilter, Block, ScriptBuf};
1318
use bitcoincore_rpc;
@@ -29,22 +34,25 @@ use bitcoincore_rpc::{json::GetBlockHeaderResult, RpcApi};
2934
/// Events contain the updated checkpoint `cp` which may be incorporated into the local chain
3035
/// state to stay in sync with the tip.
3136
#[derive(Debug)]
32-
pub struct FilterIter<'a> {
37+
pub struct FilterIter<'a, D = BlockHash> {
3338
/// RPC client
3439
client: &'a bitcoincore_rpc::Client,
3540
/// SPK inventory
3641
spks: Vec<ScriptBuf>,
3742
/// checkpoint
38-
cp: CheckPoint<BlockHash>,
43+
cp: CheckPoint<D>,
3944
/// Header info, contains the prev and next hashes for each header.
4045
header: Option<GetBlockHeaderResult>,
4146
}
4247

43-
impl<'a> FilterIter<'a> {
48+
impl<'a, D> FilterIter<'a, D>
49+
where
50+
D: ToBlockHash + Clone + Debug,
51+
{
4452
/// Construct [`FilterIter`] with checkpoint, RPC client and SPKs.
4553
pub fn new(
4654
client: &'a bitcoincore_rpc::Client,
47-
cp: CheckPoint,
55+
cp: CheckPoint<D>,
4856
spks: impl IntoIterator<Item = ScriptBuf>,
4957
) -> Self {
5058
Self {
@@ -69,13 +77,76 @@ impl<'a> FilterIter<'a> {
6977
}
7078
Err(Error::ReorgDepthExceeded)
7179
}
80+
81+
fn try_next_with<F>(&mut self, to_data: F) -> Result<Option<Event<D>>, Error>
82+
where
83+
F: Fn(Header) -> D,
84+
{
85+
let mut cp = self.cp.clone();
86+
87+
let header = match self.header.take() {
88+
Some(header) => header,
89+
// If no header is cached we need to locate a base of the local
90+
// checkpoint from which the scan may proceed.
91+
None => self.find_base()?,
92+
};
93+
94+
let mut next_hash = match header.next_block_hash {
95+
Some(hash) => hash,
96+
None => return Ok(None),
97+
};
98+
99+
let mut next_header = self.client.get_block_header_info(&next_hash)?;
100+
101+
// In case of a reorg, rewind by fetching headers of previous hashes until we find
102+
// one with enough confirmations.
103+
while next_header.confirmations < 0 {
104+
let prev_hash = next_header
105+
.previous_block_hash
106+
.ok_or(Error::ReorgDepthExceeded)?;
107+
let prev_header = self.client.get_block_header_info(&prev_hash)?;
108+
next_header = prev_header;
109+
}
110+
111+
next_hash = next_header.hash;
112+
let next_height: u32 = next_header.height.try_into()?;
113+
114+
cp = cp.insert(
115+
next_height,
116+
to_data(self.client.get_block_header(&next_hash)?),
117+
);
118+
119+
let mut block = None;
120+
let filter = BlockFilter::new(self.client.get_block_filter(&next_hash)?.filter.as_slice());
121+
if filter
122+
.match_any(&next_hash, self.spks.iter().map(ScriptBuf::as_ref))
123+
.map_err(Error::Bip158)?
124+
{
125+
block = Some(self.client.get_block(&next_hash)?);
126+
}
127+
128+
// Store the next header
129+
self.header = Some(next_header);
130+
// Update self.cp
131+
self.cp = cp.clone();
132+
133+
Ok(Some(Event { cp, block }))
134+
}
135+
136+
/// Get the next event with a custom checkpoint data type.
137+
pub fn next_with<F>(&mut self, to_data: F) -> Option<Result<Event<D>, Error>>
138+
where
139+
F: Fn(Header) -> D,
140+
{
141+
self.try_next_with(to_data).transpose()
142+
}
72143
}
73144

74145
/// Event returned by [`FilterIter`].
75146
#[derive(Debug, Clone)]
76-
pub struct Event {
147+
pub struct Event<D = BlockHash> {
77148
/// Checkpoint
78-
pub cp: CheckPoint,
149+
pub cp: CheckPoint<D>,
79150
/// Block, will be `Some(..)` for matching blocks
80151
pub block: Option<Block>,
81152
}
@@ -92,60 +163,14 @@ impl Event {
92163
}
93164
}
94165

95-
impl Iterator for FilterIter<'_> {
96-
type Item = Result<Event, Error>;
166+
impl<D> Iterator for FilterIter<'_, D>
167+
where
168+
D: ToBlockHash + FromBlockHeader + Clone + Debug,
169+
{
170+
type Item = Result<Event<D>, Error>;
97171

98172
fn next(&mut self) -> Option<Self::Item> {
99-
(|| -> Result<Option<_>, Error> {
100-
let mut cp = self.cp.clone();
101-
102-
let header = match self.header.take() {
103-
Some(header) => header,
104-
// If no header is cached we need to locate a base of the local
105-
// checkpoint from which the scan may proceed.
106-
None => self.find_base()?,
107-
};
108-
109-
let mut next_hash = match header.next_block_hash {
110-
Some(hash) => hash,
111-
None => return Ok(None),
112-
};
113-
114-
let mut next_header = self.client.get_block_header_info(&next_hash)?;
115-
116-
// In case of a reorg, rewind by fetching headers of previous hashes until we find
117-
// one with enough confirmations.
118-
while next_header.confirmations < 0 {
119-
let prev_hash = next_header
120-
.previous_block_hash
121-
.ok_or(Error::ReorgDepthExceeded)?;
122-
let prev_header = self.client.get_block_header_info(&prev_hash)?;
123-
next_header = prev_header;
124-
}
125-
126-
next_hash = next_header.hash;
127-
let next_height: u32 = next_header.height.try_into()?;
128-
129-
cp = cp.insert(next_height, next_hash);
130-
131-
let mut block = None;
132-
let filter =
133-
BlockFilter::new(self.client.get_block_filter(&next_hash)?.filter.as_slice());
134-
if filter
135-
.match_any(&next_hash, self.spks.iter().map(ScriptBuf::as_ref))
136-
.map_err(Error::Bip158)?
137-
{
138-
block = Some(self.client.get_block(&next_hash)?);
139-
}
140-
141-
// Store the next header
142-
self.header = Some(next_header);
143-
// Update self.cp
144-
self.cp = cp.clone();
145-
146-
Ok(Some(Event { cp, block }))
147-
})()
148-
.transpose()
173+
self.try_next_with(D::from_blockheader).transpose()
149174
}
150175
}
151176

crates/bitcoind_rpc/src/lib.rs

Lines changed: 46 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,11 @@ extern crate alloc;
1616

1717
use alloc::sync::Arc;
1818
use bdk_core::collections::{HashMap, HashSet};
19-
use bdk_core::{BlockId, CheckPoint};
19+
use bdk_core::{BlockId, CheckPoint, FromBlockHeader, ToBlockHash};
2020
use bitcoin::{Block, BlockHash, Transaction, Txid};
2121
use bitcoincore_rpc::{bitcoincore_rpc_json, RpcApi};
2222
use core::ops::Deref;
23+
use std::fmt::Debug;
2324

2425
pub mod bip158;
2526

@@ -30,13 +31,13 @@ pub use bitcoincore_rpc;
3031
/// Refer to [module-level documentation] for more.
3132
///
3233
/// [module-level documentation]: crate
33-
pub struct Emitter<C> {
34+
pub struct Emitter<C, D = BlockHash> {
3435
client: C,
3536
start_height: u32,
3637

3738
/// The checkpoint of the last-emitted block that is in the best chain. If it is later found
3839
/// that the block is no longer in the best chain, it will be popped off from here.
39-
last_cp: CheckPoint<BlockHash>,
40+
last_cp: CheckPoint<D>,
4041

4142
/// The block result returned from rpc of the last-emitted block. As this result contains the
4243
/// next block's block hash (which we use to fetch the next block), we set this to `None`
@@ -62,10 +63,11 @@ pub struct Emitter<C> {
6263
/// to start empty (i.e. with no unconfirmed transactions).
6364
pub const NO_EXPECTED_MEMPOOL_TXS: core::iter::Empty<Arc<Transaction>> = core::iter::empty();
6465

65-
impl<C> Emitter<C>
66+
impl<C, D> Emitter<C, D>
6667
where
6768
C: Deref,
6869
C::Target: RpcApi,
70+
D: ToBlockHash + Clone + Debug,
6971
{
7072
/// Construct a new [`Emitter`].
7173
///
@@ -80,7 +82,7 @@ where
8082
/// If it is known that the wallet is empty, [`NO_EXPECTED_MEMPOOL_TXS`] can be used.
8183
pub fn new(
8284
client: C,
83-
last_cp: CheckPoint<BlockHash>,
85+
last_cp: CheckPoint<D>,
8486
start_height: u32,
8587
expected_mempool_txs: impl IntoIterator<Item = impl Into<Arc<Transaction>>>,
8688
) -> Self {
@@ -198,9 +200,18 @@ where
198200
Ok(mempool_event)
199201
}
200202

201-
/// Emit the next block height and block (if any).
202-
pub fn next_block(&mut self) -> Result<Option<BlockEvent<Block>>, bitcoincore_rpc::Error> {
203-
if let Some((checkpoint, block)) = poll(self, move |hash, client| client.get_block(hash))? {
203+
/// Emit the next block, using `to_data` to construct checkpoint data from the block.
204+
///
205+
/// This is the alternative to [`next_block`](Self::next_block) when [`FromBlockHeader`] isn't
206+
/// implemented for `D`.
207+
pub fn next_block_with<F>(
208+
&mut self,
209+
to_data: F,
210+
) -> Result<Option<BlockEvent<Block, D>>, bitcoincore_rpc::Error>
211+
where
212+
F: Fn(&Block) -> D,
213+
{
214+
if let Some((checkpoint, block)) = poll(self, to_data)? {
204215
// Stop tracking unconfirmed transactions that have been confirmed in this block.
205216
for tx in &block.txdata {
206217
self.mempool_snapshot.remove(&tx.compute_txid());
@@ -209,6 +220,14 @@ where
209220
}
210221
Ok(None)
211222
}
223+
224+
/// Emit the next block height and block (if any).
225+
pub fn next_block(&mut self) -> Result<Option<BlockEvent<Block, D>>, bitcoincore_rpc::Error>
226+
where
227+
D: FromBlockHeader,
228+
{
229+
self.next_block_with(|block| D::from_blockheader(block.header))
230+
}
212231
}
213232

214233
/// A new emission from mempool.
@@ -223,7 +242,7 @@ pub struct MempoolEvent {
223242

224243
/// A newly emitted block from [`Emitter`].
225244
#[derive(Debug)]
226-
pub struct BlockEvent<B> {
245+
pub struct BlockEvent<B = Block, D = BlockHash> {
227246
/// The block.
228247
pub block: B,
229248

@@ -235,7 +254,7 @@ pub struct BlockEvent<B> {
235254
///
236255
/// This is important as BDK structures require block-to-apply to be connected with another
237256
/// block in the original chain.
238-
pub checkpoint: CheckPoint<BlockHash>,
257+
pub checkpoint: CheckPoint<D>,
239258
}
240259

241260
impl<B> BlockEvent<B> {
@@ -264,17 +283,17 @@ impl<B> BlockEvent<B> {
264283
}
265284
}
266285

267-
enum PollResponse {
286+
enum PollResponse<D = BlockHash> {
268287
Block(bitcoincore_rpc_json::GetBlockResult),
269288
NoMoreBlocks,
270289
/// Fetched block is not in the best chain.
271290
BlockNotInBestChain,
272-
AgreementFound(bitcoincore_rpc_json::GetBlockResult, CheckPoint<BlockHash>),
291+
AgreementFound(bitcoincore_rpc_json::GetBlockResult, CheckPoint<D>),
273292
/// Force the genesis checkpoint down the receiver's throat.
274293
AgreementPointNotFound(BlockHash),
275294
}
276295

277-
fn poll_once<C>(emitter: &Emitter<C>) -> Result<PollResponse, bitcoincore_rpc::Error>
296+
fn poll_once<C, D>(emitter: &Emitter<C, D>) -> Result<PollResponse<D>, bitcoincore_rpc::Error>
278297
where
279298
C: Deref,
280299
C::Target: RpcApi,
@@ -328,30 +347,32 @@ where
328347
Ok(PollResponse::AgreementPointNotFound(genesis_hash))
329348
}
330349

331-
fn poll<C, V, F>(
332-
emitter: &mut Emitter<C>,
333-
get_item: F,
334-
) -> Result<Option<(CheckPoint<BlockHash>, V)>, bitcoincore_rpc::Error>
350+
fn poll<C, D, F>(
351+
emitter: &mut Emitter<C, D>,
352+
to_cp_data: F,
353+
) -> Result<Option<(CheckPoint<D>, Block)>, bitcoincore_rpc::Error>
335354
where
336355
C: Deref,
337356
C::Target: RpcApi,
338-
F: Fn(&BlockHash, &C::Target) -> Result<V, bitcoincore_rpc::Error>,
357+
D: ToBlockHash + Clone + Debug,
358+
F: Fn(&Block) -> D,
339359
{
360+
let client = &emitter.client;
340361
loop {
341362
match poll_once(emitter)? {
342363
PollResponse::Block(res) => {
343364
let height = res.height as u32;
344-
let hash = res.hash;
345-
let item = get_item(&hash, &emitter.client)?;
365+
let block = client.get_block(&res.hash)?;
366+
let cp_data = to_cp_data(&block);
346367

347368
let new_cp = emitter
348369
.last_cp
349370
.clone()
350-
.push(height, hash)
371+
.push(height, cp_data)
351372
.expect("must push");
352373
emitter.last_cp = new_cp.clone();
353374
emitter.last_block = Some(res);
354-
return Ok(Some((new_cp, item)));
375+
return Ok(Some((new_cp, block)));
355376
}
356377
PollResponse::NoMoreBlocks => {
357378
emitter.last_block = None;
@@ -368,7 +389,9 @@ where
368389
continue;
369390
}
370391
PollResponse::AgreementPointNotFound(genesis_hash) => {
371-
emitter.last_cp = CheckPoint::new(0, genesis_hash);
392+
let block = client.get_block(&genesis_hash)?;
393+
let cp_data = to_cp_data(&block);
394+
emitter.last_cp = CheckPoint::new(0, cp_data);
372395
emitter.last_block = None;
373396
continue;
374397
}

crates/bitcoind_rpc/tests/test_filter_iter.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use bdk_bitcoind_rpc::bip158::{Error, FilterIter};
22
use bdk_core::CheckPoint;
33
use bdk_testenv::{anyhow, bitcoind, TestEnv};
4-
use bitcoin::{Address, Amount, Network, ScriptBuf};
4+
use bitcoin::{Address, Amount, BlockHash, Network, ScriptBuf};
55
use bitcoincore_rpc::RpcApi;
66

77
fn testenv() -> anyhow::Result<TestEnv> {
@@ -59,7 +59,7 @@ fn filter_iter_error_wrong_network() -> anyhow::Result<()> {
5959
let _ = env.mine_blocks(10, None)?;
6060

6161
// Try to initialize FilterIter with a CP on the wrong network
62-
let cp = CheckPoint::new(0, bitcoin::hashes::Hash::hash(b"wrong-hash"));
62+
let cp = CheckPoint::<BlockHash>::new(0, bitcoin::hashes::Hash::hash(b"wrong-hash"));
6363
let mut iter = FilterIter::new(&env.bitcoind.client, cp, [ScriptBuf::new()]);
6464
assert!(matches!(iter.next(), Some(Err(Error::ReorgDepthExceeded))));
6565

0 commit comments

Comments
 (0)