From c25849a093b466966353ecd7b63f722535b8d5b3 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 11 Jun 2026 11:44:40 +0800 Subject: [PATCH] Fix procedure lock wait scheduling --- .../iotdb/confignode/procedure/Procedure.java | 10 ++ .../procedure/ProcedureExecutor.java | 109 +++++++++++------- .../impl/node/AbstractNodeProcedure.java | 12 ++ .../procedure/scheduler/LockQueue.java | 6 +- .../scheduler/SimpleProcedureScheduler.java | 37 +++++- .../procedure/entity/SimpleLockProcedure.java | 11 +- 6 files changed, 135 insertions(+), 50 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java index 29a241b16d09d..adb870d90f30c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java @@ -235,6 +235,16 @@ protected void releaseLock(Env env) { // no op } + /** + * Called after an execution attempt returns {@link ProcedureLockState#LOCK_EVENT_WAIT}. Override + * it to put the procedure into the corresponding lock wait queue. + * + * @param env env + */ + protected void waitForLock(Env env) { + // no op + } + /** * Used to keep procedure lock even when the procedure is yielded or suspended. * diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java index 82afea3859fd4..e567cc8c59886 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java @@ -23,7 +23,6 @@ import org.apache.iotdb.commons.utils.RetryUtils; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.confignode.i18n.ProcedureMessages; -import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.exception.ProcedureException; import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler; import org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler; @@ -42,7 +41,6 @@ import java.util.Deque; import java.util.HashSet; import java.util.List; -import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -86,6 +84,16 @@ public class ProcedureExecutor { private final Env environment; private final IProcedureStore store; + private static final class LockStateResult { + private final ProcedureLockState lockState; + private final Procedure procedure; + + private LockStateResult(ProcedureLockState lockState, Procedure procedure) { + this.lockState = lockState; + this.procedure = procedure; + } + } + public ProcedureExecutor( final Env environment, final IProcedureStore store, final ProcedureScheduler scheduler) { this.environment = environment; @@ -329,33 +337,39 @@ private void executeProcedure(Procedure proc) { return; } ProcedureLockState lockState = null; + Procedure lockEventWaitProcedure = null; try { do { if (!rootProcStack.acquire()) { if (rootProcStack.setRollback()) { - lockState = executeRootStackRollback(rootProcId, rootProcStack); + LockStateResult lockStateResult = + executeRootStackRollback(rootProcId, rootProcStack); + lockState = lockStateResult.lockState; switch (lockState) { case LOCK_ACQUIRED: break; case LOCK_EVENT_WAIT: - LOG.info(ProcedureMessages.LOCK_EVENT_WAIT_ROLLBACK, proc); + LOG.info(ProcedureMessages.LOCK_EVENT_WAIT_ROLLBACK, lockStateResult.procedure); rootProcStack.unsetRollback(); + lockEventWaitProcedure = lockStateResult.procedure; break; case LOCK_YIELD_WAIT: rootProcStack.unsetRollback(); - scheduler.yield(proc); + scheduler.yield(lockStateResult.procedure); break; default: throw new UnsupportedOperationException(); } } else { if (!proc.wasExecuted()) { - switch (executeRollback(proc)) { + lockState = executeRollback(proc); + switch (lockState) { case LOCK_ACQUIRED: break; case LOCK_EVENT_WAIT: LOG.info( ProcedureMessages.LOCK_EVENT_WAIT_CAN_T_ROLLBACK_CHILD_RUNNING_FOR, proc); + lockEventWaitProcedure = proc; break; case LOCK_YIELD_WAIT: scheduler.yield(proc); @@ -367,19 +381,25 @@ private void executeProcedure(Procedure proc) { } break; } - lockState = acquireLock(proc); - switch (lockState) { - case LOCK_ACQUIRED: - executeProcedure(rootProcStack, proc); - break; - case LOCK_YIELD_WAIT: - case LOCK_EVENT_WAIT: - LOG.info(ProcedureMessages.LOCKSTATE_IS, proc, lockState); - break; - default: - throw new UnsupportedOperationException(); + try { + lockState = acquireLock(proc); + switch (lockState) { + case LOCK_ACQUIRED: + executeProcedure(rootProcStack, proc); + break; + case LOCK_YIELD_WAIT: + case LOCK_EVENT_WAIT: + LOG.info(ProcedureMessages.LOCKSTATE_IS, proc, lockState); + if (lockState == ProcedureLockState.LOCK_EVENT_WAIT) { + lockEventWaitProcedure = proc; + } + break; + default: + throw new UnsupportedOperationException(); + } + } finally { + rootProcStack.release(); } - rootProcStack.release(); if (proc.isSuccess()) { // update metrics on finishing the procedure @@ -397,9 +417,9 @@ private void executeProcedure(Procedure proc) { } finally { // Only after procedure has completed execution can it be allowed to be rescheduled to prevent // data races - if (Objects.equals(lockState, ProcedureLockState.LOCK_EVENT_WAIT)) { - LOG.info(ProcedureMessages.PROCEDUREID_WAIT_FOR_LOCK, proc.getProcId()); - ((ConfigNodeProcedureEnv) this.environment).getNodeLock().waitProcedure(proc); + if (lockEventWaitProcedure != null) { + LOG.info(ProcedureMessages.PROCEDUREID_WAIT_FOR_LOCK, lockEventWaitProcedure.getProcId()); + lockEventWaitProcedure.waitForLock(this.environment); } } } @@ -607,8 +627,8 @@ private void yieldProcedure(Procedure proc) { * @param procedureStack root procedure stack * @return lock state */ - private ProcedureLockState executeRootStackRollback( - Long rootProcId, RootProcedureStack procedureStack) { + private LockStateResult executeRootStackRollback( + Long rootProcId, RootProcedureStack procedureStack) { Procedure rootProcedure = procedures.get(rootProcId); ProcedureException exception = rootProcedure.getException(); if (exception == null) { @@ -629,7 +649,7 @@ private ProcedureLockState executeRootStackRollback( } ProcedureLockState lockState = acquireLock(procedure); if (lockState != ProcedureLockState.LOCK_ACQUIRED) { - return lockState; + return new LockStateResult<>(lockState, procedure); } lockState = executeRollback(procedure); releaseLock(procedure, false); @@ -637,11 +657,11 @@ private ProcedureLockState executeRootStackRollback( boolean abortRollback = lockState != ProcedureLockState.LOCK_ACQUIRED; abortRollback |= !isRunning() || !store.isRunning(); if (abortRollback) { - return lockState; + return new LockStateResult<>(lockState, procedure); } if (!procedure.isFinished() && procedure.isYieldAfterExecution(this.environment)) { - return ProcedureLockState.LOCK_YIELD_WAIT; + return new LockStateResult<>(ProcedureLockState.LOCK_YIELD_WAIT, procedure); } if (procedure != rootProcedure) { @@ -652,7 +672,7 @@ private ProcedureLockState executeRootStackRollback( LOG.info( ProcedureMessages.ROLLED_BACK_TIME_DURATION_IS, rootProcedure, rootProcedure.elapsedTime()); rootProcedureCleanup(rootProcedure); - return ProcedureLockState.LOCK_ACQUIRED; + return new LockStateResult<>(ProcedureLockState.LOCK_ACQUIRED, rootProcedure); } private ProcedureLockState acquireLock(Procedure proc) { @@ -787,21 +807,30 @@ public void run() { Thread.sleep(1000); continue; } - this.activeProcedure.set(procedure); - activeExecutorCount.incrementAndGet(); - startTime.set(System.currentTimeMillis()); - PROCEDURE_EXECUTION_CONTEXT.set(true); try { - executeProcedure(procedure); - } finally { - PROCEDURE_EXECUTION_CONTEXT.remove(); + this.activeProcedure.set(procedure); + activeExecutorCount.incrementAndGet(); + startTime.set(System.currentTimeMillis()); + try { + PROCEDURE_EXECUTION_CONTEXT.set(true); + try { + executeProcedure(procedure); + } finally { + PROCEDURE_EXECUTION_CONTEXT.remove(); + } + } finally { + activeExecutorCount.decrementAndGet(); + LOG.trace( + "Halt pid={}, activeCount={}", procedure.getProcId(), activeExecutorCount.get()); + this.activeProcedure.set(null); + lastUpdated = System.currentTimeMillis(); + startTime.set(lastUpdated); + } + } catch (Exception e) { + LOG.warn( + "Exception happened when worker {} execute procedure {}", getName(), procedure, e); + throw e; } - activeExecutorCount.decrementAndGet(); - LOG.trace( - "Halt pid={}, activeCount={}", procedure.getProcId(), activeExecutorCount.get()); - this.activeProcedure.set(null); - lastUpdated = System.currentTimeMillis(); - startTime.set(lastUpdated); } } catch (Exception e) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java index b141027917366..6cade537f1fbb 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java @@ -56,6 +56,18 @@ protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProced } } + @Override + protected void waitForLock(ConfigNodeProcedureEnv configNodeProcedureEnv) { + configNodeProcedureEnv.getSchedulerLock().lock(); + try { + configNodeProcedureEnv + .getNodeLock() + .waitProcedure(this, configNodeProcedureEnv.getScheduler()); + } finally { + configNodeProcedureEnv.getSchedulerLock().unlock(); + } + } + @Override protected void releaseLock(ConfigNodeProcedureEnv configNodeProcedureEnv) { configNodeProcedureEnv.getSchedulerLock().lock(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java index 832e339c0aede..9fc1948aa8e0c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java @@ -45,7 +45,11 @@ public boolean releaseLock(Procedure procedure) { return true; } - public void waitProcedure(Procedure procedure) { + public void waitProcedure(Procedure procedure, ProcedureScheduler procedureScheduler) { + if (lockOwnerProcedure == null) { + procedureScheduler.addFront(procedure); + return; + } deque.addLast(procedure); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/SimpleProcedureScheduler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/SimpleProcedureScheduler.java index 3cd5ceacf4b6d..94b6f3119308a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/SimpleProcedureScheduler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/SimpleProcedureScheduler.java @@ -22,6 +22,7 @@ import org.apache.iotdb.confignode.procedure.Procedure; import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.locks.ReentrantLock; /** Simple scheduler for procedures */ public class SimpleProcedureScheduler extends AbstractProcedureScheduler { @@ -48,6 +49,7 @@ public void clear() { schedLock(); try { runnables.clear(); + waitings.clear(); } finally { schedUnlock(); } @@ -68,12 +70,37 @@ public int queueSize() { return runnables.size(); } - public void addWaiting(Procedure proc) { - waitings.add(proc); + public void waitProcedure(Procedure proc, ReentrantLock lock) { + boolean signal = false; + schedLock(); + try { + if (lock.isLocked()) { + waitings.add(proc); + } else { + runnables.addFirst(proc); + signal = true; + } + } finally { + schedUnlock(); + } + if (signal) { + signalAll(); + } } - public void releaseWaiting() { - runnables.addAll(waitings); - waitings.clear(); + public void releaseWaiting(ReentrantLock lock) { + boolean signal; + schedLock(); + try { + lock.unlock(); + signal = !waitings.isEmpty(); + runnables.addAll(waitings); + waitings.clear(); + } finally { + schedUnlock(); + } + if (signal) { + signalAll(); + } } } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleLockProcedure.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleLockProcedure.java index ce9fea39d5589..42badd700799e 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleLockProcedure.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleLockProcedure.java @@ -53,18 +53,21 @@ protected ProcedureLockState acquireLock(TestProcEnv testProcEnv) { return ProcedureLockState.LOCK_ACQUIRED; } - SimpleProcedureScheduler scheduler = (SimpleProcedureScheduler) testProcEnv.getScheduler(); - scheduler.addWaiting(this); System.out.println(procName + " wait for lock."); return ProcedureLockState.LOCK_EVENT_WAIT; } + @Override + protected void waitForLock(TestProcEnv testProcEnv) { + SimpleProcedureScheduler scheduler = (SimpleProcedureScheduler) testProcEnv.getScheduler(); + scheduler.waitProcedure(this, testProcEnv.getEnvLock()); + } + @Override protected void releaseLock(TestProcEnv testProcEnv) { System.out.println(procName + " release lock."); - testProcEnv.getEnvLock().unlock(); SimpleProcedureScheduler scheduler = (SimpleProcedureScheduler) testProcEnv.getScheduler(); - scheduler.releaseWaiting(); + scheduler.releaseWaiting(testProcEnv.getEnvLock()); } @Override