Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.repair.CassandraTableRepairManager;
import org.apache.cassandra.db.rows.CellPath;
import org.apache.cassandra.db.rows.UnfilteredSource;
import org.apache.cassandra.db.streaming.CassandraStreamManager;
import org.apache.cassandra.db.view.TableViews;
import org.apache.cassandra.dht.AbstractBounds;
Expand Down Expand Up @@ -3002,7 +3003,7 @@ public int getLevelFanoutSize()
return compactionStrategyManager.getLevelFanoutSize();
}

public static class ViewFragment
public static class ViewFragment implements ReadableView
{
public final List<SSTableReader> sstables;
public final Iterable<Memtable> memtables;
Expand All @@ -3012,6 +3013,18 @@ public ViewFragment(List<SSTableReader> sstables, Iterable<Memtable> memtables)
this.sstables = sstables;
this.memtables = memtables;
}

@Override
public Iterable<? extends UnfilteredSource> memtables()
{
return memtables;
}

@Override
public List<SSTableReader> sstables()
{
return sstables;
}
}

public static class RefViewFragment extends ViewFragment implements AutoCloseable
Expand Down
87 changes: 60 additions & 27 deletions src/java/org/apache/cassandra/db/ReadCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,42 @@ public abstract class ReadCommand extends AbstractReadQuery
{
private interface ReadCompleter<T>
{
T complete(UnfilteredPartitionIterator iterator, ReadExecutionController executionController, Index.Searcher searcher, ColumnFamilyStore cfs, long startTimeNanos);
T complete(ReadCommand command, UnfilteredPartitionIterator iterator, ReadExecutionController executionController, ColumnFamilyStore cfs, long startTimeNanos);
T complete(ReadCommand command, Index.Searcher searcher, ReadExecutionController executionController, ColumnFamilyStore cfs, long startTimeNanos);

ReadCompleter<UnfilteredPartitionIterator> IMMEDIATE = new ReadCompleter<>()
{
@Override
public UnfilteredPartitionIterator complete(ReadCommand command, UnfilteredPartitionIterator iterator, ReadExecutionController executionController, ColumnFamilyStore cfs, long startTimeNanos)
{
return command.completeRead(iterator, executionController, null, cfs, startTimeNanos);
}

@Override
public UnfilteredPartitionIterator complete(ReadCommand command, Index.Searcher searcher, ReadExecutionController executionController, ColumnFamilyStore cfs, long startTimeNanos)
{
UnfilteredPartitionIterator iterator = searcher.search(executionController);
return command.completeRead(iterator, executionController, searcher, cfs, startTimeNanos);
}
};

ReadCompleter<PartialTrackedRead> TRACKED = new ReadCompleter<>()
{
@Override
public PartialTrackedRead complete(ReadCommand command, UnfilteredPartitionIterator iterator, ReadExecutionController executionController, ColumnFamilyStore cfs, long startTimeNanos)
{
return command.createInProgressRead(iterator, executionController, null, cfs, startTimeNanos);
}

@Override
public PartialTrackedRead complete(ReadCommand command, Index.Searcher searcher, ReadExecutionController executionController, ColumnFamilyStore cfs, long startTimeNanos)
{
if (!searcher.isMultiStep())
throw new IllegalStateException("Cannot use " + searcher.getClass().getName() + " with tracked reads");

return searcher.asMultiStep().beginRead(executionController, cfs, startTimeNanos);
}
};
}

private static final int TEST_ITERATION_DELAY_MILLIS = CassandraRelevantProperties.TEST_READ_ITERATION_DELAY_MS.getInt();
Expand Down Expand Up @@ -479,28 +514,26 @@ private <T> T beginRead(ReadExecutionController executionController, @Nullable C
ConsensusRequestRouter.validateSafeToReadNonTransactionally(this, cm);
Index.QueryPlan indexQueryPlan = indexQueryPlan();

Index.Searcher searcher = null;
if (indexQueryPlan != null)
{
cfs.indexManager.checkQueryability(indexQueryPlan);

searcher = indexQueryPlan.searcherFor(this);
Tracing.trace("Executing read on {}.{} using index{} {}",
cfs.metadata.keyspace,
cfs.metadata.name,
indexQueryPlan.getIndexes().size() == 1 ? "" : "es",
indexQueryPlan.getIndexes()
.stream()
.map(i -> i.getIndexMetadata().name)
.collect(Collectors.joining(",")));
if (logger.isTraceEnabled())
{
Tracing.trace("Executing read on {}.{} using index{} {}",
cfs.metadata.keyspace,
cfs.metadata.name,
indexQueryPlan.getIndexes().size() == 1 ? "" : "es",
indexQueryPlan.getIndexes()
.stream()
.map(i -> i.getIndexMetadata().name)
.collect(Collectors.joining(",")));
}
Index.Searcher searcher = indexQueryPlan.searcherFor(this);
return completer.complete(this, searcher, executionController, cfs, startTimeNanos);
}

if (searcher != null && metadata().replicationType().isTracked())
throw new UnsupportedOperationException("TODO: support tracked index reads");

UnfilteredPartitionIterator iterator = (null == searcher) ? queryStorage(cfs, executionController) : searcher.search(executionController);

return completer.complete(iterator, executionController, searcher, cfs, startTimeNanos);
UnfilteredPartitionIterator iterator = queryStorage(cfs, executionController);
return completer.complete(this, iterator, executionController, cfs, startTimeNanos);
}
finally
{
Expand Down Expand Up @@ -586,7 +619,7 @@ protected abstract PartialTrackedRead createInProgressRead(UnfilteredPartitionIt

public PartialTrackedRead beginTrackedRead(ReadExecutionController executionController)
{
return beginRead(executionController, null, this::createInProgressRead);
return beginRead(executionController, null, ReadCompleter.TRACKED);
}

public UnfilteredPartitionIterator completeTrackedRead(UnfilteredPartitionIterator iterator, PartialTrackedRead read)
Expand All @@ -604,12 +637,12 @@ public UnfilteredPartitionIterator completeTrackedRead(UnfilteredPartitionIterat
// iterators created inside the try as long as we do close the original resultIterator), or by closing the result.
public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController)
{
return beginRead(executionController, null, this::completeRead);
return beginRead(executionController, null, ReadCompleter.IMMEDIATE);
}

public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController, @Nullable ClusterMetadata cm)
{
return beginRead(executionController, cm, this::completeRead);
return beginRead(executionController, cm, ReadCompleter.IMMEDIATE);
}

protected abstract void recordLatency(TableMetrics metric, long latencyNanos);
Expand Down Expand Up @@ -1144,7 +1177,7 @@ public String name()
return toCQLString();
}

InputCollector<UnfilteredRowIterator> iteratorsForPartition(ColumnFamilyStore.ViewFragment view, ReadExecutionController controller)
InputCollector<UnfilteredRowIterator> iteratorsForPartition(ReadableView view, ReadExecutionController controller)
{
final BiFunction<List<UnfilteredRowIterator>, RepairedDataInfo, UnfilteredRowIterator> merge =
(unfilteredRowIterators, repairedDataInfo) -> {
Expand Down Expand Up @@ -1194,7 +1227,7 @@ static class InputCollector<T extends AutoCloseable>
List<T> repairedIters;
List<T> unrepairedIters;

InputCollector(ColumnFamilyStore.ViewFragment view,
InputCollector(ReadableView view,
ReadExecutionController controller,
BiFunction<List<T>, RepairedDataInfo, T> repairedMerger,
Function<T, UnfilteredPartitionIterator> postLimitAdditionalPartitions)
Expand All @@ -1204,27 +1237,27 @@ static class InputCollector<T extends AutoCloseable>

if (isTrackingRepairedStatus)
{
for (SSTableReader sstable : view.sstables)
for (SSTableReader sstable : view.sstables())
{
if (considerRepairedForTracking(sstable))
{
if (repairedSSTables == null)
repairedSSTables = Sets.newHashSetWithExpectedSize(view.sstables.size());
repairedSSTables = Sets.newHashSetWithExpectedSize(view.sstables().size());
repairedSSTables.add(sstable);
}
}
}
if (repairedSSTables == null)
{
repairedIters = Collections.emptyList();
unrepairedIters = new ArrayList<>(view.sstables.size());
unrepairedIters = new ArrayList<>(view.sstables().size());
}
else
{
repairedIters = new ArrayList<>(repairedSSTables.size());
// when we're done collating, we'll merge the repaired iters and add the
// result to the unrepaired list, so size that list accordingly
unrepairedIters = new ArrayList<>((view.sstables.size() - repairedSSTables.size()) + Iterables.size(view.memtables) + 1);
unrepairedIters = new ArrayList<>((view.sstables().size() - repairedSSTables.size()) + Iterables.size(view.memtables()) + 1);
}
this.repairedMerger = repairedMerger;
this.postLimitAdditionalPartitions = postLimitAdditionalPartitions;
Expand Down
30 changes: 30 additions & 0 deletions src/java/org/apache/cassandra/db/ReadableView.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.db;

import java.util.List;

import org.apache.cassandra.db.rows.UnfilteredSource;
import org.apache.cassandra.io.sstable.format.SSTableReader;

public interface ReadableView
{
Iterable<? extends UnfilteredSource> memtables();
List<SSTableReader> sstables();
}
36 changes: 21 additions & 15 deletions src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.rows.UnfilteredRowIteratorWithLowerBound;
import org.apache.cassandra.db.rows.UnfilteredRowIterators;
import org.apache.cassandra.db.rows.UnfilteredSource;
import org.apache.cassandra.db.rows.WrappingUnfilteredRowIterator;
import org.apache.cassandra.db.transform.RTBoundValidator;
import org.apache.cassandra.db.transform.Transformation;
Expand Down Expand Up @@ -715,15 +716,20 @@ public Unfiltered next()
* Also note that one must have created a {@code ReadExecutionController} on the queried table and we require it as
* a parameter to enforce that fact, even though it's not explicitlly used by the method.
*/
public UnfilteredRowIterator queryMemtableAndDisk(ColumnFamilyStore cfs, ReadExecutionController executionController)
public UnfilteredRowIterator queryMemtableAndDisk(ReadableView view, ColumnFamilyStore cfs, ReadExecutionController executionController)
{
assert executionController != null && executionController.validForReadOn(cfs);
Tracing.trace("Executing single-partition query on {}", cfs.name);

return queryMemtableAndDiskInternal(cfs, executionController);
return queryMemtableAndDiskInternal(view, cfs, executionController);
}

public UnfilteredRowIterator queryMemtableAndDisk(ColumnFamilyStore cfs, ReadExecutionController executionController)
{
return queryMemtableAndDisk(cfs.select(View.select(SSTableSet.LIVE, partitionKey())), cfs, executionController);
}

private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs, ReadExecutionController controller)
private UnfilteredRowIterator queryMemtableAndDiskInternal(ReadableView view, ColumnFamilyStore cfs, ReadExecutionController controller)
{
/*
* We have 2 main strategies:
Expand All @@ -747,12 +753,12 @@ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs
&& !queriesMulticellType()
&& !controller.isTrackingRepairedStatus())
{
return queryMemtableAndSSTablesInTimestampOrder(cfs, (ClusteringIndexNamesFilter)clusteringIndexFilter(), controller);
return queryMemtableAndSSTablesInTimestampOrder(view, cfs, (ClusteringIndexNamesFilter)clusteringIndexFilter(), controller);
}

Tracing.trace("Acquiring sstable references");
ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));
view.sstables.sort(SSTableReader.maxTimestampDescending);
List<SSTableReader> sstables = view.sstables();
sstables.sort(SSTableReader.maxTimestampDescending);
ClusteringIndexFilter filter = clusteringIndexFilter();
long minTimestamp = Long.MAX_VALUE;
long mostRecentPartitionTombstone = Long.MIN_VALUE;
Expand All @@ -761,7 +767,7 @@ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs
{
SSTableReadMetricsCollector metricsCollector = new SSTableReadMetricsCollector();

for (Memtable memtable : view.memtables)
for (UnfilteredSource memtable : view.memtables())
{
UnfilteredRowIterator iter = memtable.rowIterator(partitionKey(), filter.getSlices(metadata()), columnFilter(), filter.isReversed(), metricsCollector);
if (iter == null)
Expand Down Expand Up @@ -790,14 +796,14 @@ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs
* In other words, iterating in descending maxTimestamp order allow to do our mostRecentPartitionTombstone
* elimination in one pass, and minimize the number of sstables for which we read a partition tombstone.
*/
view.sstables.sort(SSTableReader.maxTimestampDescending);
sstables.sort(SSTableReader.maxTimestampDescending);
int nonIntersectingSSTables = 0;
int includedDueToTombstones = 0;

if (controller.isTrackingRepairedStatus())
Tracing.trace("Collecting data from sstables and tracking repaired status");

for (SSTableReader sstable : view.sstables)
for (SSTableReader sstable : sstables)
{
// if we've already seen a partition tombstone with a timestamp greater
// than the most recent update to this sstable, we can skip it
Expand Down Expand Up @@ -865,7 +871,7 @@ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs

if (Tracing.isTracing())
Tracing.trace("Skipped {}/{} non-slice-intersecting sstables, included {} due to tombstones",
nonIntersectingSSTables, view.sstables.size(), includedDueToTombstones);
nonIntersectingSSTables, sstables.size(), includedDueToTombstones);

if (inputCollector.isEmpty())
return EmptyIterators.unfilteredRow(cfs.metadata(), partitionKey(), filter.isReversed());
Expand Down Expand Up @@ -989,16 +995,15 @@ private boolean queriesMulticellType()
* no collection or counters are included).
* This method assumes the filter is a {@code ClusteringIndexNamesFilter}.
*/
private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFamilyStore cfs, ClusteringIndexNamesFilter filter, ReadExecutionController controller)
private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ReadableView view, ColumnFamilyStore cfs, ClusteringIndexNamesFilter filter, ReadExecutionController controller)
{
Tracing.trace("Acquiring sstable references");
ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));

ImmutableBTreePartition result = null;
SSTableReadMetricsCollector metricsCollector = new SSTableReadMetricsCollector();

Tracing.trace("Merging memtable contents");
for (Memtable memtable : view.memtables)
for (UnfilteredSource memtable : view.memtables())
{
try (UnfilteredRowIterator iter = memtable.rowIterator(partitionKey, filter.getSlices(metadata()), columnFilter(), isReversed(), metricsCollector))
{
Expand All @@ -1014,9 +1019,10 @@ private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFam
}

/* add the SSTables on disk */
view.sstables.sort(SSTableReader.maxTimestampDescending);
List<SSTableReader> sstables = view.sstables();
sstables.sort(SSTableReader.maxTimestampDescending);
// read sorted sstables
for (SSTableReader sstable : view.sstables)
for (SSTableReader sstable : sstables)
{
// if we've already seen a partition tombstone with a timestamp greater
// than the most recent update to this sstable, we're done, since the rest of the sstables
Expand Down
2 changes: 2 additions & 0 deletions src/java/org/apache/cassandra/db/memtable/Memtable.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import javax.annotation.concurrent.NotThreadSafe;

import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.db.RegularAndStaticColumns;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
Expand Down Expand Up @@ -205,6 +206,7 @@ default long put(MutationId mutationId, PartitionUpdate update, UpdateTransactio
long put(MutationId mutationId, PartitionUpdate update, UpdateTransaction indexer, OpOrder.Group opGroup, boolean assumeMissing);

// Read operations are provided by the UnfilteredSource interface.
Partition snapshotPartition(DecoratedKey partitionKey);

// Statistics

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,12 @@ public UnfilteredRowIterator rowIterator(DecoratedKey key)
return p != null ? p.unfilteredIterator() : null;
}

@Override
public Partition snapshotPartition(DecoratedKey partitionKey)
{
return getPartition(partitionKey);
}

public FlushablePartitionSet<AtomicBTreePartition> getFlushSet(PartitionPosition from, PartitionPosition to)
{
long keySize = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,12 @@ public UnfilteredRowIterator rowIterator(DecoratedKey key)
return p != null ? p.unfilteredIterator() : null;
}

@Override
public Partition snapshotPartition(DecoratedKey partitionKey)
{
return getPartition(partitionKey);
}

private static int estimateRowOverhead(final int count)
{
// calculate row overhead
Expand Down
Loading