Skip to content

Commit 2a21c0f

Browse files
benthecarmanclaude
andcommitted
Add FilesystemStoreV2 with paginated listing support
Implements PaginatedKVStore traits with timestamp-prefixed filenames for newest-first pagination and [empty] directory markers for consistent namespace hierarchy. Co-Authored-By: Claude Opus 4.5 <[email protected]>
1 parent 3269813 commit 2a21c0f

File tree

4 files changed

+801
-39
lines changed

4 files changed

+801
-39
lines changed

lightning-persister/src/fs_store/common.rs

Lines changed: 136 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
//! Common utilities shared between [`FilesystemStore`].
1+
//! Common utilities shared between [`FilesystemStore`] and [`FilesystemStoreV2`] implementations.
22
//!
33
//! [`FilesystemStore`]: crate::fs_store::v1::FilesystemStore
4+
//! [`FilesystemStoreV2`]: crate::fs_store::v2::FilesystemStoreV2
45
56
use crate::utils::{check_namespace_key_validity, is_valid_kvstore_str};
67

@@ -45,6 +46,11 @@ fn path_to_windows_str<T: AsRef<OsStr>>(path: &T) -> Vec<u16> {
4546
// a consistent view and error out.
4647
const LIST_DIR_CONSISTENCY_RETRIES: usize = 10;
4748

49+
// The directory name used for empty namespaces in v2.
50+
// Uses brackets which are not in KVSTORE_NAMESPACE_KEY_ALPHABET, preventing collisions
51+
// with valid namespace names.
52+
pub(crate) const EMPTY_NAMESPACE_DIR: &str = "[empty]";
53+
4854
/// Inner state shared between sync and async operations for filesystem stores.
4955
///
5056
/// This struct manages the data directory, temporary file counter, and per-path locks
@@ -103,6 +109,19 @@ impl FilesystemStoreState {
103109
let outer_lock = self.inner.locks.lock().unwrap();
104110
outer_lock.len()
105111
}
112+
113+
pub(crate) fn get_checked_dest_file_path(
114+
&self, primary_namespace: &str, secondary_namespace: &str, key: Option<&str>,
115+
operation: &str, use_empty_ns_dir: bool,
116+
) -> lightning::io::Result<PathBuf> {
117+
self.inner.get_checked_dest_file_path(
118+
primary_namespace,
119+
secondary_namespace,
120+
key,
121+
operation,
122+
use_empty_ns_dir,
123+
)
124+
}
106125
}
107126

108127
impl FilesystemStoreInner {
@@ -112,7 +131,7 @@ impl FilesystemStoreInner {
112131
}
113132

114133
fn get_dest_dir_path(
115-
&self, primary_namespace: &str, secondary_namespace: &str,
134+
&self, primary_namespace: &str, secondary_namespace: &str, use_empty_ns_dir: bool,
116135
) -> std::io::Result<PathBuf> {
117136
let mut dest_dir_path = {
118137
#[cfg(target_os = "windows")]
@@ -127,21 +146,35 @@ impl FilesystemStoreInner {
127146
}
128147
};
129148

130-
dest_dir_path.push(primary_namespace);
131-
if !secondary_namespace.is_empty() {
132-
dest_dir_path.push(secondary_namespace);
149+
if use_empty_ns_dir {
150+
dest_dir_path.push(if primary_namespace.is_empty() {
151+
EMPTY_NAMESPACE_DIR
152+
} else {
153+
primary_namespace
154+
});
155+
dest_dir_path.push(if secondary_namespace.is_empty() {
156+
EMPTY_NAMESPACE_DIR
157+
} else {
158+
secondary_namespace
159+
});
160+
} else {
161+
dest_dir_path.push(primary_namespace);
162+
if !secondary_namespace.is_empty() {
163+
dest_dir_path.push(secondary_namespace);
164+
}
133165
}
134166

135167
Ok(dest_dir_path)
136168
}
137169

138170
fn get_checked_dest_file_path(
139171
&self, primary_namespace: &str, secondary_namespace: &str, key: Option<&str>,
140-
operation: &str,
172+
operation: &str, use_empty_ns_dir: bool,
141173
) -> lightning::io::Result<PathBuf> {
142174
check_namespace_key_validity(primary_namespace, secondary_namespace, key, operation)?;
143175

144-
let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?;
176+
let mut dest_file_path =
177+
self.get_dest_dir_path(primary_namespace, secondary_namespace, use_empty_ns_dir)?;
145178
if let Some(key) = key {
146179
dest_file_path.push(key);
147180
}
@@ -217,8 +250,13 @@ impl FilesystemStoreInner {
217250
/// returns early without writing.
218251
fn write_version(
219252
&self, inner_lock_ref: Arc<RwLock<u64>>, dest_file_path: PathBuf, buf: Vec<u8>,
220-
version: u64,
253+
version: u64, preserve_mtime: bool,
221254
) -> lightning::io::Result<()> {
255+
let mtime = if preserve_mtime {
256+
fs::metadata(&dest_file_path).ok().and_then(|m| m.modified().ok())
257+
} else {
258+
None
259+
};
222260
let parent_directory = dest_file_path.parent().ok_or_else(|| {
223261
let msg =
224262
format!("Could not retrieve parent directory of {}.", dest_file_path.display());
@@ -238,6 +276,13 @@ impl FilesystemStoreInner {
238276
{
239277
let mut tmp_file = fs::File::create(&tmp_file_path)?;
240278
tmp_file.write_all(&buf)?;
279+
280+
// If we need to preserve the original mtime (for updates), set it before fsync.
281+
if let Some(mtime) = mtime {
282+
let times = fs::FileTimes::new().set_modified(mtime);
283+
tmp_file.set_times(times)?;
284+
}
285+
241286
tmp_file.sync_all()?;
242287
}
243288

@@ -370,13 +415,15 @@ impl FilesystemStoreInner {
370415
})
371416
}
372417

373-
fn list(&self, prefixed_dest: PathBuf) -> lightning::io::Result<Vec<String>> {
418+
fn list(
419+
&self, prefixed_dest: PathBuf, retry_on_race: bool,
420+
) -> lightning::io::Result<Vec<String>> {
374421
if !Path::new(&prefixed_dest).exists() {
375422
return Ok(Vec::new());
376423
}
377424

378425
let mut keys;
379-
let mut retries = LIST_DIR_CONSISTENCY_RETRIES;
426+
let mut retries = if retry_on_race { LIST_DIR_CONSISTENCY_RETRIES } else { 0 };
380427

381428
'retry_list: loop {
382429
keys = Vec::new();
@@ -418,64 +465,73 @@ impl FilesystemStoreInner {
418465
impl FilesystemStoreState {
419466
pub(crate) fn read_impl(
420467
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
468+
use_empty_ns_dir: bool,
421469
) -> Result<Vec<u8>, lightning::io::Error> {
422470
let path = self.inner.get_checked_dest_file_path(
423471
primary_namespace,
424472
secondary_namespace,
425473
Some(key),
426474
"read",
475+
use_empty_ns_dir,
427476
)?;
428477
self.inner.read(path)
429478
}
430479

431480
pub(crate) fn write_impl(
432481
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
482+
use_empty_ns_dir: bool,
433483
) -> Result<(), lightning::io::Error> {
434484
let path = self.inner.get_checked_dest_file_path(
435485
primary_namespace,
436486
secondary_namespace,
437487
Some(key),
438488
"write",
489+
use_empty_ns_dir,
439490
)?;
440491
let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(path.clone());
441-
self.inner.write_version(inner_lock_ref, path, buf, version)
492+
self.inner.write_version(inner_lock_ref, path, buf, version, use_empty_ns_dir)
442493
}
443494

444495
pub(crate) fn remove_impl(
445496
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
497+
use_empty_ns_dir: bool,
446498
) -> Result<(), lightning::io::Error> {
447499
let path = self.inner.get_checked_dest_file_path(
448500
primary_namespace,
449501
secondary_namespace,
450502
Some(key),
451503
"remove",
504+
use_empty_ns_dir,
452505
)?;
453506
let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(path.clone());
454507
self.inner.remove_version(inner_lock_ref, path, lazy, version)
455508
}
456509

457510
pub(crate) fn list_impl(
458-
&self, primary_namespace: &str, secondary_namespace: &str,
511+
&self, primary_namespace: &str, secondary_namespace: &str, use_empty_ns_dir: bool,
459512
) -> Result<Vec<String>, lightning::io::Error> {
460513
let path = self.inner.get_checked_dest_file_path(
461514
primary_namespace,
462515
secondary_namespace,
463516
None,
464517
"list",
518+
use_empty_ns_dir,
465519
)?;
466-
self.inner.list(path)
520+
self.inner.list(path, !use_empty_ns_dir)
467521
}
468522

469523
#[cfg(feature = "tokio")]
470524
pub(crate) fn read_async(
471525
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
526+
use_empty_ns_dir: bool,
472527
) -> impl Future<Output = Result<Vec<u8>, lightning::io::Error>> + 'static + Send {
473528
let this = Arc::clone(&self.inner);
474529
let path = this.get_checked_dest_file_path(
475530
primary_namespace,
476531
secondary_namespace,
477532
Some(key),
478533
"read",
534+
use_empty_ns_dir,
479535
);
480536

481537
async move {
@@ -492,10 +548,17 @@ impl FilesystemStoreState {
492548
#[cfg(feature = "tokio")]
493549
pub(crate) fn write_async(
494550
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
551+
use_empty_ns_dir: bool,
495552
) -> impl Future<Output = Result<(), lightning::io::Error>> + 'static + Send {
496553
let this = Arc::clone(&self.inner);
497554
let path = this
498-
.get_checked_dest_file_path(primary_namespace, secondary_namespace, Some(key), "write")
555+
.get_checked_dest_file_path(
556+
primary_namespace,
557+
secondary_namespace,
558+
Some(key),
559+
"write",
560+
use_empty_ns_dir,
561+
)
499562
.map(|path| (self.get_new_version_and_lock_ref(path.clone()), path));
500563

501564
async move {
@@ -504,7 +567,7 @@ impl FilesystemStoreState {
504567
Err(e) => return Err(e),
505568
};
506569
tokio::task::spawn_blocking(move || {
507-
this.write_version(inner_lock_ref, path, buf, version)
570+
this.write_version(inner_lock_ref, path, buf, version, use_empty_ns_dir)
508571
})
509572
.await
510573
.unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)))
@@ -514,10 +577,17 @@ impl FilesystemStoreState {
514577
#[cfg(feature = "tokio")]
515578
pub(crate) fn remove_async(
516579
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
580+
use_empty_ns_dir: bool,
517581
) -> impl Future<Output = Result<(), lightning::io::Error>> + 'static + Send {
518582
let this = Arc::clone(&self.inner);
519583
let path = this
520-
.get_checked_dest_file_path(primary_namespace, secondary_namespace, Some(key), "remove")
584+
.get_checked_dest_file_path(
585+
primary_namespace,
586+
secondary_namespace,
587+
Some(key),
588+
"remove",
589+
use_empty_ns_dir,
590+
)
521591
.map(|path| (self.get_new_version_and_lock_ref(path.clone()), path));
522592

523593
async move {
@@ -535,32 +605,50 @@ impl FilesystemStoreState {
535605

536606
#[cfg(feature = "tokio")]
537607
pub(crate) fn list_async(
538-
&self, primary_namespace: &str, secondary_namespace: &str,
608+
&self, primary_namespace: &str, secondary_namespace: &str, use_empty_ns_dir: bool,
539609
) -> impl Future<Output = Result<Vec<String>, lightning::io::Error>> + 'static + Send {
540610
let this = Arc::clone(&self.inner);
541611

542-
let path =
543-
this.get_checked_dest_file_path(primary_namespace, secondary_namespace, None, "list");
612+
let path = this.get_checked_dest_file_path(
613+
primary_namespace,
614+
secondary_namespace,
615+
None,
616+
"list",
617+
use_empty_ns_dir,
618+
);
544619

545620
async move {
546621
let path = match path {
547622
Ok(path) => path,
548623
Err(e) => return Err(e),
549624
};
550-
tokio::task::spawn_blocking(move || this.list(path)).await.unwrap_or_else(|e| {
551-
Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))
552-
})
625+
tokio::task::spawn_blocking(move || this.list(path, !use_empty_ns_dir))
626+
.await
627+
.unwrap_or_else(|e| {
628+
Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))
629+
})
553630
}
554631
}
555632

556633
pub(crate) fn list_all_keys_impl(
557-
&self,
634+
&self, use_empty_ns_dir: bool,
558635
) -> Result<Vec<(String, String, String)>, lightning::io::Error> {
559636
let prefixed_dest = &self.inner.data_dir;
560637
if !prefixed_dest.exists() {
561638
return Ok(Vec::new());
562639
}
563640

641+
// When use_empty_ns_dir is true (v2), namespace directories may be named
642+
// [empty] to represent empty namespaces, so we resolve via namespace_from_dir_path.
643+
// When false (v1), directory names are always valid kvstore strings.
644+
let resolve_ns = |path: &Path, base: &Path| -> Result<String, lightning::io::Error> {
645+
if use_empty_ns_dir {
646+
namespace_from_dir_path(path)
647+
} else {
648+
get_key_from_dir_entry_path(path, base)
649+
}
650+
};
651+
564652
let mut keys = Vec::new();
565653

566654
'primary_loop: for primary_entry in fs::read_dir(prefixed_dest)? {
@@ -581,8 +669,7 @@ impl FilesystemStoreState {
581669
let secondary_path = secondary_entry.path();
582670

583671
if dir_entry_is_key(&secondary_entry)? {
584-
let primary_namespace =
585-
get_key_from_dir_entry_path(&primary_path, prefixed_dest)?;
672+
let primary_namespace = resolve_ns(&primary_path, prefixed_dest)?;
586673
let secondary_namespace = String::new();
587674
let key = get_key_from_dir_entry_path(&secondary_path, &primary_path)?;
588675
keys.push((primary_namespace, secondary_namespace, key));
@@ -595,10 +682,8 @@ impl FilesystemStoreState {
595682
let tertiary_path = tertiary_entry.path();
596683

597684
if dir_entry_is_key(&tertiary_entry)? {
598-
let primary_namespace =
599-
get_key_from_dir_entry_path(&primary_path, prefixed_dest)?;
600-
let secondary_namespace =
601-
get_key_from_dir_entry_path(&secondary_path, &primary_path)?;
685+
let primary_namespace = resolve_ns(&primary_path, prefixed_dest)?;
686+
let secondary_namespace = resolve_ns(&secondary_path, &primary_path)?;
602687
let key = get_key_from_dir_entry_path(&tertiary_path, &secondary_path)?;
603688
keys.push((primary_namespace, secondary_namespace, key));
604689
} else {
@@ -623,6 +708,25 @@ impl FilesystemStoreState {
623708
}
624709
}
625710

711+
/// Extracts a namespace string from a directory path, converting [`EMPTY_NAMESPACE_DIR`] to an
712+
/// empty string.
713+
fn namespace_from_dir_path(path: &Path) -> Result<String, lightning::io::Error> {
714+
let name = path.file_name().and_then(|n| n.to_str()).ok_or_else(|| {
715+
lightning::io::Error::new(
716+
lightning::io::ErrorKind::Other,
717+
format!(
718+
"Failed to extract namespace from path {}",
719+
PrintableString(path.to_str().unwrap_or_default())
720+
),
721+
)
722+
})?;
723+
if name == EMPTY_NAMESPACE_DIR {
724+
Ok(String::new())
725+
} else {
726+
Ok(name.to_string())
727+
}
728+
}
729+
626730
fn dir_entry_is_key(dir_entry: &fs::DirEntry) -> Result<bool, lightning::io::Error> {
627731
let p = dir_entry.path();
628732
if let Some(ext) = p.extension() {
@@ -663,7 +767,9 @@ fn dir_entry_is_key(dir_entry: &fs::DirEntry) -> Result<bool, lightning::io::Err
663767
Ok(true)
664768
}
665769

666-
fn get_key_from_dir_entry_path(p: &Path, base_path: &Path) -> Result<String, lightning::io::Error> {
770+
pub(crate) fn get_key_from_dir_entry_path(
771+
p: &Path, base_path: &Path,
772+
) -> Result<String, lightning::io::Error> {
667773
match p.strip_prefix(&base_path) {
668774
Ok(stripped_path) => {
669775
if let Some(relative_path) = stripped_path.to_str() {
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
//! Implementations of filesystem-backed key-value stores.
22
33
pub mod v1;
4+
pub mod v2;
45

56
pub(crate) mod common;

0 commit comments

Comments
 (0)