Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@

import java.util.Collection;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Uninterruptibles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.DecoratedKey;
Expand Down Expand Up @@ -58,6 +61,18 @@ public class PaxosCleanupLocalCoordinator extends AsyncFuture<PaxosCleanupRespon
private static final Logger logger = LoggerFactory.getLogger(PaxosCleanupLocalCoordinator.class);
private static final UUID INTERNAL_SESSION = new UUID(0, 0);

private static class DelayedRepair
{
private final UncommittedPaxosKey uncommitted;
private final long startAfterMillis;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assumed this was a time delta when I read it (i.e. after millis (have elapsed)). Perhaps atMillis or startAtMillis, or scheduledAtMillis?

But also, this should probably be nanos and we should probably use nanoTime?


public DelayedRepair(UncommittedPaxosKey uncommitted, long startAfterMillis)
{
this.uncommitted = uncommitted;
this.startAfterMillis = startAfterMillis;
}
}

private final UUID session;
private final TableId tableId;
private final TableMetadata table;
Expand All @@ -69,6 +84,7 @@ public class PaxosCleanupLocalCoordinator extends AsyncFuture<PaxosCleanupRespon
private final boolean autoRepair;

private final Map<DecoratedKey, AbstractPaxosRepair> inflight = new ConcurrentHashMap<>();
private final Queue<DelayedRepair> delayed = new LinkedBlockingQueue<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given usage, can't we just use CLQ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean concurrent linked queue? delayed is only accessed inside synchronized blocks so I don't think we'd gain anything by using it

Copy link
Contributor

@belliottsmith belliottsmith Oct 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, totally fine with swapping it for ArrayDeque instead (or even LinkedList). Was just trying to downgrade LinkedBlockingQueue, as probably too heavy weight.

private final PaxosTableRepairs tableRepairs;

private PaxosCleanupLocalCoordinator(SharedContext ctx, UUID session, TableId tableId, Collection<Range<Token>> ranges, CloseableIterator<UncommittedPaxosKey> uncommittedIter, boolean autoRepair)
Expand Down Expand Up @@ -125,6 +141,31 @@ public static PaxosCleanupLocalCoordinator createForAutoRepair(SharedContext ctx
return new PaxosCleanupLocalCoordinator(ctx, INTERNAL_SESSION, tableId, ranges, iterator, true);
}

private boolean maybeDelay(UncommittedPaxosKey uncommitted)
{
if (!DatabaseDescriptor.getPaxosRepairRaceWait())
return false;


long txnTimeoutMillis = Math.max(getCasContentionTimeout(MILLISECONDS), getWriteRpcTimeout(MILLISECONDS));
long nowMillis = Clock.Global.currentTimeMillis();
long ballotElapsedMillis = nowMillis - MICROSECONDS.toMillis(uncommitted.ballot().unixMicros());

if (ballotElapsedMillis < 0 && Math.abs(ballotElapsedMillis) > SECONDS.toMillis(1))
logger.warn("Encountered ballot that is more than 1 second in the future, is there a clock sync issue? {}", uncommitted.ballot());

if (ballotElapsedMillis >= txnTimeoutMillis)
return false;

long sleepMillis = txnTimeoutMillis - ballotElapsedMillis;
logger.info("Paxos auto repair encountered a potentially in progress ballot, sleeping {}ms to allow the in flight operation to finish", sleepMillis);

delayed.add(new DelayedRepair(uncommitted, nowMillis + sleepMillis));
ScheduledExecutors.scheduledFastTasks.schedule(this::scheduleKeyRepairsOrFinish, sleepMillis, MILLISECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to synchronise scheduleKeyRepairsOrFinish? Or wrap it in a synchronise call inside the lambda. Currently it inherits its safety from its callers


return true;
}

/**
* Schedule as many key repairs as we can, up to the paralellism limit. If no repairs are scheduled and
* none are in flight when the iterator is exhausted, the session will be finished
Expand All @@ -141,18 +182,33 @@ private void scheduleKeyRepairsOrFinish()
return;
}

long txnTimeoutMicros = Math.max(getCasContentionTimeout(MICROSECONDS), getWriteRpcTimeout(MICROSECONDS));
boolean waitForCoordinator = DatabaseDescriptor.getPaxosRepairRaceWait();
while (inflight.size() < parallelism && uncommittedIter.hasNext())
repairKey(uncommittedIter.next(), txnTimeoutMicros, waitForCoordinator);

while (inflight.size() < parallelism && !isDone())
{
if (!delayed.isEmpty() && delayed.peek().startAfterMillis < Clock.Global.currentTimeMillis())
{
DelayedRepair delayedRepair = delayed.remove();
repairKey(delayedRepair.uncommitted);
}
else if (uncommittedIter.hasNext())
{
UncommittedPaxosKey uncommitted = uncommittedIter.next();
if (!maybeDelay(uncommitted))
{
repairKey(uncommitted);
}
}
else
{
break;
}
}
}

if (inflight.isEmpty())
finish();
}

private boolean repairKey(UncommittedPaxosKey uncommitted, long txnTimeoutMicros, boolean waitForCoordinator)
private boolean repairKey(UncommittedPaxosKey uncommitted)
{
logger.trace("repairing {}", uncommitted);
Preconditions.checkState(!inflight.containsKey(uncommitted.getKey()));
Expand All @@ -163,9 +219,6 @@ private boolean repairKey(UncommittedPaxosKey uncommitted, long txnTimeoutMicros
if (consistency == null)
return false;

if (waitForCoordinator)
maybeWaitForOriginalCoordinator(uncommitted, txnTimeoutMicros);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also delete maybeWaitForOriginalCoordinator?


inflight.put(uncommitted.getKey(), tableRepairs.startOrGetOrQueue(uncommitted.getKey(), uncommitted.ballot(), uncommitted.getConsistencyLevel(), table, result -> {
if (result.wasSuccessful())
onKeyFinish(uncommitted.getKey());
Expand Down