This is an automated email from the ASF dual-hosted git repository. nfilotto pushed a commit to branch essobedo/CAMEL-21614/prevent-cache-change-miss-on-swap in repository https://gitbox.apache.org/repos/asf/camel.git
commit 6d27d74d65bac6bb3b7685a77dcc544680288cfd Author: Nicolas Filotto <[email protected]> AuthorDate: Tue Jan 14 19:29:17 2025 +0100 CAMEL-21614: camel-core - Prevent cache change miss on queue swap --- .../apache/camel/support/cache/SimpleLRUCache.java | 33 ++++++++++++++++++++-- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java b/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java index b6996a3f215..5ddd57b9c43 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java @@ -25,6 +25,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -49,6 +50,14 @@ public class SimpleLRUCache<K, V> extends ConcurrentHashMap<K, V> { * The flag indicating that an eviction process is in progress. */ private final AtomicBoolean eviction = new AtomicBoolean(); + /** + * The flag indicating that a swap process is in progress. + */ + private final AtomicBoolean swap = new AtomicBoolean(); + /** + * The number of cache change additions in progress. + */ + private final AtomicInteger additions = new AtomicInteger(); /** * The maximum cache size. */ @@ -84,7 +93,15 @@ public class SimpleLRUCache<K, V> extends ConcurrentHashMap<K, V> { if (value == null) { return null; } - lastChanges.get().add(Map.entry(key, value)); + while (swap.get()) { + Thread.yield(); + } + additions.incrementAndGet(); + try { + lastChanges.get().add(Map.entry(key, value)); + } finally { + additions.decrementAndGet(); + } return value; } @@ -268,8 +285,18 @@ public class SimpleLRUCache<K, V> extends ConcurrentHashMap<K, V> { * Removes duplicates from the queue of changes. */ private void compressChanges() { - Deque<Entry<K, V>> newChanges = new ConcurrentLinkedDeque<>(); - Deque<Entry<K, V>> currentChanges = lastChanges.getAndSet(newChanges); + Deque<Entry<K, V>> newChanges; + Deque<Entry<K, V>> currentChanges; + try { + swap.set(true); + while (additions.get() > 0) { + Thread.yield(); + } + newChanges = new ConcurrentLinkedDeque<>(); + currentChanges = lastChanges.getAndSet(newChanges); + } finally { + swap.set(false); + } Set<K> keys = new HashSet<>(); Entry<K, V> entry; while ((entry = currentChanges.pollLast()) != null) {
