This is an automated email from the ASF dual-hosted git repository.

jchovatia pushed a commit to branch cassandra-4.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-4.1 by this push:
     new 59574a86a3 ReadCommandController should close fast to avoid deadlock 
when building secondary index
59574a86a3 is described below

commit 59574a86a3a829d904ce3964e36ec71f5b62b961
Author: Runtian Liu <[email protected]>
AuthorDate: Wed Nov 20 12:51:36 2024 -0800

    ReadCommandController should close fast to avoid deadlock when building 
secondary index
    
    patch by Runtian Liu; reviewed by Caleb Rackliffe, Jaydeepkumar Chovatia 
for CASSANDRA-19564
---
 CHANGES.txt                                        |  1 +
 .../cassandra/index/SecondaryIndexManager.java     | 19 +++++++-
 .../distributed/test/SecondaryIndexTest.java       | 57 +++++++++++++++++++++-
 .../cassandra/db/marshal/ValueGenerator.java       |  7 ++-
 4 files changed, 80 insertions(+), 4 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index dc70d2bce4..b160f76416 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.1.12
+ * ReadCommandController should close fast to avoid deadlock when building 
secondary index (CASSANDRA-19564)
 Merged from 4.0
  * Updated dtest-api to 0.0.18 and removed JMX-related classes that now live 
in the dtest-api (CASSANDRA-20884)
 
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java 
b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index db3fa1d938..75bf0be0af 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -52,6 +52,7 @@ import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.db.lifecycle.View;
 import org.apache.cassandra.db.memtable.Memtable;
+import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.db.rows.*;
@@ -918,14 +919,21 @@ public class SecondaryIndexManager implements 
IndexRegistry, INotificationConsum
             SinglePartitionPager pager = new SinglePartitionPager(cmd, null, 
ProtocolVersion.CURRENT);
             while (!pager.isExhausted())
             {
+                UnfilteredRowIterator partition;
                 try (ReadExecutionController controller = 
cmd.executionController();
-                     WriteContext ctx = 
keyspace.getWriteHandler().createContextForIndexing();
                      UnfilteredPartitionIterator page = 
pager.fetchPageUnfiltered(baseCfs.metadata(), pageSize, controller))
                 {
                     if (!page.hasNext())
                         break;
 
-                    try (UnfilteredRowIterator partition = page.next())
+                    try (UnfilteredRowIterator onePartition = page.next())
+                    {
+                        partition = 
ImmutableBTreePartition.create(onePartition).unfilteredIterator();
+                    }
+                }
+
+                try (WriteContext ctx = 
keyspace.getWriteHandler().createContextForIndexing())
+                {
                     {
                         Set<Index.Indexer> indexers = indexes.stream()
                                                              .map(index -> 
index.indexerFor(key,
@@ -983,6 +991,13 @@ public class SecondaryIndexManager implements 
IndexRegistry, INotificationConsum
                         indexers.forEach(Index.Indexer::finish);
                     }
                 }
+                finally
+                {
+                    if (partition != null)
+                    {
+                        partition.close();
+                    }
+                }
             }
         }
     }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexTest.java
index 3b55dcf27d..5f4913bffd 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexTest.java
@@ -23,12 +23,18 @@ import java.net.InetAddress;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Random;
 import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
+import com.google.common.collect.Sets;
+
 import org.awaitility.Awaitility;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -37,15 +43,20 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.marshal.ValueGenerator;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.utils.TimeUUID;
 
+import static org.apache.cassandra.distributed.impl.IsolatedExecutor.waitOn;
+
 public class SecondaryIndexTest extends TestBaseImpl
 {
     private static final int NUM_NODES = 3;
     private static final int REPLICATION_FACTOR = 1;
-    private static final String CREATE_TABLE = "CREATE TABLE %s(k int, v int, 
PRIMARY KEY (k))";
+    private static final String CREATE_TABLE = "CREATE TABLE %s(k int, v text, 
PRIMARY KEY (k))";
     private static final String CREATE_INDEX = "CREATE INDEX v_index_%d ON 
%s(v)";
 
     private static final AtomicInteger seq = new AtomicInteger();
@@ -122,4 +133,48 @@ public class SecondaryIndexTest extends TestBaseImpl
                                    });
         }
     }
+
+    @Test
+    public void test_secondary_rebuild_with_small_memtable_memory()
+    {
+        // populate data
+        Random rand = new Random();
+        for (int i = 0 ; i < 100 ; ++i)
+            cluster.coordinator(1).execute(String.format("INSERT INTO %s (k, 
v) VALUES (?, ?)", tableName), ConsistencyLevel.ALL, i, 
ValueGenerator.randomString(rand, 50000));
+
+        cluster.forEach(i -> i.flush(KEYSPACE));
+
+        // restart node 1 with small memtable allocation so that index rebuild 
will cause memtable flush which will need
+        // to reclaim the memory. see CASSANDRA-19564
+        waitOn(cluster.get(1).shutdown());
+        Object originalMemTableHeapSpace = 
cluster.get(1).config().get("memtable_heap_space");
+        cluster.get(1).config().set("memtable_heap_space", "1MiB");
+        cluster.get(1).startup();
+        String tableNameWithoutKeyspaceName = tableName.split("\\.")[1];
+        String indexName = String.format("v_index_%d", seq.get());
+        Runnable task = cluster.get(1).runsOnInstance(
+        () -> {
+            ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore(tableNameWithoutKeyspaceName);
+            
cfs.indexManager.rebuildIndexesBlocking(Sets.newHashSet(Arrays.asList(indexName)));
+        }
+        );
+        ExecutorService es = Executors.newFixedThreadPool(1);
+        Future<?> future = es.submit(task);
+        try
+        {
+            future.get(30, TimeUnit.SECONDS);
+        }
+        catch (Exception e)
+        {
+            e.printStackTrace();
+            Assert.fail("Rebuild should finish within 30 seconds without 
issue.");
+        }
+        finally
+        {
+            // restore node1 to use default value for memtable_heap_space
+            waitOn(cluster.get(1).shutdown());
+            cluster.get(1).config().set("memtable_heap_space", 
originalMemTableHeapSpace);
+            cluster.get(1).startup();
+        }
+    }
 }
diff --git a/test/unit/org/apache/cassandra/db/marshal/ValueGenerator.java 
b/test/unit/org/apache/cassandra/db/marshal/ValueGenerator.java
index 79f6fdfacd..951fb6927c 100644
--- a/test/unit/org/apache/cassandra/db/marshal/ValueGenerator.java
+++ b/test/unit/org/apache/cassandra/db/marshal/ValueGenerator.java
@@ -47,7 +47,12 @@ public class ValueGenerator
 
     public static String randomString(Random random)
     {
-        char[] chars = new char[random.nextInt(100)];
+        return randomString(random, 100);
+    }
+
+    public static String randomString(Random random, int length)
+    {
+        char[] chars = new char[random.nextInt(length)];
         for (int i=0; i<chars.length; i++)
             chars[i] = CHARS[random.nextInt(CHARS.length)];
         return new String(chars);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to