This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 95b88ff8fc [ISSUE #8442][RIP-70-3] Extract adaptive lock mechanism 
(#8663)
95b88ff8fc is described below

commit 95b88ff8fcfecbd1942e1a35460f7417ee620673
Author: hqbfz <125714719+3424672...@users.noreply.github.com>
AuthorDate: Wed Oct 23 19:17:37 2024 +0800

    [ISSUE #8442][RIP-70-3] Extract adaptive lock mechanism (#8663)
    
    * extract the adaptive lock
    
    * extract the adaptive lock
    
    * feat(): perfect the adaptive lock
    
    * feat(): perfect the adaptive lock
    
    * Optimized code type
    
    * Optimized code type
    
    * Optimized code type
    
    * fix fail test
    
    * Optimize the adaptive locking mechanism logic
    
    * Optimize the adaptive locking mechanism logic
    
    * feat:Adaptive locking mechanism adjustment
    
    * feat:Adaptive locking mechanism adjustment
    
    * feat:Adaptive locking mechanism adjustment
    
    * Optimize the adaptive locking mechanism logic
    
    * Optimize the adaptive locking mechanism logic
    
    * Optimize the adaptive locking mechanism logic
    
    * feat:Supports the hot activation of ABS locks
    
    * feat:Supports the hot activation of ABS locks
    
    * feat:Supports the hot activation of ABS locks
    
    * feat:Supports the hot activation of ABS locks
    
    * Optimize code style
    
    * Optimize code style
    
    * Optimize code style
    
    * Optimize code style
    
    * Optimize code style
    
    * Optimize code style
    
    * Updated the locking mechanism name
    
    * Optimize the logic of switching to spin locks
    
    * Optimize the logic of switching to spin locks
    
    * Optimize the logic of switching to spin locks
    
    * Optimize the logic of switching to spin locks
    
    * Optimize the logic of switching to spin locks
    
    * Optimize the logic of switching to spin locks
    
    * Optimize the logic of switching to spin locks
    
    * Optimize the logic of switching to spin locks
    
    * delete unused import
    
    * Optimize the logic of switching to spin locks
    
    * Revert "Optimize the logic of switching to spin locks"
    
    This reverts commit 1d7bac5c2fea0531af01d4c57c843084ba4fea61.
    
    * Optimize the logic of switching to spin locks
    
    * Optimize the logic of switching to spin locks
    
    * Optimize the logic of switching to spin locks
    
    * Optimize the logic of switching to spin locks
    
    * Optimize the logic of switching to spin locks
    
    * Optimize the logic of switching to spin locks
    
    * Optimize the logic of switching to spin locks
    
    * Optimized locking logic
    
    * Optimized locking logic
    
    * Optimized locking logic
    
    * fix test
    
    * fix test
    
    * fix test
    
    * fix test
    
    * Optimize code style
    
    * Optimize code style
    
    * fix test
    
    * fix test
    
    * optimize client rebalancing logic
    
    ---------
    
    Co-authored-by: wanghuaiyuan <wanghuaiy...@xiaomi.com>
---
 .../java/org/apache/rocketmq/store/CommitLog.java  |   7 +-
 .../rocketmq/store/config/MessageStoreConfig.java  |  27 +++
 .../store/lock/AdaptiveBackOffSpinLock.java        |  35 ++++
 .../store/lock/AdaptiveBackOffSpinLockImpl.java    | 207 +++++++++++++++++++++
 .../rocketmq/store/lock/BackOffReentrantLock.java  |  33 ++++
 .../rocketmq/store/lock/BackOffSpinLock.java       | 110 +++++++++++
 .../rocketmq/store/lock/AdaptiveLockTest.java      |  86 +++++++++
 7 files changed, 504 insertions(+), 1 deletion(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java 
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 153215c98a..63022520e2 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -62,6 +62,7 @@ import 
org.apache.rocketmq.store.exception.ConsumeQueueException;
 import org.apache.rocketmq.store.exception.StoreException;
 import org.apache.rocketmq.store.ha.HAService;
 import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
+import org.apache.rocketmq.store.lock.AdaptiveBackOffSpinLockImpl;
 import org.apache.rocketmq.store.logfile.MappedFile;
 import org.apache.rocketmq.store.util.LibC;
 import org.rocksdb.RocksDBException;
@@ -130,7 +131,11 @@ public class CommitLog implements Swappable {
                 return new 
PutMessageThreadLocal(defaultMessageStore.getMessageStoreConfig());
             }
         };
-        this.putMessageLock = 
messageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new 
PutMessageReentrantLock() : new PutMessageSpinLock();
+
+        PutMessageLock adaptiveBackOffSpinLock = new 
AdaptiveBackOffSpinLockImpl();
+
+        this.putMessageLock = 
messageStore.getMessageStoreConfig().getUseABSLock() ? adaptiveBackOffSpinLock :
+            
messageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new 
PutMessageReentrantLock() : new PutMessageSpinLock();
 
         this.flushDiskWatcher = new FlushDiskWatcher();
 
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 8effe35bab..e31c03dd22 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -445,6 +445,17 @@ public class MessageStoreConfig {
      */
     private String bottomMostCompressionTypeForConsumeQueueStore = "zstd";
 
+    /**
+     * Spin number in the retreat strategy of spin lock
+     * Default is 1000
+     */
+    private int spinLockCollisionRetreatOptimalDegree = 1000;
+
+    /**
+     * Use AdaptiveBackOffLock
+     **/
+    private boolean useABSLock = false;
+
     public boolean isRocksdbCQDoubleWriteEnable() {
         return rocksdbCQDoubleWriteEnable;
     }
@@ -1898,4 +1909,20 @@ public class MessageStoreConfig {
     public void setBottomMostCompressionTypeForConsumeQueueStore(String 
bottomMostCompressionTypeForConsumeQueueStore) {
         this.bottomMostCompressionTypeForConsumeQueueStore = 
bottomMostCompressionTypeForConsumeQueueStore;
     }
+
+    public int getSpinLockCollisionRetreatOptimalDegree() {
+        return spinLockCollisionRetreatOptimalDegree;
+    }
+
+    public void setSpinLockCollisionRetreatOptimalDegree(int 
spinLockCollisionRetreatOptimalDegree) {
+        this.spinLockCollisionRetreatOptimalDegree = 
spinLockCollisionRetreatOptimalDegree;
+    }
+
+    public void setUseABSLock(boolean useABSLock) {
+        this.useABSLock = useABSLock;
+    }
+
+    public boolean getUseABSLock() {
+        return useABSLock;
+    }
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/lock/AdaptiveBackOffSpinLock.java
 
b/store/src/main/java/org/apache/rocketmq/store/lock/AdaptiveBackOffSpinLock.java
new file mode 100644
index 0000000000..96200bcc15
--- /dev/null
+++ 
b/store/src/main/java/org/apache/rocketmq/store/lock/AdaptiveBackOffSpinLock.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.lock;
+
+import org.apache.rocketmq.store.PutMessageLock;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+
+public interface AdaptiveBackOffSpinLock extends PutMessageLock {
+    /**
+     * Configuration update
+     * @param messageStoreConfig
+     */
+    default void update(MessageStoreConfig messageStoreConfig) {
+    }
+
+    /**
+     * Locking mechanism switching
+     */
+    default void swap() {
+    }
+}
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/lock/AdaptiveBackOffSpinLockImpl.java
 
b/store/src/main/java/org/apache/rocketmq/store/lock/AdaptiveBackOffSpinLockImpl.java
new file mode 100644
index 0000000000..b4abb08271
--- /dev/null
+++ 
b/store/src/main/java/org/apache/rocketmq/store/lock/AdaptiveBackOffSpinLockImpl.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.lock;
+
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class AdaptiveBackOffSpinLockImpl implements AdaptiveBackOffSpinLock {
+    private AdaptiveBackOffSpinLock adaptiveLock;
+    //state
+    private AtomicBoolean state = new AtomicBoolean(true);
+
+    // Used to determine the switchover between a mutex lock and a spin lock
+    private final static float SWAP_SPIN_LOCK_RATIO = 0.8f;
+
+    // It is used to adjust the spin number K of the escape spin lock
+    // When (retreat number / TPS) <= (1 / BASE_SWAP_ADAPTIVE_RATIO * 
SPIN_LOCK_ADAPTIVE_RATIO), K is decreased
+    private final static int SPIN_LOCK_ADAPTIVE_RATIO = 4;
+
+    // It is used to adjust the spin number K of the escape spin lock
+    // When (retreat number / TPS) >= (1 / BASE_SWAP_ADAPTIVE_RATIO), K is 
increased
+    private final static int BASE_SWAP_LOCK_RATIO = 320;
+
+    private final static String BACK_OFF_SPIN_LOCK = "SpinLock";
+
+    private final static String REENTRANT_LOCK = "ReentrantLock";
+
+    private Map<String, AdaptiveBackOffSpinLock> locks;
+
+    private final List<AtomicInteger> tpsTable;
+
+    private final List<Map<Thread, Byte>> threadTable;
+
+    private int swapCriticalPoint;
+
+    private AtomicInteger currentThreadNum = new AtomicInteger(0);
+
+    private AtomicBoolean isOpen = new AtomicBoolean(true);
+
+    public AdaptiveBackOffSpinLockImpl() {
+        this.locks = new HashMap<>();
+        this.locks.put(REENTRANT_LOCK, new BackOffReentrantLock());
+        this.locks.put(BACK_OFF_SPIN_LOCK, new BackOffSpinLock());
+
+        this.threadTable = new ArrayList<>(2);
+        this.threadTable.add(new ConcurrentHashMap<>());
+        this.threadTable.add(new ConcurrentHashMap<>());
+
+        this.tpsTable = new ArrayList<>(2);
+        this.tpsTable.add(new AtomicInteger(0));
+        this.tpsTable.add(new AtomicInteger(0));
+
+        adaptiveLock = this.locks.get(BACK_OFF_SPIN_LOCK);
+    }
+
+    @Override
+    public void lock() {
+        int slot = LocalTime.now().getSecond() % 2;
+        this.threadTable.get(slot).putIfAbsent(Thread.currentThread(), 
Byte.MAX_VALUE);
+        this.tpsTable.get(slot).getAndIncrement();
+        boolean state;
+        do {
+            state = this.state.get();
+        } while (!state);
+
+        currentThreadNum.incrementAndGet();
+        this.adaptiveLock.lock();
+    }
+
+    @Override
+    public void unlock() {
+        this.adaptiveLock.unlock();
+        currentThreadNum.decrementAndGet();
+        if (isOpen.get()) {
+            swap();
+        }
+    }
+
+    @Override
+    public void update(MessageStoreConfig messageStoreConfig) {
+        this.adaptiveLock.update(messageStoreConfig);
+    }
+
+    @Override
+    public void swap() {
+        if (!this.state.get()) {
+            return;
+        }
+        boolean needSwap = false;
+        int slot = 1 - LocalTime.now().getSecond() % 2;
+        int tps = this.tpsTable.get(slot).get() + 1;
+        int threadNum = this.threadTable.get(slot).size();
+        this.tpsTable.get(slot).set(-1);
+        this.threadTable.get(slot).clear();
+        if (tps == 0) {
+            return;
+        }
+
+        if (this.adaptiveLock instanceof BackOffSpinLock) {
+            BackOffSpinLock lock = (BackOffSpinLock) this.adaptiveLock;
+            // Avoid frequent adjustment of K, and make a reasonable range 
through experiments
+            // reasonable range : (retreat number / TPS) > (1 / 
BASE_SWAP_ADAPTIVE_RATIO * SPIN_LOCK_ADAPTIVE_RATIO) &&
+            // (retreat number / TPS) < (1 / BASE_SWAP_ADAPTIVE_RATIO)
+            if (lock.getNumberOfRetreat(slot) * BASE_SWAP_LOCK_RATIO >= tps) {
+                if (lock.isAdapt()) {
+                    lock.adapt(true);
+                } else {
+                    // It is used to switch between mutex lock and spin lock
+                    this.swapCriticalPoint = tps * threadNum;
+                    needSwap = true;
+                }
+            } else if (lock.getNumberOfRetreat(slot) * BASE_SWAP_LOCK_RATIO * 
SPIN_LOCK_ADAPTIVE_RATIO <= tps) {
+                lock.adapt(false);
+            }
+            lock.setNumberOfRetreat(slot, 0);
+        } else {
+            if (tps * threadNum <= this.swapCriticalPoint * 
SWAP_SPIN_LOCK_RATIO) {
+                needSwap = true;
+            }
+        }
+
+        if (needSwap) {
+            if (this.state.compareAndSet(true, false)) {
+                // Ensures that no threads are in contention locks as well as 
in critical zones
+                int currentThreadNum;
+                do {
+                    currentThreadNum = this.currentThreadNum.get();
+                } while (currentThreadNum != 0);
+
+                try {
+                    if (this.adaptiveLock instanceof BackOffSpinLock) {
+                        this.adaptiveLock = this.locks.get(REENTRANT_LOCK);
+                    } else {
+                        this.adaptiveLock = this.locks.get(BACK_OFF_SPIN_LOCK);
+                        ((BackOffSpinLock) this.adaptiveLock).adapt(false);
+                    }
+                } catch (Exception e) {
+                    //ignore
+                } finally {
+                    this.state.compareAndSet(false, true);
+                }
+            }
+        }
+    }
+
+    public List<AdaptiveBackOffSpinLock> getLocks() {
+        return (List<AdaptiveBackOffSpinLock>) this.locks.values();
+    }
+
+    public void setLocks(Map<String, AdaptiveBackOffSpinLock> locks) {
+        this.locks = locks;
+    }
+
+    public boolean getState() {
+        return this.state.get();
+    }
+
+    public void setState(boolean state) {
+        this.state.set(state);
+    }
+
+    public AdaptiveBackOffSpinLock getAdaptiveLock() {
+        return adaptiveLock;
+    }
+
+    public List<AtomicInteger> getTpsTable() {
+        return tpsTable;
+    }
+
+    public void setSwapCriticalPoint(int swapCriticalPoint) {
+        this.swapCriticalPoint = swapCriticalPoint;
+    }
+
+    public int getSwapCriticalPoint() {
+        return swapCriticalPoint;
+    }
+
+    public boolean isOpen() {
+        return this.isOpen.get();
+    }
+
+    public void setOpen(boolean open) {
+        this.isOpen.set(open);
+    }
+}
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/lock/BackOffReentrantLock.java 
b/store/src/main/java/org/apache/rocketmq/store/lock/BackOffReentrantLock.java
new file mode 100644
index 0000000000..90e416419b
--- /dev/null
+++ 
b/store/src/main/java/org/apache/rocketmq/store/lock/BackOffReentrantLock.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.lock;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+public class BackOffReentrantLock implements AdaptiveBackOffSpinLock {
+    private ReentrantLock putMessageNormalLock = new ReentrantLock(); // 
NonfairSync
+
+    @Override
+    public void lock() {
+        putMessageNormalLock.lock();
+    }
+
+    @Override
+    public void unlock() {
+        putMessageNormalLock.unlock();
+    }
+}
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/lock/BackOffSpinLock.java 
b/store/src/main/java/org/apache/rocketmq/store/lock/BackOffSpinLock.java
new file mode 100644
index 0000000000..f754970a05
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/lock/BackOffSpinLock.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.lock;
+
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class BackOffSpinLock implements AdaptiveBackOffSpinLock {
+
+    private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);
+
+    private int optimalDegree;
+
+    private final static int INITIAL_DEGREE = 1000;
+
+    private final static int MAX_OPTIMAL_DEGREE = 10000;
+
+    private final List<AtomicInteger> numberOfRetreat;
+
+    public BackOffSpinLock() {
+        this.optimalDegree = INITIAL_DEGREE;
+
+        numberOfRetreat = new ArrayList<>(2);
+        numberOfRetreat.add(new AtomicInteger(0));
+        numberOfRetreat.add(new AtomicInteger(0));
+    }
+
+    @Override
+    public void lock() {
+        int spinDegree = this.optimalDegree;
+        while (true) {
+            for (int i = 0; i < spinDegree; i++) {
+                if (this.putMessageSpinLock.compareAndSet(true, false)) {
+                    return;
+                }
+            }
+            numberOfRetreat.get(LocalTime.now().getSecond() % 
2).getAndIncrement();
+            try {
+                Thread.sleep(0);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @Override
+    public void unlock() {
+        this.putMessageSpinLock.compareAndSet(false, true);
+    }
+
+    @Override
+    public void update(MessageStoreConfig messageStoreConfig) {
+        this.optimalDegree = 
messageStoreConfig.getSpinLockCollisionRetreatOptimalDegree();
+    }
+
+    public int getOptimalDegree() {
+        return this.optimalDegree;
+    }
+
+    public void setOptimalDegree(int optimalDegree) {
+        this.optimalDegree = optimalDegree;
+    }
+
+    public boolean isAdapt() {
+        return optimalDegree < MAX_OPTIMAL_DEGREE;
+    }
+
+    public synchronized void adapt(boolean isRise) {
+        if (isRise) {
+            if (optimalDegree * 2 <= MAX_OPTIMAL_DEGREE) {
+                optimalDegree *= 2;
+            } else {
+                if (optimalDegree + INITIAL_DEGREE <= MAX_OPTIMAL_DEGREE) {
+                    optimalDegree += INITIAL_DEGREE;
+                }
+            }
+        } else {
+            if (optimalDegree >= 2 * INITIAL_DEGREE) {
+                optimalDegree -= INITIAL_DEGREE;
+            }
+        }
+    }
+
+    public int getNumberOfRetreat(int pos) {
+        return numberOfRetreat.get(pos).get();
+    }
+
+    public void setNumberOfRetreat(int pos, int size) {
+        this.numberOfRetreat.get(pos).set(size);
+    }
+}
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/lock/AdaptiveLockTest.java 
b/store/src/test/java/org/apache/rocketmq/store/lock/AdaptiveLockTest.java
new file mode 100644
index 0000000000..ac1b3c60cc
--- /dev/null
+++ b/store/src/test/java/org/apache/rocketmq/store/lock/AdaptiveLockTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.lock;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class AdaptiveLockTest {
+
+    AdaptiveBackOffSpinLockImpl adaptiveLock;
+
+    @Before
+    public void init() {
+        adaptiveLock = new AdaptiveBackOffSpinLockImpl();
+    }
+
+    @Test
+    public void testAdaptiveLock() throws InterruptedException {
+        assertTrue(adaptiveLock.getAdaptiveLock() instanceof BackOffSpinLock);
+        CountDownLatch countDownLatch = new CountDownLatch(1);
+        adaptiveLock.lock();
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                adaptiveLock.lock();
+                try {
+                    Thread.sleep(1);
+                } catch (InterruptedException e) {
+                    //ignore
+                }
+                adaptiveLock.unlock();
+                countDownLatch.countDown();
+            }
+        }).start();
+        Thread.sleep(1000);
+        adaptiveLock.unlock();
+        assertEquals(2000, ((BackOffSpinLock) 
adaptiveLock.getAdaptiveLock()).getOptimalDegree());
+        countDownLatch.await();
+
+        for (int i = 0; i <= 5; i++) {
+            CountDownLatch countDownLatch1 = new CountDownLatch(1);
+            adaptiveLock.lock();
+            new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    adaptiveLock.lock();
+                    try {
+                        Thread.sleep(1);
+                    } catch (InterruptedException e) {
+                        //ignore
+                    }
+                    adaptiveLock.unlock();
+                    countDownLatch1.countDown();
+                }
+            }).start();
+            Thread.sleep(1000);
+            adaptiveLock.unlock();
+            countDownLatch1.await();
+        }
+        assertTrue(adaptiveLock.getAdaptiveLock() instanceof 
BackOffReentrantLock);
+
+        adaptiveLock.lock();
+        Thread.sleep(1000);
+        adaptiveLock.unlock();
+        assertTrue(adaptiveLock.getAdaptiveLock() instanceof BackOffSpinLock);
+    }
+}

Reply via email to