Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@
package org.apache.cassandra.service.paxos.cleanup;

import java.util.Collection;
import java.util.Comparator;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Uninterruptibles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.DecoratedKey;
Expand Down Expand Up @@ -58,6 +61,37 @@ public class PaxosCleanupLocalCoordinator extends AsyncFuture<PaxosCleanupRespon
private static final Logger logger = LoggerFactory.getLogger(PaxosCleanupLocalCoordinator.class);
private static final UUID INTERNAL_SESSION = new UUID(0, 0);

private static class DelayedRepair
{
private final UncommittedPaxosKey uncommitted;
private final long scheduledAtNanos;

private static Comparator<DelayedRepair> PRIORITY_COMPARATOR = new Comparator<DelayedRepair>()
{
@Override
public int compare(DelayedRepair o1, DelayedRepair o2)
{
long delta = o1.scheduledAtNanos - o2.scheduledAtNanos;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Long.compare?

if (delta > 0)
return 1;
if (delta < 0)
return -1;
return 0;
}
};

public DelayedRepair(UncommittedPaxosKey uncommitted, long sleepMillis)
{
this.uncommitted = uncommitted;
this.scheduledAtNanos = Clock.Global.nanoTime() + MILLISECONDS.toNanos(sleepMillis);
}

public boolean isRunnable()
{
return Clock.Global.nanoTime() - scheduledAtNanos > 0;
}
}

private final UUID session;
private final TableId tableId;
private final TableMetadata table;
Expand All @@ -69,6 +103,7 @@ public class PaxosCleanupLocalCoordinator extends AsyncFuture<PaxosCleanupRespon
private final boolean autoRepair;

private final Map<DecoratedKey, AbstractPaxosRepair> inflight = new ConcurrentHashMap<>();
private final Queue<DelayedRepair> delayed = new PriorityQueue<>(DelayedRepair.PRIORITY_COMPARATOR);
private final PaxosTableRepairs tableRepairs;

private PaxosCleanupLocalCoordinator(SharedContext ctx, UUID session, TableId tableId, Collection<Range<Token>> ranges, CloseableIterator<UncommittedPaxosKey> uncommittedIter, boolean autoRepair)
Expand Down Expand Up @@ -125,11 +160,40 @@ public static PaxosCleanupLocalCoordinator createForAutoRepair(SharedContext ctx
return new PaxosCleanupLocalCoordinator(ctx, INTERNAL_SESSION, tableId, ranges, iterator, true);
}

/**
* Wait to repair things that are still potentially executing at the original coordinator to avoid
* causing timeouts. This should only have to happen at most a few times when the repair starts
*/
private boolean maybeDelay(UncommittedPaxosKey uncommitted)
{
if (!DatabaseDescriptor.getPaxosRepairRaceWait())
return false;


long txnTimeoutMillis = Math.max(getCasContentionTimeout(MILLISECONDS), getWriteRpcTimeout(MILLISECONDS));
long nowMillis = Clock.Global.currentTimeMillis();
long ballotElapsedMillis = nowMillis - MICROSECONDS.toMillis(uncommitted.ballot().unixMicros());

if (ballotElapsedMillis < 0 && Math.abs(ballotElapsedMillis) > SECONDS.toMillis(1))
logger.warn("Encountered ballot that is more than 1 second in the future, is there a clock sync issue? {}", uncommitted.ballot());

if (ballotElapsedMillis >= txnTimeoutMillis)
return false;

long sleepMillis = txnTimeoutMillis - ballotElapsedMillis;
logger.info("Paxos auto repair encountered a potentially in progress ballot, sleeping {}ms to allow the in flight operation to finish", sleepMillis);

delayed.add(new DelayedRepair(uncommitted, sleepMillis));
ScheduledExecutors.scheduledFastTasks.schedule(this::scheduleKeyRepairsOrFinish, sleepMillis, MILLISECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to synchronise scheduleKeyRepairsOrFinish? Or wrap it in a synchronise call inside the lambda. Currently it inherits its safety from its callers


return true;
}

/**
* Schedule as many key repairs as we can, up to the paralellism limit. If no repairs are scheduled and
* none are in flight when the iterator is exhausted, the session will be finished
*/
private void scheduleKeyRepairsOrFinish()
private synchronized void scheduleKeyRepairsOrFinish()
{
int parallelism = DatabaseDescriptor.getPaxosRepairParallelism();
Preconditions.checkArgument(parallelism > 0);
Expand All @@ -141,18 +205,33 @@ private void scheduleKeyRepairsOrFinish()
return;
}

long txnTimeoutMicros = Math.max(getCasContentionTimeout(MICROSECONDS), getWriteRpcTimeout(MICROSECONDS));
boolean waitForCoordinator = DatabaseDescriptor.getPaxosRepairRaceWait();
while (inflight.size() < parallelism && uncommittedIter.hasNext())
repairKey(uncommittedIter.next(), txnTimeoutMicros, waitForCoordinator);

while (inflight.size() < parallelism && !isDone())
{
if (!delayed.isEmpty() && delayed.peek().isRunnable())
{
DelayedRepair delayedRepair = delayed.remove();
repairKey(delayedRepair.uncommitted);
}
else if (uncommittedIter.hasNext())
{
UncommittedPaxosKey uncommitted = uncommittedIter.next();
if (!maybeDelay(uncommitted))
{
repairKey(uncommitted);
}
}
else
{
break;
}
}
}

if (inflight.isEmpty())
if (inflight.isEmpty() && delayed.isEmpty())
finish();
}

private boolean repairKey(UncommittedPaxosKey uncommitted, long txnTimeoutMicros, boolean waitForCoordinator)
private boolean repairKey(UncommittedPaxosKey uncommitted)
{
logger.trace("repairing {}", uncommitted);
Preconditions.checkState(!inflight.containsKey(uncommitted.getKey()));
Expand All @@ -163,9 +242,6 @@ private boolean repairKey(UncommittedPaxosKey uncommitted, long txnTimeoutMicros
if (consistency == null)
return false;

if (waitForCoordinator)
maybeWaitForOriginalCoordinator(uncommitted, txnTimeoutMicros);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also delete maybeWaitForOriginalCoordinator?


inflight.put(uncommitted.getKey(), tableRepairs.startOrGetOrQueue(uncommitted.getKey(), uncommitted.ballot(), uncommitted.getConsistencyLevel(), table, result -> {
if (result.wasSuccessful())
onKeyFinish(uncommitted.getKey());
Expand All @@ -175,24 +251,6 @@ private boolean repairKey(UncommittedPaxosKey uncommitted, long txnTimeoutMicros
return true;
}

/**
* Wait to repair things that are still potentially executing at the original coordinator to avoid
* causing timeouts. This should only have to happen at most a few times when the repair starts
*/
private static void maybeWaitForOriginalCoordinator(UncommittedPaxosKey uncommitted, long txnTimeoutMicros)
{
long nowMicros = MILLISECONDS.toMicros(Clock.Global.currentTimeMillis());
long ballotElapsedMicros = nowMicros - uncommitted.ballot().unixMicros();
if (ballotElapsedMicros < 0 && Math.abs(ballotElapsedMicros) > SECONDS.toMicros(1))
logger.warn("Encountered ballot that is more than 1 second in the future, is there a clock sync issue? {}", uncommitted.ballot());
if (ballotElapsedMicros < txnTimeoutMicros)
{
long sleepMicros = txnTimeoutMicros - ballotElapsedMicros;
logger.info("Paxos auto repair encountered a potentially in progress ballot, sleeping {}us to allow the in flight operation to finish", sleepMicros);
Uninterruptibles.sleepUninterruptibly(sleepMicros, MICROSECONDS);
}
}

private synchronized void onKeyFinish(DecoratedKey key)
{
if (!inflight.containsKey(key))
Expand Down