Skip to content

Commit 663312c

Browse files
branch-4.0: [fix](job) fix streaming job stuck when S3 auth error is silently ignored in fetchRemoteMeta #61284 (#61296)
Cherry-picked from #61284 Co-authored-by: wudi <[email protected]>
1 parent bd2b354 commit 663312c

File tree

2 files changed

+114
-3
lines changed

2 files changed

+114
-3
lines changed

fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.doris.job.offset.s3;
1919

20+
import org.apache.doris.backup.Status;
21+
import org.apache.doris.common.util.DebugPointUtil;
2022
import org.apache.doris.datasource.property.storage.StorageProperties;
2123
import org.apache.doris.fs.FileSystemFactory;
2224
import org.apache.doris.fs.GlobListResult;
@@ -69,6 +71,11 @@ public S3Offset getNextOffset(StreamingJobProperties jobProps, Map<String, Strin
6971
filePath = storageProperties.validateAndNormalizeUri(uri);
7072
GlobListResult globListResult = fileSystem.globListWithLimit(filePath, rfiles, startFile,
7173
jobProps.getS3BatchBytes(), jobProps.getS3BatchFiles());
74+
if (globListResult == null || !globListResult.getStatus().ok()) {
75+
String errMsg = globListResult != null
76+
? globListResult.getStatus().getErrMsg() : "null result";
77+
throw new RuntimeException("Failed to list S3 files: " + errMsg);
78+
}
7279

7380
if (!rfiles.isEmpty()) {
7481
String bucket = globListResult.getBucket();
@@ -162,11 +169,19 @@ public void fetchRemoteMeta(Map<String, String> properties) throws Exception {
162169
String filePath = storageProperties.validateAndNormalizeUri(uri);
163170
List<RemoteFile> objects = new ArrayList<>();
164171
GlobListResult globListResult = fileSystem.globListWithLimit(filePath, objects, startFile, 1, 1);
165-
if (globListResult != null && !objects.isEmpty() && StringUtils.isNotEmpty(globListResult.getMaxFile())) {
172+
// debug point: simulate globListWithLimit returning a failed status (e.g. S3 auth error)
173+
if (DebugPointUtil.isEnable("S3SourceOffsetProvider.fetchRemoteMeta.error")) {
174+
globListResult = new GlobListResult(new Status(Status.ErrCode.COMMON_ERROR,
175+
"debug point: simulated S3 auth error"));
176+
}
177+
if (globListResult == null || !globListResult.getStatus().ok()) {
178+
String errMsg = globListResult != null
179+
? globListResult.getStatus().getErrMsg() : "null result";
180+
throw new Exception("Failed to list S3 files: " + errMsg);
181+
}
182+
if (!objects.isEmpty() && StringUtils.isNotEmpty(globListResult.getMaxFile())) {
166183
maxEndFile = globListResult.getMaxFile();
167184
}
168-
} catch (Exception e) {
169-
throw e;
170185
}
171186
}
172187

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
19+
import org.awaitility.Awaitility
20+
21+
import static java.util.concurrent.TimeUnit.SECONDS
22+
23+
// Verify that when fetchRemoteMeta receives a failed GlobListResult (e.g. S3 auth error),
24+
// the streaming job is correctly PAUSED rather than hanging indefinitely.
25+
suite("test_streaming_insert_job_fetch_meta_error", "nonConcurrent") {
26+
def tableName = "test_streaming_insert_job_fetch_meta_error_tbl"
27+
def jobName = "test_streaming_insert_job_fetch_meta_error_job"
28+
29+
sql """drop table if exists `${tableName}` force"""
30+
sql """
31+
DROP JOB IF EXISTS where jobname = '${jobName}'
32+
"""
33+
34+
sql """
35+
CREATE TABLE IF NOT EXISTS ${tableName} (
36+
`c1` int NULL,
37+
`c2` string NULL,
38+
`c3` int NULL,
39+
) ENGINE=OLAP
40+
DUPLICATE KEY(`c1`)
41+
COMMENT 'OLAP'
42+
DISTRIBUTED BY HASH(`c1`) BUCKETS 3
43+
PROPERTIES ("replication_allocation" = "tag.location.default: 1");
44+
"""
45+
46+
GetDebugPoint().enableDebugPointForAllFEs("S3SourceOffsetProvider.fetchRemoteMeta.error")
47+
try {
48+
sql """
49+
CREATE JOB ${jobName}
50+
ON STREAMING DO INSERT INTO ${tableName}
51+
SELECT * FROM S3
52+
(
53+
"uri" = "s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
54+
"format" = "csv",
55+
"provider" = "${getS3Provider()}",
56+
"column_separator" = ",",
57+
"s3.endpoint" = "${getS3Endpoint()}",
58+
"s3.region" = "${getS3Region()}",
59+
"s3.access_key" = "${getS3AK()}",
60+
"s3.secret_key" = "${getS3SK()}"
61+
);
62+
"""
63+
64+
try {
65+
Awaitility.await().atMost(120, SECONDS)
66+
.pollInterval(2, SECONDS).until(
67+
{
68+
def jobRes = sql """ select Status from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
69+
log.info("jobRes: " + jobRes)
70+
jobRes.size() == 1 && 'PAUSED'.equals(jobRes.get(0).get(0))
71+
}
72+
)
73+
} catch (Exception ex) {
74+
def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'"""
75+
def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'"""
76+
log.info("show job: " + showjob)
77+
log.info("show task: " + showtask)
78+
throw ex
79+
}
80+
81+
def jobStatus = sql """select Status, ErrorMsg from jobs("type"="insert") where Name='${jobName}'"""
82+
assert jobStatus.get(0).get(0) == "PAUSED"
83+
assert jobStatus.get(0).get(1).contains("Failed to list S3 files")
84+
85+
sql """
86+
DROP JOB IF EXISTS where jobname = '${jobName}'
87+
"""
88+
89+
def jobCountRsp = sql """select count(1) from jobs("type"="insert") where Name='${jobName}'"""
90+
assert jobCountRsp.get(0).get(0) == 0
91+
92+
} finally {
93+
GetDebugPoint().disableDebugPointForAllFEs("S3SourceOffsetProvider.fetchRemoteMeta.error")
94+
sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
95+
}
96+
}

0 commit comments

Comments
 (0)