This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new d17a7f04c fix code bug & optimize the memory usage in
DefaultMappedFile (#4678)
d17a7f04c is described below
commit d17a7f04ccd84b4e4905692ffe7b23df70461c2a
Author: CoedPig <[email protected]>
AuthorDate: Wed Aug 17 10:25:41 2022 +0800
fix code bug & optimize the memory usage in DefaultMappedFile (#4678)
* optimize the memory usage in DefaultMappedFile:
use AtomicIntegerFieldUpdater instead of AtomicInteger
* fix code bug:
if the initial cursor of listIterator equals zero,
the previous element will always null.
* modify constant name
Co-authored-by: shanpengyu <[email protected]>
---
.../org/apache/rocketmq/store/MappedFileQueue.java | 2 +-
.../rocketmq/store/logfile/DefaultMappedFile.java | 96 +++++++++++++---------
2 files changed, 56 insertions(+), 42 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
index 7ba7de4f4..799df7288 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
@@ -277,7 +277,7 @@ public class MappedFileQueue implements Swappable {
return false;
}
- ListIterator<MappedFile> iterator = this.mappedFiles.listIterator();
+ ListIterator<MappedFile> iterator =
this.mappedFiles.listIterator(mappedFiles.size());
while (iterator.hasPrevious()) {
mappedFileLast = iterator.previous();
diff --git
a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
index 89709989f..b5ba68f51 100644
---
a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
+++
b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.store.logfile;
import com.sun.jna.NativeLong;
import com.sun.jna.Pointer;
+
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -27,7 +28,9 @@ import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageExtBatch;
@@ -52,9 +55,14 @@ public class DefaultMappedFile extends AbstractMappedFile {
protected static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new
AtomicLong(0);
protected static final AtomicInteger TOTAL_MAPPED_FILES = new
AtomicInteger(0);
- protected final AtomicInteger wrotePosition = new AtomicInteger(0);
- protected final AtomicInteger committedPosition = new AtomicInteger(0);
- protected final AtomicInteger flushedPosition = new AtomicInteger(0);
+
+ protected static final AtomicIntegerFieldUpdater<DefaultMappedFile>
WROTE_POSITION_UPDATER;
+ protected static final AtomicIntegerFieldUpdater<DefaultMappedFile>
COMMITTED_POSITION_UPDATER;
+ protected static final AtomicIntegerFieldUpdater<DefaultMappedFile>
FLUSHED_POSITION_UPDATER;
+
+ protected volatile int wrotePosition;
+ protected volatile int committedPosition;
+ protected volatile int flushedPosition;
protected int fileSize;
protected FileChannel fileChannel;
/**
@@ -74,6 +82,12 @@ public class DefaultMappedFile extends AbstractMappedFile {
protected long swapMapTime = 0L;
protected long mappedByteBufferAccessCountSinceLastSwap = 0L;
+ static {
+ WROTE_POSITION_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(DefaultMappedFile.class, "wrotePosition");
+ COMMITTED_POSITION_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(DefaultMappedFile.class,
"committedPosition");
+ FLUSHED_POSITION_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(DefaultMappedFile.class,
"flushedPosition");
+ }
+
public DefaultMappedFile() {
}
@@ -82,7 +96,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
}
public DefaultMappedFile(final String fileName, final int fileSize,
- final TransientStorePool transientStorePool) throws IOException {
+ final TransientStorePool transientStorePool)
throws IOException {
init(fileName, fileSize, transientStorePool);
}
@@ -155,11 +169,11 @@ public class DefaultMappedFile extends AbstractMappedFile
{
}
} else {
log.debug("matched, but hold failed, request pos: " + pos + ",
fileFromOffset: "
- + this.fileFromOffset);
+ + this.fileFromOffset);
}
} else {
log.warn("selectMappedBuffer request pos invalid, request pos: " +
pos + ", size: " + size
- + ", fileFromOffset: " + this.fileFromOffset);
+ + ", fileFromOffset: " + this.fileFromOffset);
}
return false;
@@ -177,22 +191,22 @@ public class DefaultMappedFile extends AbstractMappedFile
{
@Override
public AppendMessageResult appendMessage(final MessageExtBrokerInner msg,
final AppendMessageCallback cb,
- PutMessageContext putMessageContext) {
+ PutMessageContext
putMessageContext) {
return appendMessagesInner(msg, cb, putMessageContext);
}
@Override
public AppendMessageResult appendMessages(final MessageExtBatch
messageExtBatch, final AppendMessageCallback cb,
- PutMessageContext putMessageContext) {
+ PutMessageContext
putMessageContext) {
return appendMessagesInner(messageExtBatch, cb, putMessageContext);
}
public AppendMessageResult appendMessagesInner(final MessageExt
messageExt, final AppendMessageCallback cb,
- PutMessageContext putMessageContext) {
+ PutMessageContext
putMessageContext) {
assert messageExt != null;
assert cb != null;
- int currentPos = this.wrotePosition.get();
+ int currentPos = WROTE_POSITION_UPDATER.get(this);
if (currentPos < this.fileSize) {
ByteBuffer byteBuffer = appendMessageBuffer().slice();
@@ -209,7 +223,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
} else {
return new
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
- this.wrotePosition.addAndGet(result.getWroteBytes());
+ WROTE_POSITION_UPDATER.addAndGet(this, result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
@@ -234,7 +248,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
@Override
public boolean appendMessage(ByteBuffer data) {
- int currentPos = this.wrotePosition.get();
+ int currentPos = WROTE_POSITION_UPDATER.get(this);
int remaining = data.remaining();
if ((currentPos + remaining) <= this.fileSize) {
@@ -246,7 +260,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
} catch (Throwable e) {
log.error("Error occurred when append message to mappedFile.",
e);
}
- this.wrotePosition.addAndGet(remaining);
+ WROTE_POSITION_UPDATER.addAndGet(this, remaining);
return true;
}
return false;
@@ -260,7 +274,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
*/
@Override
public boolean appendMessage(final byte[] data, final int offset, final
int length) {
- int currentPos = this.wrotePosition.get();
+ int currentPos = WROTE_POSITION_UPDATER.get(this);
if ((currentPos + length) <= this.fileSize) {
try {
@@ -270,7 +284,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
} catch (Throwable e) {
log.error("Error occurred when append message to mappedFile.",
e);
}
- this.wrotePosition.addAndGet(length);
+ WROTE_POSITION_UPDATER.addAndGet(this, length);
return true;
}
@@ -300,11 +314,11 @@ public class DefaultMappedFile extends AbstractMappedFile
{
log.error("Error occurred when force data to disk.", e);
}
- this.flushedPosition.set(value);
+ FLUSHED_POSITION_UPDATER.set(this, value);
this.release();
} else {
- log.warn("in flush, hold failed, flush offset = " +
this.flushedPosition.get());
- this.flushedPosition.set(getReadPosition());
+ log.warn("in flush, hold failed, flush offset = " +
FLUSHED_POSITION_UPDATER.get(this));
+ FLUSHED_POSITION_UPDATER.set(this, getReadPosition());
}
}
return this.getFlushedPosition();
@@ -314,29 +328,29 @@ public class DefaultMappedFile extends AbstractMappedFile
{
public int commit(final int commitLeastPages) {
if (writeBuffer == null) {
//no need to commit data to file channel, so just regard
wrotePosition as committedPosition.
- return this.wrotePosition.get();
+ return WROTE_POSITION_UPDATER.get(this);
}
if (this.isAbleToCommit(commitLeastPages)) {
if (this.hold()) {
commit0();
this.release();
} else {
- log.warn("in commit, hold failed, commit offset = " +
this.committedPosition.get());
+ log.warn("in commit, hold failed, commit offset = " +
COMMITTED_POSITION_UPDATER.get(this));
}
}
// All dirty data has been committed to FileChannel.
- if (writeBuffer != null && this.transientStorePool != null &&
this.fileSize == this.committedPosition.get()) {
+ if (writeBuffer != null && this.transientStorePool != null &&
this.fileSize == COMMITTED_POSITION_UPDATER.get(this)) {
this.transientStorePool.returnBuffer(writeBuffer);
this.writeBuffer = null;
}
- return this.committedPosition.get();
+ return COMMITTED_POSITION_UPDATER.get(this);
}
protected void commit0() {
- int writePos = this.wrotePosition.get();
- int lastCommittedPosition = this.committedPosition.get();
+ int writePos = WROTE_POSITION_UPDATER.get(this);
+ int lastCommittedPosition = COMMITTED_POSITION_UPDATER.get(this);
if (writePos - lastCommittedPosition > 0) {
try {
@@ -345,7 +359,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
byteBuffer.limit(writePos);
this.fileChannel.position(lastCommittedPosition);
this.fileChannel.write(byteBuffer);
- this.committedPosition.set(writePos);
+ COMMITTED_POSITION_UPDATER.set(this, writePos);
} catch (Throwable e) {
log.error("Error occurred when commit data to FileChannel.",
e);
}
@@ -353,7 +367,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
}
private boolean isAbleToFlush(final int flushLeastPages) {
- int flush = this.flushedPosition.get();
+ int flush = FLUSHED_POSITION_UPDATER.get(this);
int write = getReadPosition();
if (this.isFull()) {
@@ -368,8 +382,8 @@ public class DefaultMappedFile extends AbstractMappedFile {
}
protected boolean isAbleToCommit(final int commitLeastPages) {
- int commit = this.committedPosition.get();
- int write = this.wrotePosition.get();
+ int commit = COMMITTED_POSITION_UPDATER.get(this);
+ int write = WROTE_POSITION_UPDATER.get(this);
if (this.isFull()) {
return true;
@@ -384,17 +398,17 @@ public class DefaultMappedFile extends AbstractMappedFile
{
@Override
public int getFlushedPosition() {
- return flushedPosition.get();
+ return FLUSHED_POSITION_UPDATER.get(this);
}
@Override
public void setFlushedPosition(int pos) {
- this.flushedPosition.set(pos);
+ FLUSHED_POSITION_UPDATER.set(this, pos);
}
@Override
public boolean isFull() {
- return this.fileSize == this.wrotePosition.get();
+ return this.fileSize == WROTE_POSITION_UPDATER.get(this);
}
@Override
@@ -411,11 +425,11 @@ public class DefaultMappedFile extends AbstractMappedFile
{
return new SelectMappedBufferResult(this.fileFromOffset + pos,
byteBufferNew, size, this);
} else {
log.warn("matched, but hold failed, request pos: " + pos + ",
fileFromOffset: "
- + this.fileFromOffset);
+ + this.fileFromOffset);
}
} else {
log.warn("selectMappedBuffer request pos invalid, request pos: " +
pos + ", size: " + size
- + ", fileFromOffset: " + this.fileFromOffset);
+ + ", fileFromOffset: " + this.fileFromOffset);
}
return null;
@@ -443,13 +457,13 @@ public class DefaultMappedFile extends AbstractMappedFile
{
public boolean cleanup(final long currentRef) {
if (this.isAvailable()) {
log.error("this file[REF:" + currentRef + "] " + this.fileName
- + " have not shutdown, stop unmapping.");
+ + " have not shutdown, stop unmapping.");
return false;
}
if (this.isCleanupOver()) {
log.error("this file[REF:" + currentRef + "] " + this.fileName
- + " have cleanup, do not do it again.");
+ + " have cleanup, do not do it again.");
return true;
}
@@ -494,12 +508,12 @@ public class DefaultMappedFile extends AbstractMappedFile
{
@Override
public int getWrotePosition() {
- return wrotePosition.get();
+ return WROTE_POSITION_UPDATER.get(this);
}
@Override
public void setWrotePosition(int pos) {
- this.wrotePosition.set(pos);
+ WROTE_POSITION_UPDATER.set(this, pos);
}
/**
@@ -507,12 +521,12 @@ public class DefaultMappedFile extends AbstractMappedFile
{
*/
@Override
public int getReadPosition() {
- return this.writeBuffer == null ? this.wrotePosition.get() :
this.committedPosition.get();
+ return this.writeBuffer == null ? WROTE_POSITION_UPDATER.get(this) :
COMMITTED_POSITION_UPDATER.get(this);
}
@Override
public void setCommittedPosition(int pos) {
- this.committedPosition.set(pos);
+ COMMITTED_POSITION_UPDATER.set(this, pos);
}
@Override
@@ -548,11 +562,11 @@ public class DefaultMappedFile extends AbstractMappedFile
{
// force flush when prepare load finished
if (type == FlushDiskType.SYNC_FLUSH) {
log.info("mapped file warm-up done, force to disk, mappedFile={},
costTime={}",
- this.getFileName(), System.currentTimeMillis() - beginTime);
+ this.getFileName(), System.currentTimeMillis() -
beginTime);
mappedByteBuffer.force();
}
log.info("mapped file warm-up done. mappedFile={}, costTime={}",
this.getFileName(),
- System.currentTimeMillis() - beginTime);
+ System.currentTimeMillis() - beginTime);
this.mlock();
}