This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new d17a7f04c fix code bug & optimize the memory usage in 
DefaultMappedFile (#4678)
d17a7f04c is described below

commit d17a7f04ccd84b4e4905692ffe7b23df70461c2a
Author: CoedPig <[email protected]>
AuthorDate: Wed Aug 17 10:25:41 2022 +0800

    fix code bug & optimize the memory usage in DefaultMappedFile (#4678)
    
    * optimize the memory usage in DefaultMappedFile:
    use AtomicIntegerFieldUpdater instead of AtomicInteger
    
    * fix code bug:
    if the initial cursor of listIterator equals zero,
    the previous element will always null.
    
    * modify constant name
    
    Co-authored-by: shanpengyu <[email protected]>
---
 .../org/apache/rocketmq/store/MappedFileQueue.java |  2 +-
 .../rocketmq/store/logfile/DefaultMappedFile.java  | 96 +++++++++++++---------
 2 files changed, 56 insertions(+), 42 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java 
b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
index 7ba7de4f4..799df7288 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
@@ -277,7 +277,7 @@ public class MappedFileQueue implements Swappable {
                 return false;
         }
 
-        ListIterator<MappedFile> iterator = this.mappedFiles.listIterator();
+        ListIterator<MappedFile> iterator = 
this.mappedFiles.listIterator(mappedFiles.size());
 
         while (iterator.hasPrevious()) {
             mappedFileLast = iterator.previous();
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java 
b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
index 89709989f..b5ba68f51 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.store.logfile;
 
 import com.sun.jna.NativeLong;
 import com.sun.jna.Pointer;
+
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -27,7 +28,9 @@ import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.channels.FileChannel.MapMode;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageExtBatch;
@@ -52,9 +55,14 @@ public class DefaultMappedFile extends AbstractMappedFile {
     protected static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new 
AtomicLong(0);
 
     protected static final AtomicInteger TOTAL_MAPPED_FILES = new 
AtomicInteger(0);
-    protected final AtomicInteger wrotePosition = new AtomicInteger(0);
-    protected final AtomicInteger committedPosition = new AtomicInteger(0);
-    protected final AtomicInteger flushedPosition = new AtomicInteger(0);
+
+    protected static final AtomicIntegerFieldUpdater<DefaultMappedFile> 
WROTE_POSITION_UPDATER;
+    protected static final AtomicIntegerFieldUpdater<DefaultMappedFile> 
COMMITTED_POSITION_UPDATER;
+    protected static final AtomicIntegerFieldUpdater<DefaultMappedFile> 
FLUSHED_POSITION_UPDATER;
+
+    protected volatile int wrotePosition;
+    protected volatile int committedPosition;
+    protected volatile int flushedPosition;
     protected int fileSize;
     protected FileChannel fileChannel;
     /**
@@ -74,6 +82,12 @@ public class DefaultMappedFile extends AbstractMappedFile {
     protected long swapMapTime = 0L;
     protected long mappedByteBufferAccessCountSinceLastSwap = 0L;
 
+    static {
+        WROTE_POSITION_UPDATER = 
AtomicIntegerFieldUpdater.newUpdater(DefaultMappedFile.class, "wrotePosition");
+        COMMITTED_POSITION_UPDATER = 
AtomicIntegerFieldUpdater.newUpdater(DefaultMappedFile.class, 
"committedPosition");
+        FLUSHED_POSITION_UPDATER = 
AtomicIntegerFieldUpdater.newUpdater(DefaultMappedFile.class, 
"flushedPosition");
+    }
+
     public DefaultMappedFile() {
     }
 
@@ -82,7 +96,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
     }
 
     public DefaultMappedFile(final String fileName, final int fileSize,
-        final TransientStorePool transientStorePool) throws IOException {
+                             final TransientStorePool transientStorePool) 
throws IOException {
         init(fileName, fileSize, transientStorePool);
     }
 
@@ -155,11 +169,11 @@ public class DefaultMappedFile extends AbstractMappedFile 
{
                 }
             } else {
                 log.debug("matched, but hold failed, request pos: " + pos + ", 
fileFromOffset: "
-                    + this.fileFromOffset);
+                        + this.fileFromOffset);
             }
         } else {
             log.warn("selectMappedBuffer request pos invalid, request pos: " + 
pos + ", size: " + size
-                + ", fileFromOffset: " + this.fileFromOffset);
+                    + ", fileFromOffset: " + this.fileFromOffset);
         }
 
         return false;
@@ -177,22 +191,22 @@ public class DefaultMappedFile extends AbstractMappedFile 
{
 
     @Override
     public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, 
final AppendMessageCallback cb,
-            PutMessageContext putMessageContext) {
+                                             PutMessageContext 
putMessageContext) {
         return appendMessagesInner(msg, cb, putMessageContext);
     }
 
     @Override
     public AppendMessageResult appendMessages(final MessageExtBatch 
messageExtBatch, final AppendMessageCallback cb,
-            PutMessageContext putMessageContext) {
+                                              PutMessageContext 
putMessageContext) {
         return appendMessagesInner(messageExtBatch, cb, putMessageContext);
     }
 
     public AppendMessageResult appendMessagesInner(final MessageExt 
messageExt, final AppendMessageCallback cb,
-            PutMessageContext putMessageContext) {
+                                                   PutMessageContext 
putMessageContext) {
         assert messageExt != null;
         assert cb != null;
 
-        int currentPos = this.wrotePosition.get();
+        int currentPos = WROTE_POSITION_UPDATER.get(this);
 
         if (currentPos < this.fileSize) {
             ByteBuffer byteBuffer = appendMessageBuffer().slice();
@@ -209,7 +223,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
             } else {
                 return new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
             }
-            this.wrotePosition.addAndGet(result.getWroteBytes());
+            WROTE_POSITION_UPDATER.addAndGet(this, result.getWroteBytes());
             this.storeTimestamp = result.getStoreTimestamp();
             return result;
         }
@@ -234,7 +248,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
 
     @Override
     public boolean appendMessage(ByteBuffer data) {
-        int currentPos = this.wrotePosition.get();
+        int currentPos = WROTE_POSITION_UPDATER.get(this);
         int remaining = data.remaining();
 
         if ((currentPos + remaining) <= this.fileSize) {
@@ -246,7 +260,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
             } catch (Throwable e) {
                 log.error("Error occurred when append message to mappedFile.", 
e);
             }
-            this.wrotePosition.addAndGet(remaining);
+            WROTE_POSITION_UPDATER.addAndGet(this, remaining);
             return true;
         }
         return false;
@@ -260,7 +274,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
      */
     @Override
     public boolean appendMessage(final byte[] data, final int offset, final 
int length) {
-        int currentPos = this.wrotePosition.get();
+        int currentPos = WROTE_POSITION_UPDATER.get(this);
 
         if ((currentPos + length) <= this.fileSize) {
             try {
@@ -270,7 +284,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
             } catch (Throwable e) {
                 log.error("Error occurred when append message to mappedFile.", 
e);
             }
-            this.wrotePosition.addAndGet(length);
+            WROTE_POSITION_UPDATER.addAndGet(this, length);
             return true;
         }
 
@@ -300,11 +314,11 @@ public class DefaultMappedFile extends AbstractMappedFile 
{
                     log.error("Error occurred when force data to disk.", e);
                 }
 
-                this.flushedPosition.set(value);
+                FLUSHED_POSITION_UPDATER.set(this, value);
                 this.release();
             } else {
-                log.warn("in flush, hold failed, flush offset = " + 
this.flushedPosition.get());
-                this.flushedPosition.set(getReadPosition());
+                log.warn("in flush, hold failed, flush offset = " + 
FLUSHED_POSITION_UPDATER.get(this));
+                FLUSHED_POSITION_UPDATER.set(this, getReadPosition());
             }
         }
         return this.getFlushedPosition();
@@ -314,29 +328,29 @@ public class DefaultMappedFile extends AbstractMappedFile 
{
     public int commit(final int commitLeastPages) {
         if (writeBuffer == null) {
             //no need to commit data to file channel, so just regard 
wrotePosition as committedPosition.
-            return this.wrotePosition.get();
+            return WROTE_POSITION_UPDATER.get(this);
         }
         if (this.isAbleToCommit(commitLeastPages)) {
             if (this.hold()) {
                 commit0();
                 this.release();
             } else {
-                log.warn("in commit, hold failed, commit offset = " + 
this.committedPosition.get());
+                log.warn("in commit, hold failed, commit offset = " + 
COMMITTED_POSITION_UPDATER.get(this));
             }
         }
 
         // All dirty data has been committed to FileChannel.
-        if (writeBuffer != null && this.transientStorePool != null && 
this.fileSize == this.committedPosition.get()) {
+        if (writeBuffer != null && this.transientStorePool != null && 
this.fileSize == COMMITTED_POSITION_UPDATER.get(this)) {
             this.transientStorePool.returnBuffer(writeBuffer);
             this.writeBuffer = null;
         }
 
-        return this.committedPosition.get();
+        return COMMITTED_POSITION_UPDATER.get(this);
     }
 
     protected void commit0() {
-        int writePos = this.wrotePosition.get();
-        int lastCommittedPosition = this.committedPosition.get();
+        int writePos = WROTE_POSITION_UPDATER.get(this);
+        int lastCommittedPosition = COMMITTED_POSITION_UPDATER.get(this);
 
         if (writePos - lastCommittedPosition > 0) {
             try {
@@ -345,7 +359,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
                 byteBuffer.limit(writePos);
                 this.fileChannel.position(lastCommittedPosition);
                 this.fileChannel.write(byteBuffer);
-                this.committedPosition.set(writePos);
+                COMMITTED_POSITION_UPDATER.set(this, writePos);
             } catch (Throwable e) {
                 log.error("Error occurred when commit data to FileChannel.", 
e);
             }
@@ -353,7 +367,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
     }
 
     private boolean isAbleToFlush(final int flushLeastPages) {
-        int flush = this.flushedPosition.get();
+        int flush = FLUSHED_POSITION_UPDATER.get(this);
         int write = getReadPosition();
 
         if (this.isFull()) {
@@ -368,8 +382,8 @@ public class DefaultMappedFile extends AbstractMappedFile {
     }
 
     protected boolean isAbleToCommit(final int commitLeastPages) {
-        int commit = this.committedPosition.get();
-        int write = this.wrotePosition.get();
+        int commit = COMMITTED_POSITION_UPDATER.get(this);
+        int write = WROTE_POSITION_UPDATER.get(this);
 
         if (this.isFull()) {
             return true;
@@ -384,17 +398,17 @@ public class DefaultMappedFile extends AbstractMappedFile 
{
 
     @Override
     public int getFlushedPosition() {
-        return flushedPosition.get();
+        return FLUSHED_POSITION_UPDATER.get(this);
     }
 
     @Override
     public void setFlushedPosition(int pos) {
-        this.flushedPosition.set(pos);
+        FLUSHED_POSITION_UPDATER.set(this, pos);
     }
 
     @Override
     public boolean isFull() {
-        return this.fileSize == this.wrotePosition.get();
+        return this.fileSize == WROTE_POSITION_UPDATER.get(this);
     }
 
     @Override
@@ -411,11 +425,11 @@ public class DefaultMappedFile extends AbstractMappedFile 
{
                 return new SelectMappedBufferResult(this.fileFromOffset + pos, 
byteBufferNew, size, this);
             } else {
                 log.warn("matched, but hold failed, request pos: " + pos + ", 
fileFromOffset: "
-                    + this.fileFromOffset);
+                        + this.fileFromOffset);
             }
         } else {
             log.warn("selectMappedBuffer request pos invalid, request pos: " + 
pos + ", size: " + size
-                + ", fileFromOffset: " + this.fileFromOffset);
+                    + ", fileFromOffset: " + this.fileFromOffset);
         }
 
         return null;
@@ -443,13 +457,13 @@ public class DefaultMappedFile extends AbstractMappedFile 
{
     public boolean cleanup(final long currentRef) {
         if (this.isAvailable()) {
             log.error("this file[REF:" + currentRef + "] " + this.fileName
-                + " have not shutdown, stop unmapping.");
+                    + " have not shutdown, stop unmapping.");
             return false;
         }
 
         if (this.isCleanupOver()) {
             log.error("this file[REF:" + currentRef + "] " + this.fileName
-                + " have cleanup, do not do it again.");
+                    + " have cleanup, do not do it again.");
             return true;
         }
 
@@ -494,12 +508,12 @@ public class DefaultMappedFile extends AbstractMappedFile 
{
 
     @Override
     public int getWrotePosition() {
-        return wrotePosition.get();
+        return WROTE_POSITION_UPDATER.get(this);
     }
 
     @Override
     public void setWrotePosition(int pos) {
-        this.wrotePosition.set(pos);
+        WROTE_POSITION_UPDATER.set(this, pos);
     }
 
     /**
@@ -507,12 +521,12 @@ public class DefaultMappedFile extends AbstractMappedFile 
{
      */
     @Override
     public int getReadPosition() {
-        return this.writeBuffer == null ? this.wrotePosition.get() : 
this.committedPosition.get();
+        return this.writeBuffer == null ? WROTE_POSITION_UPDATER.get(this) : 
COMMITTED_POSITION_UPDATER.get(this);
     }
 
     @Override
     public void setCommittedPosition(int pos) {
-        this.committedPosition.set(pos);
+        COMMITTED_POSITION_UPDATER.set(this, pos);
     }
 
     @Override
@@ -548,11 +562,11 @@ public class DefaultMappedFile extends AbstractMappedFile 
{
         // force flush when prepare load finished
         if (type == FlushDiskType.SYNC_FLUSH) {
             log.info("mapped file warm-up done, force to disk, mappedFile={}, 
costTime={}",
-                this.getFileName(), System.currentTimeMillis() - beginTime);
+                    this.getFileName(), System.currentTimeMillis() - 
beginTime);
             mappedByteBuffer.force();
         }
         log.info("mapped file warm-up done. mappedFile={}, costTime={}", 
this.getFileName(),
-            System.currentTimeMillis() - beginTime);
+                System.currentTimeMillis() - beginTime);
 
         this.mlock();
     }

Reply via email to