nfsantos commented on code in PR #2978:
URL: https://github.com/apache/jackrabbit-oak/pull/2978#discussion_r3497533549


##########
oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java:
##########
@@ -31,27 +31,68 @@
 import java.io.Closeable;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 public abstract class AbstractPersistentCache implements PersistentCache, 
Closeable {
     private static final Logger logger = 
LoggerFactory.getLogger(AbstractPersistentCache.class);
 
     public static final int THREADS = 
Integer.getInteger("oak.segment.cache.threads", 10);
+    public static final int WRITE_QUEUE_SIZE = 
Integer.getInteger("oak.segment.cache.writeQueueSize", THREADS * 100);

Review Comment:
   Writing to disk will be limited to the performance of the IO system and does 
not scale much with the number of threads that are writing. In fact, a single 
thread writing efficiently to disk will likely saturate the disk. The max queue 
size should be a function of the write IO of the disk and of the memory that we 
want to devote to buffering pending writes, it should not depend on the number 
of write threads. And we also don't want to cause an OOME only by increasing 
the number of threads. The number of writers and the size of the queue should 
be separate settings.
   
   Since segments are around 256KB, a queue size of 100 would take up 25MB when 
full. I'd say this is enough, as writes to a local SSD should be faster than 
reading from the network, so a few write threads should be able to keep up with 
whatever we can get from Azure. 



##########
oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java:
##########
@@ -31,27 +31,68 @@
 import java.io.Closeable;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 public abstract class AbstractPersistentCache implements PersistentCache, 
Closeable {
     private static final Logger logger = 
LoggerFactory.getLogger(AbstractPersistentCache.class);
 
     public static final int THREADS = 
Integer.getInteger("oak.segment.cache.threads", 10);
+    public static final int WRITE_QUEUE_SIZE = 
Integer.getInteger("oak.segment.cache.writeQueueSize", THREADS * 100);
 
     protected ExecutorService executor;
     protected AtomicLong cacheSize = new AtomicLong(0);
     protected PersistentCache nextCache;
     protected final Set<String> writesPending;
+    protected AtomicLong discardCount = new AtomicLong();
 
     protected SegmentCacheStats segmentCacheStats;
 
+    /**
+     * Name of the feature toggle for the OAK-12282 bounded write-queue fix.
+     * See {@link #FT_OAK_12282_BOUNDED_WRITE_QUEUE_ENABLED}.
+     */
+    public static final String FT_OAK_12282 = "FT_OAK-12282";
+
+    /**
+     * Whether the bounded write queue introduced in OAK-12282 is active.
+     * <p>
+     * When {@code true} (default), the executor uses a queue bounded to
+     * {@link #WRITE_QUEUE_SIZE} and silently discards write tasks when full,
+     * preventing OOM under high write load. This is safe because the disk
+     * cache is an optimisation only — a dropped write means the segment is
+     * fetched from remote storage on the next read.
+     * <p>
+     * Set to {@code false} via the {@link 
org.apache.jackrabbit.oak.spi.toggle.FeatureToggle}
+     * registered with the Whiteboard to revert to the pre-fix unbounded queue.
+     * <strong>Note:</strong> changing this flag requires a process restart, as
+     * the executor is created at startup.
+     */
+    public static final AtomicBoolean FT_OAK_12282_BOUNDED_WRITE_QUEUE_ENABLED 
= new AtomicBoolean(true);
+
     public AbstractPersistentCache() {
-        executor = Executors.newFixedThreadPool(THREADS);
+        // Formerly Executors.newFixedThreadPool() was used here, which 
creates an unbounded
+        // LinkedBlockingQueue — allowing unlimited segment buffers to pile up 
in memory under
+        // high write load. The bounded queue (gated by FT_OAK_12282) prevents 
OOM by dropping
+        // write tasks when full; this is safe because the disk cache is an 
optimisation only.
+        BlockingQueue<Runnable> writeQueue = 
FT_OAK_12282_BOUNDED_WRITE_QUEUE_ENABLED.get()
+                ? new LinkedBlockingQueue<>(WRITE_QUEUE_SIZE)
+                : new LinkedBlockingQueue<>();
+        executor = new ThreadPoolExecutor(
+                THREADS, THREADS,
+                0L, TimeUnit.MILLISECONDS,
+                writeQueue,
+                (r, e) -> {
+                    discardCount.incrementAndGet();
+                    logger.debug("Segment write task discarded: write queue 
full (capacity={})", WRITE_QUEUE_SIZE);

Review Comment:
   This log should be throttled, as otherwise we risk flooding the logs if for 
some reason the write threads cannot keep up with the reads from Azure. See the 
LogSilencer class.
   



##########
oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java:
##########
@@ -31,27 +31,68 @@
 import java.io.Closeable;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 public abstract class AbstractPersistentCache implements PersistentCache, 
Closeable {
     private static final Logger logger = 
LoggerFactory.getLogger(AbstractPersistentCache.class);
 
     public static final int THREADS = 
Integer.getInteger("oak.segment.cache.threads", 10);
+    public static final int WRITE_QUEUE_SIZE = 
Integer.getInteger("oak.segment.cache.writeQueueSize", THREADS * 100);
 
     protected ExecutorService executor;
     protected AtomicLong cacheSize = new AtomicLong(0);
     protected PersistentCache nextCache;
     protected final Set<String> writesPending;
+    protected AtomicLong discardCount = new AtomicLong();
 
     protected SegmentCacheStats segmentCacheStats;
 
+    /**
+     * Name of the feature toggle for the OAK-12282 bounded write-queue fix.
+     * See {@link #FT_OAK_12282_BOUNDED_WRITE_QUEUE_ENABLED}.
+     */
+    public static final String FT_OAK_12282 = "FT_OAK-12282";

Review Comment:
   Personally, I think this is unnecessary. The change is quite safe. And if 
for some reason having a bound on the queue causes trouble, we can increase 
it's size to be effectively unlimited. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to