Skip to content

Commit ca93e7b

Browse files
[fix](filecache) self-heal stale DOWNLOADED entries on local NOT_FOUND (#60977)
Problem: In a rare restart window, BE can rebuild file-cache metadata in memory while the corresponding cache files are not yet durable on disk. If that metadata is also restored via LRU dump/load, blocks may appear as DOWNLOADED even though the local files are missing. Subsequent reads then produce false-positive cache hits, fail on local read, and repeatedly fall back to S3. This preserves correctness but causes avoidable cache thrashing and latency jitter. Root cause: The read path treated DOWNLOADED as a valid local hit source and fell back to remote reads on failure, but it did not actively invalidate stale metadata when the local cache file was gone.
1 parent 56c34a1 commit ca93e7b

File tree

2 files changed

+202
-0
lines changed

2 files changed

+202
-0
lines changed

be/src/io/cache/cached_remote_file_reader.cpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ bvar::Adder<uint64_t> g_read_cache_direct_partial_bytes(
8181
bvar::Adder<uint64_t> g_read_cache_indirect_bytes("cached_remote_reader_cache_indirect_bytes");
8282
bvar::Adder<uint64_t> g_read_cache_indirect_total_bytes(
8383
"cached_remote_reader_cache_indirect_total_bytes");
84+
bvar::Adder<uint64_t> g_read_cache_self_heal_on_not_found(
85+
"cached_remote_reader_self_heal_on_not_found");
8486
bvar::Window<bvar::Adder<uint64_t>> g_read_cache_indirect_bytes_1min_window(
8587
"cached_remote_reader_indirect_bytes_1min_window", &g_read_cache_indirect_bytes, 60);
8688
bvar::Window<bvar::Adder<uint64_t>> g_read_cache_indirect_total_bytes_1min_window(
@@ -488,6 +490,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
488490

489491
size_t current_offset = offset;
490492
size_t end_offset = offset + bytes_req - 1;
493+
bool need_self_heal = false;
491494
*bytes_read = 0;
492495
for (auto& block : holder.file_blocks) {
493496
if (current_offset > end_offset) {
@@ -549,6 +552,17 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
549552
if (is_dryrun) [[unlikely]] {
550553
// dryrun mode uses a null buffer, skip actual remote IO
551554
} else {
555+
if (block_state == FileBlock::State::DOWNLOADED &&
556+
st.is<ErrorCode::NOT_FOUND>()) {
557+
need_self_heal = true;
558+
g_read_cache_self_heal_on_not_found << 1;
559+
LOG_EVERY_N(WARNING, 100)
560+
<< "Cache block file is missing, will self-heal by clearing cache "
561+
"hash. "
562+
<< "path=" << path().native()
563+
<< ", hash=" << _cache_hash.to_string() << ", offset=" << left
564+
<< ", err=" << st.msg();
565+
}
552566
LOG(WARNING) << "Read data failed from file cache downloaded by others. err="
553567
<< st.msg() << ", block state=" << block_state;
554568
size_t nest_bytes_read {0};
@@ -568,6 +582,9 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
568582
*bytes_read += read_size;
569583
current_offset = right + 1;
570584
}
585+
if (need_self_heal && _cache != nullptr) {
586+
_cache->remove_if_cached_async(_cache_hash);
587+
}
571588
g_read_cache_indirect_bytes << indirect_read_bytes;
572589
g_read_cache_indirect_total_bytes << *bytes_read;
573590

be/test/io/cache/block_file_cache_test.cpp

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3566,6 +3566,191 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_error_handle) {
35663566
FileCacheFactory::instance()->_capacity = 0;
35673567
}
35683568

3569+
extern bvar::Adder<uint64_t> g_read_cache_self_heal_on_not_found;
3570+
3571+
TEST_F(BlockFileCacheTest, cached_remote_file_reader_self_heal_on_downloaded_not_found) {
3572+
bool origin_enable_direct_read = config::enable_read_cache_file_directly;
3573+
config::enable_read_cache_file_directly = false;
3574+
Defer reset_direct_read {
3575+
[&] { config::enable_read_cache_file_directly = origin_enable_direct_read; }};
3576+
3577+
std::string cache_base_path =
3578+
caches_dir / "cached_remote_reader_self_heal_on_downloaded_not_found" / "";
3579+
if (fs::exists(cache_base_path)) {
3580+
fs::remove_all(cache_base_path);
3581+
}
3582+
fs::create_directories(cache_base_path);
3583+
3584+
io::FileCacheSettings settings;
3585+
settings.query_queue_size = 6291456;
3586+
settings.query_queue_elements = 6;
3587+
settings.index_queue_size = 1048576;
3588+
settings.index_queue_elements = 1;
3589+
settings.disposable_queue_size = 1048576;
3590+
settings.disposable_queue_elements = 1;
3591+
settings.capacity = 8388608;
3592+
settings.max_file_block_size = 1048576;
3593+
settings.max_query_cache_size = 0;
3594+
ASSERT_TRUE(FileCacheFactory::instance()->create_file_cache(cache_base_path, settings).ok());
3595+
auto cache = FileCacheFactory::instance()->_path_to_cache[cache_base_path];
3596+
for (int i = 0; i < 100; i++) {
3597+
if (cache->get_async_open_success()) {
3598+
break;
3599+
}
3600+
std::this_thread::sleep_for(std::chrono::milliseconds(1));
3601+
}
3602+
3603+
FileReaderSPtr local_reader;
3604+
ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader));
3605+
io::FileReaderOptions opts;
3606+
opts.cache_type = io::cache_type_from_string("file_block_cache");
3607+
opts.is_doris_table = true;
3608+
CachedRemoteFileReader reader(local_reader, opts);
3609+
3610+
uint64_t before_self_heal = g_read_cache_self_heal_on_not_found.get_value();
3611+
3612+
std::string buffer(64_kb, '\0');
3613+
IOContext io_ctx;
3614+
FileCacheStatistics stats;
3615+
io_ctx.file_cache_stats = &stats;
3616+
size_t bytes_read = 0;
3617+
ASSERT_TRUE(reader.read_at(0, Slice(buffer.data(), buffer.size()), &bytes_read, &io_ctx).ok());
3618+
EXPECT_EQ(std::string(64_kb, '0'), buffer);
3619+
3620+
auto key = io::BlockFileCache::hash("tmp_file");
3621+
{
3622+
io::CacheContext inspect_ctx;
3623+
ReadStatistics inspect_stats;
3624+
inspect_ctx.stats = &inspect_stats;
3625+
inspect_ctx.cache_type = io::FileCacheType::NORMAL;
3626+
auto inspect_holder = cache->get_or_set(key, 0, 64_kb, inspect_ctx);
3627+
auto inspect_blocks = fromHolder(inspect_holder);
3628+
ASSERT_EQ(inspect_blocks.size(), 1);
3629+
ASSERT_EQ(inspect_blocks[0]->state(), io::FileBlock::State::DOWNLOADED);
3630+
std::string cache_file = inspect_blocks[0]->get_cache_file();
3631+
ASSERT_TRUE(fs::exists(cache_file));
3632+
ASSERT_TRUE(global_local_filesystem()->delete_file(cache_file).ok());
3633+
ASSERT_FALSE(fs::exists(cache_file));
3634+
}
3635+
3636+
ASSERT_TRUE(reader.read_at(0, Slice(buffer.data(), buffer.size()), &bytes_read, &io_ctx).ok());
3637+
EXPECT_EQ(std::string(64_kb, '0'), buffer);
3638+
3639+
bool self_healed = false;
3640+
for (int i = 0; i < 100; ++i) {
3641+
io::CacheContext verify_ctx;
3642+
ReadStatistics verify_stats;
3643+
verify_ctx.stats = &verify_stats;
3644+
verify_ctx.cache_type = io::FileCacheType::NORMAL;
3645+
auto verify_holder = cache->get_or_set(key, 0, 64_kb, verify_ctx);
3646+
auto verify_blocks = fromHolder(verify_holder);
3647+
if (verify_blocks.size() == 1 && verify_blocks[0]->state() == io::FileBlock::State::EMPTY) {
3648+
self_healed = true;
3649+
break;
3650+
}
3651+
std::this_thread::sleep_for(std::chrono::milliseconds(10));
3652+
}
3653+
EXPECT_TRUE(self_healed);
3654+
EXPECT_EQ(g_read_cache_self_heal_on_not_found.get_value(), before_self_heal + 1);
3655+
3656+
EXPECT_TRUE(reader.close().ok());
3657+
EXPECT_TRUE(reader.closed());
3658+
std::this_thread::sleep_for(std::chrono::seconds(1));
3659+
if (fs::exists(cache_base_path)) {
3660+
fs::remove_all(cache_base_path);
3661+
}
3662+
FileCacheFactory::instance()->_caches.clear();
3663+
FileCacheFactory::instance()->_path_to_cache.clear();
3664+
FileCacheFactory::instance()->_capacity = 0;
3665+
}
3666+
3667+
TEST_F(BlockFileCacheTest, cached_remote_file_reader_no_self_heal_on_non_not_found_error) {
3668+
bool origin_enable_direct_read = config::enable_read_cache_file_directly;
3669+
config::enable_read_cache_file_directly = false;
3670+
Defer reset_direct_read {
3671+
[&] { config::enable_read_cache_file_directly = origin_enable_direct_read; }};
3672+
3673+
std::string cache_base_path =
3674+
caches_dir / "cached_remote_reader_no_self_heal_on_non_not_found_error" / "";
3675+
if (fs::exists(cache_base_path)) {
3676+
fs::remove_all(cache_base_path);
3677+
}
3678+
fs::create_directories(cache_base_path);
3679+
3680+
io::FileCacheSettings settings;
3681+
settings.query_queue_size = 6291456;
3682+
settings.query_queue_elements = 6;
3683+
settings.index_queue_size = 1048576;
3684+
settings.index_queue_elements = 1;
3685+
settings.disposable_queue_size = 1048576;
3686+
settings.disposable_queue_elements = 1;
3687+
settings.capacity = 8388608;
3688+
settings.max_file_block_size = 1048576;
3689+
settings.max_query_cache_size = 0;
3690+
ASSERT_TRUE(FileCacheFactory::instance()->create_file_cache(cache_base_path, settings).ok());
3691+
auto cache = FileCacheFactory::instance()->_path_to_cache[cache_base_path];
3692+
for (int i = 0; i < 100; i++) {
3693+
if (cache->get_async_open_success()) {
3694+
break;
3695+
}
3696+
std::this_thread::sleep_for(std::chrono::milliseconds(1));
3697+
}
3698+
3699+
FileReaderSPtr local_reader;
3700+
ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader));
3701+
io::FileReaderOptions opts;
3702+
opts.cache_type = io::cache_type_from_string("file_block_cache");
3703+
opts.is_doris_table = true;
3704+
CachedRemoteFileReader reader(local_reader, opts);
3705+
3706+
std::string buffer(64_kb, '\0');
3707+
IOContext io_ctx;
3708+
FileCacheStatistics stats;
3709+
io_ctx.file_cache_stats = &stats;
3710+
size_t bytes_read = 0;
3711+
ASSERT_TRUE(reader.read_at(0, Slice(buffer.data(), buffer.size()), &bytes_read, &io_ctx).ok());
3712+
3713+
uint64_t before_self_heal = g_read_cache_self_heal_on_not_found.get_value();
3714+
auto* sp = SyncPoint::get_instance();
3715+
sp->enable_processing();
3716+
Defer defer {[&] {
3717+
sp->clear_call_back("LocalFileReader::read_at_impl");
3718+
sp->disable_processing();
3719+
}};
3720+
sp->set_call_back("LocalFileReader::read_at_impl", [&](auto&& values) {
3721+
std::pair<Status, bool>* pair = try_any_cast<std::pair<Status, bool>*>(values.back());
3722+
pair->first = Status::IOError("inject io error for cache read");
3723+
pair->second = true;
3724+
});
3725+
3726+
auto st = reader.read_at(0, Slice(buffer.data(), buffer.size()), &bytes_read, &io_ctx);
3727+
ASSERT_FALSE(st.ok());
3728+
EXPECT_EQ(g_read_cache_self_heal_on_not_found.get_value(), before_self_heal);
3729+
3730+
sp->clear_call_back("LocalFileReader::read_at_impl");
3731+
sp->disable_processing();
3732+
3733+
io::CacheContext verify_ctx;
3734+
ReadStatistics verify_stats;
3735+
verify_ctx.stats = &verify_stats;
3736+
verify_ctx.cache_type = io::FileCacheType::NORMAL;
3737+
auto key = io::BlockFileCache::hash("tmp_file");
3738+
auto verify_holder = cache->get_or_set(key, 0, 64_kb, verify_ctx);
3739+
auto verify_blocks = fromHolder(verify_holder);
3740+
ASSERT_EQ(verify_blocks.size(), 1);
3741+
EXPECT_EQ(verify_blocks[0]->state(), io::FileBlock::State::DOWNLOADED);
3742+
3743+
EXPECT_TRUE(reader.close().ok());
3744+
EXPECT_TRUE(reader.closed());
3745+
std::this_thread::sleep_for(std::chrono::seconds(1));
3746+
if (fs::exists(cache_base_path)) {
3747+
fs::remove_all(cache_base_path);
3748+
}
3749+
FileCacheFactory::instance()->_caches.clear();
3750+
FileCacheFactory::instance()->_path_to_cache.clear();
3751+
FileCacheFactory::instance()->_capacity = 0;
3752+
}
3753+
35693754
TEST_F(BlockFileCacheTest, cached_remote_file_reader_init) {
35703755
std::string cache_base_path = caches_dir / "cached_remote_file_reader_init" / "";
35713756
if (fs::exists(cache_base_path)) {

0 commit comments

Comments
 (0)