Skip to content

Commit fbe4993

Browse files
authored
[fix](job) fix streaming job fails with "No new files found" on second scheduling (#61249)
### What problem does this PR solve? #### When a streaming job processes S3 files, the second scheduling fails with: No new files found in path: ... Root cause: In S3ObjStorage.globListInternal, currentMaxFile was unconditionally set to the last raw S3 object key returned in the response page, without checking whether it matched the glob pattern. This affects two scenarios: **Scenario 1** — reachLimit=false (all matched files consumed in one listing): The S3 page still contains non-matching keys after the last matched file (e.g. test_csv_comma_header.csv.lz4 sitting after test_csv_comma_header.csv). currentMaxFile gets set to the .lz4 key, so hasMoreDataToConsume() returns true. The next scheduling calls startAfter("...csv"), S3 returns only .lz4 which doesn't match the glob → rfiles empty → exception. **Scenario 2** — reachLimit=true (batch limit hit mid-page): After the limit is hit, the remaining page objects are not inspected. The original code set currentMaxFile to the last raw key in the entire page (which may be a non-matching sibling), causing the same failure on the next scheduling attempt. #### Fix Track lastMatchedKey (the last S3 key that actually matched the glob) during the listing loop. When reachLimit=true, instead of breaking out of the for loop immediately, continue scanning the remaining objects already fetched in the current page to find the first next glob-matching key as currentMaxFile. No extra S3 API call is needed. When no next matching key is found in the remaining page objects, fall back to lastMatchedKey instead of the raw last S3 page key. ####Regression Test Added test_streaming_job_no_new_files_with_sibling. The pattern example_[0-0].csv only matches example_0.csv; since getLongestPrefix strips at [, the S3 listing prefix becomes regression/load/data/example_ and returns both example_0.csv and example_1.csv — example_1.csv acts as the non-matching sibling. The test verifies that after the first successful task no failed tasks appear.
1 parent aef4694 commit fbe4993

File tree

4 files changed

+145
-8
lines changed

4 files changed

+145
-8
lines changed

fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -645,11 +645,25 @@ private GlobListResult globListInternal(String remotePath, List<RemoteFile> resu
645645

646646
boolean isTruncated = false;
647647
boolean reachLimit = false;
648+
String lastMatchedKey = "";
648649
do {
649650
roundCnt++;
650651
ListObjectsV2Response response = listObjectsV2(request);
651652
for (S3Object obj : response.contents()) {
652653
elementCnt++;
654+
655+
// Limit already reached: scan remaining objects in this page to find
656+
// the next glob-matching key, so hasMoreDataToConsume() returns true
657+
// correctly without recording a non-matching raw S3 key as currentMaxFile.
658+
if (reachLimit) {
659+
java.nio.file.Path checkPath = Paths.get(obj.key());
660+
if (matcher.matches(checkPath)) {
661+
currentMaxFile = obj.key();
662+
break;
663+
}
664+
continue;
665+
}
666+
653667
java.nio.file.Path objPath = Paths.get(obj.key());
654668

655669
boolean isPrefix = false;
@@ -677,6 +691,7 @@ private GlobListResult globListInternal(String remotePath, List<RemoteFile> resu
677691
isPrefix ? 0 : obj.lastModified().toEpochMilli()
678692
);
679693
result.add(remoteFile);
694+
lastMatchedKey = obj.key();
680695

681696
if (hasLimits && reachLimit(result.size(), matchFileSize, fileSizeLimit, fileNumLimit)) {
682697
reachLimit = true;
@@ -686,15 +701,13 @@ private GlobListResult globListInternal(String remotePath, List<RemoteFile> resu
686701
objPath = objPath.getParent();
687702
isPrefix = true;
688703
}
689-
if (reachLimit) {
690-
break;
691-
}
692704
}
693705

694-
// Record current max file for limit scenario
695-
if (!response.contents().isEmpty()) {
696-
S3Object lastS3Object = response.contents().get(response.contents().size() - 1);
697-
currentMaxFile = lastS3Object.key();
706+
// If no next matching file was found after the limit in the current page,
707+
// fall back to lastMatchedKey to avoid a non-matching raw S3 key
708+
// (e.g. a sibling file like .lz4) being recorded as currentMaxFile.
709+
if (currentMaxFile.isEmpty()) {
710+
currentMaxFile = lastMatchedKey;
698711
}
699712

700713
isTruncated = response.isTruncated();

fe/fe-core/src/main/java/org/apache/doris/job/common/FailureReason.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,9 @@ public FailureReason(String msg) {
5050

5151
private static boolean isTooManyFailureRowsErr(String msg) {
5252
return msg.contains("Insert has filtered data in strict mode")
53-
|| msg.contains("too many filtered rows");
53+
|| msg.contains("too many filtered")
54+
|| msg.contains("Encountered unqualified data")
55+
|| msg.contains("parse number fail");
5456
}
5557

5658
public InternalErrorCode getCode() {
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
-- This file is automatically generated. You should know what you did if you want to edit this
2+
-- !select --
3+
1 Emily 25
4+
2 Benjamin 35
5+
3 Olivia 28
6+
4 Alexander 60
7+
5 Ava 17
8+
6 William 69
9+
7 Sophia 32
10+
8 James 64
11+
9 Emma 37
12+
10 Liam 64
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
19+
import org.awaitility.Awaitility
20+
21+
import static java.util.concurrent.TimeUnit.SECONDS
22+
23+
// Regression test for: streaming job second scheduling fails with "No new files found"
24+
// when S3 listing returns non-matching sibling keys (e.g. example_1.csv) after the last
25+
// matched file, causing currentMaxFile to be set to a non-matching raw S3 key.
26+
//
27+
// Pattern example_[0-0].csv matches only example_0.csv, but getLongestPrefix strips
28+
// the bracket so S3 lists both example_0.csv and example_1.csv in the same page.
29+
// Without the fix, currentMaxFile = "example_1.csv" triggers a second scheduling
30+
// that finds no matching files and errors. With the fix, currentMaxFile = "example_0.csv"
31+
// and hasMoreDataToConsume() correctly returns false.
32+
suite("test_streaming_job_no_new_files_with_sibling") {
33+
def tableName = "test_streaming_job_no_new_files_with_sibling_tbl"
34+
def jobName = "test_streaming_job_no_new_files_with_sibling_job"
35+
36+
sql """drop table if exists `${tableName}` force"""
37+
sql """
38+
DROP JOB IF EXISTS where jobname = '${jobName}'
39+
"""
40+
41+
sql """
42+
CREATE TABLE IF NOT EXISTS ${tableName} (
43+
`c1` int NULL,
44+
`c2` string NULL,
45+
`c3` int NULL
46+
) ENGINE=OLAP
47+
DUPLICATE KEY(`c1`)
48+
COMMENT 'OLAP'
49+
DISTRIBUTED BY HASH(`c1`) BUCKETS 3
50+
PROPERTIES ("replication_allocation" = "tag.location.default: 1");
51+
"""
52+
53+
// Use example_[0-0].csv: glob matches only example_0.csv, but S3 listing prefix
54+
// "example_" also returns example_1.csv, which does not match the pattern.
55+
// This reproduces the "non-matching sibling key" scenario.
56+
sql """
57+
CREATE JOB ${jobName}
58+
ON STREAMING DO INSERT INTO ${tableName}
59+
SELECT * FROM S3
60+
(
61+
"uri" = "s3://${s3BucketName}/regression/load/data/example_[0-0].csv",
62+
"format" = "csv",
63+
"provider" = "${getS3Provider()}",
64+
"column_separator" = ",",
65+
"s3.endpoint" = "${getS3Endpoint()}",
66+
"s3.region" = "${getS3Region()}",
67+
"s3.access_key" = "${getS3AK()}",
68+
"s3.secret_key" = "${getS3SK()}"
69+
);
70+
"""
71+
72+
try {
73+
// Wait for the first task to succeed
74+
Awaitility.await().atMost(120, SECONDS)
75+
.pollInterval(1, SECONDS).until(
76+
{
77+
def res = sql """ select SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
78+
log.info("SucceedTaskCount: " + res)
79+
res.size() == 1 && '1' <= res.get(0).get(0)
80+
}
81+
)
82+
83+
// Wait extra time to allow a potential second (buggy) scheduling attempt
84+
Thread.sleep(10000)
85+
86+
// Verify no failed tasks: the job should not have tried to re-schedule and
87+
// hit "No new files found" after all matched files are consumed.
88+
def jobStatus = sql """
89+
select Status, SucceedTaskCount, FailedTaskCount, ErrorMsg
90+
from jobs("type"="insert") where Name = '${jobName}'
91+
"""
92+
log.info("jobStatus: " + jobStatus)
93+
assert jobStatus.get(0).get(2) == '0' : "Expected no failed tasks, but got: " + jobStatus
94+
assert jobStatus.get(0).get(0) != "STOPPED" : "Job should not be stopped, status: " + jobStatus
95+
96+
qt_select """ SELECT * FROM ${tableName} order by c1 """
97+
98+
} catch (Exception ex) {
99+
def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'"""
100+
def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'"""
101+
log.info("show job: " + showjob)
102+
log.info("show task: " + showtask)
103+
throw ex
104+
} finally {
105+
sql """
106+
DROP JOB IF EXISTS where jobname = '${jobName}'
107+
"""
108+
sql """drop table if exists `${tableName}` force"""
109+
}
110+
}

0 commit comments

Comments
 (0)