[fix](job) fix streaming job stuck when S3 auth error is silently ignored in fetchRemoteMeta#61284
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
There was a problem hiding this comment.
Pull request overview
Fixes a streaming insert job hang when S3 listing fails (e.g., auth/403) by surfacing the error and pausing the job, plus adds a regression test to prevent silent stalls.
Changes:
- Validate
GlobListResultstatus ingetNextOffset()andfetchRemoteMeta()and throw with the real S3 error when listing fails. - Add a debug point to simulate failed S3 listing for testing.
- Add a regression test asserting the job transitions to
PAUSEDand exposes the correct error message.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_fetch_meta_error.groovy | Adds regression coverage for PAUSE-on-fetch-meta error behavior via debug point injection |
| fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java | Propagates S3 listing errors by checking GlobListResult status and adds a debug point for failure injection |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_fetch_meta_error.groovy
Show resolved
Hide resolved
fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
Show resolved
Hide resolved
fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
Show resolved
Hide resolved
|
/review |
|
PR approved by at least one committer and no changes requested. |
|
PR approved by anyone and no changes requested. |
There was a problem hiding this comment.
Code Review Summary
PR Goal
Fix streaming job hanging when S3 auth errors are silently ignored in fetchRemoteMeta(). The root cause is that S3ObjStorage.globListInternal() catches all exceptions and returns a GlobListResult with a non-ok Status, but S3SourceOffsetProvider.fetchRemoteMeta() never checked the returned status. This fix adds status checks in both fetchRemoteMeta() and getNextOffset() and adds a regression test.
Critical Checkpoint Conclusions
-
Goal accomplishment: The fix correctly addresses the root cause. The
fetchRemoteMeta()fix is sound — the thrownExceptionis properly caught byStreamingInsertJob.fetchMeta()which setsGET_REMOTE_DATA_ERRORand pauses the job for auto-resume. The regression test proves the fix works. -
Modification size/focus: The change is small (18 additions, 3 deletions in the Java file) and focused on the single issue. Good.
-
Concurrency: No new concurrency concerns introduced. The
maxEndFileandcurrentOffsetfields were already used without synchronization by the existing code; this PR doesn't change that pattern. -
Lifecycle/static init: No changes. N/A.
-
Configuration items: None added. N/A.
-
Incompatible changes: None. N/A.
-
Parallel code paths: Both
getNextOffset()andfetchRemoteMeta()are patched. Issue found ingetNextOffset()— see inline comment about double-wrapping of RuntimeException. -
Special conditional checks: The
globListResult == nullcheck is defensive but reasonable since the interface contract ofglobListWithLimitdoesn't guarantee non-null. The!globListResult.getStatus().ok()check is the core fix. -
Test coverage: A regression test is added that uses a debug point to inject the failure and verifies the job transitions to PAUSED with the correct error message. The test follows existing conventions in the streaming job test suite. No negative test for
getNextOffset()is included, though thefetchRemoteMeta()path is the primary one that caused the hang. -
Observability: The existing
log.warninfetchMeta()andgetNextOffset()catch blocks provide adequate logging. The error message includes the original S3 error detail. -
Transaction/persistence: N/A.
-
FE-BE variable passing: N/A.
-
Performance: No performance concerns. The status check is trivial.
-
Other issues: One bug found — see inline comment.
Verdict
One bug found in getNextOffset(): the new RuntimeException thrown inside the try block gets caught by the outer catch (Exception e) and double-wrapped, losing the clear error message. See inline comment for details and suggested fix.
| import org.apache.doris.common.util.DebugPointUtil; | ||
| import org.apache.doris.datasource.property.storage.StorageProperties; | ||
| import org.apache.doris.fs.FileSystemFactory; | ||
| import org.apache.doris.fs.GlobListResult; |
There was a problem hiding this comment.
Bug: Double-wrapped RuntimeException loses the clear error message.
This throw new RuntimeException("Failed to list S3 files: " + errMsg) is inside the try (RemoteFileSystem ...) block. It will be immediately caught by the catch (Exception e) at line 106 (original numbering), which re-wraps it as throw new RuntimeException(e). The result is a double-wrapped exception where the outer message is just the toString() of the inner RuntimeException, losing the clean "Failed to list S3 files" message that the caller (AbstractStreamingTask.execute()) uses for errMsg.
Suggested fix: either (a) catch only non-RuntimeException in the catch block by re-throwing RuntimeException before wrapping, or (b) move the status check outside/after the try-with-resources, or (c) add a specific catch clause. For example:
} catch (RuntimeException e) {
log.warn("list path exception, path={}", filePath, e);
throw e;
} catch (Exception e) {
log.warn("list path exception, path={}", filePath, e);
throw new RuntimeException(e);
}Note: The same issue applies to the existing throw new RuntimeException("No new files found in path: " + filePath) at line 104 (pre-existing), but since this PR introduces the new throw, it would be good to fix the catch block as well.
FE UT Coverage ReportIncrement line coverage |
TPC-H: Total hot run time: 27850 ms |
TPC-DS: Total hot run time: 154485 ms |
…ored in fetchRemoteMeta (#61284) ### What problem does this PR solve? #### Problem When S3 credentials become invalid (e.g. 403 auth error), the streaming job neither pauses nor reports an error — it hang, even add new files. indefinitely without making progress. #### Root cause: S3ObjStorage.globListInternal() catches all exceptions and returns a GlobListResult with a non-ok Status instead of rethrowing. S3SourceOffsetProvider.fetchRemoteMeta() called globListWithLimit() but never checked the returned status. Since objects was empty, the maxEndFile was never updated, hasMoreDataToConsume() kept returning false, and the scheduler retried every 500ms forever without triggering a PAUSE. The same status check was also missing in getNextOffset(), which would produce a misleading "No new files found" error instead of the actual S3 error message. #### Fix - In fetchRemoteMeta(): check globListResult status after globListWithLimit(); throw Exception with the real error message if not ok, so the upper-level StreamingInsertJob.fetchMeta() catch block can catch it, set GET_REMOTE_DATA_ERROR, and PAUSE the job for auto-resume. - In getNextOffset(): same status check, throw RuntimeException with accurate error message. - Add a debug point S3SourceOffsetProvider.fetchRemoteMeta.error to simulate a failed GlobListResult for testing. #### Test Added regression test test_streaming_insert_job_fetch_meta_error: enables the debug point to inject a failed GlobListResult, creates a streaming job, waits for it to reach PAUSED status, and asserts the ErrorMsg contains "Failed to list S3 files".
…silently ignored in fetchRemoteMeta #61284 (#61296) Cherry-picked from #61284 Co-authored-by: wudi <[email protected]>
What problem does this PR solve?
Problem
When S3 credentials become invalid (e.g. 403 auth error), the streaming job neither pauses nor reports an error — it hang, even add new files.
indefinitely without making progress.
Root cause:
S3ObjStorage.globListInternal() catches all exceptions and returns a GlobListResult with a non-ok Status instead of
rethrowing. S3SourceOffsetProvider.fetchRemoteMeta() called globListWithLimit() but never checked the returned status.
Since objects was empty, the maxEndFile was never updated, hasMoreDataToConsume() kept returning false, and the scheduler
retried every 500ms forever without triggering a PAUSE.
The same status check was also missing in getNextOffset(), which would produce a misleading "No new files found" error
instead of the actual S3 error message.
Fix
if not ok, so the upper-level StreamingInsertJob.fetchMeta() catch block can catch it, set GET_REMOTE_DATA_ERROR, and PAUSE
the job for auto-resume.
Test
Added regression test test_streaming_insert_job_fetch_meta_error: enables the debug point to inject a failed
GlobListResult, creates a streaming job, waits for it to reach PAUSED status, and asserts the ErrorMsg contains "Failed to
list S3 files".
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)