diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java index 593e68ef8d82ba..ae12107c3ddac4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java @@ -511,6 +511,20 @@ public synchronized void run() { } public void initClients() throws Exception { + prepareClients(); + if (beToClient.isEmpty()) { + try { + for (Map.Entry entry : beToThriftAddress.entrySet()) { + initClient(entry.getKey(), entry.getValue()); + } + } catch (Exception e) { + releaseClients(); + throw new RuntimeException(e); + } + } + } + + private void prepareClients() { if (beToThriftAddress == null || beToThriftAddress.isEmpty()) { fetchBeToThriftAddress(); } @@ -518,25 +532,36 @@ public void initClients() throws Exception { beToClient = new HashMap<>(); beToAddr = new HashMap<>(); } + } + + private void initClient(long beId, String thriftAddress) throws Exception { + boolean ok = false; + TNetworkAddress address = null; + Client client = null; + try { + String[] ipPort = thriftAddress.split(":"); + address = new TNetworkAddress(ipPort[0], Integer.parseInt(ipPort[1])); + beToAddr.put(beId, address); + client = ClientPool.backendPool.borrowObject(address); + beToClient.put(beId, client); + ok = true; + } finally { + if (!ok) { + ClientPool.backendPool.invalidateObject(address, client); + beToAddr.remove(beId); + } + } + } + + private void initClientsForClearJob() { + prepareClients(); if (beToClient.isEmpty()) { for (Map.Entry entry : beToThriftAddress.entrySet()) { - boolean ok = false; - TNetworkAddress address = null; - Client client = null; try { - String[] ipPort = entry.getValue().split(":"); - address = new TNetworkAddress(ipPort[0], Integer.parseInt(ipPort[1])); - beToAddr.put(entry.getKey(), address); - client = ClientPool.backendPool.borrowObject(address); - beToClient.put(entry.getKey(), client); - ok = true; + initClient(entry.getKey(), entry.getValue()); } catch (Exception e) { - throw new RuntimeException(e); - } finally { - if (!ok) { - ClientPool.backendPool.invalidateObject(address, client); - releaseClients(); - } + LOG.warn("init client for BE {} ({}) failed when clearing warm up job {}: {}", + entry.getKey(), entry.getValue(), jobId, e.getMessage()); } } } @@ -575,7 +600,7 @@ private String getBackendEndpoint(long beId) { private final void clearJobOnBEs() { try { - initClients(); + initClientsForClearJob(); // Iterate with explicit iterator so we can remove invalidated clients during iteration. Iterator> iter = beToClient.entrySet().iterator(); while (iter.hasNext()) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/cloud/CloudWarmUpJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/cloud/CloudWarmUpJobTest.java index add89a0c51c87d..3da0946113872b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/cloud/CloudWarmUpJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/cloud/CloudWarmUpJobTest.java @@ -18,23 +18,52 @@ package org.apache.doris.cloud; import org.apache.doris.catalog.Env; +import org.apache.doris.cloud.CloudWarmUpJob.JobState; import org.apache.doris.cloud.CloudWarmUpJob.JobType; import org.apache.doris.cloud.CloudWarmUpJob.SyncEvent; import org.apache.doris.cloud.CloudWarmUpJob.SyncMode; import org.apache.doris.cloud.system.CloudSystemInfoService; +import org.apache.doris.common.ClientPool; +import org.apache.doris.common.GenericPool; import org.apache.doris.system.Backend; +import org.apache.doris.thrift.BackendService; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TWarmUpTabletsRequest; +import org.apache.doris.thrift.TWarmUpTabletsRequestType; +import org.apache.doris.thrift.TWarmUpTabletsResponse; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; import org.mockito.MockedStatic; import org.mockito.Mockito; +import java.lang.reflect.Method; import java.util.Arrays; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; public class CloudWarmUpJobTest { + private GenericPool originalBackendPool; + private GenericPool mockBackendPool; + + @SuppressWarnings("unchecked") + @Before + public void setUp() { + originalBackendPool = ClientPool.backendPool; + mockBackendPool = Mockito.mock(GenericPool.class); + ClientPool.backendPool = mockBackendPool; + } + + @After + public void tearDown() { + ClientPool.backendPool = originalBackendPool; + } + @Test public void testEventDrivenRefreshesSourceBackends() { CloudSystemInfoService cloudSystemInfoService = Mockito.mock(CloudSystemInfoService.class); @@ -70,4 +99,65 @@ public void testEventDrivenRefreshesSourceBackends() { Assert.assertEquals("host1:9060", warmUpJob.getBeToThriftAddress().get(1L)); Assert.assertEquals("host2:9061", warmUpJob.getBeToThriftAddress().get(2L)); } + + @Test + public void testInitClientsKeepsFailFastForWarmUpRpc() throws Exception { + long jobId = 12345L; + TNetworkAddress firstAddress = new TNetworkAddress("127.0.0.1", 9050); + TNetworkAddress secondAddress = new TNetworkAddress("127.0.0.2", 9050); + BackendService.Client firstClient = Mockito.mock(BackendService.Client.class); + CloudWarmUpJob job = createRunningJob(jobId, firstAddress, secondAddress); + + Mockito.when(mockBackendPool.borrowObject(firstAddress)).thenReturn(firstClient); + Mockito.when(mockBackendPool.borrowObject(secondAddress)).thenThrow(new RuntimeException("down")); + + Assert.assertThrows(RuntimeException.class, job::initClients); + Mockito.verify(mockBackendPool).returnObject(firstAddress, firstClient); + Mockito.verify(mockBackendPool).invalidateObject(secondAddress, null); + } + + @Test + public void testClearJobSkipsUnavailableBackendAndClearsAvailableBackend() throws Exception { + long jobId = 12346L; + TNetworkAddress unavailableAddress = new TNetworkAddress("127.0.0.1", 9050); + TNetworkAddress availableAddress = new TNetworkAddress("127.0.0.2", 9050); + BackendService.Client availableClient = Mockito.mock(BackendService.Client.class); + CloudWarmUpJob job = createRunningJob(jobId, unavailableAddress, availableAddress); + + Mockito.when(mockBackendPool.borrowObject(unavailableAddress)).thenThrow(new RuntimeException("down")); + Mockito.when(mockBackendPool.borrowObject(availableAddress)).thenReturn(availableClient); + Mockito.when(availableClient.warmUpTablets(Mockito.any(TWarmUpTabletsRequest.class))) + .thenReturn(new TWarmUpTabletsResponse()); + + invokeClearJobOnBEs(job); + + ArgumentCaptor captor = ArgumentCaptor.forClass(TWarmUpTabletsRequest.class); + Mockito.verify(availableClient).warmUpTablets(captor.capture()); + TWarmUpTabletsRequest request = captor.getValue(); + Assert.assertEquals(TWarmUpTabletsRequestType.CLEAR_JOB, request.getType()); + Assert.assertEquals(jobId, request.getJobId()); + Mockito.verify(mockBackendPool).returnObject(availableAddress, availableClient); + Mockito.verify(mockBackendPool).invalidateObject(unavailableAddress, null); + } + + private CloudWarmUpJob createRunningJob(long jobId, TNetworkAddress firstAddress, + TNetworkAddress secondAddress) { + CloudWarmUpJob job = new CloudWarmUpJob.Builder() + .setJobId(jobId) + .setSrcClusterName("source_cluster") + .setDstClusterName("target_cluster") + .build(); + job.setJobState(JobState.RUNNING); + Map beToThriftAddress = new LinkedHashMap<>(); + beToThriftAddress.put(1L, firstAddress.getHostname() + ":" + firstAddress.getPort()); + beToThriftAddress.put(2L, secondAddress.getHostname() + ":" + secondAddress.getPort()); + job.setBeToThriftAddress(beToThriftAddress); + return job; + } + + private void invokeClearJobOnBEs(CloudWarmUpJob job) throws Exception { + Method method = CloudWarmUpJob.class.getDeclaredMethod("clearJobOnBEs"); + method.setAccessible(true); + method.invoke(job); + } }