This is an automated email from the ASF dual-hosted git repository.
konstantinov pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6b57acc7f4 Reduce contention in MemtableAllocator.allocate - Estimate
memory required to allocate for applying a partition update into a memtable and
allocate this memory in one shot, then use it as a request-local SLAB - Reduce
contention by switching MemtableAllocator.SubAllocator#owns from updates via
AtomicLongFieldUpdater to LongAdder usage.
MemtableAllocator.SubAllocator#acquired(..) method updates "owns" value but
does not use the updated result. - Reduce contention by rep [...]
6b57acc7f4 is described below
commit 6b57acc7f41357afb2ced62271f8f16b865f99c8
Author: Dmitry Konstantinov <[email protected]>
AuthorDate: Mon Mar 24 19:37:21 2025 +0000
Reduce contention in MemtableAllocator.allocate
- Estimate memory required to allocate for applying a partition update into
a memtable and allocate this memory in one shot, then use it as a request-local
SLAB
- Reduce contention by switching MemtableAllocator.SubAllocator#owns from
updates via AtomicLongFieldUpdater to LongAdder usage.
MemtableAllocator.SubAllocator#acquired(..) method updates "owns" value but
does not use the updated result.
- Reduce contention by replacing of CAS loop in
MemtablePool.SubPool#tryAllocate with allocatedUpdater.addAndGet(this, size)
Patch by Dmitry Konstantinov; reviewed by Michael Semb Wever for
CASSANDRA-20226
---
CHANGES.txt | 1 +
.../org/apache/cassandra/db/NativeClustering.java | 14 +-
.../db/partitions/BTreePartitionUpdater.java | 42 ++++-
.../org/apache/cassandra/db/rows/AbstractCell.java | 12 ++
src/java/org/apache/cassandra/db/rows/Cell.java | 5 +
.../org/apache/cassandra/db/rows/ColumnData.java | 3 +
.../cassandra/db/rows/ComplexColumnData.java | 5 +
.../org/apache/cassandra/db/rows/NativeCell.java | 23 ++-
...erAllocator.java => AddressBasedAllocator.java} | 24 +--
.../cassandra/utils/memory/ByteBufferCloner.java | 23 +++
.../org/apache/cassandra/utils/memory/Cloner.java | 10 ++
.../apache/cassandra/utils/memory/HeapCloner.java | 6 +
.../apache/cassandra/utils/memory/HeapPool.java | 33 +++-
.../cassandra/utils/memory/MemtableAllocator.java | 18 +-
.../utils/memory/MemtableBufferAllocator.java | 79 ++++++++-
.../cassandra/utils/memory/MemtablePool.java | 30 ++--
.../cassandra/utils/memory/NativeAllocator.java | 118 ++++++++++---
.../btree/AtomicBTreePartitionUpdateBench.java | 6 +
.../cassandra/utils/memory/ContextClonerTest.java | 186 +++++++++++++++++++++
19 files changed, 551 insertions(+), 87 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index e98357f851..7b0116f005 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.1
+ * Reduce contention in MemtableAllocator.allocate (CASSANDRA-20226)
* Add export, list, import sub-commands for nodetool compressiondictionary
(CASSANDRA-20941)
* Add support in the binary protocol to allow transactions to have multiple
conditions (CASSANDRA-20883)
* Enable CQLSSTableWriter to create SSTables compressed with a dictionary
(CASSANDRA-20938)
diff --git a/src/java/org/apache/cassandra/db/NativeClustering.java
b/src/java/org/apache/cassandra/db/NativeClustering.java
index f51ea90a82..45cfbb3b35 100644
--- a/src/java/org/apache/cassandra/db/NativeClustering.java
+++ b/src/java/org/apache/cassandra/db/NativeClustering.java
@@ -29,10 +29,10 @@ import org.apache.cassandra.db.marshal.ValueAccessor;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.AddressBasedAllocator;
import org.apache.cassandra.utils.memory.HeapCloner;
import org.apache.cassandra.utils.memory.MemoryUtil;
import org.apache.cassandra.utils.memory.NativeEndianMemoryUtil;
-import org.apache.cassandra.utils.memory.NativeAllocator;
public class NativeClustering implements Clustering<NativeData>
{
@@ -42,7 +42,17 @@ public class NativeClustering implements
Clustering<NativeData>
private NativeClustering() { peer = 0; }
- public NativeClustering(NativeAllocator allocator, OpOrder.Group writeOp,
Clustering<?> clustering)
+ public static int estimateAllocationSize(Clustering<?> clustering)
+ {
+ int count = clustering.size();
+ int metadataSize = (count * 2) + 4;
+ int dataSize = clustering.dataSize();
+ int bitmapSize = ((count + 7) >>> 3);
+
+ return metadataSize + dataSize + bitmapSize;
+ }
+
+ public NativeClustering(AddressBasedAllocator allocator, OpOrder.Group
writeOp, Clustering<?> clustering)
{
int count = clustering.size();
int metadataSize = (count * 2) + 4;
diff --git
a/src/java/org/apache/cassandra/db/partitions/BTreePartitionUpdater.java
b/src/java/org/apache/cassandra/db/partitions/BTreePartitionUpdater.java
index bce379d98a..9a4d6acaaa 100644
--- a/src/java/org/apache/cassandra/db/partitions/BTreePartitionUpdater.java
+++ b/src/java/org/apache/cassandra/db/partitions/BTreePartitionUpdater.java
@@ -40,7 +40,9 @@ public class BTreePartitionUpdater implements
UpdateFunction<Row, Row>, ColumnDa
{
final MemtableAllocator allocator;
final OpOrder.Group writeOp;
+
final Cloner cloner;
+ Cloner contextCloner;
final UpdateTransaction indexer;
public long dataSize;
@@ -84,6 +86,34 @@ public class BTreePartitionUpdater implements
UpdateFunction<Row, Row>, ColumnDa
protected BTreePartitionData makeMergedPartition(BTreePartitionData
current, PartitionUpdate update)
{
+ if (cloner.isContextAwareCloningSupported()) // to avoid an estimation
cost if context aware cloning is not supported
+ {
+ int estimitedCloneSize = 0;
+ // a typical case when all values in the update are used in the
result of the merge
+ // clustering key cloning is needed when we have an insert but not
needed when we have an update,
+ // so we may allocate a bit more than needed sometimes
+ for (Row row : update)
+ {
+ estimitedCloneSize += (int) row.accumulate((cd, v) -> v +
cd.estimateCloneSize(cloner), 0);
+ estimitedCloneSize +=
cloner.estimateCloneSize(row.clustering());
+ }
+
+ Row staticRow = update.staticRow();
+ if (!staticRow.isEmpty())
+ {
+ estimitedCloneSize += (int) staticRow.accumulate((cd, v) -> v
+ cd.estimateCloneSize(cloner), 0);
+ // there are no clustering keys for static rows
+ }
+
+ if (contextCloner != null && contextCloner != cloner)
+ contextCloner.adjustUnused();
+ contextCloner =
cloner.createContextAwareCloner(estimitedCloneSize);
+ }
+ else
+ {
+ contextCloner = cloner;
+ }
+
DeletionInfo newDeletionInfo = merge(current.deletionInfo,
update.deletionInfo());
RegularAndStaticColumns columns = current.columns;
@@ -130,7 +160,7 @@ public class BTreePartitionUpdater implements
UpdateFunction<Row, Row>, ColumnDa
@Override
public Row insert(Row insert)
{
- Row data = insert.clone(cloner);
+ Row data = insert.clone(contextCloner);
indexer.onInserted(insert);
dataSize += data.dataSize();
@@ -154,8 +184,8 @@ public class BTreePartitionUpdater implements
UpdateFunction<Row, Row>, ColumnDa
long timeDelta = Math.abs(insert.timestamp() - previous.timestamp());
if (timeDelta < colUpdateTimeDelta)
colUpdateTimeDelta = timeDelta;
- if (cloner != null)
- insert = cloner.clone(insert);
+ if (contextCloner != null)
+ insert = contextCloner.clone(insert);
dataSize += insert.dataSize() - previous.dataSize();
heapSize += insert.unsharedHeapSizeExcludingData() -
previous.unsharedHeapSizeExcludingData();
return insert;
@@ -163,8 +193,8 @@ public class BTreePartitionUpdater implements
UpdateFunction<Row, Row>, ColumnDa
public ColumnData insert(ColumnData insert)
{
- if (cloner != null)
- insert = insert.clone(cloner);
+ if (contextCloner != null)
+ insert = insert.clone(contextCloner);
dataSize += insert.dataSize();
heapSize += insert.unsharedHeapSizeExcludingData();
return insert;
@@ -185,5 +215,7 @@ public class BTreePartitionUpdater implements
UpdateFunction<Row, Row>, ColumnDa
public void reportAllocatedMemory()
{
allocator.onHeap().adjust(heapSize, writeOp);
+ if (contextCloner != null && contextCloner != cloner)
+ contextCloner.adjustUnused();
}
}
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractCell.java
b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
index 0dbcfc4420..aa5536593d 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractCell.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
@@ -114,6 +114,18 @@ public abstract class AbstractCell<V> extends Cell<V>
return new BufferCell(column, timestamp(), ttl(), localDeletionTime(),
cloner.clone(buffer()), path == null ? null : path.clone(cloner));
}
+ public static int estimateAllocationSize(Cell<?> cell)
+ {
+ long size = cell.valueSize();
+ CellPath path = cell.path();
+ if (path != null)
+ {
+ assert path.size() == 1 : String.format("Expected path size to be
1 but was not; %s", path);
+ size += path.get(0).remaining();
+ }
+ return (int) size;
+ }
+
// note: while the cell returned may be different, the value is the same,
so if the value is offheap it must be referenced inside a guarded context (or
copied)
public Cell<?> updateAllTimestamp(long newTimestamp)
{
diff --git a/src/java/org/apache/cassandra/db/rows/Cell.java
b/src/java/org/apache/cassandra/db/rows/Cell.java
index 3ddfeae39a..ced2f5229a 100644
--- a/src/java/org/apache/cassandra/db/rows/Cell.java
+++ b/src/java/org/apache/cassandra/db/rows/Cell.java
@@ -201,6 +201,11 @@ public abstract class Cell<V> extends ColumnData
return cloner.clone(this);
}
+ public int estimateCloneSize(Cloner cloner)
+ {
+ return cloner.estimateCloneSize(this);
+ }
+
public abstract Cell<?> clone(ByteBufferCloner cloner);
@Override
diff --git a/src/java/org/apache/cassandra/db/rows/ColumnData.java
b/src/java/org/apache/cassandra/db/rows/ColumnData.java
index c27d9bdabc..e37a8dc292 100644
--- a/src/java/org/apache/cassandra/db/rows/ColumnData.java
+++ b/src/java/org/apache/cassandra/db/rows/ColumnData.java
@@ -280,6 +280,9 @@ public abstract class ColumnData implements
IMeasurableMemory
public abstract ColumnData clone(Cloner cloner);
+ public abstract int estimateCloneSize(Cloner cloner);
+
+
/**
* Returns a copy of the data where all timestamps for live data have
replaced by {@code newTimestamp} and
* all deletion timestamp by {@code newTimestamp - 1}.
diff --git a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
index 84dbc0a8f7..20c4045bac 100644
--- a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
+++ b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
@@ -260,6 +260,11 @@ public class ComplexColumnData extends ColumnData
implements Iterable<Cell<?>>
return transform(c -> cloner.clone(c));
}
+ public int estimateCloneSize(Cloner cloner)
+ {
+ return (int) accumulate((c, v) -> v + cloner.estimateCloneSize(c), 0);
+ }
+
public ComplexColumnData updateAllTimestamp(long newTimestamp)
{
DeletionTime newDeletion = complexDeletion.isLive() ? complexDeletion
: DeletionTime.build(newTimestamp - 1, complexDeletion.localDeletionTime());
diff --git a/src/java/org/apache/cassandra/db/rows/NativeCell.java
b/src/java/org/apache/cassandra/db/rows/NativeCell.java
index b774cb2ce9..ebea8a319b 100644
--- a/src/java/org/apache/cassandra/db/rows/NativeCell.java
+++ b/src/java/org/apache/cassandra/db/rows/NativeCell.java
@@ -28,8 +28,8 @@ import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.AddressBasedAllocator;
import org.apache.cassandra.utils.memory.MemoryUtil;
-import org.apache.cassandra.utils.memory.NativeAllocator;
import org.apache.cassandra.utils.memory.NativeEndianMemoryUtil;
public class NativeCell extends AbstractCell<NativeData> implements NativeData
@@ -45,13 +45,28 @@ public class NativeCell extends AbstractCell<NativeData>
implements NativeData
private final long peer;
+ public static int estimateAllocationSize(Cell<?> cell)
+ {
+ long size = offHeapSizeWithoutPath(cell.valueSize());
+ CellPath path = cell.path();
+ if (path != null)
+ {
+ assert path.size() == 1 : String.format("Expected path size to be
1 but was not; %s", path);
+ size += 4 + path.get(0).remaining();
+ }
+
+ if (size > Integer.MAX_VALUE)
+ throw new IllegalStateException();
+ return (int) size;
+ }
+
private NativeCell()
{
super(null);
this.peer = 0;
}
- public NativeCell(NativeAllocator allocator,
+ public NativeCell(AddressBasedAllocator allocator,
OpOrder.Group writeOp,
Cell<?> cell)
{
@@ -67,7 +82,7 @@ public class NativeCell extends AbstractCell<NativeData>
implements NativeData
// Please keep both int/long overloaded ctros public. Otherwise silent
casts will mess timestamps when one is not
// available.
- public NativeCell(NativeAllocator allocator,
+ public NativeCell(AddressBasedAllocator allocator,
OpOrder.Group writeOp,
ColumnMetadata column,
long timestamp,
@@ -79,7 +94,7 @@ public class NativeCell extends AbstractCell<NativeData>
implements NativeData
this(allocator, writeOp, column, timestamp, ttl,
deletionTimeLongToUnsignedInteger(localDeletionTime), value, path);
}
- public NativeCell(NativeAllocator allocator,
+ public NativeCell(AddressBasedAllocator allocator,
OpOrder.Group writeOp,
ColumnMetadata column,
long timestamp,
diff --git
a/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java
b/src/java/org/apache/cassandra/utils/memory/AddressBasedAllocator.java
similarity index 59%
copy from
src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java
copy to src/java/org/apache/cassandra/utils/memory/AddressBasedAllocator.java
index b7b0384fbf..a4f5c41c0f 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/AddressBasedAllocator.java
@@ -15,30 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.cassandra.utils.memory;
-import java.nio.ByteBuffer;
+package org.apache.cassandra.utils.memory;
import org.apache.cassandra.utils.concurrent.OpOrder;
-public abstract class MemtableBufferAllocator extends MemtableAllocator
+public interface AddressBasedAllocator
{
- protected MemtableBufferAllocator(SubAllocator onHeap, SubAllocator
offHeap)
- {
- super(onHeap, offHeap);
- }
-
- public abstract ByteBuffer allocate(int size, OpOrder.Group opGroup);
-
- protected Cloner allocator(OpOrder.Group opGroup)
- {
- return new ByteBufferCloner()
- {
- @Override
- public ByteBuffer allocate(int size)
- {
- return MemtableBufferAllocator.this.allocate(size,
opGroup);
- }
- };
- }
+ long allocate(int sizeToAllocate, OpOrder.Group opGroup);
}
diff --git a/src/java/org/apache/cassandra/utils/memory/ByteBufferCloner.java
b/src/java/org/apache/cassandra/utils/memory/ByteBufferCloner.java
index a77e866acb..f089ac5c8e 100644
--- a/src/java/org/apache/cassandra/utils/memory/ByteBufferCloner.java
+++ b/src/java/org/apache/cassandra/utils/memory/ByteBufferCloner.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.marshal.ByteArrayAccessor;
import org.apache.cassandra.db.marshal.ByteBufferAccessor;
import org.apache.cassandra.db.marshal.ValueAccessor;
+import org.apache.cassandra.db.rows.AbstractCell;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -52,12 +53,34 @@ public abstract class ByteBufferCloner implements Cloner
return clustering.clone(this);
}
+ public int estimateCloneSize(Clustering<?> clustering)
+ {
+ return clustering.dataSize();
+ }
+
@Override
public Cell<?> clone(Cell<?> cell)
{
return cell.clone(this);
}
+ public int estimateCloneSize(Cell<?> cell)
+ {
+ return AbstractCell.estimateAllocationSize(cell);
+ }
+
+ @Override
+ public Cloner createContextAwareCloner(int estimatedCloneSize)
+ {
+ return this;
+ }
+
+ @Override
+ public void adjustUnused()
+ {
+ // nothing to do by default
+ }
+
public final ByteBuffer clone(ByteBuffer buffer)
{
return clone(buffer, ByteBufferAccessor.instance);
diff --git a/src/java/org/apache/cassandra/utils/memory/Cloner.java
b/src/java/org/apache/cassandra/utils/memory/Cloner.java
index 0b3accbff8..46541c285a 100644
--- a/src/java/org/apache/cassandra/utils/memory/Cloner.java
+++ b/src/java/org/apache/cassandra/utils/memory/Cloner.java
@@ -44,6 +44,8 @@ public interface Cloner
*/
Clustering<?> clone(Clustering<?> clustering);
+ int estimateCloneSize(Clustering<?> clustering);
+
/**
* Clones the specified cell.
*
@@ -51,4 +53,12 @@ public interface Cloner
* @return the cloned cell
*/
Cell<?> clone(Cell<?> cell);
+
+ int estimateCloneSize(Cell<?> cell);
+
+ boolean isContextAwareCloningSupported();
+
+ Cloner createContextAwareCloner(int estimatedCloneSize);
+
+ void adjustUnused();
}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/utils/memory/HeapCloner.java
b/src/java/org/apache/cassandra/utils/memory/HeapCloner.java
index 7f07c7903a..da8ff498cf 100644
--- a/src/java/org/apache/cassandra/utils/memory/HeapCloner.java
+++ b/src/java/org/apache/cassandra/utils/memory/HeapCloner.java
@@ -34,4 +34,10 @@ public final class HeapCloner extends ByteBufferCloner
{
return ByteBuffer.allocate(size);
}
+
+ @Override
+ public boolean isContextAwareCloningSupported()
+ {
+ return false;
+ }
}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/utils/memory/HeapPool.java
b/src/java/org/apache/cassandra/utils/memory/HeapPool.java
index 659715de4a..cf1307f11b 100644
--- a/src/java/org/apache/cassandra/utils/memory/HeapPool.java
+++ b/src/java/org/apache/cassandra/utils/memory/HeapPool.java
@@ -49,6 +49,24 @@ public class HeapPool extends MemtablePool
super(pool.onHeap.newAllocator(), pool.offHeap.newAllocator());
}
+ protected Cloner allocator(OpOrder.Group opGroup)
+ {
+ return new ByteBufferCloner()
+ {
+ @Override
+ public boolean isContextAwareCloningSupported()
+ {
+ return false;
+ }
+
+ @Override
+ public ByteBuffer allocate(int size)
+ {
+ return Allocator.this.allocate(size, opGroup);
+ }
+ };
+ }
+
public ByteBuffer allocate(int size, OpOrder.Group opGroup)
{
super.onHeap().allocate(size, opGroup);
@@ -127,7 +145,20 @@ public class HeapPool extends MemtablePool
public Cloner cloner(OpOrder.Group opGroup)
{
- return allocator(opGroup);
+ return new ByteBufferCloner()
+ {
+ @Override
+ public boolean isContextAwareCloningSupported()
+ {
+ return false;
+ }
+
+ @Override
+ public ByteBuffer allocate(int size)
+ {
+ return Logged.Allocator.this.allocate(size, opGroup);
+ }
+ };
}
}
diff --git a/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
b/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
index 646c90e04d..54fac01611 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.utils.memory;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.LongAdder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -111,7 +112,7 @@ public abstract class MemtableAllocator
private volatile LifeCycle state;
// the amount of memory/resource owned by this object
- private volatile long owns;
+ private final LongAdder owns = new LongAdder();
// the amount of memory we are reporting to collect; this may be
inaccurate, but is close
// and is used only to ensure that once we have reclaimed we mark the
tracker with the same amount
private volatile long reclaiming;
@@ -150,7 +151,7 @@ public abstract class MemtableAllocator
*/
void releaseAll()
{
- parent.released(ownsUpdater.getAndSet(this, 0));
+ parent.released(owns.sumThenReset());
parent.reclaimed(reclaimingUpdater.getAndSet(this, 0));
}
@@ -204,7 +205,7 @@ public abstract class MemtableAllocator
private void allocated(long size)
{
parent.allocated(size);
- ownsUpdater.addAndGet(this, size);
+ owns.add(size);
if (state == LifeCycle.DISCARDING)
{
@@ -221,7 +222,7 @@ public abstract class MemtableAllocator
private void acquired(long size)
{
parent.acquired();
- ownsUpdater.addAndGet(this, size);
+ owns.add(size);
if (state == LifeCycle.DISCARDING)
{
@@ -244,7 +245,7 @@ public abstract class MemtableAllocator
if (state == LifeCycle.LIVE)
{
parent.released(size);
- ownsUpdater.addAndGet(this, -size);
+ owns.add(-size);
}
else
{
@@ -264,7 +265,7 @@ public abstract class MemtableAllocator
{
while (true)
{
- long cur = owns;
+ long cur = owns.sum();
long prev = reclaiming;
if (!reclaimingUpdater.compareAndSet(this, prev, cur))
continue;
@@ -276,7 +277,7 @@ public abstract class MemtableAllocator
public long owns()
{
- return owns;
+ return owns.sum();
}
public long getReclaiming()
@@ -286,13 +287,12 @@ public abstract class MemtableAllocator
public float ownershipRatio()
{
- float r = owns / (float) parent.limit;
+ float r = owns.sum() / (float) parent.limit;
if (Float.isNaN(r))
return 0;
return r;
}
- private static final AtomicLongFieldUpdater<SubAllocator> ownsUpdater
= AtomicLongFieldUpdater.newUpdater(SubAllocator.class, "owns");
private static final AtomicLongFieldUpdater<SubAllocator>
reclaimingUpdater = AtomicLongFieldUpdater.newUpdater(SubAllocator.class,
"reclaiming");
}
diff --git
a/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java
b/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java
index b7b0384fbf..9680da6693 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.utils.memory;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -32,13 +33,77 @@ public abstract class MemtableBufferAllocator extends
MemtableAllocator
protected Cloner allocator(OpOrder.Group opGroup)
{
- return new ByteBufferCloner()
+ return new MemtableByteBufferCloner(this, opGroup);
+ }
+
+ private static class MemtableByteBufferCloner extends ByteBufferCloner
+ {
+ private final MemtableBufferAllocator allocator;
+ private final OpOrder.Group opGroup;
+ private final ByteBuffer contextSlab;
+
+ private MemtableByteBufferCloner(MemtableBufferAllocator allocator,
OpOrder.Group opGroup)
+ {
+ this(allocator, opGroup, null);
+ }
+
+ private MemtableByteBufferCloner(MemtableBufferAllocator allocator,
OpOrder.Group opGroup, ByteBuffer contextSlab)
+ {
+ this.allocator = allocator;
+ this.opGroup = opGroup;
+ this.contextSlab = contextSlab;
+ }
+
+ @Override
+ public ByteBuffer allocate(int size)
+ {
+ if (contextSlab == null || contextSlab.remaining() < size)
+ return allocator.allocate(size, opGroup);
+
+ ByteBuffer result;
+ int currentPosition = contextSlab.position();
+ if (contextSlab.isDirect())
+ {
+ // we do not want to keep contextSlab as an attachment for our
allocated buffers
+ // it would increase a memory pressure by keeping contextSlab
instances while a memtable is alive,
+ // so we have to imitate that the allocated direct byte buffer
is a child of the original parent region
+ result =
MemoryUtil.getHollowDirectByteBuffer(ByteOrder.BIG_ENDIAN);
+ MemoryUtil.duplicateDirectByteBuffer(contextSlab, result);
+ MemoryUtil.setAttachment(result,
MemoryUtil.getAttachment(contextSlab));
+ }
+ else
+ {
+ result = contextSlab.duplicate();
+ }
+ result.limit(currentPosition + size);
+ contextSlab.position(currentPosition + size);
+ return result;
+ }
+
+ @Override
+ public boolean isContextAwareCloningSupported()
+ {
+ return true;
+ }
+
+ @Override
+ public Cloner createContextAwareCloner(int estimatedCloneSize)
+ {
+ return new MemtableByteBufferCloner(allocator, opGroup,
allocator.allocate(estimatedCloneSize, opGroup));
+ }
+
+ @Override
+ public void adjustUnused()
+ {
+ if (contextSlab != null && contextSlab.remaining() >= 0)
{
- @Override
- public ByteBuffer allocate(int size)
- {
- return MemtableBufferAllocator.this.allocate(size,
opGroup);
- }
- };
+ int remaining = contextSlab.remaining();
+ if (contextSlab.isDirect())
+ allocator.offHeap().adjust(-remaining, opGroup);
+ else
+ allocator.onHeap().adjust(-remaining, opGroup);
+ contextSlab.position(contextSlab.position() + remaining);
+ }
+ }
}
}
diff --git a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
index f77ea2fab1..479504f880 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
@@ -152,14 +152,23 @@ public abstract class MemtablePool
boolean tryAllocate(long size)
{
- while (true)
- {
- long cur;
- if ((cur = allocated) + size > limit)
- return false;
- if (allocatedUpdater.compareAndSet(this, cur, cur + size))
- return true;
+ long result = allocatedUpdater.addAndGet(this, size);
+ if (result > limit) {
+ // We have switched from CAS loop and a strict limit check
+ // to addAndGet with a possible post-correction for perf
reasons.
+ // Why this is OK:
+ // - We may temporarily exceed the limit here, but that also
happens in case of blocking op order.
+ // - We decrease the allocated value, but that was also
possible as part of the adjustment logic.
+ //
+ // We don’t call released() here because it triggers
hasRoom.signalAll(), which would
+ // immediately wake up the current thread before memory is
reclaimed and cause a busy loop.
+ // In a rare case, an unsuccessful attempt of a larger
allocation near a limit by one thread
+ // may temporarily block progress on a smaller concurrent
allocation by another thread,
+ // but both threads will be signaled and be able to proceed
once memory is reclaimed.
+ allocatedUpdater.addAndGet(this, -size);
+ return false;
}
+ return true;
}
/**
@@ -168,12 +177,7 @@ public abstract class MemtablePool
*/
private void adjustAllocated(long size)
{
- while (true)
- {
- long cur = allocated;
- if (allocatedUpdater.compareAndSet(this, cur, cur + size))
- return;
- }
+ allocatedUpdater.addAndGet(this, size);
}
void allocated(long size)
diff --git a/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
b/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
index 0d1fdd488f..22ee46c68c 100644
--- a/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
@@ -41,7 +41,7 @@ import static
org.apache.cassandra.utils.concurrent.Semaphore.newSemaphore;
* long-lived objects.
*
*/
-public class NativeAllocator extends MemtableAllocator
+public class NativeAllocator extends MemtableAllocator implements
AddressBasedAllocator
{
private final static int MAX_REGION_SIZE = 1 * 1024 * 1024;
private final static int MAX_CLONED_SIZE = 128 * 1024; // bigger than this
don't go in the region
@@ -104,30 +104,98 @@ public class NativeAllocator extends MemtableAllocator
@Override
public Cloner cloner(Group opGroup)
{
- return new Cloner()
- {
-
- @Override
- public DecoratedKey clone(DecoratedKey key)
- {
- return NativeAllocator.this.clone(key, opGroup);
- }
-
- @Override
- public Clustering<?> clone(Clustering<?> clustering)
- {
- if (clustering != Clustering.STATIC_CLUSTERING)
- return new NativeClustering(NativeAllocator.this,
opGroup, clustering);
-
- return Clustering.STATIC_CLUSTERING;
- }
-
- @Override
- public Cell<?> clone(Cell<?> cell)
- {
- return new NativeCell(NativeAllocator.this, opGroup,
cell);
- }
- };
+ return new NativeCloner(this, opGroup);
+ }
+
+ private static class NativeCloner implements Cloner, AddressBasedAllocator
+ {
+ private final NativeAllocator allocator;
+ private final Group opGroup;
+
+ private final long address;
+ private final int limit;
+ private int offset = 0;
+
+ private NativeCloner(NativeAllocator allocator, Group opGroup)
+ {
+ this(allocator, opGroup, 0);
+ }
+
+ private NativeCloner(NativeAllocator allocator, Group opGroup, int
size)
+ {
+ this.allocator = allocator;
+ this.opGroup = opGroup;
+ this.limit = size;
+ this.address = size > 0 ? allocator.allocate(size, opGroup) : -1;
+ }
+
+ @Override
+ public DecoratedKey clone(DecoratedKey key)
+ {
+ return allocator.clone(key, opGroup);
+ }
+
+ @Override
+ public Clustering<?> clone(Clustering<?> clustering)
+ {
+ if (clustering != Clustering.STATIC_CLUSTERING)
+ return new NativeClustering(this, opGroup, clustering);
+
+ return Clustering.STATIC_CLUSTERING;
+ }
+
+ @Override
+ public int estimateCloneSize(Clustering<?> clustering)
+ {
+ return NativeClustering.estimateAllocationSize(clustering);
+ }
+
+ @Override
+ public Cell<?> clone(Cell<?> cell)
+ {
+ return new NativeCell(this, opGroup, cell);
+ }
+
+ @Override
+ public int estimateCloneSize(Cell<?> cell)
+ {
+ return NativeCell.estimateAllocationSize(cell);
+ }
+
+ @Override
+ public boolean isContextAwareCloningSupported()
+ {
+ return true;
+ }
+
+ @Override
+ public Cloner createContextAwareCloner(int estimatedCloneSize)
+ {
+ return new NativeCloner(this.allocator, this.opGroup,
estimatedCloneSize);
+ }
+
+ public long allocate(int sizeToAllocate, Group opGroup)
+ {
+ // we use this method in a single thread, no sync required
+ if (offset + sizeToAllocate <= limit)
+ {
+ long result = address + offset;
+ offset += sizeToAllocate;
+ return result;
+ }
+ return allocator.allocate(sizeToAllocate, opGroup);
+ }
+
+ @Override
+ public void adjustUnused()
+ {
+ int remaining = limit - offset;
+ if (remaining > 0)
+ {
+ allocator.offHeap().adjust(-remaining, opGroup);
+ offset += remaining;
+ }
+ }
}
public EnsureOnHeap ensureOnHeap()
diff --git
a/test/microbench/org/apache/cassandra/test/microbench/btree/AtomicBTreePartitionUpdateBench.java
b/test/microbench/org/apache/cassandra/test/microbench/btree/AtomicBTreePartitionUpdateBench.java
index df9d919f52..20f3af6d03 100644
---
a/test/microbench/org/apache/cassandra/test/microbench/btree/AtomicBTreePartitionUpdateBench.java
+++
b/test/microbench/org/apache/cassandra/test/microbench/btree/AtomicBTreePartitionUpdateBench.java
@@ -338,6 +338,12 @@ public class AtomicBTreePartitionUpdateBench
{
return new ByteBufferCloner()
{
+ @Override
+ public boolean isContextAwareCloningSupported()
+ {
+ return false;
+ }
+
@Override
public ByteBuffer allocate(int size)
{
diff --git a/test/unit/org/apache/cassandra/utils/memory/ContextClonerTest.java
b/test/unit/org/apache/cassandra/utils/memory/ContextClonerTest.java
new file mode 100644
index 0000000000..167965bde2
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/memory/ContextClonerTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.memory;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.db.ArrayClustering;
+import org.apache.cassandra.db.BufferClustering;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.rows.ArrayCell;
+import org.apache.cassandra.db.rows.BufferCell;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.utils.concurrent.ImmediateFuture;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.quicktheories.QuickTheory.qt;
+import static org.quicktheories.generators.SourceDSL.arbitrary;
+import static org.quicktheories.generators.SourceDSL.integers;
+
+@RunWith(Parameterized.class)
+public class ContextClonerTest
+{
+ private static final ColumnMetadata cellDef =
ColumnMetadata.regularColumn("ks", "cf", "c",
+
Int32Type.instance,
+
ColumnMetadata.NO_UNIQUE_ID);
+
+ private static final ColumnMetadata complexCellDef =
ColumnMetadata.regularColumn("ks", "cf", "c",
+
MapType.getInstance(Int32Type.instance,
+
Int32Type.instance,
+
true),
+
ColumnMetadata.NO_UNIQUE_ID);
+ @Parameterized.Parameters(name="allocationType={0}")
+ public static Iterable<? extends Object> data()
+ {
+ return Arrays.asList(Config.MemtableAllocationType.values());
+ }
+
+ @Parameterized.Parameter
+ public Config.MemtableAllocationType allocationType;
+
+ boolean offheapAllocation = false;
+
+ MemtablePool pool;
+
+ @Before
+ public void before()
+ {
+ long capacity = Integer.MAX_VALUE;
+ switch(allocationType)
+ {
+ case unslabbed_heap_buffers:
+ pool = new HeapPool(capacity, 1.0f, () ->
ImmediateFuture.success(false));
+ break;
+ case unslabbed_heap_buffers_logged:
+ pool = new HeapPool.Logged(capacity, 1.0f, () ->
ImmediateFuture.success(false));
+ break;
+ case heap_buffers:
+ pool = new SlabPool(capacity, 0, 1.0f, () ->
ImmediateFuture.success(false));
+ break;
+ case offheap_buffers:
+ pool = new SlabPool(0, capacity, 1.0f, () ->
ImmediateFuture.success(false));
+ offheapAllocation = true;
+ break;
+ case offheap_objects:
+ pool = new NativePool(0, capacity, 1.0f, () ->
ImmediateFuture.success(false));
+ offheapAllocation = true;
+ break;
+ default: throw new UnsupportedOperationException();
+
+ }
+ }
+
+ @Test
+ public void test()
+ {
+ qt().forAll(arbitrary().pick(Type.values()),
+ integers().between(10, 20),
+ integers().between(0, 10),
+ integers().between(-20, 20)).checkAssert(this::test);
+ }
+
+ public void test(Type type, int valueSize, int pathSize, int
estimationError) {
+ MemtableAllocator memtableAllocator = pool.newAllocator("test");
+ OpOrder opOrder = new OpOrder();
+ opOrder.start();
+ Cloner cloner = memtableAllocator.cloner(opOrder.getCurrent());
+ if (allocationType ==
Config.MemtableAllocationType.unslabbed_heap_buffers ||
+ allocationType ==
Config.MemtableAllocationType.unslabbed_heap_buffers_logged)
+ {
+ Assert.assertFalse(cloner.isContextAwareCloningSupported());
+ return;
+ }
+ else
+ {
+ Assert.assertTrue(cloner.isContextAwareCloningSupported());
+ }
+
+ int cellValueSize = valueSize;
+ Cell<?> cell = cell(type, cellValueSize, pathSize);
+ int cellEstimation = cloner.estimateCloneSize(cell);
+ Assert.assertThat(cellEstimation, greaterThanOrEqualTo(cellValueSize +
pathSize));
+
+ int clusteringValueSize = valueSize;
+ int clusteringColumns = 2;
+ Clustering<?> clustering = clustering(type, clusteringValueSize,
clusteringColumns);
+ int clusteringEstimation = cloner.estimateCloneSize(clustering);
+ Assert.assertThat(clusteringEstimation,
greaterThanOrEqualTo(clusteringValueSize * clusteringColumns));
+
+ int estimationSize = clusteringEstimation + cellEstimation;
+ Cloner contextCloner = cloner.createContextAwareCloner(estimationSize
+ estimationError);
+ contextCloner.clone(clustering);
+ contextCloner.clone(cell);
+ contextCloner.adjustUnused();
+ Assert.assertEquals(estimationSize,
+ offheapAllocation ?
memtableAllocator.offHeap().owns()
+ :
memtableAllocator.onHeap().owns());
+ }
+
+ Cell<?> cell(Type type, int valueSize, int pathSize)
+ {
+ CellPath path = (pathSize > 0) ?
CellPath.create(ByteBuffer.allocate(pathSize)) : null;
+ ColumnMetadata cellDefToUse = (pathSize > 0) ? complexCellDef :
cellDef;
+ switch(type)
+ {
+ case ARRAY:
+ byte[] cellValueArray = new byte[valueSize];
+ return new ArrayCell(cellDefToUse, 0L, Cell.NO_TTL,
Cell.NO_DELETION_TIME, cellValueArray, path);
+ case BUFFER:
+ ByteBuffer cellValueBuffer = ByteBuffer.allocate(valueSize);
+ return new BufferCell(cellDefToUse, 0L, Cell.NO_TTL,
Cell.NO_DELETION_TIME, cellValueBuffer, path);
+ }
+ throw new UnsupportedOperationException();
+ }
+
+ Clustering<?> clustering(Type type, int columnSize, int numberOfColumns)
+ {
+ switch(type)
+ {
+ case ARRAY:
+ byte[][] arrayValues = new byte[columnSize][numberOfColumns];
+ return new ArrayClustering(arrayValues);
+ case BUFFER:
+ ByteBuffer[] bufferValues = new ByteBuffer[numberOfColumns];
+ for (int i = 0; i < numberOfColumns; i++)
+ {
+ bufferValues[i] = ByteBuffer.allocate(columnSize);
+ }
+ return new BufferClustering(bufferValues);
+ }
+ throw new UnsupportedOperationException();
+ }
+
+ enum Type
+ {
+ ARRAY, BUFFER;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]