Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,16 @@ protected void releaseLock(Env env) {
// no op
}

/**
* Called after an execution attempt returns {@link ProcedureLockState#LOCK_EVENT_WAIT}. Override
* it to put the procedure into the corresponding lock wait queue.
*
* @param env env
*/
protected void waitForLock(Env env) {
// no op
}

/**
* Used to keep procedure lock even when the procedure is yielded or suspended.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.iotdb.commons.utils.RetryUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.i18n.ProcedureMessages;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
import org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler;
Expand All @@ -42,7 +41,6 @@
import java.util.Deque;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
Expand Down Expand Up @@ -86,6 +84,16 @@ public class ProcedureExecutor<Env> {
private final Env environment;
private final IProcedureStore<Env> store;

private static final class LockStateResult<Env> {
private final ProcedureLockState lockState;
private final Procedure<Env> procedure;

private LockStateResult(ProcedureLockState lockState, Procedure<Env> procedure) {
this.lockState = lockState;
this.procedure = procedure;
}
}

public ProcedureExecutor(
final Env environment, final IProcedureStore<Env> store, final ProcedureScheduler scheduler) {
this.environment = environment;
Expand Down Expand Up @@ -329,33 +337,39 @@ private void executeProcedure(Procedure<Env> proc) {
return;
}
ProcedureLockState lockState = null;
Procedure<Env> lockEventWaitProcedure = null;
try {
do {
if (!rootProcStack.acquire()) {
if (rootProcStack.setRollback()) {
lockState = executeRootStackRollback(rootProcId, rootProcStack);
LockStateResult<Env> lockStateResult =
executeRootStackRollback(rootProcId, rootProcStack);
lockState = lockStateResult.lockState;
switch (lockState) {
case LOCK_ACQUIRED:
break;
case LOCK_EVENT_WAIT:
LOG.info(ProcedureMessages.LOCK_EVENT_WAIT_ROLLBACK, proc);
LOG.info(ProcedureMessages.LOCK_EVENT_WAIT_ROLLBACK, lockStateResult.procedure);
rootProcStack.unsetRollback();
lockEventWaitProcedure = lockStateResult.procedure;
break;
case LOCK_YIELD_WAIT:
rootProcStack.unsetRollback();
scheduler.yield(proc);
scheduler.yield(lockStateResult.procedure);
break;
default:
throw new UnsupportedOperationException();
}
} else {
if (!proc.wasExecuted()) {
switch (executeRollback(proc)) {
lockState = executeRollback(proc);
switch (lockState) {
case LOCK_ACQUIRED:
break;
case LOCK_EVENT_WAIT:
LOG.info(
ProcedureMessages.LOCK_EVENT_WAIT_CAN_T_ROLLBACK_CHILD_RUNNING_FOR, proc);
lockEventWaitProcedure = proc;
break;
case LOCK_YIELD_WAIT:
scheduler.yield(proc);
Expand All @@ -367,19 +381,25 @@ private void executeProcedure(Procedure<Env> proc) {
}
break;
}
lockState = acquireLock(proc);
switch (lockState) {
case LOCK_ACQUIRED:
executeProcedure(rootProcStack, proc);
break;
case LOCK_YIELD_WAIT:
case LOCK_EVENT_WAIT:
LOG.info(ProcedureMessages.LOCKSTATE_IS, proc, lockState);
break;
default:
throw new UnsupportedOperationException();
try {
lockState = acquireLock(proc);
switch (lockState) {
case LOCK_ACQUIRED:
executeProcedure(rootProcStack, proc);
break;
case LOCK_YIELD_WAIT:
case LOCK_EVENT_WAIT:
LOG.info(ProcedureMessages.LOCKSTATE_IS, proc, lockState);
if (lockState == ProcedureLockState.LOCK_EVENT_WAIT) {
lockEventWaitProcedure = proc;
}
break;
default:
throw new UnsupportedOperationException();
}
} finally {
rootProcStack.release();
}
rootProcStack.release();

if (proc.isSuccess()) {
// update metrics on finishing the procedure
Expand All @@ -397,9 +417,9 @@ private void executeProcedure(Procedure<Env> proc) {
} finally {
// Only after procedure has completed execution can it be allowed to be rescheduled to prevent
// data races
if (Objects.equals(lockState, ProcedureLockState.LOCK_EVENT_WAIT)) {
LOG.info(ProcedureMessages.PROCEDUREID_WAIT_FOR_LOCK, proc.getProcId());
((ConfigNodeProcedureEnv) this.environment).getNodeLock().waitProcedure(proc);
if (lockEventWaitProcedure != null) {
LOG.info(ProcedureMessages.PROCEDUREID_WAIT_FOR_LOCK, lockEventWaitProcedure.getProcId());
lockEventWaitProcedure.waitForLock(this.environment);
}
}
}
Expand Down Expand Up @@ -607,8 +627,8 @@ private void yieldProcedure(Procedure<Env> proc) {
* @param procedureStack root procedure stack
* @return lock state
*/
private ProcedureLockState executeRootStackRollback(
Long rootProcId, RootProcedureStack procedureStack) {
private LockStateResult<Env> executeRootStackRollback(
Long rootProcId, RootProcedureStack<Env> procedureStack) {
Procedure<Env> rootProcedure = procedures.get(rootProcId);
ProcedureException exception = rootProcedure.getException();
if (exception == null) {
Expand All @@ -629,19 +649,19 @@ private ProcedureLockState executeRootStackRollback(
}
ProcedureLockState lockState = acquireLock(procedure);
if (lockState != ProcedureLockState.LOCK_ACQUIRED) {
return lockState;
return new LockStateResult<>(lockState, procedure);
}
lockState = executeRollback(procedure);
releaseLock(procedure, false);

boolean abortRollback = lockState != ProcedureLockState.LOCK_ACQUIRED;
abortRollback |= !isRunning() || !store.isRunning();
if (abortRollback) {
return lockState;
return new LockStateResult<>(lockState, procedure);
}

if (!procedure.isFinished() && procedure.isYieldAfterExecution(this.environment)) {
return ProcedureLockState.LOCK_YIELD_WAIT;
return new LockStateResult<>(ProcedureLockState.LOCK_YIELD_WAIT, procedure);
}

if (procedure != rootProcedure) {
Expand All @@ -652,7 +672,7 @@ private ProcedureLockState executeRootStackRollback(
LOG.info(
ProcedureMessages.ROLLED_BACK_TIME_DURATION_IS, rootProcedure, rootProcedure.elapsedTime());
rootProcedureCleanup(rootProcedure);
return ProcedureLockState.LOCK_ACQUIRED;
return new LockStateResult<>(ProcedureLockState.LOCK_ACQUIRED, rootProcedure);
}

private ProcedureLockState acquireLock(Procedure<Env> proc) {
Expand Down Expand Up @@ -787,21 +807,30 @@ public void run() {
Thread.sleep(1000);
continue;
}
this.activeProcedure.set(procedure);
activeExecutorCount.incrementAndGet();
startTime.set(System.currentTimeMillis());
PROCEDURE_EXECUTION_CONTEXT.set(true);
try {
executeProcedure(procedure);
} finally {
PROCEDURE_EXECUTION_CONTEXT.remove();
this.activeProcedure.set(procedure);
activeExecutorCount.incrementAndGet();
startTime.set(System.currentTimeMillis());
try {
PROCEDURE_EXECUTION_CONTEXT.set(true);
try {
executeProcedure(procedure);
} finally {
PROCEDURE_EXECUTION_CONTEXT.remove();
}
} finally {
activeExecutorCount.decrementAndGet();
LOG.trace(
"Halt pid={}, activeCount={}", procedure.getProcId(), activeExecutorCount.get());
this.activeProcedure.set(null);
lastUpdated = System.currentTimeMillis();
startTime.set(lastUpdated);
}
} catch (Exception e) {
LOG.warn(
"Exception happened when worker {} execute procedure {}", getName(), procedure, e);
throw e;
}
activeExecutorCount.decrementAndGet();
LOG.trace(
"Halt pid={}, activeCount={}", procedure.getProcId(), activeExecutorCount.get());
this.activeProcedure.set(null);
lastUpdated = System.currentTimeMillis();
startTime.set(lastUpdated);
}

} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,18 @@ protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProced
}
}

@Override
protected void waitForLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
configNodeProcedureEnv.getSchedulerLock().lock();
try {
configNodeProcedureEnv
.getNodeLock()
.waitProcedure(this, configNodeProcedureEnv.getScheduler());
} finally {
configNodeProcedureEnv.getSchedulerLock().unlock();
}
}

@Override
protected void releaseLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
configNodeProcedureEnv.getSchedulerLock().lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ public boolean releaseLock(Procedure<?> procedure) {
return true;
}

public void waitProcedure(Procedure<?> procedure) {
public void waitProcedure(Procedure<?> procedure, ProcedureScheduler procedureScheduler) {
if (lockOwnerProcedure == null) {
procedureScheduler.addFront(procedure);
return;
}
deque.addLast(procedure);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iotdb.confignode.procedure.Procedure;

import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.locks.ReentrantLock;

/** Simple scheduler for procedures */
public class SimpleProcedureScheduler extends AbstractProcedureScheduler {
Expand All @@ -48,6 +49,7 @@ public void clear() {
schedLock();
try {
runnables.clear();
waitings.clear();
} finally {
schedUnlock();
}
Expand All @@ -68,12 +70,37 @@ public int queueSize() {
return runnables.size();
}

public void addWaiting(Procedure proc) {
waitings.add(proc);
public void waitProcedure(Procedure proc, ReentrantLock lock) {
boolean signal = false;
schedLock();
try {
if (lock.isLocked()) {
waitings.add(proc);
} else {
runnables.addFirst(proc);
signal = true;
}
} finally {
schedUnlock();
}
if (signal) {
signalAll();
}
}

public void releaseWaiting() {
runnables.addAll(waitings);
waitings.clear();
public void releaseWaiting(ReentrantLock lock) {
boolean signal;
schedLock();
try {
lock.unlock();
signal = !waitings.isEmpty();
runnables.addAll(waitings);
waitings.clear();
} finally {
schedUnlock();
}
if (signal) {
signalAll();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,21 @@ protected ProcedureLockState acquireLock(TestProcEnv testProcEnv) {

return ProcedureLockState.LOCK_ACQUIRED;
}
SimpleProcedureScheduler scheduler = (SimpleProcedureScheduler) testProcEnv.getScheduler();
scheduler.addWaiting(this);
System.out.println(procName + " wait for lock.");
return ProcedureLockState.LOCK_EVENT_WAIT;
}

@Override
protected void waitForLock(TestProcEnv testProcEnv) {
SimpleProcedureScheduler scheduler = (SimpleProcedureScheduler) testProcEnv.getScheduler();
scheduler.waitProcedure(this, testProcEnv.getEnvLock());
}

@Override
protected void releaseLock(TestProcEnv testProcEnv) {
System.out.println(procName + " release lock.");
testProcEnv.getEnvLock().unlock();
SimpleProcedureScheduler scheduler = (SimpleProcedureScheduler) testProcEnv.getScheduler();
scheduler.releaseWaiting();
scheduler.releaseWaiting(testProcEnv.getEnvLock());
}

@Override
Expand Down
Loading