diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/tablemodel/IoTDBSubscriptionTopicIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/tablemodel/IoTDBSubscriptionTopicIT.java index f3c0dd29f461e..c71a7b2319bb1 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/tablemodel/IoTDBSubscriptionTopicIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/tablemodel/IoTDBSubscriptionTopicIT.java @@ -56,6 +56,7 @@ import java.util.Objects; import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; import java.util.function.Consumer; @@ -93,6 +94,8 @@ protected void setUpConfig() { .getConfig() .getCommonConfig() .setPipeMetaSyncerSyncIntervalMinutes(1) + .setSubscriptionPrefetchTsFileBatchMaxDelayInMs(600_000) + .setSubscriptionPrefetchTsFileBatchMaxSizeInBytes(64 * 1024 * 1024) .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); } @@ -382,6 +385,89 @@ private void testTopicWithSnapshotModeTemplate(final String topicFormat) throws } } + @Test + public void testTsFileSnapshotDrainsPendingBatchBeforeTermination() throws Exception { + TableModelUtils.createDataBaseAndTable(senderEnv, "test1", "test1"); + TableModelUtils.createDataBaseAndTable(receiverEnv, "test1", "test1"); + + // Insert historical data and create a closed TsFile before snapshot subscription starts. + TableModelUtils.insertData("test1", "test1", 0, 10, senderEnv); + + final String topicName = "topic_drain_tsfile_batch_before_termination"; + final String host = senderEnv.getIP(); + final int port = Integer.parseInt(senderEnv.getPort()); + try (final ISubscriptionTableSession session = + new SubscriptionTableSessionBuilder().host(host).port(port).build()) { + final Properties config = new Properties(); + config.put(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE); + config.put(TopicConstant.MODE_KEY, TopicConstant.MODE_SNAPSHOT_VALUE); + config.put(TopicConstant.DATABASE_KEY, "test1"); + config.put(TopicConstant.TABLE_KEY, "test1"); + // Force TsFile parsing so the snapshot data is buffered in SubscriptionPipeTsFileEventBatch. + config.put(TopicConstant.START_TIME_KEY, 1); + session.createTopic(topicName, config); + } + assertTopicCount(1); + + final AtomicBoolean isClosed = new AtomicBoolean(false); + final AtomicReference consumerFailure = new AtomicReference<>(); + final Thread thread = + new Thread( + () -> { + try (final ISubscriptionTablePullConsumer consumer = + new SubscriptionTablePullConsumerBuilder() + .host(host) + .port(port) + .consumerId("c_drain") + .consumerGroupId("cg_drain") + .autoCommit(false) + .build(); + final ITableSession session = receiverEnv.getTableSessionConnection()) { + consumer.open(); + consumer.subscribe(topicName); + while (!isClosed.get()) { + final List messages = + consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS); + insertData(messages, session); + consumer.commitSync(messages); + } + } catch (final Throwable e) { + consumerFailure.set(e); + } finally { + LOGGER.info("draining consumer exiting..."); + } + }, + String.format("%s - draining consumer", testName.getDisplayName())); + thread.start(); + + try { + final Consumer handleFailure = + o -> { + TestUtils.executeNonQueryWithRetry(senderEnv, "flush"); + TestUtils.executeNonQueryWithRetry(receiverEnv, "flush"); + }; + AWAIT.untilAsserted( + () -> { + Assert.assertNull(consumerFailure.get()); + + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + final TShowSubscriptionResp showSubscriptionResp = + client.showSubscription(new TShowSubscriptionReq()); + Assert.assertEquals( + RpcUtils.SUCCESS_STATUS.getCode(), showSubscriptionResp.status.getCode()); + Assert.assertNotNull(showSubscriptionResp.subscriptionInfoList); + Assert.assertEquals(0, showSubscriptionResp.subscriptionInfoList.size()); + } + + TableModelUtils.assertData("test1", "test1", 1, 10, receiverEnv, handleFailure); + }); + } finally { + isClosed.set(true); + thread.join(); + } + } + /////////////////////////////// utility /////////////////////////////// private void assertTopicCount(final int count) throws Exception { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java index 086fde0fbcc9e..29b3a91a85ba0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java @@ -117,6 +117,7 @@ public abstract class SubscriptionPrefetchingQueue { private volatile TsFileInsertionEvent currentTsFileInsertionEvent; private volatile RetryableEvent currentTabletInsertionEvent; private volatile SubscriptionTsFileToTabletIterator currentToTabletIterator; + private PipeTerminateEvent currentTerminateEvent; public SubscriptionPrefetchingQueue( final String brokerId, @@ -175,6 +176,10 @@ protected void cleanUpInternal() { .clearReferenceCount(this.getClass().getName()); currentTabletInsertionEvent = null; } + if (Objects.nonNull(currentTerminateEvent)) { + currentTerminateEvent.clearReferenceCount(this.getClass().getName()); + currentTerminateEvent = null; + } } ///////////////////////////////// lock ///////////////////////////////// @@ -236,44 +241,8 @@ private SubscriptionEvent pollInternal(final String consumerId) { onEvent(); } - final long size = prefetchingQueue.size(); - long count = 0; - - SubscriptionEvent event; try { - while (count++ < size // limit control - && Objects.nonNull( - event = - prefetchingQueue.poll( - SubscriptionConfig.getInstance().getSubscriptionPollMaxBlockingTimeMs(), - TimeUnit.MILLISECONDS))) { - if (event.isCommitted()) { - LOGGER.warn( - "Subscription: SubscriptionPrefetchingQueue {} poll committed event {} from prefetching queue (broken invariant), remove it", - this, - event); - // no need to update inFlightEvents - continue; - } - - if (!event.pollable()) { - LOGGER.warn( - "Subscription: SubscriptionPrefetchingQueue {} poll non-pollable event {} from prefetching queue (broken invariant), nack and remove it", - this, - event); - event.nack(); // now pollable - // no need to update inFlightEvents and prefetchingQueue - continue; - } - - // This operation should be performed before updating inFlightEvents to prevent multiple - // consumers from consuming the same event. - event.recordLastPolledTimestamp(); // now non-pollable - - inFlightEvents.put(new Pair<>(consumerId, event.getCommitContext()), event); - event.recordLastPolledConsumerId(consumerId); - return event; - } + return pollPrefetchedEvent(consumerId); } catch (final InterruptedException e) { Thread.currentThread().interrupt(); LOGGER.warn( @@ -307,40 +276,8 @@ private SubscriptionEvent pollInternalV2(final String consumerId, final PollTime onEvent(); } - final long size = prefetchingQueue.size(); - long count = 0; - - while (count++ < size // limit control - && Objects.nonNull( - event = - prefetchingQueue.poll( - SubscriptionConfig.getInstance().getSubscriptionPollMaxBlockingTimeMs(), - TimeUnit.MILLISECONDS))) { - if (event.isCommitted()) { - LOGGER.warn( - "Subscription: SubscriptionPrefetchingQueue {} poll committed event {} from prefetching queue (broken invariant), remove it", - this, - event); - // no need to update inFlightEvents - continue; - } - - if (!event.pollable()) { - LOGGER.warn( - "Subscription: SubscriptionPrefetchingQueue {} poll non-pollable event {} from prefetching queue (broken invariant), nack and remove it", - this, - event); - event.nack(); // now pollable - // no need to update inFlightEvents and prefetchingQueue - continue; - } - - // This operation should be performed before updating inFlightEvents to prevent multiple - // consumers from consuming the same event. - event.recordLastPolledTimestamp(); // now non-pollable - - inFlightEvents.put(new Pair<>(consumerId, event.getCommitContext()), event); - event.recordLastPolledConsumerId(consumerId); + event = pollPrefetchedEvent(consumerId); + if (Objects.nonNull(event)) { return event; } } catch (final InterruptedException e) { @@ -356,6 +293,49 @@ private SubscriptionEvent pollInternalV2(final String consumerId, final PollTime return null; } + private synchronized SubscriptionEvent pollPrefetchedEvent(final String consumerId) + throws InterruptedException { + final long size = prefetchingQueue.size(); + long count = 0; + + SubscriptionEvent event; + while (count++ < size // limit control + && Objects.nonNull( + event = + prefetchingQueue.poll( + SubscriptionConfig.getInstance().getSubscriptionPollMaxBlockingTimeMs(), + TimeUnit.MILLISECONDS))) { + if (event.isCommitted()) { + LOGGER.warn( + "Subscription: SubscriptionPrefetchingQueue {} poll committed event {} from prefetching queue (broken invariant), remove it", + this, + event); + // no need to update inFlightEvents + continue; + } + + if (!event.pollable()) { + LOGGER.warn( + "Subscription: SubscriptionPrefetchingQueue {} poll non-pollable event {} from prefetching queue (broken invariant), nack and remove it", + this, + event); + event.nack(); // now pollable + // no need to update inFlightEvents and prefetchingQueue + continue; + } + + // This operation should be performed before updating inFlightEvents to prevent multiple + // consumers from consuming the same event. + event.recordLastPolledTimestamp(); // now non-pollable + + inFlightEvents.put(new Pair<>(consumerId, event.getCommitContext()), event); + event.recordLastPolledConsumerId(consumerId); + return event; + } + + return null; + } + /////////////////////////////// prefetch /////////////////////////////// public boolean executePrefetch() { @@ -366,7 +346,11 @@ public boolean executePrefetch() { } reportStateIfNeeded(); // TODO: more refined behavior (prefetch/serialize/...) control - if (states.shouldPrefetch()) { + if (tryCommitCurrentTerminateEventIfPresent()) { + remapInFlightEventsSnapshot( + committedCleaner, pollableNacker, responsePrefetcher, responseSerializer); + return true; + } else if (states.shouldPrefetch()) { tryPrefetch(); remapInFlightEventsSnapshot( committedCleaner, pollableNacker, responsePrefetcher, responseSerializer); @@ -467,6 +451,10 @@ private synchronized void peekOnce() { * {@link SubscriptionPrefetchingQueue#inputPendingQueue} is empty. */ private synchronized void tryPrefetch() { + if (Objects.nonNull(currentTerminateEvent) && !tryCommitCurrentTerminateEvent()) { + return; + } + while (!inputPendingQueue.isEmpty() || Objects.nonNull(currentTabletInsertionEvent)) { if (Objects.nonNull(currentTabletInsertionEvent)) { final RetryableState state = onRetryableTabletInsertionEvent(currentTabletInsertionEvent); @@ -497,16 +485,10 @@ private synchronized void tryPrefetch() { } if (event instanceof PipeTerminateEvent) { - final PipeTerminateEvent terminateEvent = (PipeTerminateEvent) event; - // add mark completed hook - terminateEvent.addOnCommittedHook(this::markCompleted); - // commit directly - ((PipeTerminateEvent) event) - .decreaseReferenceCount(SubscriptionPrefetchingQueue.class.getName(), true); - LOGGER.info( - "Subscription: SubscriptionPrefetchingQueue {} commit PipeTerminateEvent {}", - this, - terminateEvent); + currentTerminateEvent = (PipeTerminateEvent) event; + if (!tryCommitCurrentTerminateEvent()) { + return; + } continue; } @@ -549,6 +531,11 @@ private synchronized void tryPrefetch() { } private synchronized void tryPrefetchV2() { + if (Objects.nonNull(currentTerminateEvent)) { + tryCommitCurrentTerminateEvent(); + return; + } + if (!prefetchingQueue.isEmpty()) { return; } @@ -613,16 +600,8 @@ private synchronized void tryPrefetchV2() { } if (event instanceof PipeTerminateEvent) { - final PipeTerminateEvent terminateEvent = (PipeTerminateEvent) event; - // add mark completed hook - terminateEvent.addOnCommittedHook(this::markCompleted); - // commit directly - ((PipeTerminateEvent) event) - .decreaseReferenceCount(SubscriptionPrefetchingQueue.class.getName(), true); - LOGGER.info( - "Subscription: SubscriptionPrefetchingQueue {} commit PipeTerminateEvent {}", - this, - terminateEvent); + currentTerminateEvent = (PipeTerminateEvent) event; + tryCommitCurrentTerminateEvent(); return; } @@ -731,6 +710,43 @@ protected boolean onEvent() { return batches.onEvent(this::prefetchEvent); } + private synchronized boolean tryCommitCurrentTerminateEventIfPresent() { + if (Objects.isNull(currentTerminateEvent)) { + return false; + } + tryCommitCurrentTerminateEvent(); + return true; + } + + private synchronized boolean tryCommitCurrentTerminateEvent() { + try { + batches.emitAll(this::prefetchEvent); + } catch (final Exception e) { + LOGGER.warn( + "Subscription: SubscriptionPrefetchingQueue {} failed to emit remaining events before " + + "committing PipeTerminateEvent {}.", + this, + currentTerminateEvent, + e); + return false; + } + + if (!prefetchingQueue.isEmpty() || !inFlightEvents.isEmpty()) { + return false; + } + + // Add mark completed hook only when all subscription events have been consumed. + currentTerminateEvent.addOnCommittedHook(this::markCompleted); + currentTerminateEvent.decreaseReferenceCount( + SubscriptionPrefetchingQueue.class.getName(), true); + LOGGER.info( + "Subscription: SubscriptionPrefetchingQueue {} commit PipeTerminateEvent {}", + this, + currentTerminateEvent); + currentTerminateEvent = null; + return true; + } + /////////////////////////////// commit /////////////////////////////// /** diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java index d25573add6b38..0707d979d6a26 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatch.java @@ -71,14 +71,7 @@ protected SubscriptionPipeEventBatch( protected synchronized boolean onEvent(final Consumer consumer) throws Exception { if (shouldEmit() && !enrichedEvents.isEmpty()) { - if (Objects.isNull(events)) { - events = generateSubscriptionEvents(); - } - if (Objects.nonNull(events)) { - events.forEach(consumer); - return true; - } - return false; + return emit(consumer); } return false; } @@ -101,6 +94,20 @@ protected synchronized boolean onEvent( return onEvent(consumer); } + protected synchronized boolean emit(final Consumer consumer) throws Exception { + if (enrichedEvents.isEmpty()) { + return false; + } + if (Objects.isNull(events)) { + events = generateSubscriptionEvents(); + } + if (Objects.nonNull(events)) { + events.forEach(consumer); + return true; + } + return false; + } + /////////////////////////////// utility /////////////////////////////// protected abstract void onTabletInsertionEvent(final TabletInsertionEvent event); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatches.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatches.java index 467f788f7979b..0cb81ca43e699 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatches.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatches.java @@ -142,6 +142,35 @@ public boolean onEvent(final EnrichedEvent event, final Consumer consumer) throws Exception { + final AtomicBoolean hasNew = new AtomicBoolean(false); + Exception exception = null; + for (final int regionId : ImmutableList.copyOf(regionIdToBatch.keySet())) { + try { + segmentLock.lock(regionId); + final SubscriptionPipeEventBatch batch = regionIdToBatch.get(regionId); + if (Objects.isNull(batch)) { + continue; + } + if (batch.emit(consumer)) { + hasNew.set(true); + regionIdToBatch.remove(regionId); + } + } catch (final Exception e) { + LOGGER.warn( + DataNodeMiscMessages.EXCEPTION_SEALING_EVENTS, regionIdToBatch.get(regionId), e); + exception = e; + } finally { + segmentLock.unlock(regionId); + } + } + + if (Objects.nonNull(exception)) { + throw exception; + } + return hasNew.get(); + } + public void cleanUp() { regionIdToBatch.values().forEach(batch -> batch.cleanUp(true)); regionIdToBatch.clear();