Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;

Expand Down Expand Up @@ -93,6 +94,8 @@ protected void setUpConfig() {
.getConfig()
.getCommonConfig()
.setPipeMetaSyncerSyncIntervalMinutes(1)
.setSubscriptionPrefetchTsFileBatchMaxDelayInMs(600_000)
.setSubscriptionPrefetchTsFileBatchMaxSizeInBytes(64 * 1024 * 1024)
.setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false);
}
Expand Down Expand Up @@ -382,6 +385,89 @@ private void testTopicWithSnapshotModeTemplate(final String topicFormat) throws
}
}

@Test
public void testTsFileSnapshotDrainsPendingBatchBeforeTermination() throws Exception {
TableModelUtils.createDataBaseAndTable(senderEnv, "test1", "test1");
TableModelUtils.createDataBaseAndTable(receiverEnv, "test1", "test1");

// Insert historical data and create a closed TsFile before snapshot subscription starts.
TableModelUtils.insertData("test1", "test1", 0, 10, senderEnv);

final String topicName = "topic_drain_tsfile_batch_before_termination";
final String host = senderEnv.getIP();
final int port = Integer.parseInt(senderEnv.getPort());
try (final ISubscriptionTableSession session =
new SubscriptionTableSessionBuilder().host(host).port(port).build()) {
final Properties config = new Properties();
config.put(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
config.put(TopicConstant.MODE_KEY, TopicConstant.MODE_SNAPSHOT_VALUE);
config.put(TopicConstant.DATABASE_KEY, "test1");
config.put(TopicConstant.TABLE_KEY, "test1");
// Force TsFile parsing so the snapshot data is buffered in SubscriptionPipeTsFileEventBatch.
config.put(TopicConstant.START_TIME_KEY, 1);
session.createTopic(topicName, config);
}
assertTopicCount(1);

final AtomicBoolean isClosed = new AtomicBoolean(false);
final AtomicReference<Throwable> consumerFailure = new AtomicReference<>();
final Thread thread =
new Thread(
() -> {
try (final ISubscriptionTablePullConsumer consumer =
new SubscriptionTablePullConsumerBuilder()
.host(host)
.port(port)
.consumerId("c_drain")
.consumerGroupId("cg_drain")
.autoCommit(false)
.build();
final ITableSession session = receiverEnv.getTableSessionConnection()) {
consumer.open();
consumer.subscribe(topicName);
while (!isClosed.get()) {
final List<SubscriptionMessage> messages =
consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS);
insertData(messages, session);
consumer.commitSync(messages);
}
} catch (final Throwable e) {
consumerFailure.set(e);
} finally {
LOGGER.info("draining consumer exiting...");
}
},
String.format("%s - draining consumer", testName.getDisplayName()));
thread.start();

try {
final Consumer<String> handleFailure =
o -> {
TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
};
AWAIT.untilAsserted(
() -> {
Assert.assertNull(consumerFailure.get());

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
final TShowSubscriptionResp showSubscriptionResp =
client.showSubscription(new TShowSubscriptionReq());
Assert.assertEquals(
RpcUtils.SUCCESS_STATUS.getCode(), showSubscriptionResp.status.getCode());
Assert.assertNotNull(showSubscriptionResp.subscriptionInfoList);
Assert.assertEquals(0, showSubscriptionResp.subscriptionInfoList.size());
}

TableModelUtils.assertData("test1", "test1", 1, 10, receiverEnv, handleFailure);
});
} finally {
isClosed.set(true);
thread.join();
}
}

/////////////////////////////// utility ///////////////////////////////

private void assertTopicCount(final int count) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public abstract class SubscriptionPrefetchingQueue {
private volatile TsFileInsertionEvent currentTsFileInsertionEvent;
private volatile RetryableEvent<TabletInsertionEvent> currentTabletInsertionEvent;
private volatile SubscriptionTsFileToTabletIterator currentToTabletIterator;
private PipeTerminateEvent currentTerminateEvent;

public SubscriptionPrefetchingQueue(
final String brokerId,
Expand Down Expand Up @@ -175,6 +176,10 @@ protected void cleanUpInternal() {
.clearReferenceCount(this.getClass().getName());
currentTabletInsertionEvent = null;
}
if (Objects.nonNull(currentTerminateEvent)) {
currentTerminateEvent.clearReferenceCount(this.getClass().getName());
currentTerminateEvent = null;
}
}

///////////////////////////////// lock /////////////////////////////////
Expand Down Expand Up @@ -236,44 +241,8 @@ private SubscriptionEvent pollInternal(final String consumerId) {
onEvent();
}

final long size = prefetchingQueue.size();
long count = 0;

SubscriptionEvent event;
try {
while (count++ < size // limit control
&& Objects.nonNull(
event =
prefetchingQueue.poll(
SubscriptionConfig.getInstance().getSubscriptionPollMaxBlockingTimeMs(),
TimeUnit.MILLISECONDS))) {
if (event.isCommitted()) {
LOGGER.warn(
"Subscription: SubscriptionPrefetchingQueue {} poll committed event {} from prefetching queue (broken invariant), remove it",
this,
event);
// no need to update inFlightEvents
continue;
}

if (!event.pollable()) {
LOGGER.warn(
"Subscription: SubscriptionPrefetchingQueue {} poll non-pollable event {} from prefetching queue (broken invariant), nack and remove it",
this,
event);
event.nack(); // now pollable
// no need to update inFlightEvents and prefetchingQueue
continue;
}

// This operation should be performed before updating inFlightEvents to prevent multiple
// consumers from consuming the same event.
event.recordLastPolledTimestamp(); // now non-pollable

inFlightEvents.put(new Pair<>(consumerId, event.getCommitContext()), event);
event.recordLastPolledConsumerId(consumerId);
return event;
}
return pollPrefetchedEvent(consumerId);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn(
Expand Down Expand Up @@ -307,40 +276,8 @@ private SubscriptionEvent pollInternalV2(final String consumerId, final PollTime
onEvent();
}

final long size = prefetchingQueue.size();
long count = 0;

while (count++ < size // limit control
&& Objects.nonNull(
event =
prefetchingQueue.poll(
SubscriptionConfig.getInstance().getSubscriptionPollMaxBlockingTimeMs(),
TimeUnit.MILLISECONDS))) {
if (event.isCommitted()) {
LOGGER.warn(
"Subscription: SubscriptionPrefetchingQueue {} poll committed event {} from prefetching queue (broken invariant), remove it",
this,
event);
// no need to update inFlightEvents
continue;
}

if (!event.pollable()) {
LOGGER.warn(
"Subscription: SubscriptionPrefetchingQueue {} poll non-pollable event {} from prefetching queue (broken invariant), nack and remove it",
this,
event);
event.nack(); // now pollable
// no need to update inFlightEvents and prefetchingQueue
continue;
}

// This operation should be performed before updating inFlightEvents to prevent multiple
// consumers from consuming the same event.
event.recordLastPolledTimestamp(); // now non-pollable

inFlightEvents.put(new Pair<>(consumerId, event.getCommitContext()), event);
event.recordLastPolledConsumerId(consumerId);
event = pollPrefetchedEvent(consumerId);
if (Objects.nonNull(event)) {
return event;
}
} catch (final InterruptedException e) {
Expand All @@ -356,6 +293,49 @@ private SubscriptionEvent pollInternalV2(final String consumerId, final PollTime
return null;
}

private synchronized SubscriptionEvent pollPrefetchedEvent(final String consumerId)
throws InterruptedException {
final long size = prefetchingQueue.size();
long count = 0;

SubscriptionEvent event;
while (count++ < size // limit control
&& Objects.nonNull(
event =
prefetchingQueue.poll(
SubscriptionConfig.getInstance().getSubscriptionPollMaxBlockingTimeMs(),
TimeUnit.MILLISECONDS))) {
if (event.isCommitted()) {
LOGGER.warn(
"Subscription: SubscriptionPrefetchingQueue {} poll committed event {} from prefetching queue (broken invariant), remove it",
this,
event);
// no need to update inFlightEvents
continue;
}

if (!event.pollable()) {
LOGGER.warn(
"Subscription: SubscriptionPrefetchingQueue {} poll non-pollable event {} from prefetching queue (broken invariant), nack and remove it",
this,
event);
event.nack(); // now pollable
// no need to update inFlightEvents and prefetchingQueue
continue;
}

// This operation should be performed before updating inFlightEvents to prevent multiple
// consumers from consuming the same event.
event.recordLastPolledTimestamp(); // now non-pollable

inFlightEvents.put(new Pair<>(consumerId, event.getCommitContext()), event);
event.recordLastPolledConsumerId(consumerId);
return event;
}

return null;
}

/////////////////////////////// prefetch ///////////////////////////////

public boolean executePrefetch() {
Expand All @@ -366,7 +346,11 @@ public boolean executePrefetch() {
}
reportStateIfNeeded();
// TODO: more refined behavior (prefetch/serialize/...) control
if (states.shouldPrefetch()) {
if (tryCommitCurrentTerminateEventIfPresent()) {
remapInFlightEventsSnapshot(
committedCleaner, pollableNacker, responsePrefetcher, responseSerializer);
return true;
} else if (states.shouldPrefetch()) {
tryPrefetch();
remapInFlightEventsSnapshot(
committedCleaner, pollableNacker, responsePrefetcher, responseSerializer);
Expand Down Expand Up @@ -467,6 +451,10 @@ private synchronized void peekOnce() {
* {@link SubscriptionPrefetchingQueue#inputPendingQueue} is empty.
*/
private synchronized void tryPrefetch() {
if (Objects.nonNull(currentTerminateEvent) && !tryCommitCurrentTerminateEvent()) {
return;
}

while (!inputPendingQueue.isEmpty() || Objects.nonNull(currentTabletInsertionEvent)) {
if (Objects.nonNull(currentTabletInsertionEvent)) {
final RetryableState state = onRetryableTabletInsertionEvent(currentTabletInsertionEvent);
Expand Down Expand Up @@ -497,16 +485,10 @@ private synchronized void tryPrefetch() {
}

if (event instanceof PipeTerminateEvent) {
final PipeTerminateEvent terminateEvent = (PipeTerminateEvent) event;
// add mark completed hook
terminateEvent.addOnCommittedHook(this::markCompleted);
// commit directly
((PipeTerminateEvent) event)
.decreaseReferenceCount(SubscriptionPrefetchingQueue.class.getName(), true);
LOGGER.info(
"Subscription: SubscriptionPrefetchingQueue {} commit PipeTerminateEvent {}",
this,
terminateEvent);
currentTerminateEvent = (PipeTerminateEvent) event;
if (!tryCommitCurrentTerminateEvent()) {
return;
}
continue;
}

Expand Down Expand Up @@ -549,6 +531,11 @@ private synchronized void tryPrefetch() {
}

private synchronized void tryPrefetchV2() {
if (Objects.nonNull(currentTerminateEvent)) {
tryCommitCurrentTerminateEvent();
return;
}

if (!prefetchingQueue.isEmpty()) {
return;
}
Expand Down Expand Up @@ -613,16 +600,8 @@ private synchronized void tryPrefetchV2() {
}

if (event instanceof PipeTerminateEvent) {
final PipeTerminateEvent terminateEvent = (PipeTerminateEvent) event;
// add mark completed hook
terminateEvent.addOnCommittedHook(this::markCompleted);
// commit directly
((PipeTerminateEvent) event)
.decreaseReferenceCount(SubscriptionPrefetchingQueue.class.getName(), true);
LOGGER.info(
"Subscription: SubscriptionPrefetchingQueue {} commit PipeTerminateEvent {}",
this,
terminateEvent);
currentTerminateEvent = (PipeTerminateEvent) event;
tryCommitCurrentTerminateEvent();
return;
}

Expand Down Expand Up @@ -731,6 +710,43 @@ protected boolean onEvent() {
return batches.onEvent(this::prefetchEvent);
}

private synchronized boolean tryCommitCurrentTerminateEventIfPresent() {
if (Objects.isNull(currentTerminateEvent)) {
return false;
}
tryCommitCurrentTerminateEvent();
return true;
}

private synchronized boolean tryCommitCurrentTerminateEvent() {
try {
batches.emitAll(this::prefetchEvent);
} catch (final Exception e) {
LOGGER.warn(
"Subscription: SubscriptionPrefetchingQueue {} failed to emit remaining events before "
+ "committing PipeTerminateEvent {}.",
this,
currentTerminateEvent,
e);
return false;
}

if (!prefetchingQueue.isEmpty() || !inFlightEvents.isEmpty()) {
return false;
}

// Add mark completed hook only when all subscription events have been consumed.
currentTerminateEvent.addOnCommittedHook(this::markCompleted);
currentTerminateEvent.decreaseReferenceCount(
SubscriptionPrefetchingQueue.class.getName(), true);
LOGGER.info(
"Subscription: SubscriptionPrefetchingQueue {} commit PipeTerminateEvent {}",
this,
currentTerminateEvent);
currentTerminateEvent = null;
return true;
}

/////////////////////////////// commit ///////////////////////////////

/**
Expand Down
Loading
Loading