This is an automated email from the ASF dual-hosted git repository.

konstantinov pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6b57acc7f4 Reduce contention in MemtableAllocator.allocate - Estimate 
memory required to allocate for applying a partition update into a memtable and 
allocate this memory in one shot, then use it as a request-local SLAB - Reduce 
contention by switching MemtableAllocator.SubAllocator#owns from updates via 
AtomicLongFieldUpdater to LongAdder usage. 
MemtableAllocator.SubAllocator#acquired(..) method updates "owns" value but 
does not use the updated result. - Reduce contention by rep [...]
6b57acc7f4 is described below

commit 6b57acc7f41357afb2ced62271f8f16b865f99c8
Author: Dmitry Konstantinov <[email protected]>
AuthorDate: Mon Mar 24 19:37:21 2025 +0000

    Reduce contention in MemtableAllocator.allocate
    - Estimate memory required to allocate for applying a partition update into 
a memtable and allocate this memory in one shot, then use it as a request-local 
SLAB
    - Reduce contention by switching MemtableAllocator.SubAllocator#owns from 
updates via AtomicLongFieldUpdater to LongAdder usage. 
MemtableAllocator.SubAllocator#acquired(..) method updates "owns" value but 
does not use the updated result.
    - Reduce contention by replacing of CAS loop in 
MemtablePool.SubPool#tryAllocate with allocatedUpdater.addAndGet(this, size)
    
    Patch by Dmitry Konstantinov; reviewed by Michael Semb Wever for 
CASSANDRA-20226
---
 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/db/NativeClustering.java  |  14 +-
 .../db/partitions/BTreePartitionUpdater.java       |  42 ++++-
 .../org/apache/cassandra/db/rows/AbstractCell.java |  12 ++
 src/java/org/apache/cassandra/db/rows/Cell.java    |   5 +
 .../org/apache/cassandra/db/rows/ColumnData.java   |   3 +
 .../cassandra/db/rows/ComplexColumnData.java       |   5 +
 .../org/apache/cassandra/db/rows/NativeCell.java   |  23 ++-
 ...erAllocator.java => AddressBasedAllocator.java} |  24 +--
 .../cassandra/utils/memory/ByteBufferCloner.java   |  23 +++
 .../org/apache/cassandra/utils/memory/Cloner.java  |  10 ++
 .../apache/cassandra/utils/memory/HeapCloner.java  |   6 +
 .../apache/cassandra/utils/memory/HeapPool.java    |  33 +++-
 .../cassandra/utils/memory/MemtableAllocator.java  |  18 +-
 .../utils/memory/MemtableBufferAllocator.java      |  79 ++++++++-
 .../cassandra/utils/memory/MemtablePool.java       |  30 ++--
 .../cassandra/utils/memory/NativeAllocator.java    | 118 ++++++++++---
 .../btree/AtomicBTreePartitionUpdateBench.java     |   6 +
 .../cassandra/utils/memory/ContextClonerTest.java  | 186 +++++++++++++++++++++
 19 files changed, 551 insertions(+), 87 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index e98357f851..7b0116f005 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Reduce contention in MemtableAllocator.allocate (CASSANDRA-20226)
  * Add export, list, import sub-commands for nodetool compressiondictionary 
(CASSANDRA-20941)
  * Add support in the binary protocol to allow transactions to have multiple 
conditions (CASSANDRA-20883)
  * Enable CQLSSTableWriter to create SSTables compressed with a dictionary 
(CASSANDRA-20938)
diff --git a/src/java/org/apache/cassandra/db/NativeClustering.java 
b/src/java/org/apache/cassandra/db/NativeClustering.java
index f51ea90a82..45cfbb3b35 100644
--- a/src/java/org/apache/cassandra/db/NativeClustering.java
+++ b/src/java/org/apache/cassandra/db/NativeClustering.java
@@ -29,10 +29,10 @@ import org.apache.cassandra.db.marshal.ValueAccessor;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.AddressBasedAllocator;
 import org.apache.cassandra.utils.memory.HeapCloner;
 import org.apache.cassandra.utils.memory.MemoryUtil;
 import org.apache.cassandra.utils.memory.NativeEndianMemoryUtil;
-import org.apache.cassandra.utils.memory.NativeAllocator;
 
 public class NativeClustering implements Clustering<NativeData>
 {
@@ -42,7 +42,17 @@ public class NativeClustering implements 
Clustering<NativeData>
 
     private NativeClustering() { peer = 0; }
 
-    public NativeClustering(NativeAllocator allocator, OpOrder.Group writeOp, 
Clustering<?> clustering)
+    public static int estimateAllocationSize(Clustering<?> clustering)
+    {
+        int count = clustering.size();
+        int metadataSize = (count * 2) + 4;
+        int dataSize = clustering.dataSize();
+        int bitmapSize = ((count + 7) >>> 3);
+
+        return metadataSize + dataSize + bitmapSize;
+    }
+
+    public NativeClustering(AddressBasedAllocator allocator, OpOrder.Group 
writeOp, Clustering<?> clustering)
     {
         int count = clustering.size();
         int metadataSize = (count * 2) + 4;
diff --git 
a/src/java/org/apache/cassandra/db/partitions/BTreePartitionUpdater.java 
b/src/java/org/apache/cassandra/db/partitions/BTreePartitionUpdater.java
index bce379d98a..9a4d6acaaa 100644
--- a/src/java/org/apache/cassandra/db/partitions/BTreePartitionUpdater.java
+++ b/src/java/org/apache/cassandra/db/partitions/BTreePartitionUpdater.java
@@ -40,7 +40,9 @@ public class BTreePartitionUpdater implements 
UpdateFunction<Row, Row>, ColumnDa
 {
     final MemtableAllocator allocator;
     final OpOrder.Group writeOp;
+
     final Cloner cloner;
+    Cloner contextCloner;
     final UpdateTransaction indexer;
     public long dataSize;
 
@@ -84,6 +86,34 @@ public class BTreePartitionUpdater implements 
UpdateFunction<Row, Row>, ColumnDa
 
     protected BTreePartitionData makeMergedPartition(BTreePartitionData 
current, PartitionUpdate update)
     {
+        if (cloner.isContextAwareCloningSupported()) // to avoid an estimation 
cost if context aware cloning is not supported
+        {
+            int estimitedCloneSize = 0;
+            // a typical case when all values in the update are used in the 
result of the merge
+            // clustering key cloning is needed when we have an insert but not 
needed when we have an update,
+            // so we may allocate a bit more than needed sometimes
+            for (Row row : update)
+            {
+                estimitedCloneSize += (int) row.accumulate((cd, v) -> v + 
cd.estimateCloneSize(cloner), 0);
+                estimitedCloneSize += 
cloner.estimateCloneSize(row.clustering());
+            }
+
+            Row staticRow = update.staticRow();
+            if (!staticRow.isEmpty())
+            {
+                estimitedCloneSize += (int) staticRow.accumulate((cd, v) -> v 
+ cd.estimateCloneSize(cloner), 0);
+                // there are no clustering keys for static rows
+            }
+
+            if (contextCloner != null && contextCloner != cloner)
+                contextCloner.adjustUnused();
+            contextCloner = 
cloner.createContextAwareCloner(estimitedCloneSize);
+        }
+        else
+        {
+            contextCloner = cloner;
+        }
+
         DeletionInfo newDeletionInfo = merge(current.deletionInfo, 
update.deletionInfo());
 
         RegularAndStaticColumns columns = current.columns;
@@ -130,7 +160,7 @@ public class BTreePartitionUpdater implements 
UpdateFunction<Row, Row>, ColumnDa
     @Override
     public Row insert(Row insert)
     {
-        Row data = insert.clone(cloner);
+        Row data = insert.clone(contextCloner);
         indexer.onInserted(insert);
 
         dataSize += data.dataSize();
@@ -154,8 +184,8 @@ public class BTreePartitionUpdater implements 
UpdateFunction<Row, Row>, ColumnDa
         long timeDelta = Math.abs(insert.timestamp() - previous.timestamp());
         if (timeDelta < colUpdateTimeDelta)
             colUpdateTimeDelta = timeDelta;
-        if (cloner != null)
-            insert = cloner.clone(insert);
+        if (contextCloner != null)
+            insert = contextCloner.clone(insert);
         dataSize += insert.dataSize() - previous.dataSize();
         heapSize += insert.unsharedHeapSizeExcludingData() - 
previous.unsharedHeapSizeExcludingData();
         return insert;
@@ -163,8 +193,8 @@ public class BTreePartitionUpdater implements 
UpdateFunction<Row, Row>, ColumnDa
 
     public ColumnData insert(ColumnData insert)
     {
-        if (cloner != null)
-            insert = insert.clone(cloner);
+        if (contextCloner != null)
+            insert = insert.clone(contextCloner);
         dataSize += insert.dataSize();
         heapSize += insert.unsharedHeapSizeExcludingData();
         return insert;
@@ -185,5 +215,7 @@ public class BTreePartitionUpdater implements 
UpdateFunction<Row, Row>, ColumnDa
     public void reportAllocatedMemory()
     {
         allocator.onHeap().adjust(heapSize, writeOp);
+        if (contextCloner != null && contextCloner != cloner)
+            contextCloner.adjustUnused();
     }
 }
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractCell.java 
b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
index 0dbcfc4420..aa5536593d 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractCell.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
@@ -114,6 +114,18 @@ public abstract class AbstractCell<V> extends Cell<V>
         return new BufferCell(column, timestamp(), ttl(), localDeletionTime(), 
cloner.clone(buffer()), path == null ? null : path.clone(cloner));
     }
 
+    public static int estimateAllocationSize(Cell<?> cell)
+    {
+        long size = cell.valueSize();
+        CellPath path = cell.path();
+        if (path != null)
+        {
+            assert path.size() == 1 : String.format("Expected path size to be 
1 but was not; %s", path);
+            size += path.get(0).remaining();
+        }
+        return (int) size;
+    }
+
     // note: while the cell returned may be different, the value is the same, 
so if the value is offheap it must be referenced inside a guarded context (or 
copied)
     public Cell<?> updateAllTimestamp(long newTimestamp)
     {
diff --git a/src/java/org/apache/cassandra/db/rows/Cell.java 
b/src/java/org/apache/cassandra/db/rows/Cell.java
index 3ddfeae39a..ced2f5229a 100644
--- a/src/java/org/apache/cassandra/db/rows/Cell.java
+++ b/src/java/org/apache/cassandra/db/rows/Cell.java
@@ -201,6 +201,11 @@ public abstract class Cell<V> extends ColumnData
         return cloner.clone(this);
     }
 
+    public int estimateCloneSize(Cloner cloner)
+    {
+        return cloner.estimateCloneSize(this);
+    }
+
     public abstract Cell<?> clone(ByteBufferCloner cloner);
 
     @Override
diff --git a/src/java/org/apache/cassandra/db/rows/ColumnData.java 
b/src/java/org/apache/cassandra/db/rows/ColumnData.java
index c27d9bdabc..e37a8dc292 100644
--- a/src/java/org/apache/cassandra/db/rows/ColumnData.java
+++ b/src/java/org/apache/cassandra/db/rows/ColumnData.java
@@ -280,6 +280,9 @@ public abstract class ColumnData implements 
IMeasurableMemory
 
     public abstract ColumnData clone(Cloner cloner);
 
+    public abstract int estimateCloneSize(Cloner cloner);
+
+
     /**
      * Returns a copy of the data where all timestamps for live data have 
replaced by {@code newTimestamp} and
      * all deletion timestamp by {@code newTimestamp - 1}.
diff --git a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java 
b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
index 84dbc0a8f7..20c4045bac 100644
--- a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
+++ b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
@@ -260,6 +260,11 @@ public class ComplexColumnData extends ColumnData 
implements Iterable<Cell<?>>
         return transform(c -> cloner.clone(c));
     }
 
+    public int estimateCloneSize(Cloner cloner)
+    {
+        return (int) accumulate((c, v) -> v + cloner.estimateCloneSize(c), 0);
+    }
+
     public ComplexColumnData updateAllTimestamp(long newTimestamp)
     {
         DeletionTime newDeletion = complexDeletion.isLive() ? complexDeletion 
: DeletionTime.build(newTimestamp - 1, complexDeletion.localDeletionTime());
diff --git a/src/java/org/apache/cassandra/db/rows/NativeCell.java 
b/src/java/org/apache/cassandra/db/rows/NativeCell.java
index b774cb2ce9..ebea8a319b 100644
--- a/src/java/org/apache/cassandra/db/rows/NativeCell.java
+++ b/src/java/org/apache/cassandra/db/rows/NativeCell.java
@@ -28,8 +28,8 @@ import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.AddressBasedAllocator;
 import org.apache.cassandra.utils.memory.MemoryUtil;
-import org.apache.cassandra.utils.memory.NativeAllocator;
 import org.apache.cassandra.utils.memory.NativeEndianMemoryUtil;
 
 public class NativeCell extends AbstractCell<NativeData> implements NativeData
@@ -45,13 +45,28 @@ public class NativeCell extends AbstractCell<NativeData> 
implements NativeData
 
     private final long peer;
 
+    public static int estimateAllocationSize(Cell<?> cell)
+    {
+        long size = offHeapSizeWithoutPath(cell.valueSize());
+        CellPath path = cell.path();
+        if (path != null)
+        {
+            assert path.size() == 1 : String.format("Expected path size to be 
1 but was not; %s", path);
+            size += 4 + path.get(0).remaining();
+        }
+
+        if (size > Integer.MAX_VALUE)
+            throw new IllegalStateException();
+        return (int) size;
+    }
+
     private NativeCell()
     {
         super(null);
         this.peer = 0;
     }
 
-    public NativeCell(NativeAllocator allocator,
+    public NativeCell(AddressBasedAllocator allocator,
                       OpOrder.Group writeOp,
                       Cell<?> cell)
     {
@@ -67,7 +82,7 @@ public class NativeCell extends AbstractCell<NativeData> 
implements NativeData
 
     // Please keep both int/long overloaded ctros public. Otherwise silent 
casts will mess timestamps when one is not
     // available.
-    public NativeCell(NativeAllocator allocator,
+    public NativeCell(AddressBasedAllocator allocator,
                       OpOrder.Group writeOp,
                       ColumnMetadata column,
                       long timestamp,
@@ -79,7 +94,7 @@ public class NativeCell extends AbstractCell<NativeData> 
implements NativeData
         this(allocator, writeOp, column, timestamp, ttl, 
deletionTimeLongToUnsignedInteger(localDeletionTime), value, path);
     }
 
-    public NativeCell(NativeAllocator allocator,
+    public NativeCell(AddressBasedAllocator allocator,
                       OpOrder.Group writeOp,
                       ColumnMetadata column,
                       long timestamp,
diff --git 
a/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java 
b/src/java/org/apache/cassandra/utils/memory/AddressBasedAllocator.java
similarity index 59%
copy from 
src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java
copy to src/java/org/apache/cassandra/utils/memory/AddressBasedAllocator.java
index b7b0384fbf..a4f5c41c0f 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/AddressBasedAllocator.java
@@ -15,30 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.cassandra.utils.memory;
 
-import java.nio.ByteBuffer;
+package org.apache.cassandra.utils.memory;
 
 import org.apache.cassandra.utils.concurrent.OpOrder;
 
-public abstract class MemtableBufferAllocator extends MemtableAllocator
+public interface AddressBasedAllocator
 {
-    protected MemtableBufferAllocator(SubAllocator onHeap, SubAllocator 
offHeap)
-    {
-        super(onHeap, offHeap);
-    }
-
-    public abstract ByteBuffer allocate(int size, OpOrder.Group opGroup);
-
-    protected Cloner allocator(OpOrder.Group opGroup)
-    {
-        return new ByteBufferCloner()
-            {
-                @Override
-                public ByteBuffer allocate(int size)
-                {
-                    return MemtableBufferAllocator.this.allocate(size, 
opGroup);
-                }
-            };
-    }
+    long allocate(int sizeToAllocate, OpOrder.Group opGroup);
 }
diff --git a/src/java/org/apache/cassandra/utils/memory/ByteBufferCloner.java 
b/src/java/org/apache/cassandra/utils/memory/ByteBufferCloner.java
index a77e866acb..f089ac5c8e 100644
--- a/src/java/org/apache/cassandra/utils/memory/ByteBufferCloner.java
+++ b/src/java/org/apache/cassandra/utils/memory/ByteBufferCloner.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.marshal.ByteArrayAccessor;
 import org.apache.cassandra.db.marshal.ByteBufferAccessor;
 import org.apache.cassandra.db.marshal.ValueAccessor;
+import org.apache.cassandra.db.rows.AbstractCell;
 import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -52,12 +53,34 @@ public abstract class ByteBufferCloner implements Cloner
         return clustering.clone(this);
     }
 
+    public int estimateCloneSize(Clustering<?> clustering)
+    {
+        return clustering.dataSize();
+    }
+
     @Override
     public Cell<?> clone(Cell<?> cell)
     {
         return cell.clone(this);
     }
 
+    public int estimateCloneSize(Cell<?> cell)
+    {
+        return AbstractCell.estimateAllocationSize(cell);
+    }
+
+    @Override
+    public Cloner createContextAwareCloner(int estimatedCloneSize)
+    {
+        return this;
+    }
+
+    @Override
+    public void adjustUnused()
+    {
+        // nothing to do by default
+    }
+
     public final ByteBuffer clone(ByteBuffer buffer)
     {
         return clone(buffer, ByteBufferAccessor.instance);
diff --git a/src/java/org/apache/cassandra/utils/memory/Cloner.java 
b/src/java/org/apache/cassandra/utils/memory/Cloner.java
index 0b3accbff8..46541c285a 100644
--- a/src/java/org/apache/cassandra/utils/memory/Cloner.java
+++ b/src/java/org/apache/cassandra/utils/memory/Cloner.java
@@ -44,6 +44,8 @@ public interface Cloner
      */
     Clustering<?> clone(Clustering<?> clustering);
 
+    int estimateCloneSize(Clustering<?> clustering);
+
     /**
      * Clones the specified cell.
      *
@@ -51,4 +53,12 @@ public interface Cloner
      * @return the cloned cell
      */
     Cell<?> clone(Cell<?> cell);
+
+    int estimateCloneSize(Cell<?> cell);
+
+    boolean isContextAwareCloningSupported();
+
+    Cloner createContextAwareCloner(int estimatedCloneSize);
+
+    void adjustUnused();
 }
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/utils/memory/HeapCloner.java 
b/src/java/org/apache/cassandra/utils/memory/HeapCloner.java
index 7f07c7903a..da8ff498cf 100644
--- a/src/java/org/apache/cassandra/utils/memory/HeapCloner.java
+++ b/src/java/org/apache/cassandra/utils/memory/HeapCloner.java
@@ -34,4 +34,10 @@ public final class HeapCloner extends ByteBufferCloner
     {
         return ByteBuffer.allocate(size);
     }
+
+    @Override
+    public boolean isContextAwareCloningSupported()
+    {
+        return false;
+    }
 }
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/utils/memory/HeapPool.java 
b/src/java/org/apache/cassandra/utils/memory/HeapPool.java
index 659715de4a..cf1307f11b 100644
--- a/src/java/org/apache/cassandra/utils/memory/HeapPool.java
+++ b/src/java/org/apache/cassandra/utils/memory/HeapPool.java
@@ -49,6 +49,24 @@ public class HeapPool extends MemtablePool
             super(pool.onHeap.newAllocator(), pool.offHeap.newAllocator());
         }
 
+        protected Cloner allocator(OpOrder.Group opGroup)
+        {
+            return new ByteBufferCloner()
+            {
+                @Override
+                public boolean isContextAwareCloningSupported()
+                {
+                    return false;
+                }
+
+                @Override
+                public ByteBuffer allocate(int size)
+                {
+                    return Allocator.this.allocate(size, opGroup);
+                }
+            };
+        }
+
         public ByteBuffer allocate(int size, OpOrder.Group opGroup)
         {
             super.onHeap().allocate(size, opGroup);
@@ -127,7 +145,20 @@ public class HeapPool extends MemtablePool
 
             public Cloner cloner(OpOrder.Group opGroup)
             {
-                return allocator(opGroup);
+                return new ByteBufferCloner()
+                {
+                    @Override
+                    public boolean isContextAwareCloningSupported()
+                    {
+                        return false;
+                    }
+
+                    @Override
+                    public ByteBuffer allocate(int size)
+                    {
+                        return Logged.Allocator.this.allocate(size, opGroup);
+                    }
+                };
             }
         }
 
diff --git a/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java 
b/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
index 646c90e04d..54fac01611 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.utils.memory;
 
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.LongAdder;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -111,7 +112,7 @@ public abstract class MemtableAllocator
         private volatile LifeCycle state;
 
         // the amount of memory/resource owned by this object
-        private volatile long owns;
+        private final LongAdder owns = new LongAdder();
         // the amount of memory we are reporting to collect; this may be 
inaccurate, but is close
         // and is used only to ensure that once we have reclaimed we mark the 
tracker with the same amount
         private volatile long reclaiming;
@@ -150,7 +151,7 @@ public abstract class MemtableAllocator
          */
         void releaseAll()
         {
-            parent.released(ownsUpdater.getAndSet(this, 0));
+            parent.released(owns.sumThenReset());
             parent.reclaimed(reclaimingUpdater.getAndSet(this, 0));
         }
 
@@ -204,7 +205,7 @@ public abstract class MemtableAllocator
         private void allocated(long size)
         {
             parent.allocated(size);
-            ownsUpdater.addAndGet(this, size);
+            owns.add(size);
 
             if (state == LifeCycle.DISCARDING)
             {
@@ -221,7 +222,7 @@ public abstract class MemtableAllocator
         private void acquired(long size)
         {
             parent.acquired();
-            ownsUpdater.addAndGet(this, size);
+            owns.add(size);
 
             if (state == LifeCycle.DISCARDING)
             {
@@ -244,7 +245,7 @@ public abstract class MemtableAllocator
             if (state == LifeCycle.LIVE)
             {
                 parent.released(size);
-                ownsUpdater.addAndGet(this, -size);
+                owns.add(-size);
             }
             else
             {
@@ -264,7 +265,7 @@ public abstract class MemtableAllocator
         {
             while (true)
             {
-                long cur = owns;
+                long cur = owns.sum();
                 long prev = reclaiming;
                 if (!reclaimingUpdater.compareAndSet(this, prev, cur))
                     continue;
@@ -276,7 +277,7 @@ public abstract class MemtableAllocator
 
         public long owns()
         {
-            return owns;
+            return owns.sum();
         }
 
         public long getReclaiming()
@@ -286,13 +287,12 @@ public abstract class MemtableAllocator
 
         public float ownershipRatio()
         {
-            float r = owns / (float) parent.limit;
+            float r = owns.sum() / (float) parent.limit;
             if (Float.isNaN(r))
                 return 0;
             return r;
         }
 
-        private static final AtomicLongFieldUpdater<SubAllocator> ownsUpdater 
= AtomicLongFieldUpdater.newUpdater(SubAllocator.class, "owns");
         private static final AtomicLongFieldUpdater<SubAllocator> 
reclaimingUpdater = AtomicLongFieldUpdater.newUpdater(SubAllocator.class, 
"reclaiming");
     }
 
diff --git 
a/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java 
b/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java
index b7b0384fbf..9680da6693 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.utils.memory;
 
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 
 import org.apache.cassandra.utils.concurrent.OpOrder;
 
@@ -32,13 +33,77 @@ public abstract class MemtableBufferAllocator extends 
MemtableAllocator
 
     protected Cloner allocator(OpOrder.Group opGroup)
     {
-        return new ByteBufferCloner()
+        return new MemtableByteBufferCloner(this, opGroup);
+    }
+
+    private static class MemtableByteBufferCloner extends ByteBufferCloner
+    {
+        private final MemtableBufferAllocator allocator;
+        private final OpOrder.Group opGroup;
+        private final ByteBuffer contextSlab;
+
+        private MemtableByteBufferCloner(MemtableBufferAllocator allocator, 
OpOrder.Group opGroup)
+        {
+            this(allocator, opGroup, null);
+        }
+
+        private MemtableByteBufferCloner(MemtableBufferAllocator allocator, 
OpOrder.Group opGroup, ByteBuffer contextSlab)
+        {
+            this.allocator = allocator;
+            this.opGroup = opGroup;
+            this.contextSlab = contextSlab;
+        }
+
+        @Override
+        public ByteBuffer allocate(int size)
+        {
+            if (contextSlab == null || contextSlab.remaining() < size)
+                return allocator.allocate(size, opGroup);
+
+            ByteBuffer result;
+            int currentPosition = contextSlab.position();
+            if (contextSlab.isDirect())
+            {
+                // we do not want to keep contextSlab as an attachment for our 
allocated buffers
+                // it would increase a memory pressure by keeping contextSlab 
instances while a memtable is alive,
+                // so we have to imitate that the allocated direct byte buffer 
is a child of the original parent region
+                result = 
MemoryUtil.getHollowDirectByteBuffer(ByteOrder.BIG_ENDIAN);
+                MemoryUtil.duplicateDirectByteBuffer(contextSlab, result);
+                MemoryUtil.setAttachment(result, 
MemoryUtil.getAttachment(contextSlab));
+            }
+            else
+            {
+                result = contextSlab.duplicate();
+            }
+            result.limit(currentPosition + size);
+            contextSlab.position(currentPosition + size);
+            return result;
+        }
+
+        @Override
+        public boolean isContextAwareCloningSupported()
+        {
+            return true;
+        }
+
+        @Override
+        public Cloner createContextAwareCloner(int estimatedCloneSize)
+        {
+           return new MemtableByteBufferCloner(allocator, opGroup, 
allocator.allocate(estimatedCloneSize, opGroup));
+        }
+
+        @Override
+        public void adjustUnused()
+        {
+            if (contextSlab != null && contextSlab.remaining() >= 0)
             {
-                @Override
-                public ByteBuffer allocate(int size)
-                {
-                    return MemtableBufferAllocator.this.allocate(size, 
opGroup);
-                }
-            };
+                int remaining = contextSlab.remaining();
+                if (contextSlab.isDirect())
+                    allocator.offHeap().adjust(-remaining, opGroup);
+                else
+                    allocator.onHeap().adjust(-remaining, opGroup);
+                contextSlab.position(contextSlab.position() + remaining);
+            }
+        }
     }
 }
diff --git a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java 
b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
index f77ea2fab1..479504f880 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
@@ -152,14 +152,23 @@ public abstract class MemtablePool
 
         boolean tryAllocate(long size)
         {
-            while (true)
-            {
-                long cur;
-                if ((cur = allocated) + size > limit)
-                    return false;
-                if (allocatedUpdater.compareAndSet(this, cur, cur + size))
-                    return true;
+            long result = allocatedUpdater.addAndGet(this, size);
+            if (result > limit) {
+                // We have switched from CAS loop and a strict limit check
+                // to addAndGet with a possible post-correction for perf 
reasons.
+                // Why this is OK:
+                // - We may temporarily exceed the limit here, but that also 
happens in case of blocking op order.
+                // - We decrease the allocated value, but that was also 
possible as part of the adjustment logic.
+                //
+                // We don’t call released() here because it triggers 
hasRoom.signalAll(), which would
+                //   immediately wake up the current thread before memory is 
reclaimed and cause a busy loop.
+                // In a rare case, an unsuccessful attempt of a larger 
allocation near a limit by one thread
+                // may temporarily block progress on a smaller concurrent 
allocation by another thread,
+                // but both threads will be signaled and be able to proceed 
once memory is reclaimed.
+                allocatedUpdater.addAndGet(this, -size);
+                return false;
             }
+            return true;
         }
 
         /**
@@ -168,12 +177,7 @@ public abstract class MemtablePool
          */
         private void adjustAllocated(long size)
         {
-            while (true)
-            {
-                long cur = allocated;
-                if (allocatedUpdater.compareAndSet(this, cur, cur + size))
-                    return;
-            }
+            allocatedUpdater.addAndGet(this, size);
         }
 
         void allocated(long size)
diff --git a/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java 
b/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
index 0d1fdd488f..22ee46c68c 100644
--- a/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
@@ -41,7 +41,7 @@ import static 
org.apache.cassandra.utils.concurrent.Semaphore.newSemaphore;
  * long-lived objects.
  *
  */
-public class NativeAllocator extends MemtableAllocator
+public class NativeAllocator extends MemtableAllocator implements 
AddressBasedAllocator
 {
     private final static int MAX_REGION_SIZE = 1 * 1024 * 1024;
     private final static int MAX_CLONED_SIZE = 128 * 1024; // bigger than this 
don't go in the region
@@ -104,30 +104,98 @@ public class NativeAllocator extends MemtableAllocator
     @Override
     public Cloner cloner(Group opGroup)
     {
-        return new Cloner()
-                {
-
-                    @Override
-                    public DecoratedKey clone(DecoratedKey key)
-                    {
-                        return NativeAllocator.this.clone(key, opGroup);
-                    }
-
-                    @Override
-                    public Clustering<?> clone(Clustering<?> clustering)
-                    {
-                        if (clustering != Clustering.STATIC_CLUSTERING)
-                            return new NativeClustering(NativeAllocator.this, 
opGroup, clustering);
-
-                        return Clustering.STATIC_CLUSTERING;
-                    }
-
-                    @Override
-                    public Cell<?> clone(Cell<?> cell)
-                    {
-                        return new NativeCell(NativeAllocator.this, opGroup, 
cell);
-                    }
-                };
+        return new NativeCloner(this, opGroup);
+    }
+
+    private static class NativeCloner implements Cloner, AddressBasedAllocator
+    {
+        private final NativeAllocator allocator;
+        private final Group opGroup;
+
+        private final long address;
+        private final int limit;
+        private int offset = 0;
+
+        private NativeCloner(NativeAllocator allocator, Group opGroup)
+        {
+            this(allocator, opGroup, 0);
+        }
+
+        private NativeCloner(NativeAllocator allocator, Group opGroup, int 
size)
+        {
+            this.allocator = allocator;
+            this.opGroup = opGroup;
+            this.limit = size;
+            this.address = size > 0 ? allocator.allocate(size, opGroup) : -1;
+        }
+
+        @Override
+        public DecoratedKey clone(DecoratedKey key)
+        {
+            return allocator.clone(key, opGroup);
+        }
+
+        @Override
+        public Clustering<?> clone(Clustering<?> clustering)
+        {
+            if (clustering != Clustering.STATIC_CLUSTERING)
+                return new NativeClustering(this, opGroup, clustering);
+
+            return Clustering.STATIC_CLUSTERING;
+        }
+
+        @Override
+        public int estimateCloneSize(Clustering<?> clustering)
+        {
+            return NativeClustering.estimateAllocationSize(clustering);
+        }
+
+        @Override
+        public Cell<?> clone(Cell<?> cell)
+        {
+            return new NativeCell(this, opGroup, cell);
+        }
+
+        @Override
+        public int estimateCloneSize(Cell<?> cell)
+        {
+            return NativeCell.estimateAllocationSize(cell);
+        }
+
+        @Override
+        public boolean isContextAwareCloningSupported()
+        {
+            return true;
+        }
+
+        @Override
+        public Cloner createContextAwareCloner(int estimatedCloneSize)
+        {
+            return new NativeCloner(this.allocator, this.opGroup, 
estimatedCloneSize);
+        }
+
+        public long allocate(int sizeToAllocate, Group opGroup)
+        {
+            // we use this method in a single thread, no sync required
+            if (offset + sizeToAllocate <= limit)
+            {
+                long result = address + offset;
+                offset += sizeToAllocate;
+                return result;
+            }
+            return allocator.allocate(sizeToAllocate, opGroup);
+        }
+
+        @Override
+        public void adjustUnused()
+        {
+            int remaining = limit - offset;
+            if (remaining > 0)
+            {
+                allocator.offHeap().adjust(-remaining, opGroup);
+                offset += remaining;
+            }
+        }
     }
 
     public EnsureOnHeap ensureOnHeap()
diff --git 
a/test/microbench/org/apache/cassandra/test/microbench/btree/AtomicBTreePartitionUpdateBench.java
 
b/test/microbench/org/apache/cassandra/test/microbench/btree/AtomicBTreePartitionUpdateBench.java
index df9d919f52..20f3af6d03 100644
--- 
a/test/microbench/org/apache/cassandra/test/microbench/btree/AtomicBTreePartitionUpdateBench.java
+++ 
b/test/microbench/org/apache/cassandra/test/microbench/btree/AtomicBTreePartitionUpdateBench.java
@@ -338,6 +338,12 @@ public class AtomicBTreePartitionUpdateBench
                 {
                     return new ByteBufferCloner()
                     {
+                        @Override
+                        public boolean isContextAwareCloningSupported()
+                        {
+                            return false;
+                        }
+
                         @Override
                         public ByteBuffer allocate(int size)
                         {
diff --git a/test/unit/org/apache/cassandra/utils/memory/ContextClonerTest.java 
b/test/unit/org/apache/cassandra/utils/memory/ContextClonerTest.java
new file mode 100644
index 0000000000..167965bde2
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/memory/ContextClonerTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.memory;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.db.ArrayClustering;
+import org.apache.cassandra.db.BufferClustering;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.rows.ArrayCell;
+import org.apache.cassandra.db.rows.BufferCell;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.utils.concurrent.ImmediateFuture;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.quicktheories.QuickTheory.qt;
+import static org.quicktheories.generators.SourceDSL.arbitrary;
+import static org.quicktheories.generators.SourceDSL.integers;
+
+@RunWith(Parameterized.class)
+public class ContextClonerTest
+{
+    private static final ColumnMetadata cellDef = 
ColumnMetadata.regularColumn("ks", "cf", "c",
+                                                                               
Int32Type.instance,
+                                                                               
ColumnMetadata.NO_UNIQUE_ID);
+
+    private static final ColumnMetadata complexCellDef = 
ColumnMetadata.regularColumn("ks", "cf", "c",
+                                                                               
       MapType.getInstance(Int32Type.instance,
+                                                                               
                           Int32Type.instance,
+                                                                               
                           true),
+                                                                               
       ColumnMetadata.NO_UNIQUE_ID);
+    @Parameterized.Parameters(name="allocationType={0}")
+    public static Iterable<? extends Object> data()
+    {
+        return Arrays.asList(Config.MemtableAllocationType.values());
+    }
+
+    @Parameterized.Parameter
+    public Config.MemtableAllocationType allocationType;
+
+    boolean offheapAllocation = false;
+
+    MemtablePool pool;
+
+    @Before
+    public void before()
+    {
+        long capacity = Integer.MAX_VALUE;
+        switch(allocationType)
+        {
+            case unslabbed_heap_buffers:
+                pool = new HeapPool(capacity, 1.0f, () -> 
ImmediateFuture.success(false));
+                break;
+            case unslabbed_heap_buffers_logged:
+                pool = new HeapPool.Logged(capacity, 1.0f, () -> 
ImmediateFuture.success(false));
+                break;
+            case heap_buffers:
+                pool = new SlabPool(capacity, 0, 1.0f, () -> 
ImmediateFuture.success(false));
+                break;
+            case offheap_buffers:
+                pool = new SlabPool(0, capacity, 1.0f, () -> 
ImmediateFuture.success(false));
+                offheapAllocation = true;
+                break;
+            case offheap_objects:
+                pool = new NativePool(0, capacity, 1.0f, () -> 
ImmediateFuture.success(false));
+                offheapAllocation = true;
+                break;
+            default: throw new UnsupportedOperationException();
+
+        }
+    }
+
+    @Test
+    public void test()
+    {
+        qt().forAll(arbitrary().pick(Type.values()),
+                    integers().between(10, 20),
+                    integers().between(0, 10),
+                    integers().between(-20, 20)).checkAssert(this::test);
+    }
+
+    public void test(Type type, int valueSize, int pathSize, int 
estimationError) {
+        MemtableAllocator memtableAllocator = pool.newAllocator("test");
+        OpOrder opOrder = new OpOrder();
+        opOrder.start();
+        Cloner cloner = memtableAllocator.cloner(opOrder.getCurrent());
+        if (allocationType == 
Config.MemtableAllocationType.unslabbed_heap_buffers ||
+            allocationType == 
Config.MemtableAllocationType.unslabbed_heap_buffers_logged)
+        {
+            Assert.assertFalse(cloner.isContextAwareCloningSupported());
+            return;
+        }
+        else
+        {
+            Assert.assertTrue(cloner.isContextAwareCloningSupported());
+        }
+
+        int cellValueSize = valueSize;
+        Cell<?> cell = cell(type, cellValueSize, pathSize);
+        int cellEstimation = cloner.estimateCloneSize(cell);
+        Assert.assertThat(cellEstimation, greaterThanOrEqualTo(cellValueSize + 
pathSize));
+
+        int clusteringValueSize = valueSize;
+        int clusteringColumns = 2;
+        Clustering<?> clustering = clustering(type, clusteringValueSize, 
clusteringColumns);
+        int clusteringEstimation = cloner.estimateCloneSize(clustering);
+        Assert.assertThat(clusteringEstimation, 
greaterThanOrEqualTo(clusteringValueSize * clusteringColumns));
+
+        int estimationSize = clusteringEstimation + cellEstimation;
+        Cloner contextCloner = cloner.createContextAwareCloner(estimationSize 
+ estimationError);
+        contextCloner.clone(clustering);
+        contextCloner.clone(cell);
+        contextCloner.adjustUnused();
+        Assert.assertEquals(estimationSize,
+                            offheapAllocation ? 
memtableAllocator.offHeap().owns()
+                                              : 
memtableAllocator.onHeap().owns());
+    }
+
+    Cell<?> cell(Type type, int valueSize, int pathSize)
+    {
+        CellPath path = (pathSize > 0) ? 
CellPath.create(ByteBuffer.allocate(pathSize)) : null;
+        ColumnMetadata cellDefToUse = (pathSize > 0) ? complexCellDef : 
cellDef;
+        switch(type)
+        {
+            case ARRAY:
+                byte[] cellValueArray = new byte[valueSize];
+                return new ArrayCell(cellDefToUse, 0L, Cell.NO_TTL, 
Cell.NO_DELETION_TIME, cellValueArray, path);
+            case BUFFER:
+                ByteBuffer cellValueBuffer = ByteBuffer.allocate(valueSize);
+                return new BufferCell(cellDefToUse, 0L, Cell.NO_TTL, 
Cell.NO_DELETION_TIME, cellValueBuffer, path);
+        }
+        throw new UnsupportedOperationException();
+    }
+
+    Clustering<?> clustering(Type type, int columnSize, int numberOfColumns)
+    {
+        switch(type)
+        {
+            case ARRAY:
+                byte[][] arrayValues = new byte[columnSize][numberOfColumns];
+                return new ArrayClustering(arrayValues);
+            case BUFFER:
+                ByteBuffer[] bufferValues = new ByteBuffer[numberOfColumns];
+                for (int i = 0; i < numberOfColumns; i++)
+                {
+                    bufferValues[i] = ByteBuffer.allocate(columnSize);
+                }
+                return new BufferClustering(bufferValues);
+        }
+        throw new UnsupportedOperationException();
+    }
+
+    enum Type
+    {
+        ARRAY, BUFFER;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to