Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 110 additions & 14 deletions src/raw/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -759,20 +759,50 @@ impl<PdC: PdClient> Client<PdC> {
});
}
let backoff = DEFAULT_STORE_BACKOFF;
let mut range = range.into().encode_keyspace(self.keyspace, KeyMode::Raw);
let range = range.into().encode_keyspace(self.keyspace, KeyMode::Raw);
let mut result = Vec::new();
let mut current_limit = limit;
let (start_key, end_key) = range.clone().into_keys();
let mut current_key: Key = start_key;

// Reverse scan requires a bounded upper range to know where to start.
// Locating the last region is not implemented.
if reverse && end_key.is_none() {
return Err(Error::InternalError {
message: "reverse scan requires an upper bound (end_key)".to_string(),
});
}

// For forward scan: current_key tracks the lower bound, moving forward
// For reverse scan: current_key tracks the upper bound, moving backward
let mut current_key: Key = if reverse {
// Safe to unwrap: we validated end_key.is_some() above
end_key.clone().unwrap()
} else {
start_key.clone()
};

while current_limit > 0 {
let scan_args = ScanInnerArgs {
start_key: current_key.clone(),
end_key: end_key.clone(),
limit: current_limit,
key_only,
reverse,
backoff: backoff.clone(),
// Build scan arguments based on direction:
// - Forward: scan from current_key (moving lower bound) to end_key (fixed upper bound)
// - Reverse: scan from start_key (fixed lower bound) to current_key (moving upper bound)
let scan_args = if reverse {
ScanInnerArgs {
start_key: start_key.clone(),
end_key: Some(current_key.clone()),
limit: current_limit,
key_only,
reverse,
backoff: backoff.clone(),
}
} else {
ScanInnerArgs {
start_key: current_key.clone(),
end_key: end_key.clone(),
limit: current_limit,
key_only,
reverse,
backoff: backoff.clone(),
}
};
let (res, next_key) = self.retryable_scan(scan_args).await?;

Expand All @@ -784,11 +814,33 @@ impl<PdC: PdClient> Client<PdC> {
current_limit -= kvs.len() as u32;
result.append(&mut kvs);
}
if end_key.clone().is_some_and(|ek| ek <= next_key) {
break;

// Determine if we should continue to the next region
if reverse {
// For reverse scan: next_key is the region's start_key (lower boundary)
// Stop if next_key is empty (reached the beginning) or
// if we've reached/passed the lower bound of our scan range
if next_key.is_empty() || next_key <= start_key {
break;
}
// Safety: if next_key >= current_key, we've made no progress
// (shouldn't happen with boundary fix, but prevents infinite loop)
if next_key >= current_key {
break;
}
current_key = next_key;
} else {
// For forward scan: next_key is the region's end_key (upper boundary)
// Stop if next_key is empty (reached the end) or
// if we've reached/passed the upper bound of our scan range
if next_key.is_empty() || end_key.clone().is_some_and(|ek| ek <= next_key) {
break;
}
Copy link

Copilot AI Jan 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a safety check similar to the reverse scan case to prevent infinite loops if next_key doesn't advance. The forward scan path should verify that next_key is strictly greater than current_key before continuing, otherwise break to avoid an infinite loop.

Suggested change
}
}
// Safety: ensure progress is made; if next_key does not advance,
// break to avoid an infinite loop
if next_key <= current_key {
break;
}

Copilot uses AI. Check for mistakes.
// Safety: if next_key <= current_key, we've made no progress
if next_key <= current_key {
break;
}
current_key = next_key;
range = BoundRange::new(std::ops::Bound::Included(current_key.clone()), range.to);
}
}

Expand All @@ -807,8 +859,45 @@ impl<PdC: PdClient> Client<PdC> {
) -> Result<(Option<RawScanResponse>, Key)> {
let start_key = scan_args.start_key;
let end_key = scan_args.end_key;
let reverse = scan_args.reverse;
loop {
let region = self.rpc.clone().region_for_key(&start_key).await?;
// For forward scan: select region containing start_key (lower bound)
// For reverse scan: select region containing end_key (upper bound)
// because TiKV reverse scan starts from the upper bound and goes backward.
//
// When reverse=true, new_raw_scan_request swaps the keys so that TiKV receives
// end_key as its start_key. We must select the region containing that key.
let region_lookup_key: &Key = if reverse {
// For reverse scan, end_key is guaranteed to be Some (validated in scan_inner)
end_key.as_ref().unwrap()
} else {
&start_key
};
let mut region = self.rpc.clone().region_for_key(region_lookup_key).await?;

// For reverse scan: if the lookup key equals the region's start_key exactly,
// we're at a boundary and need the previous region. This happens when iterating
// backward and current_key lands on a region boundary.
if reverse {
let region_start = region.start_key();
if !region_start.is_empty() && *region_lookup_key == region_start {
// Find the previous region by computing the key immediately before
// this boundary. Decrement the last byte with borrow propagation.
let prev_key = {
let mut key: Vec<u8> = region_start.into();
while let Some(last) = key.last_mut() {
if *last > 0 {
*last -= 1;
break;
}
key.pop(); // byte was 0, borrow from previous
}
key
};
// Always look up - empty prev_key correctly returns the first region
region = self.rpc.clone().region_for_key(&prev_key.into()).await?;
}
}
let store = self.rpc.clone().store_for_id(region.id()).await?;
let request = new_raw_scan_request(
(start_key.clone(), end_key.clone()).into(),
Expand All @@ -833,7 +922,14 @@ impl<PdC: PdClient> Client<PdC> {
return Err(RegionError(Box::new(err)));
}
}
Ok((Some(r), region.end_key()))
// For forward scan: next region starts at this region's end_key
// For reverse scan: next region ends at this region's start_key
let next_key = if reverse {
region.start_key()
} else {
region.end_key()
};
Ok((Some(r), next_key))
}
Err(err) => Err(err),
};
Expand Down