Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.index.SecondaryIndexBuilder;
import org.apache.cassandra.index.sai.StorageAttachedIndexGroup;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.IScrubber;
Expand All @@ -110,6 +111,7 @@
import org.apache.cassandra.repair.NoSuchRepairSessionException;
import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageService;
Expand Down Expand Up @@ -669,6 +671,15 @@ public void execute(LifecycleTransaction input)
public AllSSTableOpStatus performVerify(ColumnFamilyStore cfs, IVerifier.Options options) throws InterruptedException, ExecutionException
{
assert !cfs.isIndex();
StorageAttachedIndexGroup indexGroup = StorageAttachedIndexGroup.getIndexGroup(cfs);
boolean skipSaiCheck = indexGroup == null || SchemaConstants.isSystemKeyspace(cfs.getKeyspaceName());
if (options.onlySai && skipSaiCheck)
{
logger.info("Skipping table {} during SAI-only veriy becasue system keyspace or no SAI index.", cfs.getTableName());
return AllSSTableOpStatus.SUCCESSFUL;

}

return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
{
@Override
Expand Down Expand Up @@ -1493,12 +1504,30 @@ void scrubOne(ColumnFamilyStore cfs, LifecycleTransaction modifier, IScrubber.Op
@VisibleForTesting
void verifyOne(ColumnFamilyStore cfs, SSTableReader sstable, IVerifier.Options options, ActiveCompactionsTracker activeCompactions)
{

StorageAttachedIndexGroup indexGroup = StorageAttachedIndexGroup.getIndexGroup(cfs);
boolean skipSaiCheck = indexGroup == null || SchemaConstants.isSystemKeyspace(cfs.getKeyspaceName());

// If this table shouldn’t be verified, we skip early
if (options.onlySai && skipSaiCheck)
{
logger.info("Skipping SAI validation for table {} (system keyspace or no SAI index).", cfs.getTableName());
return;
}

CompactionInfo.Holder verifyInfo = null;
try (IVerifier verifier = sstable.getVerifier(cfs, new OutputHandler.LogOutput(), false, options))
{
verifyInfo = verifier.getVerifyInfo();
activeCompactions.beginCompaction(verifyInfo);
verifier.verify();

if (!options.onlySai)
verifier.verify();

if (!skipSaiCheck)
cfs.indexManager.validateSSTableAttachedIndexes(Collections.singleton(sstable), true, true);
else
logger.info("Skipping SAI validation for table {} (system keyspace or no SAI index).", cfs.getTableName());
}
finally
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,11 @@ public boolean validatePerIndexComponents(IndexTermType indexTermType, IndexIden
}
}

public void verify(IndexValidation validation, boolean validateChecksum, boolean rethrow)
{
validatePerSSTableComponents(validation, validateChecksum, rethrow);
}

@SuppressWarnings("BooleanMethodIsAlwaysInverted")
public boolean validatePerSSTableComponents(IndexValidation validation, boolean validateChecksum, boolean rethrow)
{
Expand Down
17 changes: 16 additions & 1 deletion src/java/org/apache/cassandra/io/sstable/IVerifier.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ class Options
*/
public final boolean quick;

/**
* To verify only SAI checksum
*/
public final boolean onlySai;

public final Function<String, ? extends Collection<Range<Token>>> tokenLookup;

private Options(boolean invokeDiskFailurePolicy,
Expand All @@ -68,6 +73,7 @@ private Options(boolean invokeDiskFailurePolicy,
boolean mutateRepairStatus,
boolean checkOwnsTokens,
boolean quick,
boolean onlySai,
Function<String, ? extends Collection<Range<Token>>> tokenLookup)
{
this.invokeDiskFailurePolicy = invokeDiskFailurePolicy;
Expand All @@ -76,6 +82,7 @@ private Options(boolean invokeDiskFailurePolicy,
this.mutateRepairStatus = mutateRepairStatus;
this.checkOwnsTokens = checkOwnsTokens;
this.quick = quick;
this.onlySai = onlySai;
this.tokenLookup = tokenLookup;
}

Expand All @@ -89,6 +96,7 @@ public String toString()
", mutateRepairStatus=" + mutateRepairStatus +
", checkOwnsTokens=" + checkOwnsTokens +
", quick=" + quick +
", onlySai=" + onlySai +
'}';
}

Expand All @@ -100,6 +108,7 @@ public static class Builder
private boolean mutateRepairStatus = false; // mutating repair status can be dangerous
private boolean checkOwnsTokens = false;
private boolean quick = false;
private boolean onlySai = false;
private Function<String, ? extends Collection<Range<Token>>> tokenLookup = StorageService.instance::getLocalAndPendingRanges;

public Builder invokeDiskFailurePolicy(boolean param)
Expand Down Expand Up @@ -138,6 +147,12 @@ public Builder quick(boolean param)
return this;
}

public Builder onlySai(boolean param)
{
this.onlySai = param;
return this;
}

public Builder tokenLookup(Function<String, ? extends Collection<Range<Token>>> tokenLookup)
{
this.tokenLookup = tokenLookup;
Expand All @@ -146,7 +161,7 @@ public Builder tokenLookup(Function<String, ? extends Collection<Range<Token>>>

public Options build()
{
return new Options(invokeDiskFailurePolicy, extendedVerification, checkVersion, mutateRepairStatus, checkOwnsTokens, quick, tokenLookup);
return new Options(invokeDiskFailurePolicy, extendedVerification, checkVersion, mutateRepairStatus, checkOwnsTokens, quick, onlySai, tokenLookup);
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions src/java/org/apache/cassandra/service/StorageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -2684,18 +2684,19 @@ public int scrub(boolean disableSnapshot, IScrubber.Options options, int jobs, S
@Deprecated(since = "4.0")
public int verify(boolean extendedVerify, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException
{
return verify(extendedVerify, false, false, false, false, false, keyspaceName, tableNames);
return verify(extendedVerify, false, false, false, false, false, false, keyspaceName, tableNames);
}

public int verify(boolean extendedVerify, boolean checkVersion, boolean diskFailurePolicy, boolean mutateRepairStatus, boolean checkOwnsTokens, boolean quick, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException
public int verify(boolean extendedVerify, boolean checkVersion, boolean diskFailurePolicy, boolean mutateRepairStatus, boolean checkOwnsTokens, boolean quick, boolean onlySai, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException
{
CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL;
IVerifier.Options options = IVerifier.options().invokeDiskFailurePolicy(diskFailurePolicy)
.extendedVerification(extendedVerify)
.checkVersion(checkVersion)
.mutateRepairStatus(mutateRepairStatus)
.checkOwnsTokens(checkOwnsTokens)
.quick(quick).build();
.quick(quick)
.onlySai(onlySai).build();
logger.info("Staring {} on {}.{} with options = {}", OperationType.VERIFY, keyspaceName, Arrays.toString(tableNames), options);
for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, keyspaceName, tableNames))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ default int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkD
* The entire sstable will be read to ensure each cell validates if extendedVerify is true
*/
public int verify(boolean extendedVerify, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException;
public int verify(boolean extendedVerify, boolean checkVersion, boolean diskFailurePolicy, boolean mutateRepairStatus, boolean checkOwnsTokens, boolean quick, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException;
public int verify(boolean extendedVerify, boolean checkVersion, boolean diskFailurePolicy, boolean mutateRepairStatus, boolean checkOwnsTokens, boolean quick, boolean onlySai, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException;

/**
* Rewrite all sstables to the latest version.
Expand Down
8 changes: 4 additions & 4 deletions src/java/org/apache/cassandra/tools/NodeProbe.java
Original file line number Diff line number Diff line change
Expand Up @@ -387,9 +387,9 @@ public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkDa
return ssProxy.scrub(disableSnapshot, skipCorrupted, checkData, reinsertOverflowedTTL, jobs, keyspaceName, tables);
}

public int verify(boolean extendedVerify, boolean checkVersion, boolean diskFailurePolicy, boolean mutateRepairStatus, boolean checkOwnsTokens, boolean quick, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException
public int verify(boolean extendedVerify, boolean checkVersion, boolean diskFailurePolicy, boolean mutateRepairStatus, boolean checkOwnsTokens, boolean quick, boolean onlySai, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException
{
return ssProxy.verify(extendedVerify, checkVersion, diskFailurePolicy, mutateRepairStatus, checkOwnsTokens, quick, keyspaceName, tableNames);
return ssProxy.verify(extendedVerify, checkVersion, diskFailurePolicy, mutateRepairStatus, checkOwnsTokens, quick, onlySai, keyspaceName, tableNames);
}

public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, long maxSSTableTimestamp, int jobs, String... tableNames) throws IOException, ExecutionException, InterruptedException
Expand Down Expand Up @@ -430,10 +430,10 @@ public void scrub(PrintStream out, boolean disableSnapshot, boolean skipCorrupte
"scrubbing");
}

public void verify(PrintStream out, boolean extendedVerify, boolean checkVersion, boolean diskFailurePolicy, boolean mutateRepairStatus, boolean checkOwnsTokens, boolean quick, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException
public void verify(PrintStream out, boolean extendedVerify, boolean checkVersion, boolean diskFailurePolicy, boolean mutateRepairStatus, boolean checkOwnsTokens, boolean quick, boolean onlySai, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException
{
perform(out, keyspaceName,
() -> verify(extendedVerify, checkVersion, diskFailurePolicy, mutateRepairStatus, checkOwnsTokens, quick, keyspaceName, tableNames),
() -> verify(extendedVerify, checkVersion, diskFailurePolicy, mutateRepairStatus, checkOwnsTokens, quick, onlySai, keyspaceName, tableNames),
"verifying");
}

Expand Down
7 changes: 6 additions & 1 deletion src/java/org/apache/cassandra/tools/nodetool/Verify.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public class Verify extends AbstractCommand
description = "Do a quick check - avoid reading all data to verify checksums")
private boolean quick = false;

@Option(paramLabel = "sai_only",
names = { "-s", "--sai-only"},
description = "Verify only sai index")
private boolean onlySai = false;

@Override
public void execute(NodeProbe probe)
{
Expand All @@ -103,7 +108,7 @@ public void execute(NodeProbe probe)
{
try
{
probe.verify(out, extendedVerify, checkVersion, diskFailurePolicy, mutateRepairStatus, checkOwnsTokens, quick, keyspace, tableNames);
probe.verify(out, extendedVerify, checkVersion, diskFailurePolicy, mutateRepairStatus, checkOwnsTokens, quick, onlySai, keyspace, tableNames);
} catch (Exception e)
{
throw new RuntimeException("Error occurred during verifying", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,29 @@ public void testVerifyOne() throws Throwable
assertTrue(mockActiveCompactions.holder.getCompactionInfo().shouldStop((s) -> true));
}

@Test
public void testSaiVerifyOne ()
{
createTable("CREATE TABLE %s (pk int, ck int, a int, b int, PRIMARY KEY (pk, ck))");
createIndex("CREATE INDEX idx1 ON %s(a) USING 'sai'");
getCurrentColumnFamilyStore().disableAutoCompaction();

for (int i = 0; i < 5; i++)
{
execute("INSERT INTO %s (pk, ck, a, b) VALUES (" + i + ", 2, 3, 4)");
flush();
}

SSTableReader sstable = Iterables.getFirst(getCurrentColumnFamilyStore().getLiveSSTables(), null);
MockActiveCompactions mockActiveCompactions = new MockActiveCompactions();
CompactionManager.instance.verifyOne(getCurrentColumnFamilyStore(), sstable, IVerifier.options().build(), mockActiveCompactions);
assertTrue(mockActiveCompactions.finished);
assertEquals(mockActiveCompactions.holder.getCompactionInfo().getSSTables(), Sets.newHashSet(sstable));
assertFalse(mockActiveCompactions.holder.getCompactionInfo().shouldStop((s) -> false));
assertTrue(mockActiveCompactions.holder.getCompactionInfo().shouldStop((s) -> true));

}

@Test
public void testSubmitCacheWrite() throws ExecutionException, InterruptedException
{
Expand Down