Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
60cbed1
CEP-45: Bulk transfer
aratno Sep 2, 2025
7dbf5e1
Fix compaction of bulk transfers with existing SSTables clearing tran…
aratno Oct 17, 2025
e357277
Fix TrackerTest for new serialization
aratno Oct 17, 2025
8030308
Fix LegacySSTableTest
aratno Oct 17, 2025
03a1a47
Fix setup in RepairMessageVerbHandlerOutOfRangeTest, likely impacting…
aratno Oct 17, 2025
435d467
Fix CoordinatorLogTest
aratno Oct 17, 2025
460aa45
Fix ReadRepairQueryTester, since tracked keyspaces always perform rea…
aratno Oct 20, 2025
1516ad6
PR Feedback: only isolate streamed SSTables for StreamOperation.TRACK…
aratno Oct 23, 2025
0dd99c7
Refactor tests
aratno Oct 23, 2025
21e0f35
Fix test failure due to offsets contained in reconciled and unreconci…
aratno Oct 23, 2025
028ddb9
Refactor local vs remote activation, improve tests
aratno Oct 28, 2025
7fab071
Fix promotion of transferred SSTables into the repaired set
aratno Oct 28, 2025
9020864
Fix outOutOfRange test
aratno Oct 28, 2025
99be63c
Add failing test for activation IDs spreading across token ranges by …
aratno Oct 29, 2025
e27c016
Fix test by only including activation IDs within the read range
aratno Oct 29, 2025
f6ae637
PR feedback: two-phase commit naming conventions, tests and fixes for…
aratno Nov 3, 2025
9ab43ff
Update enum serializer to save a few bytes
aratno Nov 3, 2025
fed8a1f
Move CooordinatedTransfers to its own class
aratno Nov 3, 2025
796975b
PROPOSE -> PREPARE
aratno Nov 3, 2025
2df59d5
Add paranoid check to only add activation IDs for reads on tracked ke…
aratno Nov 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,7 @@ public List<String> importNewSSTables(Set<String> srcPaths, boolean resetLevel,
.build());
}

Descriptor getUniqueDescriptorFor(Descriptor descriptor, File targetDirectory)
public Descriptor getUniqueDescriptorFor(Descriptor descriptor, File targetDirectory)
{
Descriptor newDescriptor;
do
Expand Down
35 changes: 35 additions & 0 deletions src/java/org/apache/cassandra/db/Directories.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.apache.cassandra.service.snapshot.SnapshotManifest;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.TimeUUID;

import static org.apache.cassandra.utils.LocalizeString.toLowerCaseLocalized;

Expand Down Expand Up @@ -115,6 +116,7 @@ public class Directories

public static final String BACKUPS_SUBDIR = "backups";
public static final String SNAPSHOT_SUBDIR = "snapshots";
public static final String PENDING_SUBDIR = "pending";
public static final String TMP_SUBDIR = "tmp";
public static final String SECONDARY_INDEX_NAME_SEPARATOR = ".";

Expand Down Expand Up @@ -727,6 +729,39 @@ public static File getSnapshotSchemaFile(File snapshotDir)
return new File(snapshotDir, "schema.cql");
}

@VisibleForTesting
public Set<File> getPendingLocations()
{
Set<File> result = new HashSet<>();
for (DataDirectory dataDirectory : dataDirectories.getAllDirectories())
{
for (File dir : dataPaths)
{
// Note that we must compare absolute paths (not canonical) here since keyspace directories might be symlinks
Path dirPath = dir.toAbsolute().toPath();
Path locationPath = dataDirectory.location.toAbsolute().toPath();
if (!dirPath.startsWith(locationPath))
continue;
result.add(getOrCreate(dir, PENDING_SUBDIR));
}
}
return result;
}

public File getPendingLocationForDisk(DataDirectory dataDirectory, TimeUUID planId)
{
for (File dir : dataPaths)
{
// Note that we must compare absolute paths (not canonical) here since keyspace directories might be symlinks
Path dirPath = dir.toAbsolute().toPath();
Path locationPath = dataDirectory.location.toAbsolute().toPath();
if (!dirPath.startsWith(locationPath))
continue;
return getOrCreate(dir, PENDING_SUBDIR, planId.toString());
}
throw new RuntimeException("Could not find pending location");
}

public static File getBackupsDirectory(Descriptor desc)
{
return getBackupsDirectory(desc.directory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,8 @@ protected void recordLatency(TableMetrics metric, long latencyNanos)
public UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadExecutionController controller)
{
ColumnFamilyStore.ViewFragment view = cfs.select(View.selectLive(dataRange().keyRange()));
if (cfs.metadata().replicationType().isTracked())
controller.addActivationIds(view);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed elsewhere, but we should only be adding activation ids that intersect with the key/range of the read

Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), dataRange().keyRange().getString(metadata().partitionKeyType));

// fetch data from current memtable, historical memtables, and SSTables in the correct order.
Expand Down
34 changes: 34 additions & 0 deletions src/java/org/apache/cassandra/db/ReadExecutionController.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,22 @@
package org.apache.cassandra.db;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.replication.ShortMutationId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.MonotonicClock;
import org.apache.cassandra.utils.concurrent.OpOrder;
Expand All @@ -33,6 +42,8 @@

public class ReadExecutionController implements AutoCloseable
{
private static final Logger logger = LoggerFactory.getLogger(ReadExecutionController.class);

private static final long NO_SAMPLING = Long.MIN_VALUE;

// For every reads
Expand All @@ -50,6 +61,13 @@ public class ReadExecutionController implements AutoCloseable
private final RepairedDataInfo repairedDataInfo;
private long oldestUnrepairedTombstone = Long.MAX_VALUE;

/*
* Track bulk transfers involved in the read, so we can do read reconciliation.
* These come from the ViewFragment, not the SSTable read path, so bloom filters + short-circuiting SSTable scans
* will still include the total set of relevant bulk transfers.
*/
private Set<ShortMutationId> activationIds = null;

ReadExecutionController(ReadCommand command,
OpOrder.Group baseOp,
TableMetadata baseMetadata,
Expand Down Expand Up @@ -243,4 +261,20 @@ private void addSample()
if (cfs != null)
cfs.metric.topLocalReadQueryTime.addSample(cql, timeMicros);
}

public void addActivationIds(ColumnFamilyStore.ViewFragment view)
{
activationIds = new HashSet<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

possibly paranoid, but can we check this isn't null before creating a new hashset?

for (SSTableReader sstable : view.sstables)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should also validate that the table is part of a tracked keyspace

{
Collection<? extends ShortMutationId> ids = sstable.getCoordinatorLogOffsets().transfers();
logger.trace("Adding transfer IDs from SSTable {} {}", sstable, ids);
activationIds.addAll(ids);
}
}

public Iterator<ShortMutationId> getActivationIds()
{
return activationIds == null ? null : activationIds.iterator();
}
}
34 changes: 28 additions & 6 deletions src/java/org/apache/cassandra/db/SSTableImporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.cassandra.io.sstable.format.SSTableFormat.Components;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.replication.MutationTrackingService;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageService;
Expand Down Expand Up @@ -80,11 +81,8 @@ synchronized List<String> importNewSSTables(Options options)
UUID importID = UUID.randomUUID();
logger.info("[{}] Loading new SSTables for {}/{}: {}", importID, cfs.getKeyspaceName(), cfs.getTableName(), options);

// This will be supported in the future
TableMetadata metadata = cfs.metadata();
if (metadata.replicationType() != null && metadata.replicationType().isTracked())
throw new IllegalStateException("Can't import into tables with mutation tracking enabled");

boolean isTracked = metadata.replicationType().isTracked();
List<Pair<Directories.SSTableLister, String>> listers = getSSTableListers(options.srcPaths);

Set<Descriptor> currentDescriptors = new HashSet<>();
Expand Down Expand Up @@ -183,7 +181,14 @@ synchronized List<String> importNewSSTables(Options options)
Descriptor newDescriptor = cfs.getUniqueDescriptorFor(entry.getKey(), targetDir);
maybeMutateMetadata(entry.getKey(), options);
movedSSTables.add(new MovedSSTable(newDescriptor, entry.getKey(), entry.getValue()));
SSTableReader sstable = SSTableReader.moveAndOpenSSTable(cfs, entry.getKey(), newDescriptor, entry.getValue(), options.copyData);
SSTableReader sstable;
if (isTracked)
sstable = SSTableReader.open(cfs, oldDescriptor, metadata.ref);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we stream even to the local node, we need to cleanup the source sstables. I think these are left on disk. We should also have a comment here explaining why we're not moving the sstable.

I'd consider separating these 2 flows into separate methods or separate AbstractSSTableImport implementations, or something to that effect. The tracked and untracked flows have some fairly significant differences. I don't have super strong opinions about that, but the multiple uses of the isTracked flag throughout this fairly long method seems a bit brittle and makes it harder to read (imo)

else
{
// Don't move tracked SSTables, since that will move them to the live set on bounce
sstable = SSTableReader.moveAndOpenSSTable(cfs, oldDescriptor, newDescriptor, entry.getValue(), options.copyData);
}
newSSTablesPerDirectory.add(sstable);
}
catch (Throwable t)
Expand Down Expand Up @@ -233,7 +238,13 @@ synchronized List<String> importNewSSTables(Options options)
if (!cfs.indexManager.validateSSTableAttachedIndexes(newSSTables, false, options.validateIndexChecksum))
cfs.indexManager.buildSSTableAttachedIndexesBlocking(newSSTables);

cfs.getTracker().addSSTables(newSSTables);
if (isTracked)
{
TrackedBulkTransfer.execute(cfs.keyspace.getName(), newSSTables);
}
else
cfs.getTracker().addSSTables(newSSTables);

for (SSTableReader reader : newSSTables)
{
if (options.invalidateCaches && cfs.isRowCacheEnabled())
Expand All @@ -250,6 +261,17 @@ synchronized List<String> importNewSSTables(Options options)
return failedDirectories;
}

/**
* TODO: Support user-defined consistency level for import, for import with replicas down
*/
private static class TrackedBulkTransfer
{
private static void execute(String keyspace, Set<SSTableReader> sstables)
{
MutationTrackingService.instance.executeTransfers(keyspace, sstables, ConsistencyLevel.ALL);
}
}

/**
* Check the state of this node and throws an {@link InterruptedException} if it is currently draining
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,8 @@ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs

Tracing.trace("Acquiring sstable references");
ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));
if (cfs.metadata().replicationType().isTracked())
controller.addActivationIds(view);
view.sstables.sort(SSTableReader.maxTimestampDescending);
ClusteringIndexFilter filter = clusteringIndexFilter();
long minTimestamp = Long.MAX_VALUE;
Expand Down Expand Up @@ -993,6 +995,8 @@ private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFam
{
Tracing.trace("Acquiring sstable references");
ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));
if (cfs.metadata().replicationType().isTracked())
controller.addActivationIds(view);

ImmutableBTreePartition result = null;
SSTableReadMetricsCollector metricsCollector = new SSTableReadMetricsCollector();
Expand All @@ -1015,6 +1019,8 @@ private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFam

/* add the SSTables on disk */
view.sstables.sort(SSTableReader.maxTimestampDescending);
if (cfs.metadata().replicationType().isTracked())
logger.trace("Executing read against SSTables {}", view.sstables);
// read sorted sstables
for (SSTableReader sstable : view.sstables)
{
Expand Down
16 changes: 16 additions & 0 deletions src/java/org/apache/cassandra/db/lifecycle/Tracker.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
Expand Down Expand Up @@ -61,6 +62,7 @@
import org.apache.cassandra.notifications.TableDroppedNotification;
import org.apache.cassandra.notifications.TablePreScrubNotification;
import org.apache.cassandra.notifications.TruncationNotification;
import org.apache.cassandra.replication.ImmutableCoordinatorLogOffsets;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.TimeUUID;
Expand Down Expand Up @@ -270,6 +272,20 @@ public void updateInitialSSTableSize(Iterable<SSTableReader> sstables)

public void addSSTables(Collection<SSTableReader> sstables)
{
Preconditions.checkState(!cfstore.metadata().replicationType().isTracked());
addSSTablesInternal(sstables, false, true, true);
}

public void addSSTablesTracked(Collection<SSTableReader> sstables)
{
Preconditions.checkState(cfstore.metadata().replicationType().isTracked());
for (SSTableReader sstable : sstables)
{
ImmutableCoordinatorLogOffsets logOffsets = sstable.getCoordinatorLogOffsets();
Preconditions.checkState(logOffsets.isEmpty());
Preconditions.checkState(!logOffsets.transfers().isEmpty());
}

addSSTablesInternal(sstables, false, true, true);
}

Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/db/lifecycle/View.java
Original file line number Diff line number Diff line change
Expand Up @@ -402,4 +402,4 @@ public boolean apply(T t)
}
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.cassandra.streaming.StreamReceiver;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.messages.StreamMessageHeader;
import org.apache.cassandra.utils.FBUtilities;

import static java.lang.String.format;
import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory;
Expand Down Expand Up @@ -159,9 +160,13 @@ public SSTableMultiWriter read(DataInputPlus in) throws IOException

private File getDataDir(ColumnFamilyStore cfs, long totalSize) throws IOException
{
boolean isTracked = cfs.metadata().replicationType().isTracked();

Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize);
if (localDir == null)
throw new IOException(format("Insufficient disk space to store %s", prettyPrintMemory(totalSize)));
throw new IOException(String.format("Insufficient disk space to store %s", FBUtilities.prettyPrintMemory(totalSize)));
if (isTracked)
return cfs.getDirectories().getPendingLocationForDisk(localDir, session.planId());

File dir = cfs.getDirectories().getLocationForDisk(cfs.getDiskBoundaries().getCorrectDiskForKey(header.firstKey));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,23 @@
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.UnknownColumnException;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.RangeAwareSSTableWriter;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.SSTableSimpleIterator;
import org.apache.cassandra.io.sstable.SSTableTxnSingleStreamWriter;
import org.apache.cassandra.io.sstable.SimpleSSTableMultiWriter;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.io.util.TrackedDataInputPlus;
import org.apache.cassandra.metrics.StorageMetrics;
import org.apache.cassandra.replication.ImmutableCoordinatorLogOffsets;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.ProgressInfo;
import org.apache.cassandra.streaming.StreamReceivedOutOfTokenRangeException;
Expand Down Expand Up @@ -180,15 +184,29 @@ protected SerializationHeader getHeader(TableMetadata metadata) throws UnknownCo
}
protected SSTableTxnSingleStreamWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, TimeUUID pendingRepair, ImmutableCoordinatorLogOffsets coordinatorLogOffsets, SSTableFormat<?, ?> format) throws IOException
{
boolean isTracked = cfs.metadata().replicationType().isTracked();

Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize);
if (localDir == null)
throw new IOException(String.format("Insufficient disk space to store %s", FBUtilities.prettyPrintMemory(totalSize)));

StreamReceiver streamReceiver = session.getAggregator(tableId);
Preconditions.checkState(streamReceiver instanceof CassandraStreamReceiver);
ILifecycleTransaction txn = createTxn();
RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, pendingRepair, coordinatorLogOffsets, format, sstableLevel, totalSize, txn, getHeader(cfs.metadata()));
return new SSTableTxnSingleStreamWriter(txn, writer);
if (isTracked)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whether the streamed sstables are cordoned off for bulk tracking or added via the normal stream path should be dictated by the senders session info and the stream init message. I don't think we want an explicit add step for bootstrapped sstables etc.

{
File location = cfs.getDirectories().getPendingLocationForDisk(localDir, session.planId());
Descriptor desc = cfs.newSSTableDescriptor(location, format);
SSTableMultiWriter writer = SimpleSSTableMultiWriter.create(desc, estimatedKeys, ActiveRepairService.UNREPAIRED_SSTABLE, ActiveRepairService.NO_PENDING_REPAIR,
coordinatorLogOffsets, cfs.metadata, null, sstableLevel, getHeader(cfs.metadata()),
cfs.indexManager.listIndexGroups(), txn, cfs);
return new SSTableTxnSingleStreamWriter(txn, writer);
}
else
{
RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, pendingRepair, coordinatorLogOffsets, format, sstableLevel, totalSize, txn, getHeader(cfs.metadata()));
return new SSTableTxnSingleStreamWriter(txn, writer);
}
}

private ILifecycleTransaction createTxn()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import org.apache.cassandra.service.accord.AccordTopology;
import org.apache.cassandra.service.accord.IAccordService;
import org.apache.cassandra.service.accord.TimeOnlyRequestBookkeeping.LatencyRequestBookkeeping;
import org.apache.cassandra.replication.MutationTrackingService;
import org.apache.cassandra.replication.PendingLocalTransfer;
import org.apache.cassandra.streaming.IncomingStream;
import org.apache.cassandra.streaming.StreamReceiver;
import org.apache.cassandra.streaming.StreamSession;
Expand Down Expand Up @@ -256,6 +258,15 @@ public void finished()

// add sstables (this will build non-SSTable-attached secondary indexes too, see CASSANDRA-10130)
logger.debug("[Stream #{}] Received {} sstables from {} ({})", session.planId(), readers.size(), session.peer, readers);

if (cfs.metadata().replicationType().isTracked())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as mentioned elsewhere, we can't require all tracked streams to use bulk transfer

{
// Don't mark as live until activated by the stream coordinator
PendingLocalTransfer transfer = new PendingLocalTransfer(cfs.metadata().id, session.planId(), sstables);
MutationTrackingService.instance.received(transfer);
return;
}

cfs.addSSTables(readers);

//invalidate row and counter cache
Expand Down
Loading