This is an automated email from the ASF dual-hosted git repository.
jchovatia pushed a commit to branch cassandra-4.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-4.1 by this push:
new 59574a86a3 ReadCommandController should close fast to avoid deadlock
when building secondary index
59574a86a3 is described below
commit 59574a86a3a829d904ce3964e36ec71f5b62b961
Author: Runtian Liu <[email protected]>
AuthorDate: Wed Nov 20 12:51:36 2024 -0800
ReadCommandController should close fast to avoid deadlock when building
secondary index
patch by Runtian Liu; reviewed by Caleb Rackliffe, Jaydeepkumar Chovatia
for CASSANDRA-19564
---
CHANGES.txt | 1 +
.../cassandra/index/SecondaryIndexManager.java | 19 +++++++-
.../distributed/test/SecondaryIndexTest.java | 57 +++++++++++++++++++++-
.../cassandra/db/marshal/ValueGenerator.java | 7 ++-
4 files changed, 80 insertions(+), 4 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index dc70d2bce4..b160f76416 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.1.12
+ * ReadCommandController should close fast to avoid deadlock when building
secondary index (CASSANDRA-19564)
Merged from 4.0
* Updated dtest-api to 0.0.18 and removed JMX-related classes that now live
in the dtest-api (CASSANDRA-20884)
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index db3fa1d938..75bf0be0af 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -52,6 +52,7 @@ import org.apache.cassandra.db.filter.RowFilter;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.db.lifecycle.View;
import org.apache.cassandra.db.memtable.Memtable;
+import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.rows.*;
@@ -918,14 +919,21 @@ public class SecondaryIndexManager implements
IndexRegistry, INotificationConsum
SinglePartitionPager pager = new SinglePartitionPager(cmd, null,
ProtocolVersion.CURRENT);
while (!pager.isExhausted())
{
+ UnfilteredRowIterator partition;
try (ReadExecutionController controller =
cmd.executionController();
- WriteContext ctx =
keyspace.getWriteHandler().createContextForIndexing();
UnfilteredPartitionIterator page =
pager.fetchPageUnfiltered(baseCfs.metadata(), pageSize, controller))
{
if (!page.hasNext())
break;
- try (UnfilteredRowIterator partition = page.next())
+ try (UnfilteredRowIterator onePartition = page.next())
+ {
+ partition =
ImmutableBTreePartition.create(onePartition).unfilteredIterator();
+ }
+ }
+
+ try (WriteContext ctx =
keyspace.getWriteHandler().createContextForIndexing())
+ {
{
Set<Index.Indexer> indexers = indexes.stream()
.map(index ->
index.indexerFor(key,
@@ -983,6 +991,13 @@ public class SecondaryIndexManager implements
IndexRegistry, INotificationConsum
indexers.forEach(Index.Indexer::finish);
}
}
+ finally
+ {
+ if (partition != null)
+ {
+ partition.close();
+ }
+ }
}
}
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexTest.java
b/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexTest.java
index 3b55dcf27d..5f4913bffd 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexTest.java
@@ -23,12 +23,18 @@ import java.net.InetAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Random;
import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import com.google.common.collect.Sets;
+
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.AfterClass;
@@ -37,15 +43,20 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.marshal.ValueGenerator;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.utils.TimeUUID;
+import static org.apache.cassandra.distributed.impl.IsolatedExecutor.waitOn;
+
public class SecondaryIndexTest extends TestBaseImpl
{
private static final int NUM_NODES = 3;
private static final int REPLICATION_FACTOR = 1;
- private static final String CREATE_TABLE = "CREATE TABLE %s(k int, v int,
PRIMARY KEY (k))";
+ private static final String CREATE_TABLE = "CREATE TABLE %s(k int, v text,
PRIMARY KEY (k))";
private static final String CREATE_INDEX = "CREATE INDEX v_index_%d ON
%s(v)";
private static final AtomicInteger seq = new AtomicInteger();
@@ -122,4 +133,48 @@ public class SecondaryIndexTest extends TestBaseImpl
});
}
}
+
+ @Test
+ public void test_secondary_rebuild_with_small_memtable_memory()
+ {
+ // populate data
+ Random rand = new Random();
+ for (int i = 0 ; i < 100 ; ++i)
+ cluster.coordinator(1).execute(String.format("INSERT INTO %s (k,
v) VALUES (?, ?)", tableName), ConsistencyLevel.ALL, i,
ValueGenerator.randomString(rand, 50000));
+
+ cluster.forEach(i -> i.flush(KEYSPACE));
+
+ // restart node 1 with small memtable allocation so that index rebuild
will cause memtable flush which will need
+ // to reclaim the memory. see CASSANDRA-19564
+ waitOn(cluster.get(1).shutdown());
+ Object originalMemTableHeapSpace =
cluster.get(1).config().get("memtable_heap_space");
+ cluster.get(1).config().set("memtable_heap_space", "1MiB");
+ cluster.get(1).startup();
+ String tableNameWithoutKeyspaceName = tableName.split("\\.")[1];
+ String indexName = String.format("v_index_%d", seq.get());
+ Runnable task = cluster.get(1).runsOnInstance(
+ () -> {
+ ColumnFamilyStore cfs =
Keyspace.open(KEYSPACE).getColumnFamilyStore(tableNameWithoutKeyspaceName);
+
cfs.indexManager.rebuildIndexesBlocking(Sets.newHashSet(Arrays.asList(indexName)));
+ }
+ );
+ ExecutorService es = Executors.newFixedThreadPool(1);
+ Future<?> future = es.submit(task);
+ try
+ {
+ future.get(30, TimeUnit.SECONDS);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ Assert.fail("Rebuild should finish within 30 seconds without
issue.");
+ }
+ finally
+ {
+ // restore node1 to use default value for memtable_heap_space
+ waitOn(cluster.get(1).shutdown());
+ cluster.get(1).config().set("memtable_heap_space",
originalMemTableHeapSpace);
+ cluster.get(1).startup();
+ }
+ }
}
diff --git a/test/unit/org/apache/cassandra/db/marshal/ValueGenerator.java
b/test/unit/org/apache/cassandra/db/marshal/ValueGenerator.java
index 79f6fdfacd..951fb6927c 100644
--- a/test/unit/org/apache/cassandra/db/marshal/ValueGenerator.java
+++ b/test/unit/org/apache/cassandra/db/marshal/ValueGenerator.java
@@ -47,7 +47,12 @@ public class ValueGenerator
public static String randomString(Random random)
{
- char[] chars = new char[random.nextInt(100)];
+ return randomString(random, 100);
+ }
+
+ public static String randomString(Random random, int length)
+ {
+ char[] chars = new char[random.nextInt(length)];
for (int i=0; i<chars.length; i++)
chars[i] = CHARS[random.nextInt(CHARS.length)];
return new String(chars);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]