Skip to content

Commit bd6fb72

Browse files
morningmanYour Name
authored andcommitted
[fix](fe) Fix INSERT INTO local TVF ignoring backend_id during scheduling (#61732)
### What problem does this PR solve? Followup #60719 Problem Summary: When using `INSERT INTO local("backend_id" = "X" ...)`, the data should only be written to the BE node specified by `backend_id`. However, the Coordinator schedules the sink fragment to an arbitrary backend because the fragment uses `UNPARTITIONED` partition, which causes `SimpleScheduler.getHost()` to pick any available BE. This results in file creation failures when the target directory only exists on the intended BE. **Root Cause:** - The read path (`SELECT FROM local(...)`) correctly handles this via `TVFScanNode.initBackendPolicy()`, restricting the scan to the specified backend. - The write path (`INSERT INTO local(...)`) had no equivalent logic. `PhysicalPlanTranslator.visitPhysicalTVFTableSink()` creates the fragment as `UNPARTITIONED`, and `Coordinator.computeFragmentHosts()` assigns it to a random BE. **Fix:** Added backend_id-aware scheduling in `Coordinator.computeFragmentHosts()` for local `TVFTableSink`, forcing the sink fragment to execute on the designated backend. This is consistent with the existing `DictionarySink` pattern that also overrides fragment scheduling for specific sink types. **Changes:** 1. `TVFTableSink.java` - Added `getTvfName()` and `getBackendId()` accessor methods 2. `Coordinator.java` - Added check before UNPARTITIONED scheduling: if the sink is a local TVFTableSink with a specific backend_id, force the fragment onto that backend
1 parent 0dd1473 commit bd6fb72

File tree

2 files changed

+35
-0
lines changed

2 files changed

+35
-0
lines changed

fe/fe-core/src/main/java/org/apache/doris/planner/TVFTableSink.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,22 @@ public TVFTableSink(PlanNodeId exchNodeId, String tvfName, Map<String, String> p
6060
this.cols = cols;
6161
}
6262

63+
public String getTvfName() {
64+
return tvfName;
65+
}
66+
67+
/**
68+
* Returns the backend_id specified in properties, or -1 if not set.
69+
* For local TVF, this indicates the specific BE node where data should be written.
70+
*/
71+
public long getBackendId() {
72+
String backendIdStr = properties.get("backend_id");
73+
if (backendIdStr != null) {
74+
return Long.parseLong(backendIdStr);
75+
}
76+
return -1;
77+
}
78+
6379
public void bindDataSink() throws AnalysisException {
6480
TTVFTableSink tSink = new TTVFTableSink();
6581
tSink.setTvfName(tvfName);

fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import org.apache.doris.planner.SchemaScanNode;
7373
import org.apache.doris.planner.SetOperationNode;
7474
import org.apache.doris.planner.SortNode;
75+
import org.apache.doris.planner.TVFTableSink;
7576
import org.apache.doris.planner.UnionNode;
7677
import org.apache.doris.proto.InternalService;
7778
import org.apache.doris.proto.InternalService.PExecPlanFragmentResult;
@@ -1767,6 +1768,24 @@ protected void computeFragmentHosts() throws Exception {
17671768
// TODO: rethink the whole function logic. could All BE sink naturally merged into other judgements?
17681769
return;
17691770
}
1771+
// For local TVF sink with a specific backend_id, we must execute the sink fragment
1772+
// on the designated backend. Otherwise, data would be written to the wrong node's local disk.
1773+
if (fragment.getSink() instanceof TVFTableSink) {
1774+
TVFTableSink tvfSink = (TVFTableSink) fragment.getSink();
1775+
if ("local".equals(tvfSink.getTvfName()) && tvfSink.getBackendId() != -1) {
1776+
Backend targetBackend = Env.getCurrentSystemInfo().getBackend(tvfSink.getBackendId());
1777+
if (targetBackend == null || !targetBackend.isAlive()) {
1778+
throw new UserException("Backend " + tvfSink.getBackendId()
1779+
+ " is not available for local TVF sink");
1780+
}
1781+
TNetworkAddress execHostport = new TNetworkAddress(
1782+
targetBackend.getHost(), targetBackend.getBePort());
1783+
this.addressToBackendID.put(execHostport, targetBackend.getId());
1784+
FInstanceExecParam instanceParam = new FInstanceExecParam(null, execHostport, params);
1785+
params.instanceExecParams.add(instanceParam);
1786+
continue;
1787+
}
1788+
}
17701789

17711790
if (fragment.getDataPartition() == DataPartition.UNPARTITIONED) {
17721791
Reference<Long> backendIdRef = new Reference<Long>();

0 commit comments

Comments
 (0)