joerghoh commented on code in PR #2978:
URL: https://github.com/apache/jackrabbit-oak/pull/2978#discussion_r3490929387


##########
oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RemotePersistentCacheService.java:
##########
@@ -120,6 +127,11 @@ private PersistentCache 
createPersistentCache(Configuration configuration, Close
                     configuration.redisMinConnections(), 
configuration.redisMaxConnections(), configuration.redisMaxTotalConnections(), 
configuration.redisDBIndex(), redisCacheIOMonitor);
             closer.register(redisCache);
 
+            // OAK-12282: expose the bounded write-queue kill switch. Requires 
a restart to take effect.

Review Comment:
   ```suggestion
               // OAK-12282: expose the bounded write-queue kill switch. 
   ```
   
   That 2nd setence is a bit misleading
   



##########
oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java:
##########
@@ -112,6 +112,7 @@ public PersistentDiskCache(File directory, int 
cacheMaxSizeMB, DiskCacheIOMonito
                 () -> Long.valueOf(directory.listFiles().length),
                 () -> FileUtils.sizeOfDirectory(directory),
                 () -> evictionCount.get());
+        segmentCacheStats.setWriteDiscardCountSupplier(() -> 
discardCount.get());

Review Comment:
   Very inconsistent. I would extend the c'tor of ``SegmentCacheStats`` by one 
more parameter.



##########
oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java:
##########
@@ -31,27 +31,68 @@
 import java.io.Closeable;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 public abstract class AbstractPersistentCache implements PersistentCache, 
Closeable {
     private static final Logger logger = 
LoggerFactory.getLogger(AbstractPersistentCache.class);
 
     public static final int THREADS = 
Integer.getInteger("oak.segment.cache.threads", 10);
+    public static final int WRITE_QUEUE_SIZE = 
Integer.getInteger("oak.segment.cache.writeQueueSize", THREADS * 100);
 
     protected ExecutorService executor;
     protected AtomicLong cacheSize = new AtomicLong(0);
     protected PersistentCache nextCache;
     protected final Set<String> writesPending;
+    protected AtomicLong discardCount = new AtomicLong();
 
     protected SegmentCacheStats segmentCacheStats;
 
+    /**
+     * Name of the feature toggle for the OAK-12282 bounded write-queue fix.
+     * See {@link #FT_OAK_12282_BOUNDED_WRITE_QUEUE_ENABLED}.
+     */
+    public static final String FT_OAK_12282 = "FT_OAK-12282";
+
+    /**
+     * Whether the bounded write queue introduced in OAK-12282 is active.
+     * <p>
+     * When {@code true} (default), the executor uses a queue bounded to
+     * {@link #WRITE_QUEUE_SIZE} and silently discards write tasks when full,
+     * preventing OOM under high write load. This is safe because the disk
+     * cache is an optimisation only — a dropped write means the segment is
+     * fetched from remote storage on the next read.
+     * <p>
+     * Set to {@code false} via the {@link 
org.apache.jackrabbit.oak.spi.toggle.FeatureToggle}
+     * registered with the Whiteboard to revert to the pre-fix unbounded queue.
+     * <strong>Note:</strong> changing this flag requires a process restart, as
+     * the executor is created at startup.
+     */

Review Comment:
   I would remove this for consistency reasons (or we would need to annotate 
every configuration parameter for the cache with the same).



##########
oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java:
##########
@@ -31,27 +31,68 @@
 import java.io.Closeable;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 public abstract class AbstractPersistentCache implements PersistentCache, 
Closeable {
     private static final Logger logger = 
LoggerFactory.getLogger(AbstractPersistentCache.class);
 
     public static final int THREADS = 
Integer.getInteger("oak.segment.cache.threads", 10);
+    public static final int WRITE_QUEUE_SIZE = 
Integer.getInteger("oak.segment.cache.writeQueueSize", THREADS * 100);
 
     protected ExecutorService executor;
     protected AtomicLong cacheSize = new AtomicLong(0);
     protected PersistentCache nextCache;
     protected final Set<String> writesPending;
+    protected AtomicLong discardCount = new AtomicLong();
 
     protected SegmentCacheStats segmentCacheStats;
 
+    /**
+     * Name of the feature toggle for the OAK-12282 bounded write-queue fix.
+     * See {@link #FT_OAK_12282_BOUNDED_WRITE_QUEUE_ENABLED}.
+     */
+    public static final String FT_OAK_12282 = "FT_OAK-12282";
+
+    /**
+     * Whether the bounded write queue introduced in OAK-12282 is active.
+     * <p>
+     * When {@code true} (default), the executor uses a queue bounded to
+     * {@link #WRITE_QUEUE_SIZE} and silently discards write tasks when full,
+     * preventing OOM under high write load. This is safe because the disk
+     * cache is an optimisation only — a dropped write means the segment is
+     * fetched from remote storage on the next read.
+     * <p>
+     * Set to {@code false} via the {@link 
org.apache.jackrabbit.oak.spi.toggle.FeatureToggle}
+     * registered with the Whiteboard to revert to the pre-fix unbounded queue.
+     * <strong>Note:</strong> changing this flag requires a process restart, as
+     * the executor is created at startup.
+     */
+    public static final AtomicBoolean FT_OAK_12282_BOUNDED_WRITE_QUEUE_ENABLED 
= new AtomicBoolean(true);
+
     public AbstractPersistentCache() {
-        executor = Executors.newFixedThreadPool(THREADS);
+        // Formerly Executors.newFixedThreadPool() was used here, which 
creates an unbounded
+        // LinkedBlockingQueue — allowing unlimited segment buffers to pile up 
in memory under
+        // high write load. The bounded queue (gated by FT_OAK_12282) prevents 
OOM by dropping
+        // write tasks when full; this is safe because the disk cache is an 
optimisation only.

Review Comment:
   This comment is typical AI, but it won't help if in a year someone is 
looking at this code (If I want to understand what was here before, I can 
checkout the right git commit ...)
   
   
   
   ```suggestion
           // Limit the amount of Runnable in the queue to avoid memory issues. 
Discarding elements from
           // the queue is not a problem, as caching is just an optimization; 
if it's dropped, it will be downloaded
           // again and tried again.
   ```



##########
oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/SegmentCacheStats.java:
##########
@@ -78,6 +81,14 @@ protected CacheStats getCurrentStats() {
         );
     }
 
+    public void setWriteDiscardCountSupplier(@NotNull Supplier<Long> supplier) 
{

Review Comment:
   as mentioned above, I would add this to the constructor for consistency 
reasons



##########
oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java:
##########
@@ -31,27 +31,68 @@
 import java.io.Closeable;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 public abstract class AbstractPersistentCache implements PersistentCache, 
Closeable {
     private static final Logger logger = 
LoggerFactory.getLogger(AbstractPersistentCache.class);
 
     public static final int THREADS = 
Integer.getInteger("oak.segment.cache.threads", 10);
+    public static final int WRITE_QUEUE_SIZE = 
Integer.getInteger("oak.segment.cache.writeQueueSize", THREADS * 100);
 
     protected ExecutorService executor;
     protected AtomicLong cacheSize = new AtomicLong(0);
     protected PersistentCache nextCache;
     protected final Set<String> writesPending;
+    protected AtomicLong discardCount = new AtomicLong();
 
     protected SegmentCacheStats segmentCacheStats;
 
+    /**
+     * Name of the feature toggle for the OAK-12282 bounded write-queue fix.
+     * See {@link #FT_OAK_12282_BOUNDED_WRITE_QUEUE_ENABLED}.
+     */
+    public static final String FT_OAK_12282 = "FT_OAK-12282";
+
+    /**
+     * Whether the bounded write queue introduced in OAK-12282 is active.
+     * <p>
+     * When {@code true} (default), the executor uses a queue bounded to
+     * {@link #WRITE_QUEUE_SIZE} and silently discards write tasks when full,
+     * preventing OOM under high write load. This is safe because the disk
+     * cache is an optimisation only — a dropped write means the segment is
+     * fetched from remote storage on the next read.
+     * <p>
+     * Set to {@code false} via the {@link 
org.apache.jackrabbit.oak.spi.toggle.FeatureToggle}
+     * registered with the Whiteboard to revert to the pre-fix unbounded queue.
+     * <strong>Note:</strong> changing this flag requires a process restart, as
+     * the executor is created at startup.
+     */
+    public static final AtomicBoolean FT_OAK_12282_BOUNDED_WRITE_QUEUE_ENABLED 
= new AtomicBoolean(true);
+
     public AbstractPersistentCache() {
-        executor = Executors.newFixedThreadPool(THREADS);
+        // Formerly Executors.newFixedThreadPool() was used here, which 
creates an unbounded
+        // LinkedBlockingQueue — allowing unlimited segment buffers to pile up 
in memory under
+        // high write load. The bounded queue (gated by FT_OAK_12282) prevents 
OOM by dropping
+        // write tasks when full; this is safe because the disk cache is an 
optimisation only.
+        BlockingQueue<Runnable> writeQueue = 
FT_OAK_12282_BOUNDED_WRITE_QUEUE_ENABLED.get()
+                ? new LinkedBlockingQueue<>(WRITE_QUEUE_SIZE)
+                : new LinkedBlockingQueue<>();
+        executor = new ThreadPoolExecutor(
+                THREADS, THREADS,
+                0L, TimeUnit.MILLISECONDS,
+                writeQueue,
+                (r, e) -> {
+                    discardCount.incrementAndGet();
+                    logger.debug("Segment write task discarded: write queue 
full (capacity={})", WRITE_QUEUE_SIZE);

Review Comment:
   I would also log the number of discarded items from the queue
   
   ```suggestion
                       long discarded = discardCount.incrementAndGet();
                       logger.debug("Segment write task discarded: write queue 
full (capacity={}, already discarded={})", WRITE_QUEUE_SIZE, discarded);
   ```



##########
oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java:
##########
@@ -31,27 +31,68 @@
 import java.io.Closeable;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 public abstract class AbstractPersistentCache implements PersistentCache, 
Closeable {
     private static final Logger logger = 
LoggerFactory.getLogger(AbstractPersistentCache.class);
 
     public static final int THREADS = 
Integer.getInteger("oak.segment.cache.threads", 10);
+    public static final int WRITE_QUEUE_SIZE = 
Integer.getInteger("oak.segment.cache.writeQueueSize", THREADS * 100);
 
     protected ExecutorService executor;
     protected AtomicLong cacheSize = new AtomicLong(0);
     protected PersistentCache nextCache;
     protected final Set<String> writesPending;
+    protected AtomicLong discardCount = new AtomicLong();
 
     protected SegmentCacheStats segmentCacheStats;
 
+    /**
+     * Name of the feature toggle for the OAK-12282 bounded write-queue fix.
+     * See {@link #FT_OAK_12282_BOUNDED_WRITE_QUEUE_ENABLED}.
+     */
+    public static final String FT_OAK_12282 = "FT_OAK-12282";
+
+    /**
+     * Whether the bounded write queue introduced in OAK-12282 is active.
+     * <p>
+     * When {@code true} (default), the executor uses a queue bounded to
+     * {@link #WRITE_QUEUE_SIZE} and silently discards write tasks when full,
+     * preventing OOM under high write load. This is safe because the disk
+     * cache is an optimisation only — a dropped write means the segment is
+     * fetched from remote storage on the next read.
+     * <p>
+     * Set to {@code false} via the {@link 
org.apache.jackrabbit.oak.spi.toggle.FeatureToggle}
+     * registered with the Whiteboard to revert to the pre-fix unbounded queue.
+     * <strong>Note:</strong> changing this flag requires a process restart, as
+     * the executor is created at startup.
+     */
+    public static final AtomicBoolean FT_OAK_12282_BOUNDED_WRITE_QUEUE_ENABLED 
= new AtomicBoolean(true);
+
     public AbstractPersistentCache() {
-        executor = Executors.newFixedThreadPool(THREADS);
+        // Formerly Executors.newFixedThreadPool() was used here, which 
creates an unbounded
+        // LinkedBlockingQueue — allowing unlimited segment buffers to pile up 
in memory under
+        // high write load. The bounded queue (gated by FT_OAK_12282) prevents 
OOM by dropping
+        // write tasks when full; this is safe because the disk cache is an 
optimisation only.
+        BlockingQueue<Runnable> writeQueue = 
FT_OAK_12282_BOUNDED_WRITE_QUEUE_ENABLED.get()
+                ? new LinkedBlockingQueue<>(WRITE_QUEUE_SIZE)
+                : new LinkedBlockingQueue<>();
+        executor = new ThreadPoolExecutor(

Review Comment:
   right now these 10 threads are named quite randomly, and therefor they are 
hard to detect; can you use a ThreadFactory to name them accordingly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to