diff --git a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupLocalCoordinator.java b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupLocalCoordinator.java index 7e5935f03d4..62124a5a20c 100644 --- a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupLocalCoordinator.java +++ b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupLocalCoordinator.java @@ -19,15 +19,18 @@ package org.apache.cassandra.service.paxos.cleanup; import java.util.Collection; +import java.util.Comparator; import java.util.Map; +import java.util.PriorityQueue; +import java.util.Queue; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.Uninterruptibles; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.DecoratedKey; @@ -58,6 +61,37 @@ public class PaxosCleanupLocalCoordinator extends AsyncFuture PRIORITY_COMPARATOR = new Comparator() + { + @Override + public int compare(DelayedRepair o1, DelayedRepair o2) + { + long delta = o1.scheduledAtNanos - o2.scheduledAtNanos; + if (delta > 0) + return 1; + if (delta < 0) + return -1; + return 0; + } + }; + + public DelayedRepair(UncommittedPaxosKey uncommitted, long sleepMillis) + { + this.uncommitted = uncommitted; + this.scheduledAtNanos = Clock.Global.nanoTime() + MILLISECONDS.toNanos(sleepMillis); + } + + public boolean isRunnable() + { + return Clock.Global.nanoTime() - scheduledAtNanos > 0; + } + } + private final UUID session; private final TableId tableId; private final TableMetadata table; @@ -69,6 +103,7 @@ public class PaxosCleanupLocalCoordinator extends AsyncFuture inflight = new ConcurrentHashMap<>(); + private final Queue delayed = new PriorityQueue<>(DelayedRepair.PRIORITY_COMPARATOR); private final PaxosTableRepairs tableRepairs; private PaxosCleanupLocalCoordinator(SharedContext ctx, UUID session, TableId tableId, Collection> ranges, CloseableIterator uncommittedIter, boolean autoRepair) @@ -125,11 +160,40 @@ public static PaxosCleanupLocalCoordinator createForAutoRepair(SharedContext ctx return new PaxosCleanupLocalCoordinator(ctx, INTERNAL_SESSION, tableId, ranges, iterator, true); } + /** + * Wait to repair things that are still potentially executing at the original coordinator to avoid + * causing timeouts. This should only have to happen at most a few times when the repair starts + */ + private boolean maybeDelay(UncommittedPaxosKey uncommitted) + { + if (!DatabaseDescriptor.getPaxosRepairRaceWait()) + return false; + + + long txnTimeoutMillis = Math.max(getCasContentionTimeout(MILLISECONDS), getWriteRpcTimeout(MILLISECONDS)); + long nowMillis = Clock.Global.currentTimeMillis(); + long ballotElapsedMillis = nowMillis - MICROSECONDS.toMillis(uncommitted.ballot().unixMicros()); + + if (ballotElapsedMillis < 0 && Math.abs(ballotElapsedMillis) > SECONDS.toMillis(1)) + logger.warn("Encountered ballot that is more than 1 second in the future, is there a clock sync issue? {}", uncommitted.ballot()); + + if (ballotElapsedMillis >= txnTimeoutMillis) + return false; + + long sleepMillis = txnTimeoutMillis - ballotElapsedMillis; + logger.info("Paxos auto repair encountered a potentially in progress ballot, sleeping {}ms to allow the in flight operation to finish", sleepMillis); + + delayed.add(new DelayedRepair(uncommitted, sleepMillis)); + ScheduledExecutors.scheduledFastTasks.schedule(this::scheduleKeyRepairsOrFinish, sleepMillis, MILLISECONDS); + + return true; + } + /** * Schedule as many key repairs as we can, up to the paralellism limit. If no repairs are scheduled and * none are in flight when the iterator is exhausted, the session will be finished */ - private void scheduleKeyRepairsOrFinish() + private synchronized void scheduleKeyRepairsOrFinish() { int parallelism = DatabaseDescriptor.getPaxosRepairParallelism(); Preconditions.checkArgument(parallelism > 0); @@ -141,18 +205,33 @@ private void scheduleKeyRepairsOrFinish() return; } - long txnTimeoutMicros = Math.max(getCasContentionTimeout(MICROSECONDS), getWriteRpcTimeout(MICROSECONDS)); - boolean waitForCoordinator = DatabaseDescriptor.getPaxosRepairRaceWait(); - while (inflight.size() < parallelism && uncommittedIter.hasNext()) - repairKey(uncommittedIter.next(), txnTimeoutMicros, waitForCoordinator); - + while (inflight.size() < parallelism && !isDone()) + { + if (!delayed.isEmpty() && delayed.peek().isRunnable()) + { + DelayedRepair delayedRepair = delayed.remove(); + repairKey(delayedRepair.uncommitted); + } + else if (uncommittedIter.hasNext()) + { + UncommittedPaxosKey uncommitted = uncommittedIter.next(); + if (!maybeDelay(uncommitted)) + { + repairKey(uncommitted); + } + } + else + { + break; + } + } } - if (inflight.isEmpty()) + if (inflight.isEmpty() && delayed.isEmpty()) finish(); } - private boolean repairKey(UncommittedPaxosKey uncommitted, long txnTimeoutMicros, boolean waitForCoordinator) + private boolean repairKey(UncommittedPaxosKey uncommitted) { logger.trace("repairing {}", uncommitted); Preconditions.checkState(!inflight.containsKey(uncommitted.getKey())); @@ -163,9 +242,6 @@ private boolean repairKey(UncommittedPaxosKey uncommitted, long txnTimeoutMicros if (consistency == null) return false; - if (waitForCoordinator) - maybeWaitForOriginalCoordinator(uncommitted, txnTimeoutMicros); - inflight.put(uncommitted.getKey(), tableRepairs.startOrGetOrQueue(uncommitted.getKey(), uncommitted.ballot(), uncommitted.getConsistencyLevel(), table, result -> { if (result.wasSuccessful()) onKeyFinish(uncommitted.getKey()); @@ -175,24 +251,6 @@ private boolean repairKey(UncommittedPaxosKey uncommitted, long txnTimeoutMicros return true; } - /** - * Wait to repair things that are still potentially executing at the original coordinator to avoid - * causing timeouts. This should only have to happen at most a few times when the repair starts - */ - private static void maybeWaitForOriginalCoordinator(UncommittedPaxosKey uncommitted, long txnTimeoutMicros) - { - long nowMicros = MILLISECONDS.toMicros(Clock.Global.currentTimeMillis()); - long ballotElapsedMicros = nowMicros - uncommitted.ballot().unixMicros(); - if (ballotElapsedMicros < 0 && Math.abs(ballotElapsedMicros) > SECONDS.toMicros(1)) - logger.warn("Encountered ballot that is more than 1 second in the future, is there a clock sync issue? {}", uncommitted.ballot()); - if (ballotElapsedMicros < txnTimeoutMicros) - { - long sleepMicros = txnTimeoutMicros - ballotElapsedMicros; - logger.info("Paxos auto repair encountered a potentially in progress ballot, sleeping {}us to allow the in flight operation to finish", sleepMicros); - Uninterruptibles.sleepUninterruptibly(sleepMicros, MICROSECONDS); - } - } - private synchronized void onKeyFinish(DecoratedKey key) { if (!inflight.containsKey(key))