Skip to content

Commit 8f39e68

Browse files
author
Richard Patel
committed
Allow ignoring programs
1 parent a7e8aeb commit 8f39e68

File tree

6 files changed

+99
-5
lines changed

6 files changed

+99
-5
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,9 @@ crate-type = ["cdylib", "rlib"]
1313
[dependencies]
1414
prost = "0.9"
1515
rdkafka = { version = "0.28", features = ["ssl-vendored", "sasl"] }
16-
solana-logger = { version = "=1.9.9" }
1716
solana-accountsdb-plugin-interface = { version = "=1.9.9" }
17+
solana-logger = { version = "=1.9.9" }
18+
solana-program = { version = "=1.9.9" }
1819
log = "0.4"
1920
serde_json = "1.0"
2021
serde = { version = "1.0", features = ["derive"] }

README.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,12 @@ Config is specified via the plugin's JSON config file.
3939
"partitioner": "murmur2_random"
4040
},
4141
"shutdown_timeout_ms": 30000,
42-
"update_account_topic": "solana.testnet.account_updates"
42+
"update_account_topic": "solana.testnet.account_updates",
43+
"slot_status_topic": "solana.testnet.slot_status",
44+
"program_ignores": [
45+
"Sysvar1111111111111111111111111111111111111",
46+
"Vote111111111111111111111111111111111111111"
47+
],
4348
}
4449
```
4550

@@ -50,6 +55,8 @@ Config is specified via the plugin's JSON config file.
5055
This plugin overrides the defaults as seen in the example config.
5156
- `shutdown_timeout_ms`: Time the plugin is given to flush out all messages to Kafka upon exit request.
5257
- `update_account_topic`: Topic name of account updates. Omit to disable.
58+
- `slot_status_topic`: Topic name of slot status update. Omit to disable.
59+
- `program_ignores`: Solana program IDs for which to ignore updates for owned accounts.
5360

5461
## Buffering
5562

src/config.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ pub struct Config {
4040
/// Kafka topic to send slot status updates to.
4141
#[serde(default)]
4242
pub slot_status_topic: String,
43+
/// List of programs to ignore.
44+
#[serde(default)]
45+
pub program_ignores: Vec<String>,
4346
}
4447

4548
impl Default for Config {
@@ -49,6 +52,7 @@ impl Default for Config {
4952
shutdown_timeout_ms: 30_000,
5053
update_account_topic: "".to_owned(),
5154
slot_status_topic: "".to_owned(),
55+
program_ignores: Vec::new(),
5256
}
5357
}
5458
}

src/filter.rs

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
// Copyright 2022 Blockdaemon Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use {
16+
crate::*,
17+
solana_program::pubkey::Pubkey,
18+
std::{collections::HashSet, str::FromStr},
19+
};
20+
21+
pub struct Filter {
22+
program_ignores: HashSet<[u8; 32]>,
23+
}
24+
25+
impl Filter {
26+
pub fn new(config: &Config) -> Self {
27+
Self {
28+
program_ignores: config
29+
.program_ignores
30+
.iter()
31+
.flat_map(|p| Pubkey::from_str(p).ok().map(|p| p.to_bytes()))
32+
.collect(),
33+
}
34+
}
35+
36+
pub fn wants_program(&self, program: &[u8]) -> bool {
37+
let key = match <&[u8; 32]>::try_from(program) {
38+
Ok(key) => key,
39+
_ => return true,
40+
};
41+
!self.program_ignores.contains(key)
42+
}
43+
}
44+
45+
#[cfg(test)]
46+
mod tests {
47+
use super::*;
48+
49+
#[test]
50+
fn test_filter() {
51+
let mut config = Config::default();
52+
config.program_ignores = vec![
53+
"Sysvar1111111111111111111111111111111111111".to_owned(),
54+
"Vote111111111111111111111111111111111111111".to_owned(),
55+
];
56+
57+
let filter = Filter::new(&config);
58+
assert_eq!(filter.program_ignores.len(), 2);
59+
60+
assert!(filter.wants_program(
61+
&Pubkey::from_str("9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin")
62+
.unwrap()
63+
.to_bytes()
64+
));
65+
assert!(!filter.wants_program(
66+
&Pubkey::from_str("Vote111111111111111111111111111111111111111")
67+
.unwrap()
68+
.to_bytes()
69+
));
70+
}
71+
}

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@ use solana_accountsdb_plugin_interface::accountsdb_plugin_interface::AccountsDbP
1616

1717
mod config;
1818
mod event;
19+
mod filter;
1920
mod plugin;
2021
mod publisher;
2122

2223
pub use {
2324
config::{Config, Producer},
2425
event::*,
26+
filter::Filter,
2527
plugin::KafkaPlugin,
2628
publisher::Publisher,
2729
};

src/plugin.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use {
2727
#[derive(Default)]
2828
pub struct KafkaPlugin {
2929
publisher: Option<Publisher>,
30+
filter: Option<Filter>,
3031
}
3132

3233
impl Debug for KafkaPlugin {
@@ -64,15 +65,15 @@ impl AccountsDbPlugin for KafkaPlugin {
6465

6566
let publisher = Publisher::new(producer, &config);
6667
self.publisher = Some(publisher);
68+
self.filter = Some(Filter::new(&config));
6769
info!("Spawned producer");
6870

6971
Ok(())
7072
}
7173

7274
fn on_unload(&mut self) {
73-
if let Some(publisher) = self.publisher.take() {
74-
drop(publisher);
75-
}
75+
self.publisher = None;
76+
self.filter = None;
7677
}
7778

7879
fn update_account(
@@ -86,6 +87,10 @@ impl AccountsDbPlugin for KafkaPlugin {
8687
}
8788

8889
let info = Self::unwrap_update_account(account);
90+
if !self.unwrap_filter().wants_program(info.pubkey) {
91+
return Ok(());
92+
}
93+
8994
let event = UpdateAccountEvent {
9095
slot,
9196
pubkey: info.pubkey.to_vec(),
@@ -143,6 +148,10 @@ impl KafkaPlugin {
143148
self.publisher.as_ref().expect("publisher is unavailable")
144149
}
145150

151+
fn unwrap_filter(&self) -> &Filter {
152+
self.filter.as_ref().expect("filter is unavailable")
153+
}
154+
146155
fn unwrap_update_account(account: ReplicaAccountInfoVersions) -> &ReplicaAccountInfo {
147156
match account {
148157
ReplicaAccountInfoVersions::V0_0_1(info) => info,

0 commit comments

Comments
 (0)