Skip to content

Commit 70734be

Browse files
committed
update
1 parent b4e54ad commit 70734be

3 files changed

Lines changed: 120 additions & 2 deletions

File tree

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1972,6 +1972,16 @@ public InlineElement getDescription() {
19721972
"When a batch job queries from a table, if a partition does not exist in the current branch, "
19731973
+ "the reader will try to get this partition from this fallback branch.");
19741974

1975+
public static final ConfigOption<Boolean> SCAN_FALLBACK_BRANCH_READ_FAIL_FAST =
1976+
key("scan.fallback-branch.read-fail-fast")
1977+
.booleanType()
1978+
.defaultValue(false)
1979+
.withDescription(
1980+
"Whether to fail the read immediately when reading from a fallback branch throws. "
1981+
+ "By default the failure is logged with the full stack trace and the reader "
1982+
+ "falls through to the current branch, which can mask data issues. "
1983+
+ "Set this to true to surface fallback branch errors to the caller instead.");
1984+
19751985
public static final ConfigOption<String> SCAN_PRIMARY_BRANCH =
19761986
key("scan.primary-branch")
19771987
.stringType()

paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -573,12 +573,17 @@ private class Read implements InnerTableRead {
573573

574574
private final InnerTableRead mainRead;
575575
private final InnerTableRead fallbackRead;
576+
private final boolean fallbackReadFailFast;
576577

577578
private Read() {
578579
FileStoreTable first = wrappedFirst ? wrapped : other;
579580
FileStoreTable second = wrappedFirst ? other : wrapped;
580581
this.mainRead = first.newRead();
581582
this.fallbackRead = second.newRead();
583+
this.fallbackReadFailFast =
584+
wrapped.coreOptions()
585+
.toConfiguration()
586+
.get(CoreOptions.SCAN_FALLBACK_BRANCH_READ_FAIL_FAST);
582587
}
583588

584589
@Override
@@ -623,10 +628,23 @@ public RecordReader<InternalRow> createReader(Split split) throws IOException {
623628
if (fallbackSplit.isFallback()) {
624629
try {
625630
return fallbackRead.createReader(fallbackSplit.wrapped());
626-
} catch (Exception ignored) {
631+
} catch (Exception e) {
632+
if (fallbackReadFailFast) {
633+
if (e instanceof IOException) {
634+
throw (IOException) e;
635+
}
636+
if (e instanceof RuntimeException) {
637+
throw (RuntimeException) e;
638+
}
639+
throw new IOException(
640+
"Failed to read fallback branch split: "
641+
+ fallbackSplit.wrapped(),
642+
e);
643+
}
627644
LOG.error(
628645
"Reading from supplemental branch has problems: {}",
629-
fallbackSplit.wrapped());
646+
fallbackSplit.wrapped(),
647+
e);
630648
}
631649
}
632650
}

paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.paimon.fs.local.LocalFileIO;
2727
import org.apache.paimon.manifest.PartitionEntry;
2828
import org.apache.paimon.options.Options;
29+
import org.apache.paimon.predicate.Predicate;
2930
import org.apache.paimon.predicate.PredicateBuilder;
3031
import org.apache.paimon.schema.Schema;
3132
import org.apache.paimon.schema.SchemaManager;
@@ -34,25 +35,31 @@
3435
import org.apache.paimon.table.sink.StreamTableCommit;
3536
import org.apache.paimon.table.sink.StreamTableWrite;
3637
import org.apache.paimon.table.source.DataTableScan;
38+
import org.apache.paimon.table.source.InnerTableRead;
3739
import org.apache.paimon.table.source.Split;
40+
import org.apache.paimon.table.source.TableRead;
3841
import org.apache.paimon.types.DataType;
3942
import org.apache.paimon.types.DataTypes;
4043
import org.apache.paimon.types.RowType;
4144
import org.apache.paimon.utils.Pair;
4245
import org.apache.paimon.utils.TraceableFileIO;
4346

4447
import org.junit.jupiter.api.BeforeEach;
48+
import org.junit.jupiter.api.Test;
4549
import org.junit.jupiter.api.io.TempDir;
4650
import org.junit.jupiter.params.ParameterizedTest;
4751
import org.junit.jupiter.params.provider.ValueSource;
52+
import org.mockito.Mockito;
4853

54+
import java.io.IOException;
4955
import java.util.Collections;
5056
import java.util.List;
5157
import java.util.UUID;
5258
import java.util.stream.Collectors;
5359

5460
import static org.apache.paimon.table.SchemaEvolutionTableTestBase.rowData;
5561
import static org.assertj.core.api.Assertions.assertThat;
62+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
5663

5764
/** Tests for {@link FallbackReadFileStoreTable}. */
5865
public class FallbackReadFileStoreTableTest {
@@ -242,6 +249,89 @@ public void testWriteGoesToWrapped(boolean wrappedFirst) throws Exception {
242249
assertThat(mergedPartitions).containsExactlyInAnyOrder(1, 2, 3);
243250
}
244251

252+
@Test
253+
public void testFallbackReadFailFastDefaultSwallowsException() throws Exception {
254+
FallbackReadFileStoreTable table = setUpTableWithThrowingFallback(false);
255+
Split split = onlyFallbackSplit(table);
256+
257+
// Default behavior: the failing fallback read is swallowed and the reader
258+
// falls through to the main branch, which has no data for partition 3 and
259+
// either returns an empty reader or throws something other than the
260+
// injected fallback exception.
261+
try {
262+
table.newRead().createReader(split);
263+
} catch (Exception e) {
264+
assertThat(e.getMessage())
265+
.as("fallback exception must not propagate when fail-fast is disabled")
266+
.doesNotContain("injected fallback failure");
267+
}
268+
}
269+
270+
@Test
271+
public void testFallbackReadFailFastPropagatesException() throws Exception {
272+
FallbackReadFileStoreTable table = setUpTableWithThrowingFallback(true);
273+
Split split = onlyFallbackSplit(table);
274+
275+
assertThatThrownBy(() -> table.newRead().createReader(split))
276+
.hasMessageContaining("injected fallback failure");
277+
}
278+
279+
private FallbackReadFileStoreTable setUpTableWithThrowingFallback(boolean failFast)
280+
throws Exception {
281+
String branchName = "bc";
282+
FileStoreTable mainTable = createTable();
283+
writeDataIntoTable(mainTable, 0, rowData(1, 10));
284+
mainTable.createBranch(branchName);
285+
FileStoreTable branchTable = createTableFromBranch(mainTable, branchName);
286+
writeDataIntoTable(branchTable, 0, rowData(3, 60));
287+
288+
Options overrides = new Options();
289+
overrides.set(CoreOptions.SCAN_FALLBACK_BRANCH_READ_FAIL_FAST, failFast);
290+
FileStoreTable mainWithOption = mainTable.copy(overrides.toMap());
291+
292+
FileStoreTable spyBranch = Mockito.spy(branchTable);
293+
InnerTableRead throwing = throwingInnerTableRead();
294+
Mockito.doReturn(throwing).when(spyBranch).newRead();
295+
296+
return new FallbackReadFileStoreTable(mainWithOption, spyBranch, true);
297+
}
298+
299+
private static Split onlyFallbackSplit(FallbackReadFileStoreTable table) {
300+
DataTableScan scan = table.newScan();
301+
scan.withFilter(new PredicateBuilder(ROW_TYPE).equal(0, 3));
302+
List<Split> splits = scan.plan().splits();
303+
assertThat(splits).hasSize(1);
304+
FallbackReadFileStoreTable.FallbackSplit fs =
305+
(FallbackReadFileStoreTable.FallbackSplit) splits.get(0);
306+
assertThat(fs.isFallback()).isTrue();
307+
return splits.get(0);
308+
}
309+
310+
private static InnerTableRead throwingInnerTableRead() {
311+
return new InnerTableRead() {
312+
@Override
313+
public InnerTableRead withFilter(Predicate predicate) {
314+
return this;
315+
}
316+
317+
@Override
318+
public InnerTableRead withReadType(RowType readType) {
319+
return this;
320+
}
321+
322+
@Override
323+
public TableRead withIOManager(org.apache.paimon.disk.IOManager ioManager) {
324+
return this;
325+
}
326+
327+
@Override
328+
public org.apache.paimon.reader.RecordReader<InternalRow> createReader(Split split)
329+
throws IOException {
330+
throw new IOException("injected fallback failure");
331+
}
332+
};
333+
}
334+
245335
private void writeDataIntoTable(
246336
FileStoreTable table, long commitIdentifier, InternalRow... allData) throws Exception {
247337
StreamTableWrite write = table.newWrite(commitUser);

0 commit comments

Comments
 (0)