This is an automated email from the ASF dual-hosted git repository.

nfilotto pushed a commit to branch 
essobedo/CAMEL-21614/prevent-cache-change-miss-on-swap
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 6d27d74d65bac6bb3b7685a77dcc544680288cfd
Author: Nicolas Filotto <[email protected]>
AuthorDate: Tue Jan 14 19:29:17 2025 +0100

    CAMEL-21614: camel-core - Prevent cache change miss on queue swap
---
 .../apache/camel/support/cache/SimpleLRUCache.java | 33 ++++++++++++++++++++--
 1 file changed, 30 insertions(+), 3 deletions(-)

diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java
index b6996a3f215..5ddd57b9c43 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java
@@ -25,6 +25,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
@@ -49,6 +50,14 @@ public class SimpleLRUCache<K, V> extends 
ConcurrentHashMap<K, V> {
      * The flag indicating that an eviction process is in progress.
      */
     private final AtomicBoolean eviction = new AtomicBoolean();
+    /**
+     * The flag indicating that a swap process is in progress.
+     */
+    private final AtomicBoolean swap = new AtomicBoolean();
+    /**
+     * The number of cache change additions in progress.
+     */
+    private final AtomicInteger additions = new AtomicInteger();
     /**
      * The maximum cache size.
      */
@@ -84,7 +93,15 @@ public class SimpleLRUCache<K, V> extends 
ConcurrentHashMap<K, V> {
         if (value == null) {
             return null;
         }
-        lastChanges.get().add(Map.entry(key, value));
+        while (swap.get()) {
+            Thread.yield();
+        }
+        additions.incrementAndGet();
+        try {
+            lastChanges.get().add(Map.entry(key, value));
+        } finally {
+            additions.decrementAndGet();
+        }
         return value;
     }
 
@@ -268,8 +285,18 @@ public class SimpleLRUCache<K, V> extends 
ConcurrentHashMap<K, V> {
      * Removes duplicates from the queue of changes.
      */
     private void compressChanges() {
-        Deque<Entry<K, V>> newChanges = new ConcurrentLinkedDeque<>();
-        Deque<Entry<K, V>> currentChanges = lastChanges.getAndSet(newChanges);
+        Deque<Entry<K, V>> newChanges;
+        Deque<Entry<K, V>> currentChanges;
+        try {
+            swap.set(true);
+            while (additions.get() > 0) {
+                Thread.yield();
+            }
+            newChanges = new ConcurrentLinkedDeque<>();
+            currentChanges = lastChanges.getAndSet(newChanges);
+        } finally {
+            swap.set(false);
+        }
         Set<K> keys = new HashSet<>();
         Entry<K, V> entry;
         while ((entry = currentChanges.pollLast()) != null) {

Reply via email to