diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java index 1eb309674056e..9c88bf3f8b7d0 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java @@ -151,8 +151,7 @@ public static void verifySuccessWithRedirectionForMultiDevices( } public static void verifySuccess(List statuses) throws BatchExecutionException { - StringBuilder errMsgs = - new StringBuilder().append(TSStatusCode.MULTIPLE_ERROR.getStatusCode()).append(": "); + StringBuilder errMsgs = new StringBuilder(); for (TSStatus status : statuses) { if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { @@ -160,7 +159,8 @@ public static void verifySuccess(List statuses) throws BatchExecutionE } } if (errMsgs.length() > 0) { - throw new BatchExecutionException(statuses, errMsgs.toString()); + throw new BatchExecutionException( + statuses, TSStatusCode.MULTIPLE_ERROR.getStatusCode() + ": " + errMsgs); } } @@ -181,9 +181,9 @@ public static TSStatus getStatus(List statusList) { for (TSStatus subStatus : statusList) { if (subStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && subStatus.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { - if (!msgSet.contains(status)) { - errMsg.append(status).append("; "); - msgSet.add(status); + if (!msgSet.contains(subStatus)) { + errMsg.append(subStatus).append("; "); + msgSet.add(subStatus); } } } diff --git a/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/RpcUtilsTest.java b/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/RpcUtilsTest.java index a698027607455..940c50453a26b 100644 --- a/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/RpcUtilsTest.java +++ b/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/RpcUtilsTest.java @@ -19,11 +19,15 @@ package org.apache.iotdb.rpc; +import org.apache.iotdb.common.rpc.thrift.TSStatus; + import org.junit.Assert; import org.junit.Test; import java.time.ZoneId; import java.time.format.DateTimeFormatter; +import java.util.Arrays; +import java.util.Collections; public class RpcUtilsTest { @@ -74,4 +78,25 @@ public void testIsSetSqlDialect() { Assert.assertFalse(RpcUtils.isSetSqlDialect("setsql_dialect =table")); Assert.assertFalse(RpcUtils.isSetSqlDialect("set sql_dia")); } + + @Test + public void testVerifySuccessListAllowsSuccessfulStatuses() throws BatchExecutionException { + RpcUtils.verifySuccess( + Arrays.asList( + RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS), + RpcUtils.getStatus(TSStatusCode.REDIRECTION_RECOMMEND))); + } + + @Test + public void testVerifySuccessListThrowsOnFailure() { + TSStatus failedStatus = RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, "failed"); + + try { + RpcUtils.verifySuccess(Collections.singletonList(failedStatus)); + Assert.fail("Expected BatchExecutionException"); + } catch (BatchExecutionException e) { + Assert.assertEquals(Collections.singletonList(failedStatus), e.getStatusList()); + Assert.assertTrue(e.getMessage().contains("failed")); + } + } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java index d15b45cbcd592..5c4b4c3175eff 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java @@ -54,8 +54,8 @@ public void getAINodeHeartBeat( client = clientManager.borrowClient(endPoint); client.getAIHeartbeat(req, handler); dispatched = true; - } catch (Exception ignore) { - // Just ignore + } catch (Exception e) { + handleError(handler, e); } finally { // After the async call is dispatched, the client's onComplete/onError callback is // responsible for returning the client. If the RPC was not dispatched (exception @@ -67,6 +67,14 @@ public void getAINodeHeartBeat( } } + private void handleError(final AINodeHeartbeatHandler handler, final Exception e) { + try { + handler.onError(e); + } catch (final Exception ignore) { + // Ignore handler failures in heartbeat best-effort path. + } + } + private static class AsyncAINodeHeartbeatClientPoolHolder { private static final AsyncAINodeHeartbeatClientPool INSTANCE = diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java index 5cf716d0dd164..ab30935753b0a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java @@ -55,8 +55,8 @@ public void getConfigNodeHeartBeat( client = clientManager.borrowClient(endPoint); client.getConfigNodeHeartBeat(heartbeatReq, handler); dispatched = true; - } catch (Exception ignore) { - // Just ignore + } catch (Exception e) { + handleError(handler, e); } finally { // After the async call is dispatched, the client's onComplete/onError callback is // responsible for returning the client. If the RPC was not dispatched (exception @@ -68,6 +68,14 @@ public void getConfigNodeHeartBeat( } } + private void handleError(final ConfigNodeHeartbeatHandler handler, final Exception e) { + try { + handler.onError(e); + } catch (final Exception ignore) { + // Ignore handler failures in heartbeat best-effort path. + } + } + private static class AsyncConfigNodeHeartbeatClientPoolHolder { private static final AsyncConfigNodeHeartbeatClientPool INSTANCE = diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java index d31ec405e0c77..516596a61b8a9 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java @@ -56,8 +56,8 @@ public void getDataNodeHeartBeat( client = clientManager.borrowClient(endPoint); client.getDataNodeHeartBeat(req, handler); dispatched = true; - } catch (Exception ignore) { - // Just ignore + } catch (Exception e) { + handleError(handler, e); } finally { returnClientIfNotDispatched(endPoint, client, dispatched); } @@ -72,7 +72,7 @@ public void writeAuditLog( client.writeAuditLog(req, handler); dispatched = true; } catch (Exception e) { - // Just ignore + handleError(handler, e); } finally { returnClientIfNotDispatched(endPoint, client, dispatched); } @@ -89,6 +89,22 @@ private void returnClientIfNotDispatched( } } + private void handleError(final DataNodeHeartbeatHandler handler, final Exception e) { + try { + handler.onError(e); + } catch (final Exception ignore) { + // Ignore handler failures in heartbeat best-effort path. + } + } + + private void handleError(final DataNodeWriteAuditLogHandler handler, final Exception e) { + try { + handler.onError(e); + } catch (final Exception ignore) { + // Ignore handler failures in audit-log best-effort path. + } + } + private static class AsyncDataNodeHeartbeatClientPoolHolder { private static final AsyncDataNodeHeartbeatClientPool INSTANCE = diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CheckTimeSeriesExistenceRPCHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CheckTimeSeriesExistenceRPCHandler.java index 3a1c47e174673..918d206fc3704 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CheckTimeSeriesExistenceRPCHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CheckTimeSeriesExistenceRPCHandler.java @@ -83,11 +83,11 @@ public void onError(Exception e) { + e.getMessage(); LOGGER.error(errorMsg); - countDownLatch.countDown(); TCheckTimeSeriesExistenceResp resp = new TCheckTimeSeriesExistenceResp(); resp.setStatus( new TSStatus( RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), errorMsg))); responseMap.put(requestId, resp); + countDownLatch.countDown(); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CountPathsUsingTemplateRPCHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CountPathsUsingTemplateRPCHandler.java index 45574e131e70a..ea2b342f5976a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CountPathsUsingTemplateRPCHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CountPathsUsingTemplateRPCHandler.java @@ -83,11 +83,11 @@ public void onError(Exception e) { + e.getMessage(); LOGGER.error(errorMsg); - countDownLatch.countDown(); TCountPathsUsingTemplateResp resp = new TCountPathsUsingTemplateResp(); resp.setStatus( new TSStatus( RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), errorMsg))); responseMap.put(requestId, resp); + countDownLatch.countDown(); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/FetchSchemaBlackListRPCHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/FetchSchemaBlackListRPCHandler.java index 44f8362585cf9..a0f8957bbb4cd 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/FetchSchemaBlackListRPCHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/FetchSchemaBlackListRPCHandler.java @@ -83,11 +83,11 @@ public void onError(Exception e) { + e.getMessage(); LOGGER.error(errorMsg); - countDownLatch.countDown(); TFetchSchemaBlackListResp resp = new TFetchSchemaBlackListResp(); resp.setStatus( new TSStatus( RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), errorMsg))); responseMap.put(requestId, resp); + countDownLatch.countDown(); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/SchemaUpdateRPCHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/SchemaUpdateRPCHandler.java index 84d848a484ec5..ea1de3b6274d8 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/SchemaUpdateRPCHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/SchemaUpdateRPCHandler.java @@ -77,10 +77,10 @@ public void onError(Exception e) { + e.getMessage(); LOGGER.warn(errorMsg); - countDownLatch.countDown(); responseMap.put( requestId, new TSStatus( RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), errorMsg))); + countDownLatch.countDown(); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/subscription/CheckSchemaRegionUsingTemplateRPCHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/subscription/CheckSchemaRegionUsingTemplateRPCHandler.java index 6d3d1bf500746..e2b1d7bc83615 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/subscription/CheckSchemaRegionUsingTemplateRPCHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/subscription/CheckSchemaRegionUsingTemplateRPCHandler.java @@ -85,11 +85,11 @@ public void onError(Exception e) { + e.getMessage(); LOGGER.error(errorMsg); - countDownLatch.countDown(); TCheckSchemaRegionUsingTemplateResp resp = new TCheckSchemaRegionUsingTemplateResp(); resp.setStatus( new TSStatus( RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), errorMsg))); responseMap.put(requestId, resp); + countDownLatch.countDown(); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java index 6d334521deda3..b99cbd000173a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java @@ -44,6 +44,7 @@ import org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber; import org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent; import org.apache.iotdb.mpp.rpc.thrift.TUpdateClusterTopologyReq; +import org.apache.iotdb.rpc.TSStatusCode; import org.apache.ratis.util.AwaitForSignal; import org.apache.tsfile.utils.Pair; @@ -72,6 +73,7 @@ public class TopologyService implements Runnable, IClusterStatusSubscriber { private static final Logger LOGGER = LoggerFactory.getLogger(TopologyService.class); private static final int SAMPLING_WINDOW_SIZE = 100; + private static final int TOPOLOGY_PROBING_RETRY_NUM = 1; private final ExecutorService topologyThread = IoTDBThreadPoolFactory.newSingleThreadExecutor( @@ -222,7 +224,7 @@ private synchronized void topologyProbing() { nodeLocations, proberLocationMap); CnToDnInternalServiceAsyncRequestManager.getInstance() - .sendAsyncRequestWithTimeoutInMs(dataNodeAsyncRequestContext, timeout); + .sendAsyncRequest(dataNodeAsyncRequestContext, TOPOLOGY_PROBING_RETRY_NUM, timeout, true); final List results = new ArrayList<>(); dataNodeAsyncRequestContext .getResponseMap() @@ -360,15 +362,18 @@ private void pushTopologyToDataNodes( } CnToDnInternalServiceAsyncRequestManager.getInstance() - .sendAsyncRequestWithTimeoutInMs(context, CONF.getTopologyProbingBaseIntervalInMs()); + .sendAsyncRequest( + context, TOPOLOGY_PROBING_RETRY_NUM, CONF.getTopologyProbingBaseIntervalInMs(), true); context .getResponseMap() .forEach( (nodeId, resp) -> { - Set reachableSet = - computedTopology.getOrDefault(nodeId, Collections.emptySet()); - lastPushedTopology.put(nodeId, new HashSet<>(reachableSet)); + if (resp.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + Set reachableSet = + computedTopology.getOrDefault(nodeId, Collections.emptySet()); + lastPushedTopology.put(nodeId, new HashSet<>(reachableSet)); + } }); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java index bbdaee8e12c3e..3dd1d0f6a13f3 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java @@ -51,6 +51,7 @@ public class PipeHeartbeatScheduler { PipeConfig.getInstance().isSeperatedPipeHeartbeatEnabled(); private static final long HEARTBEAT_INTERVAL_SECONDS = PipeConfig.getInstance().getPipeHeartbeatIntervalSecondsForCollectingPipeMeta(); + private static final int PIPE_HEARTBEAT_RETRY_NUM = 1; private static final ScheduledExecutorService HEARTBEAT_EXECUTOR = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( @@ -100,12 +101,8 @@ private synchronized void heartbeat() { new DataNodeAsyncRequestContext<>( CnToDnAsyncRequestType.PIPE_HEARTBEAT, request, dataNodeLocationMap); CnToDnInternalServiceAsyncRequestManager.getInstance() - .sendAsyncRequestToNodeWithRetryAndTimeoutInMs( - clientHandler, - PipeConfig.getInstance().getPipeHeartbeatIntervalSecondsForCollectingPipeMeta() - * 1000L - * 2 - / 3); + .sendAsyncRequest( + clientHandler, PIPE_HEARTBEAT_RETRY_NUM, getPipeHeartbeatRequestTimeoutInMs(), true); clientHandler .getResponseMap() .forEach( @@ -134,6 +131,10 @@ private synchronized void heartbeat() { } } + private static long getPipeHeartbeatRequestTimeoutInMs() { + return TimeUnit.SECONDS.toMillis(HEARTBEAT_INTERVAL_SECONDS) * 2 / 3; + } + public synchronized void stop() { if (IS_SEPERATED_PIPE_HEARTBEAT_ENABLED && heartbeatFuture != null) { heartbeatFuture.cancel(false); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java index 4105b8ecf5c2c..ab4f367df9242 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java @@ -30,6 +30,7 @@ import org.apache.iotdb.commons.cluster.NodeType; import org.apache.iotdb.commons.pipe.agent.plugin.meta.PipePluginMeta; import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.commons.subscription.config.SubscriptionConfig; import org.apache.iotdb.commons.trigger.TriggerInformation; import org.apache.iotdb.confignode.client.CnToCnNodeRequestType; import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType; @@ -110,6 +111,8 @@ public class ConfigNodeProcedureEnv { private static final Logger LOG = LoggerFactory.getLogger(ConfigNodeProcedureEnv.class); + private static final int RUNTIME_META_PUSH_RETRY_NUM = 1; + /** Add or remove node lock. */ private final LockQueue nodeLock = new LockQueue(); @@ -655,10 +658,23 @@ public Map pushAllPipeMetaToDataNodes( final DataNodeAsyncRequestContext clientHandler = new DataNodeAsyncRequestContext<>( CnToDnAsyncRequestType.PIPE_PUSH_ALL_META, request, dataNodeLocationMap); - CnToDnInternalServiceAsyncRequestManager.getInstance() - .sendAsyncRequestToNodeWithRetryAndTimeoutInMs( - clientHandler, - PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 60 * 1000 * 2 / 3); + final long timeoutInMs = getRequiredPipeMetadataRequestTimeoutInMs(); + sendRequiredMetadataRequest(clientHandler, timeoutInMs); + fillMissingPipePushMetaResponses(clientHandler, timeoutInMs); + return clientHandler.getResponseMap(); + } + + public Map pushAllPipeMetaToDataNodesBestEffort( + List pipeMetaBinaryList) { + final Map dataNodeLocationMap = + configManager.getNodeManager().getRegisteredDataNodeLocations(); + final TPushPipeMetaReq request = new TPushPipeMetaReq().setPipeMetas(pipeMetaBinaryList); + + final DataNodeAsyncRequestContext clientHandler = + new DataNodeAsyncRequestContext<>( + CnToDnAsyncRequestType.PIPE_PUSH_ALL_META, request, dataNodeLocationMap); + final long timeoutInMs = sendBestEffortRuntimeMetaRequest(clientHandler); + fillMissingPipePushMetaResponses(clientHandler, timeoutInMs); return clientHandler.getResponseMap(); } @@ -670,10 +686,9 @@ public Map pushSinglePipeMetaToDataNodes(ByteBuffer final DataNodeAsyncRequestContext clientHandler = new DataNodeAsyncRequestContext<>( CnToDnAsyncRequestType.PIPE_PUSH_SINGLE_META, request, dataNodeLocationMap); - CnToDnInternalServiceAsyncRequestManager.getInstance() - .sendAsyncRequestToNodeWithRetryAndTimeoutInMs( - clientHandler, - PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 60 * 1000 * 2 / 3); + final long timeoutInMs = getRequiredPipeMetadataRequestTimeoutInMs(); + sendRequiredMetadataRequest(clientHandler, timeoutInMs); + fillMissingPipePushMetaResponses(clientHandler, timeoutInMs); return clientHandler.getResponseMap(); } @@ -686,10 +701,9 @@ public Map dropSinglePipeOnDataNodes(String pipeName final DataNodeAsyncRequestContext clientHandler = new DataNodeAsyncRequestContext<>( CnToDnAsyncRequestType.PIPE_PUSH_SINGLE_META, request, dataNodeLocationMap); - CnToDnInternalServiceAsyncRequestManager.getInstance() - .sendAsyncRequestToNodeWithRetryAndTimeoutInMs( - clientHandler, - PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 60 * 1000 * 2 / 3); + final long timeoutInMs = getRequiredPipeMetadataRequestTimeoutInMs(); + sendRequiredMetadataRequest(clientHandler, timeoutInMs); + fillMissingPipePushMetaResponses(clientHandler, timeoutInMs); return clientHandler.getResponseMap(); } @@ -703,10 +717,9 @@ public Map pushMultiPipeMetaToDataNodes( final DataNodeAsyncRequestContext clientHandler = new DataNodeAsyncRequestContext<>( CnToDnAsyncRequestType.PIPE_PUSH_MULTI_META, request, dataNodeLocationMap); - CnToDnInternalServiceAsyncRequestManager.getInstance() - .sendAsyncRequestToNodeWithRetryAndTimeoutInMs( - clientHandler, - PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 60 * 1000 * 2 / 3); + final long timeoutInMs = getRequiredPipeMetadataRequestTimeoutInMs(); + sendRequiredMetadataRequest(clientHandler, timeoutInMs); + fillMissingPipePushMetaResponses(clientHandler, timeoutInMs); return clientHandler.getResponseMap(); } @@ -719,10 +732,9 @@ public Map dropMultiPipeOnDataNodes(List pip final DataNodeAsyncRequestContext clientHandler = new DataNodeAsyncRequestContext<>( CnToDnAsyncRequestType.PIPE_PUSH_MULTI_META, request, dataNodeLocationMap); - CnToDnInternalServiceAsyncRequestManager.getInstance() - .sendAsyncRequestToNodeWithRetryAndTimeoutInMs( - clientHandler, - PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 60 * 1000 * 2 / 3); + final long timeoutInMs = getRequiredPipeMetadataRequestTimeoutInMs(); + sendRequiredMetadataRequest(clientHandler, timeoutInMs); + fillMissingPipePushMetaResponses(clientHandler, timeoutInMs); return clientHandler.getResponseMap(); } @@ -735,10 +747,23 @@ public Map pushAllTopicMetaToDataNodes( final DataNodeAsyncRequestContext clientHandler = new DataNodeAsyncRequestContext<>( CnToDnAsyncRequestType.TOPIC_PUSH_ALL_META, request, dataNodeLocationMap); - CnToDnInternalServiceAsyncRequestManager.getInstance() - .sendAsyncRequestToNodeWithRetryAndTimeoutInMs( - clientHandler, - PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 60 * 1000 * 2 / 3); + final long timeoutInMs = getRequiredSubscriptionMetadataRequestTimeoutInMs(); + sendRequiredMetadataRequest(clientHandler, timeoutInMs); + fillMissingTopicPushMetaResponses(clientHandler, timeoutInMs); + return clientHandler.getResponseMap(); + } + + public Map pushAllTopicMetaToDataNodesBestEffort( + List topicMetaBinaryList) { + final Map dataNodeLocationMap = + configManager.getNodeManager().getRegisteredDataNodeLocations(); + final TPushTopicMetaReq request = new TPushTopicMetaReq().setTopicMetas(topicMetaBinaryList); + + final DataNodeAsyncRequestContext clientHandler = + new DataNodeAsyncRequestContext<>( + CnToDnAsyncRequestType.TOPIC_PUSH_ALL_META, request, dataNodeLocationMap); + final long timeoutInMs = sendBestEffortRuntimeMetaRequest(clientHandler); + fillMissingTopicPushMetaResponses(clientHandler, timeoutInMs); return clientHandler.getResponseMap(); } @@ -750,7 +775,9 @@ public List pushSingleTopicOnDataNode(ByteBuffer topicMeta) { final DataNodeAsyncRequestContext clientHandler = new DataNodeAsyncRequestContext<>( CnToDnAsyncRequestType.TOPIC_PUSH_SINGLE_META, request, dataNodeLocationMap); - CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler); + final long timeoutInMs = getRequiredSubscriptionMetadataRequestTimeoutInMs(); + sendRequiredMetadataRequest(clientHandler, timeoutInMs); + fillMissingTopicPushMetaResponses(clientHandler, timeoutInMs); return clientHandler.getResponseList().stream() .map(TPushTopicMetaResp::getStatus) .collect(Collectors.toList()); @@ -765,7 +792,9 @@ public List dropSingleTopicOnDataNode(String topicNameToDrop) { final DataNodeAsyncRequestContext clientHandler = new DataNodeAsyncRequestContext<>( CnToDnAsyncRequestType.TOPIC_PUSH_SINGLE_META, request, dataNodeLocationMap); - CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler); + final long timeoutInMs = getRequiredSubscriptionMetadataRequestTimeoutInMs(); + sendRequiredMetadataRequest(clientHandler, timeoutInMs); + fillMissingTopicPushMetaResponses(clientHandler, timeoutInMs); return clientHandler.getResponseList().stream() .map(TPushTopicMetaResp::getStatus) .collect(Collectors.toList()); @@ -781,10 +810,9 @@ public Map pushMultiTopicMetaToDataNodes( final DataNodeAsyncRequestContext clientHandler = new DataNodeAsyncRequestContext<>( CnToDnAsyncRequestType.TOPIC_PUSH_MULTI_META, request, dataNodeLocationMap); - CnToDnInternalServiceAsyncRequestManager.getInstance() - .sendAsyncRequestToNodeWithRetryAndTimeoutInMs( - clientHandler, - PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 60 * 1000 * 2 / 3); + final long timeoutInMs = getRequiredSubscriptionMetadataRequestTimeoutInMs(); + sendRequiredMetadataRequest(clientHandler, timeoutInMs); + fillMissingTopicPushMetaResponses(clientHandler, timeoutInMs); return clientHandler.getResponseMap(); } @@ -797,10 +825,9 @@ public Map dropMultiTopicOnDataNodes(List t final DataNodeAsyncRequestContext clientHandler = new DataNodeAsyncRequestContext<>( CnToDnAsyncRequestType.TOPIC_PUSH_MULTI_META, request, dataNodeLocationMap); - CnToDnInternalServiceAsyncRequestManager.getInstance() - .sendAsyncRequestToNodeWithRetryAndTimeoutInMs( - clientHandler, - PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 60 * 1000 * 2 / 3); + final long timeoutInMs = getRequiredSubscriptionMetadataRequestTimeoutInMs(); + sendRequiredMetadataRequest(clientHandler, timeoutInMs); + fillMissingTopicPushMetaResponses(clientHandler, timeoutInMs); return clientHandler.getResponseMap(); } @@ -815,10 +842,25 @@ public Map pushAllConsumerGroupMetaToDataNo clientHandler = new DataNodeAsyncRequestContext<>( CnToDnAsyncRequestType.CONSUMER_GROUP_PUSH_ALL_META, request, dataNodeLocationMap); - CnToDnInternalServiceAsyncRequestManager.getInstance() - .sendAsyncRequestToNodeWithRetryAndTimeoutInMs( - clientHandler, - PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 60 * 1000 * 2 / 3); + final long timeoutInMs = getRequiredSubscriptionMetadataRequestTimeoutInMs(); + sendRequiredMetadataRequest(clientHandler, timeoutInMs); + fillMissingConsumerGroupPushMetaResponses(clientHandler, timeoutInMs); + return clientHandler.getResponseMap(); + } + + public Map pushAllConsumerGroupMetaToDataNodesBestEffort( + List consumerGroupMetaBinaryList) { + final Map dataNodeLocationMap = + configManager.getNodeManager().getRegisteredDataNodeLocations(); + final TPushConsumerGroupMetaReq request = + new TPushConsumerGroupMetaReq().setConsumerGroupMetas(consumerGroupMetaBinaryList); + + final DataNodeAsyncRequestContext + clientHandler = + new DataNodeAsyncRequestContext<>( + CnToDnAsyncRequestType.CONSUMER_GROUP_PUSH_ALL_META, request, dataNodeLocationMap); + final long timeoutInMs = sendBestEffortRuntimeMetaRequest(clientHandler); + fillMissingConsumerGroupPushMetaResponses(clientHandler, timeoutInMs); return clientHandler.getResponseMap(); } @@ -834,7 +876,9 @@ public List pushSingleConsumerGroupOnDataNode(ByteBuffer consumerGroup CnToDnAsyncRequestType.CONSUMER_GROUP_PUSH_SINGLE_META, request, dataNodeLocationMap); - CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler); + final long timeoutInMs = getRequiredSubscriptionMetadataRequestTimeoutInMs(); + sendRequiredMetadataRequest(clientHandler, timeoutInMs); + fillMissingConsumerGroupPushMetaResponses(clientHandler, timeoutInMs); return clientHandler.getResponseList().stream() .map(TPushConsumerGroupMetaResp::getStatus) .collect(Collectors.toList()); @@ -852,7 +896,9 @@ public List dropSingleConsumerGroupOnDataNode(String consumerGroupName CnToDnAsyncRequestType.CONSUMER_GROUP_PUSH_SINGLE_META, request, dataNodeLocationMap); - CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler); + final long timeoutInMs = getRequiredSubscriptionMetadataRequestTimeoutInMs(); + sendRequiredMetadataRequest(clientHandler, timeoutInMs); + fillMissingConsumerGroupPushMetaResponses(clientHandler, timeoutInMs); return clientHandler.getResponseList().stream() .map(TPushConsumerGroupMetaResp::getStatus) .collect(Collectors.toList()); @@ -867,10 +913,23 @@ public Map pullCommitProgressFromDataNodes() { clientHandler = new DataNodeAsyncRequestContext<>( CnToDnAsyncRequestType.PULL_COMMIT_PROGRESS, request, dataNodeLocationMap); - CnToDnInternalServiceAsyncRequestManager.getInstance() - .sendAsyncRequestToNodeWithRetryAndTimeoutInMs( - clientHandler, - PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 60 * 1000 * 2 / 3); + final long timeoutInMs = getRequiredSubscriptionMetadataRequestTimeoutInMs(); + sendRequiredMetadataRequest(clientHandler, timeoutInMs); + fillMissingPullCommitProgressResponses(clientHandler, timeoutInMs); + return clientHandler.getResponseMap(); + } + + public Map pullCommitProgressFromDataNodesBestEffort() { + final Map dataNodeLocationMap = + configManager.getNodeManager().getRegisteredDataNodeLocations(); + final TPullCommitProgressReq request = new TPullCommitProgressReq(); + + final DataNodeAsyncRequestContext + clientHandler = + new DataNodeAsyncRequestContext<>( + CnToDnAsyncRequestType.PULL_COMMIT_PROGRESS, request, dataNodeLocationMap); + final long timeoutInMs = sendBestEffortRuntimeMetaRequest(clientHandler); + fillMissingPullCommitProgressResponses(clientHandler, timeoutInMs); return clientHandler.getResponseMap(); } @@ -884,8 +943,12 @@ public Map pushSubscriptionRuntimeStatesToDataNodes( final Set readableDataNodeIds = getLoadManager().filterDataNodeThroughStatus(NodeStatus::isReadable).stream() .collect(Collectors.toSet()); - final DataNodeAsyncRequestContext clientHandler = - new DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.SUBSCRIPTION_PUSH_RUNTIME); + final DataNodeAsyncRequestContext + readableDataNodeClientHandler = + new DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.SUBSCRIPTION_PUSH_RUNTIME); + final DataNodeAsyncRequestContext + unreadableDataNodeClientHandler = + new DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.SUBSCRIPTION_PUSH_RUNTIME); dataNodeLocationMap.forEach( (dataNodeId, dataNodeLocation) -> { @@ -919,16 +982,25 @@ && isRuntimeActiveWriterNode(oldLeaderNodeId)) { preferredWriterNodeId == dataNodeId, new ArrayList<>(activeWriterNodeIds))); }); + final DataNodeAsyncRequestContext clientHandler = + readableDataNodeIds.contains(dataNodeId) + ? readableDataNodeClientHandler + : unreadableDataNodeClientHandler; clientHandler.putNodeLocation(dataNodeId, dataNodeLocation); clientHandler.putRequest( dataNodeId, new TPushSubscriptionRuntimeReq().setRuntimeStates(runtimeStates)); }); - CnToDnInternalServiceAsyncRequestManager.getInstance() - .sendAsyncRequestToNodeWithRetryAndTimeoutInMs( - clientHandler, - PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 60 * 1000 * 2 / 3); - return clientHandler.getResponseMap(); + final long readableTimeoutInMs = sendRequiredRuntimeMetaRequest(readableDataNodeClientHandler); + fillMissingSubscriptionRuntimeResponses(readableDataNodeClientHandler, readableTimeoutInMs); + final long unreadableTimeoutInMs = + sendBestEffortRuntimeMetaRequest(unreadableDataNodeClientHandler); + fillMissingSubscriptionRuntimeResponses(unreadableDataNodeClientHandler, unreadableTimeoutInMs); + + final Map responseMap = + new HashMap<>(readableDataNodeClientHandler.getResponseMap()); + responseMap.putAll(unreadableDataNodeClientHandler.getResponseMap()); + return responseMap; } private boolean isRuntimeActiveWriterNode(final int dataNodeId) { @@ -937,6 +1009,164 @@ && getLoadManager().getNodeStatus(dataNodeId) != NodeStatus.Unknown && getLoadManager().getNodeStatus(dataNodeId) != NodeStatus.Removing; } + private static long sendBestEffortRuntimeMetaRequest( + final DataNodeAsyncRequestContext clientHandler) { + final long timeoutInMs = getRuntimeMetaPushTimeoutInMs(); + sendRuntimeMetaRequest(clientHandler, true, timeoutInMs); + return timeoutInMs; + } + + private static long sendRequiredRuntimeMetaRequest( + final DataNodeAsyncRequestContext clientHandler) { + final long timeoutInMs = getRuntimeMetaPushTimeoutInMs(); + sendRuntimeMetaRequest(clientHandler, false, timeoutInMs); + return timeoutInMs; + } + + private static void sendRequiredMetadataRequest( + final DataNodeAsyncRequestContext clientHandler, final long timeoutInMs) { + sendRuntimeMetaRequest(clientHandler, false, timeoutInMs); + } + + private static void sendRuntimeMetaRequest( + final DataNodeAsyncRequestContext clientHandler, final boolean keepSilent) { + sendRuntimeMetaRequest(clientHandler, keepSilent, getRuntimeMetaPushTimeoutInMs()); + } + + private static void sendRuntimeMetaRequest( + final DataNodeAsyncRequestContext clientHandler, + final boolean keepSilent, + final long timeoutInMs) { + CnToDnInternalServiceAsyncRequestManager.getInstance() + .sendAsyncRequest(clientHandler, RUNTIME_META_PUSH_RETRY_NUM, timeoutInMs, keepSilent); + } + + private static void fillMissingPipePushMetaResponses( + final DataNodeAsyncRequestContext clientHandler, + final long timeoutInMs) { + clientHandler + .getRequestIndices() + .forEach( + dataNodeId -> + clientHandler + .getResponseMap() + .putIfAbsent( + dataNodeId, + new TPushPipeMetaResp( + createMetadataTimeoutStatus( + clientHandler.getRequestType(), + dataNodeId, + TSStatusCode.PIPE_PUSH_META_ERROR, + timeoutInMs)))); + } + + private static void fillMissingTopicPushMetaResponses( + final DataNodeAsyncRequestContext clientHandler, + final long timeoutInMs) { + clientHandler + .getRequestIndices() + .forEach( + dataNodeId -> + clientHandler + .getResponseMap() + .putIfAbsent( + dataNodeId, + new TPushTopicMetaResp( + createMetadataTimeoutStatus( + clientHandler.getRequestType(), + dataNodeId, + TSStatusCode.TOPIC_PUSH_META_ERROR, + timeoutInMs)))); + } + + private static void fillMissingConsumerGroupPushMetaResponses( + final DataNodeAsyncRequestContext clientHandler, + final long timeoutInMs) { + clientHandler + .getRequestIndices() + .forEach( + dataNodeId -> + clientHandler + .getResponseMap() + .putIfAbsent( + dataNodeId, + new TPushConsumerGroupMetaResp( + createMetadataTimeoutStatus( + clientHandler.getRequestType(), + dataNodeId, + TSStatusCode.CONSUMER_PUSH_META_ERROR, + timeoutInMs)))); + } + + private static void fillMissingPullCommitProgressResponses( + final DataNodeAsyncRequestContext clientHandler, + final long timeoutInMs) { + clientHandler + .getRequestIndices() + .forEach( + dataNodeId -> + clientHandler + .getResponseMap() + .putIfAbsent( + dataNodeId, + new TPullCommitProgressResp( + createMetadataTimeoutStatus( + clientHandler.getRequestType(), + dataNodeId, + TSStatusCode.EXECUTE_STATEMENT_ERROR, + timeoutInMs)))); + } + + private static void fillMissingSubscriptionRuntimeResponses( + final DataNodeAsyncRequestContext clientHandler, final long timeoutInMs) { + clientHandler + .getRequestIndices() + .forEach( + dataNodeId -> + clientHandler + .getResponseMap() + .putIfAbsent( + dataNodeId, + createMetadataTimeoutStatus( + clientHandler.getRequestType(), + dataNodeId, + TSStatusCode.EXECUTE_STATEMENT_ERROR, + timeoutInMs))); + } + + private static TSStatus createMetadataTimeoutStatus( + final CnToDnAsyncRequestType requestType, + final int dataNodeId, + final TSStatusCode statusCode, + final long timeoutInMs) { + return new TSStatus(statusCode.getStatusCode()) + .setMessage( + String.format( + "Failed to %s on DataNode %s before metadata request timeout %sms", + requestType, dataNodeId, timeoutInMs)); + } + + private static long getRuntimeMetaPushTimeoutInMs() { + return TimeUnit.SECONDS.toMillis( + PipeConfig.getInstance().getPipeHeartbeatIntervalSecondsForCollectingPipeMeta()) + * 2 + / 3; + } + + private static long getRequiredPipeMetadataRequestTimeoutInMs() { + return TimeUnit.MINUTES.toMillis( + PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes()) + * 2 + / 3; + } + + private static long getRequiredSubscriptionMetadataRequestTimeoutInMs() { + return TimeUnit.MINUTES.toMillis( + SubscriptionConfig.getInstance().getSubscriptionMetaSyncerSyncIntervalMinutes()) + * 2 + / 3; + } + public LockQueue getNodeLock() { return nodeLock; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java index 5679b36ffc506..0e72737f2650c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java @@ -487,10 +487,11 @@ public static String parsePushPipeMetaExceptionForPipe( if (resp.getStatus().getCode() == TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode()) { if (!resp.isSetExceptionMessages()) { + final String statusMessage = resp.getStatus().getMessage(); exceptionMessageBuilder.append( String.format( - "DataNodeId: %s, Message: Internal error while processing pushPipeMeta on dataNodes.", - dataNodeId)); + "DataNodeId: %s, Message: Internal error while processing pushPipeMeta on dataNodes.%s", + dataNodeId, statusMessage == null ? "" : " " + statusMessage)); continue; } @@ -538,6 +539,23 @@ protected void pushPipeMetaToDataNodesIgnoreException(ConfigNodeProcedureEnv env } } + protected Map pushPipeMetaToDataNodesBestEffortAndGetResponse( + ConfigNodeProcedureEnv env) throws IOException { + final List pipeMetaBinaryList = new ArrayList<>(); + for (final PipeMeta pipeMeta : pipeTaskInfo.get().getPipeMetaList()) { + pipeMetaBinaryList.add(copyAndFilterOutNonWorkingDataRegionPipeTasks(pipeMeta).serialize()); + } + return env.pushAllPipeMetaToDataNodesBestEffort(pipeMetaBinaryList); + } + + protected void pushPipeMetaToDataNodesBestEffort(ConfigNodeProcedureEnv env) { + try { + pushPipeMetaToDataNodesBestEffortAndGetResponse(env); + } catch (Exception e) { + LOGGER.info(ProcedureMessages.FAILED_TO_PUSH_PIPE_META_LIST_TO_DATA_NODES_WILL, e); + } + } + /** * Pushing one pipeMeta to all the dataNodes, forcing an update to the pipe's runtime state. * diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java index 2c5b9566fce61..f4b4324b6bb5f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java @@ -113,7 +113,7 @@ public void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) { public void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) { LOGGER.info(ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_EXECUTEFROMHANDLEONDATANODES); - pushPipeMetaToDataNodesIgnoreException(env); + pushPipeMetaToDataNodesBestEffort(env); } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java index a0dec36735265..f37ba237bdf70 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java @@ -131,7 +131,7 @@ public void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) { return; } - pushPipeMetaToDataNodesIgnoreException(env); + pushPipeMetaToDataNodesBestEffort(env); } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java index 33b909eae274e..bddcaf1c987e5 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java @@ -51,6 +51,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -64,7 +65,8 @@ public class PipeMetaSyncProcedure extends AbstractOperatePipeProcedureV2 { private static final Logger LOGGER = LoggerFactory.getLogger(PipeMetaSyncProcedure.class); private static final long MIN_EXECUTION_INTERVAL_MS = - PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 60 * 1000 / 2; + TimeUnit.MINUTES.toMillis(PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes()) + / 2; // No need to serialize this field private static final AtomicLong LAST_EXECUTION_TIME = new AtomicLong(0); @@ -201,7 +203,7 @@ public void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws PipeException, IOException { LOGGER.debug(ProcedureMessages.PIPEMETASYNCPROCEDURE_EXECUTEFROMOPERATEONDATANODES); - Map respMap = pushPipeMetaToDataNodes(env); + Map respMap = pushPipeMetaToDataNodesBestEffortAndGetResponse(env); if (pipeTaskInfo.get().recordDataNodePushPipeMetaExceptions(respMap)) { throw new PipeException( String.format( diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java index 9b0bd21750d8d..d6fa0de5d20f4 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java @@ -350,6 +350,16 @@ protected Map pushTopicMetaToDataNodes(ConfigNodePr return env.pushAllTopicMetaToDataNodes(topicMetaBinaryList); } + protected Map pushTopicMetaToDataNodesBestEffort( + ConfigNodeProcedureEnv env) throws IOException { + final List topicMetaBinaryList = new ArrayList<>(); + for (TopicMeta topicMeta : subscriptionInfo.get().getAllTopicMeta()) { + topicMetaBinaryList.add(topicMeta.serialize()); + } + + return env.pushAllTopicMetaToDataNodesBestEffort(topicMetaBinaryList); + } + public static boolean pushTopicMetaHasException(Map respMap) { for (TPushTopicMetaResp resp : respMap.values()) { if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { @@ -376,6 +386,16 @@ protected Map pushConsumerGroupMetaToDataNo return env.pushAllConsumerGroupMetaToDataNodes(consumerGroupMetaBinaryList); } + protected Map pushConsumerGroupMetaToDataNodesBestEffort( + ConfigNodeProcedureEnv env) throws IOException { + final List consumerGroupMetaBinaryList = new ArrayList<>(); + for (ConsumerGroupMeta consumerGroupMeta : subscriptionInfo.get().getAllConsumerGroupMeta()) { + consumerGroupMetaBinaryList.add(consumerGroupMeta.serialize()); + } + + return env.pushAllConsumerGroupMetaToDataNodesBestEffort(consumerGroupMetaBinaryList); + } + public static boolean pushConsumerGroupMetaHasException( Map respMap) { for (TPushConsumerGroupMetaResp resp : respMap.values()) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/CommitProgressSyncProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/CommitProgressSyncProcedure.java index e9b3056e66211..348412af1069e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/CommitProgressSyncProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/CommitProgressSyncProcedure.java @@ -20,7 +20,7 @@ package org.apache.iotdb.confignode.procedure.impl.subscription.consumer.runtime; import org.apache.iotdb.common.rpc.thrift.TSStatus; -import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.commons.subscription.config.SubscriptionConfig; import org.apache.iotdb.confignode.consensus.request.write.subscription.consumer.runtime.CommitProgressHandleMetaChangePlan; import org.apache.iotdb.confignode.persistence.subscription.SubscriptionInfo; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; @@ -47,6 +47,7 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.Objects; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -59,7 +60,9 @@ public class CommitProgressSyncProcedure extends AbstractOperateSubscriptionProc private static final Logger LOGGER = LoggerFactory.getLogger(CommitProgressSyncProcedure.class); private static final long MIN_EXECUTION_INTERVAL_MS = - PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 60 * 1000 / 2; + TimeUnit.MINUTES.toMillis( + SubscriptionConfig.getInstance().getSubscriptionMetaSyncerSyncIntervalMinutes()) + / 2; private static final AtomicLong LAST_EXECUTION_TIME = new AtomicLong(0); public CommitProgressSyncProcedure() { @@ -106,7 +109,8 @@ public void executeFromOperateOnConfigNodes(ConfigNodeProcedureEnv env) LOGGER.info("CommitProgressSyncProcedure: executeFromOperateOnConfigNodes"); // 1. Pull commit progress from all DataNodes - final Map respMap = env.pullCommitProgressFromDataNodes(); + final Map respMap = + env.pullCommitProgressFromDataNodesBestEffort(); // 2. Merge all DataNode responses with existing progress using Math::max final Map mergedRegionProgress = diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java index 20a1173ac60bb..8e14b26e260a7 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java @@ -20,7 +20,7 @@ package org.apache.iotdb.confignode.procedure.impl.subscription.consumer.runtime; import org.apache.iotdb.common.rpc.thrift.TSStatus; -import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.commons.subscription.config.SubscriptionConfig; import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMeta; import org.apache.iotdb.confignode.consensus.request.write.subscription.consumer.runtime.ConsumerGroupHandleMetaChangePlan; import org.apache.iotdb.confignode.i18n.ConfigNodeMessages; @@ -44,6 +44,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -53,7 +54,9 @@ public class ConsumerGroupMetaSyncProcedure extends AbstractOperateSubscriptionP LoggerFactory.getLogger(ConsumerGroupMetaSyncProcedure.class); private static final long MIN_EXECUTION_INTERVAL_MS = - PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 60 * 1000 / 2; + TimeUnit.MINUTES.toMillis( + SubscriptionConfig.getInstance().getSubscriptionMetaSyncerSyncIntervalMinutes()) + / 2; // No need to serialize this field private static final AtomicLong LAST_EXECUTION_TIME = new AtomicLong(0); @@ -129,7 +132,8 @@ public void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws SubscriptionException, IOException { LOGGER.info(ProcedureMessages.CONSUMERGROUPMETASYNCPROCEDURE_EXECUTEFROMOPERATEONDATANODES); - Map respMap = pushConsumerGroupMetaToDataNodes(env); + Map respMap = + pushConsumerGroupMetaToDataNodesBestEffort(env); if (pushConsumerGroupMetaHasException(respMap)) { throw new SubscriptionException( String.format( diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/runtime/SubscriptionHandleLeaderChangeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/runtime/SubscriptionHandleLeaderChangeProcedure.java index 5337b719d207f..09414b0e3a8ad 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/runtime/SubscriptionHandleLeaderChangeProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/runtime/SubscriptionHandleLeaderChangeProcedure.java @@ -104,7 +104,8 @@ public void executeFromOperateOnConfigNodes(final ConfigNodeProcedureEnv env) throws SubscriptionException { LOGGER.info("SubscriptionHandleLeaderChangeProcedure: executeFromOperateOnConfigNodes"); - final Map respMap = env.pullCommitProgressFromDataNodes(); + final Map respMap = + env.pullCommitProgressFromDataNodesBestEffort(); final Map mergedRegionProgress = deserializeRegionProgressMap( subscriptionInfo.get().getCommitProgressKeeper().getAllRegionProgress()); @@ -158,7 +159,7 @@ public void executeFromOperateOnDataNodes(final ConfigNodeProcedureEnv env) throws SubscriptionException, IOException { LOGGER.info("SubscriptionHandleLeaderChangeProcedure: executeFromOperateOnDataNodes"); - final Map topicRespMap = pushTopicMetaToDataNodes(env); + final Map topicRespMap = pushTopicMetaToDataNodesBestEffort(env); topicRespMap.forEach( (dataNodeId, resp) -> { if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { @@ -170,7 +171,7 @@ public void executeFromOperateOnDataNodes(final ConfigNodeProcedureEnv env) }); final Map consumerGroupRespMap = - pushConsumerGroupMetaToDataNodes(env); + pushConsumerGroupMetaToDataNodesBestEffort(env); consumerGroupRespMap.forEach( (dataNodeId, resp) -> { if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java index b7432a2c588bf..d1ba988db63e9 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java @@ -20,7 +20,7 @@ package org.apache.iotdb.confignode.procedure.impl.subscription.topic.runtime; import org.apache.iotdb.common.rpc.thrift.TSStatus; -import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.commons.subscription.config.SubscriptionConfig; import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta; import org.apache.iotdb.confignode.consensus.request.write.subscription.topic.runtime.TopicHandleMetaChangePlan; import org.apache.iotdb.confignode.i18n.ConfigNodeMessages; @@ -44,6 +44,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -52,7 +53,9 @@ public class TopicMetaSyncProcedure extends AbstractOperateSubscriptionProcedure private static final Logger LOGGER = LoggerFactory.getLogger(TopicMetaSyncProcedure.class); private static final long MIN_EXECUTION_INTERVAL_MS = - PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 60 * 1000 / 2; + TimeUnit.MINUTES.toMillis( + SubscriptionConfig.getInstance().getSubscriptionMetaSyncerSyncIntervalMinutes()) + / 2; // No need to serialize this field private static final AtomicLong LAST_EXECUTION_TIME = new AtomicLong(0); @@ -129,7 +132,7 @@ public void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws SubscriptionException, IOException { LOGGER.info(ProcedureMessages.TOPICMETASYNCPROCEDURE_EXECUTEFROMOPERATEONDATANODES); - Map respMap = pushTopicMetaToDataNodes(env); + Map respMap = pushTopicMetaToDataNodesBestEffort(env); if (pushTopicMetaHasException(respMap)) { throw new SubscriptionException( String.format(ProcedureMessages.FAILED_TO_PUSH_TOPIC_META_TO_DATANODES_DETAILS, respMap)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index 18c755bf923ab..46f2823b3fc69 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -434,6 +434,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface private static final long TEST_CONNECTION_TIMEOUT_MS = CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS(); + private static final int TEST_CONNECTION_RETRY_NUM = 1; private static final ExecutorService TOPOLOGY_PROBING_EXECUTOR = IoTDBThreadPoolFactory.newFixedThreadPool( @@ -2207,7 +2208,8 @@ private List testAllConfigNodeConnection( DnToCnRequestType.TEST_CONNECTION, (AsyncRequestContext handler) -> DnToCnInternalServiceAsyncRequestManager.getInstance() - .sendAsyncRequestWithTimeoutInMs(handler, TEST_CONNECTION_TIMEOUT_MS)); + .sendAsyncRequest( + handler, TEST_CONNECTION_RETRY_NUM, TEST_CONNECTION_TIMEOUT_MS, true)); } private List testAllDataNodeConnectionInHeartbeatChannel( @@ -2233,7 +2235,8 @@ private List testAllDataNodeInternalServiceConnection( DnToDnRequestType.TEST_CONNECTION, (AsyncRequestContext handler) -> DnToDnInternalServiceAsyncRequestManager.getInstance() - .sendAsyncRequestWithTimeoutInMs(handler, TEST_CONNECTION_TIMEOUT_MS)); + .sendAsyncRequest( + handler, TEST_CONNECTION_RETRY_NUM, TEST_CONNECTION_TIMEOUT_MS, true)); } private List testAllDataNodeMPPServiceConnection( @@ -2246,7 +2249,8 @@ private List testAllDataNodeMPPServiceConnection( DnToDnRequestType.TEST_CONNECTION, (AsyncRequestContext handler) -> DataNodeMPPServiceAsyncRequestManager.getInstance() - .sendAsyncRequestWithTimeoutInMs(handler, TEST_CONNECTION_TIMEOUT_MS)); + .sendAsyncRequest( + handler, TEST_CONNECTION_RETRY_NUM, TEST_CONNECTION_TIMEOUT_MS, true)); } private List testAllDataNodeExternalServiceConnection( @@ -2259,7 +2263,8 @@ private List testAllDataNodeExternalServiceConnection( DnToDnRequestType.TEST_CONNECTION, (AsyncRequestContext handler) -> DataNodeExternalServiceAsyncRequestManager.getInstance() - .sendAsyncRequestWithTimeoutInMs(handler, TEST_CONNECTION_TIMEOUT_MS)); + .sendAsyncRequest( + handler, TEST_CONNECTION_RETRY_NUM, TEST_CONNECTION_TIMEOUT_MS, true)); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/update/GeneralRegionAttributeSecurityService.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/update/GeneralRegionAttributeSecurityService.java index db23436e520b9..4f668a1907567 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/update/GeneralRegionAttributeSecurityService.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/update/GeneralRegionAttributeSecurityService.java @@ -67,6 +67,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -77,6 +78,7 @@ public class GeneralRegionAttributeSecurityService extends AbstractPeriodicalSer LoggerFactory.getLogger(GeneralRegionAttributeSecurityService.class); private static final IoTDBConfig iotdbConfig = IoTDBDescriptor.getInstance().getConfig(); + private static final int ATTRIBUTE_UPDATE_RETRY_NUM = 1; private final Map> dataNodeId2FailureDurationAndTimesMap = new HashMap<>(); private final Set regionLeaders = @@ -214,46 +216,42 @@ protected void executeTask() { ByteBuffer.wrap(bytes))); })); - DnToDnInternalServiceAsyncRequestManager.getInstance() - .sendAsyncRequestWithTimeoutInMs( - clientHandler, + final long timeoutInMs = + TimeUnit.SECONDS.toMillis( IoTDBDescriptor.getInstance() .getConfig() .getGeneralRegionAttributeSecurityServiceTimeoutSeconds()); + DnToDnInternalServiceAsyncRequestManager.getInstance() + .sendAsyncRequest(clientHandler, ATTRIBUTE_UPDATE_RETRY_NUM, timeoutInMs); final AtomicBoolean needFetch = new AtomicBoolean(false); + final Set failedDataNodes = new HashSet<>(clientHandler.getRequestIndices()); + final long failureDurationToFetchInMs = + TimeUnit.SECONDS.toMillis( + iotdbConfig.getGeneralRegionAttributeSecurityServiceFailureDurationSecondsToFetch()); - final Set failedDataNodes = - clientHandler.getResponseMap().entrySet().stream() - .filter( - entry -> { - final boolean failed = - entry.getValue().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode(); - if (failed) { - dataNodeId2FailureDurationAndTimesMap.compute( - entry.getKey(), - (k, v) -> { - if (Objects.isNull(v)) { - return new Pair<>(System.currentTimeMillis(), 1); - } - v.setRight(v.getRight() + 1); - if (System.currentTimeMillis() - v.getLeft() - >= iotdbConfig - .getGeneralRegionAttributeSecurityServiceFailureDurationSecondsToFetch() - || v.getRight() - >= iotdbConfig - .getGeneralRegionAttributeSecurityServiceFailureTimesToFetch()) { - needFetch.set(true); - } - return v; - }); - } else { - dataNodeId2FailureDurationAndTimesMap.remove(entry.getKey()); + clientHandler.getResponseMap().entrySet().stream() + .filter(entry -> entry.getValue().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) + .map(Map.Entry::getKey) + .forEach(dataNodeId2FailureDurationAndTimesMap::remove); + + failedDataNodes.forEach( + dataNodeId -> + dataNodeId2FailureDurationAndTimesMap.compute( + dataNodeId, + (k, v) -> { + if (Objects.isNull(v)) { + return new Pair<>(System.currentTimeMillis(), 1); + } + v.setRight(v.getRight() + 1); + if (System.currentTimeMillis() - v.getLeft() >= failureDurationToFetchInMs + || v.getRight() + >= iotdbConfig + .getGeneralRegionAttributeSecurityServiceFailureTimesToFetch()) { + needFetch.set(true); } - return failed; - }) - .map(Map.Entry::getKey) - .collect(Collectors.toSet()); + return v; + })); // Compute node shrinkage before failure removal in commit final Map> result = @@ -301,7 +299,8 @@ private GeneralRegionAttributeSecurityService() { super( IoTDBThreadPoolFactory.newSingleThreadExecutor( ThreadName.GENERAL_REGION_ATTRIBUTE_SECURITY_SERVICE.getName()), - iotdbConfig.getGeneralRegionAttributeSecurityServiceIntervalSeconds() * 1000L); + TimeUnit.SECONDS.toMillis( + iotdbConfig.getGeneralRegionAttributeSecurityServiceIntervalSeconds())); } private static final class GeneralRegionAttributeSecurityServiceHolder { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java index 1f147070acb90..a00b653d28af9 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java @@ -182,6 +182,7 @@ protected void sendAsyncRequest( final TEndPoint endPoint = nodeLocationToEndPoint(targetNode); Client client = null; boolean dispatched = false; + AsyncRequestRPCHandler handler = null; try { if (!actionMap.containsKey(requestContext.getRequestType())) { throw new UnsupportedOperationException( @@ -189,11 +190,10 @@ protected void sendAsyncRequest( + requestContext.getRequestType() + ", please set it in AsyncRequestManager::initActionMapBuilder()"); } + handler = buildHandler(requestContext, requestId, targetNode); client = clientManager.borrowClient(endPoint); adjustClientTimeoutIfNecessary(requestContext.getRequestType(), client); Object req = requestContext.getRequest(requestId); - AsyncRequestRPCHandler handler = - buildHandler(requestContext, requestId, targetNode); Objects.requireNonNull(actionMap.get(requestContext.getRequestType())) .accept(req, client, handler); // After accept() returns, the async callback (onComplete/onError) takes over the @@ -207,6 +207,21 @@ protected void sendAsyncRequest( endPoint, e.getMessage(), retryCount); + if (handler != null) { + try { + handler.onError(e); + } catch (final Exception handlerException) { + LOGGER.warn( + "Failed to handle async request error for request type {} on node {}: {}", + requestContext.getRequestType(), + endPoint, + handlerException.getMessage(), + handlerException); + requestContext.getCountDownLatch().countDown(); + } + } else { + requestContext.getCountDownLatch().countDown(); + } } finally { if (!dispatched && client != null && clientManager instanceof ClientManager) { ((ClientManager) clientManager).returnClient(endPoint, client); diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/request/AsyncRequestManagerTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/request/AsyncRequestManagerTest.java new file mode 100644 index 0000000000000..40b2bf9730b7e --- /dev/null +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/request/AsyncRequestManagerTest.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.client.request; + +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +public class AsyncRequestManagerTest { + + @Test + public void dispatchFailureShouldNotBlockRetryWithoutTimeout() throws Exception { + final TestAsyncRequestManager manager = new TestAsyncRequestManager(); + final AsyncRequestContext context = + new AsyncRequestContext<>( + TestRequestType.TEST, + "request", + Collections.singletonMap(1, new TestNodeLocation(new TEndPoint("localhost", 6667)))); + + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + try { + final Future future = + executorService.submit(() -> manager.sendAsyncRequest(context, 2, null, true)); + future.get(3, TimeUnit.SECONDS); + } finally { + executorService.shutdownNow(); + } + + Assert.assertEquals(2, manager.getBorrowAttempts()); + Assert.assertEquals("borrow failed", context.getResponseMap().get(1)); + Assert.assertEquals(Collections.singletonList(1), context.getRequestIndices()); + } + + @Test + public void handlerErrorFailureShouldNotEscapeDispatchFailure() throws Exception { + final TestAsyncRequestManager manager = new TestAsyncRequestManager(false, true, true); + final AsyncRequestContext context = + new AsyncRequestContext<>( + TestRequestType.TEST, + "request", + Collections.singletonMap(1, new TestNodeLocation(new TEndPoint("localhost", 6667)))); + + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + try { + final Future future = + executorService.submit(() -> manager.sendAsyncRequest(context, 1, null, true)); + future.get(3, TimeUnit.SECONDS); + } finally { + executorService.shutdownNow(); + } + + Assert.assertEquals(1, manager.getBorrowAttempts()); + Assert.assertTrue(context.getResponseMap().isEmpty()); + Assert.assertEquals(Collections.singletonList(1), context.getRequestIndices()); + } + + private enum TestRequestType { + TEST + } + + private static class TestNodeLocation { + + private final TEndPoint endPoint; + + private TestNodeLocation(final TEndPoint endPoint) { + this.endPoint = endPoint; + } + } + + private static class TestAsyncRequestManager + extends AsyncRequestManager { + + private final AtomicInteger borrowAttempts = new AtomicInteger(); + private boolean failOnBorrow; + private boolean failOnDispatch; + private boolean failOnError; + + private TestAsyncRequestManager() { + this(true, false, false); + } + + private TestAsyncRequestManager( + final boolean failOnBorrow, final boolean failOnDispatch, final boolean failOnError) { + super(1); + this.failOnBorrow = failOnBorrow; + this.failOnDispatch = failOnDispatch; + this.failOnError = failOnError; + } + + private int getBorrowAttempts() { + return borrowAttempts.get(); + } + + @Override + protected void initClientManager(final int selectorNumOfAsyncClientManager) { + clientManager = + new IClientManager() { + + @Override + public Object borrowClient(final TEndPoint node) throws ClientManagerException { + borrowAttempts.incrementAndGet(); + if (failOnBorrow) { + throw new ClientManagerException("borrow failed"); + } + return new Object(); + } + + @Override + public void clear(final TEndPoint node) { + // Do nothing + } + + @Override + public void clearAll() { + // Do nothing + } + + @Override + public void close() { + // Do nothing + } + }; + } + + @Override + protected void initActionMapBuilder() { + actionMapBuilder.put( + TestRequestType.TEST, + (request, client, handler) -> { + if (failOnDispatch) { + throw new RuntimeException("dispatch failed"); + } + Assert.fail("The test client manager should fail before dispatch."); + }); + } + + @Override + protected TEndPoint nodeLocationToEndPoint(final TestNodeLocation location) { + return location.endPoint; + } + + @SuppressWarnings("unchecked") + @Override + protected AsyncRequestRPCHandler buildHandler( + final AsyncRequestContext requestContext, + final int requestId, + final TestNodeLocation targetNode) { + return new TestAsyncRequestRPCHandler( + requestContext.getRequestType(), + requestId, + targetNode, + requestContext.getNodeLocationMap(), + (Map) requestContext.getResponseMap(), + requestContext.getCountDownLatch(), + failOnError); + } + } + + private static class TestAsyncRequestRPCHandler + extends AsyncRequestRPCHandler { + + private TestAsyncRequestRPCHandler( + final TestRequestType requestType, + final int requestId, + final TestNodeLocation targetNode, + final Map nodeLocationMap, + final Map responseMap, + final CountDownLatch countDownLatch, + final boolean failOnError) { + super(requestType, requestId, targetNode, nodeLocationMap, responseMap, countDownLatch); + this.failOnError = failOnError; + } + + private final boolean failOnError; + + @Override + protected String generateFormattedTargetLocation(final TestNodeLocation location) { + return location.endPoint.toString(); + } + + @Override + public void onComplete(final String response) { + responseMap.put(requestId, response); + nodeLocationMap.remove(requestId); + countDownLatch.countDown(); + } + + @Override + public void onError(final Exception exception) { + if (failOnError) { + throw new RuntimeException("handler failed"); + } + responseMap.put(requestId, exception.getMessage()); + countDownLatch.countDown(); + } + } +}