diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java index 35dba7cafead..8c6466feacc3 100644 --- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java @@ -61,6 +61,7 @@ import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.db.lifecycle.View; import org.apache.cassandra.db.memtable.Memtable; +import org.apache.cassandra.db.partitions.ImmutableBTreePartition; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.db.rows.*; @@ -1064,14 +1065,21 @@ public void indexPartition(DecoratedKey key, Set indexes, int pageSize, R SinglePartitionPager pager = new SinglePartitionPager(cmd, null, ProtocolVersion.CURRENT); while (!pager.isExhausted()) { + UnfilteredRowIterator partition; try (ReadExecutionController controller = cmd.executionController(); - WriteContext ctx = keyspace.getWriteHandler().createContextForIndexing(); UnfilteredPartitionIterator page = pager.fetchPageUnfiltered(baseCfs.metadata(), pageSize, controller)) { if (!page.hasNext()) break; - try (UnfilteredRowIterator partition = page.next()) + try (UnfilteredRowIterator onePartition = page.next()) + { + partition = ImmutableBTreePartition.create(onePartition).unfilteredIterator(); + } + } + + try (WriteContext ctx = keyspace.getWriteHandler().createContextForIndexing()) + { { Set indexers = new HashSet<>(indexGroups.size()); @@ -1135,6 +1143,13 @@ public void indexPartition(DecoratedKey key, Set indexes, int pageSize, R indexers.forEach(Index.Indexer::finish); } } + finally + { + if (partition != null) + { + partition.close(); + } + } } } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexTest.java b/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexTest.java index 2d6e0da229bc..72c28d89b995 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexTest.java @@ -23,12 +23,19 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Random; import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; import java.util.stream.Collectors; +import com.google.common.collect.Sets; + +import org.awaitility.Awaitility; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -36,16 +43,21 @@ import org.junit.BeforeClass; import org.junit.Test; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.marshal.ValueGenerator; import org.apache.cassandra.distributed.Cluster; import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.utils.TimeUUID; import org.awaitility.Awaitility; +import static org.apache.cassandra.distributed.impl.IsolatedExecutor.waitOn; + public class SecondaryIndexTest extends TestBaseImpl { private static final int NUM_NODES = 3; private static final int REPLICATION_FACTOR = 1; - private static final String CREATE_TABLE = "CREATE TABLE %s(k int, v int, PRIMARY KEY (k))"; + private static final String CREATE_TABLE = "CREATE TABLE %s(k int, v text, PRIMARY KEY (k))"; private static final String CREATE_INDEX = "CREATE INDEX v_index_%d ON %s(v)"; private static final AtomicInteger seq = new AtomicInteger(); @@ -122,4 +134,48 @@ public void test_only_coordinator_chooses_index_for_query() }); } } + + @Test + public void test_secondary_rebuild_with_small_memtable_memory() + { + // populate data + Random rand = new Random(); + for (int i = 0 ; i < 100 ; ++i) + cluster.coordinator(1).execute(String.format("INSERT INTO %s (k, v) VALUES (?, ?)", tableName), ConsistencyLevel.ALL, i, ValueGenerator.randomString(rand, 50000)); + + cluster.forEach(i -> i.flush(KEYSPACE)); + + // restart node 1 with small memtable allocation so that index rebuild will cause memtable flush which will need + // to reclaim the memory. see CASSANDRA-19564 + waitOn(cluster.get(1).shutdown()); + Object originalMemTableHeapSpace = cluster.get(1).config().get("memtable_heap_space"); + cluster.get(1).config().set("memtable_heap_space", "1MiB"); + cluster.get(1).startup(); + String tableNameWithoutKeyspaceName = tableName.split("\\.")[1]; + String indexName = String.format("v_index_%d", seq.get()); + Runnable task = cluster.get(1).runsOnInstance( + () -> { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(tableNameWithoutKeyspaceName); + cfs.indexManager.rebuildIndexesBlocking(Sets.newHashSet(Arrays.asList(indexName))); + } + ); + ExecutorService es = Executors.newFixedThreadPool(1); + Future future = es.submit(task); + try + { + future.get(30, TimeUnit.SECONDS); + } + catch (Exception e) + { + e.printStackTrace(); + Assert.fail("Rebuild should finish within 30 seconds without issue."); + } + finally + { + // restore node1 to use default value for memtable_heap_space + waitOn(cluster.get(1).shutdown()); + cluster.get(1).config().set("memtable_heap_space", originalMemTableHeapSpace); + cluster.get(1).startup(); + } + } } diff --git a/test/unit/org/apache/cassandra/db/marshal/ValueGenerator.java b/test/unit/org/apache/cassandra/db/marshal/ValueGenerator.java index 79f6fdfacd48..951fb6927c96 100644 --- a/test/unit/org/apache/cassandra/db/marshal/ValueGenerator.java +++ b/test/unit/org/apache/cassandra/db/marshal/ValueGenerator.java @@ -47,7 +47,12 @@ public static ByteBuffer randomBytes(Random random) public static String randomString(Random random) { - char[] chars = new char[random.nextInt(100)]; + return randomString(random, 100); + } + + public static String randomString(Random random, int length) + { + char[] chars = new char[random.nextInt(length)]; for (int i=0; i