jpountz commented on code in PR #13190:
URL: https://github.com/apache/lucene/pull/13190#discussion_r1531735553


##########
lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java:
##########
@@ -446,7 +454,13 @@ private static String rateToString(double mbPerSec) {
 
   @Override
   public void close() {
-    sync();
+    try {

Review Comment:
   should we call `super.close()` as well, to close `MergeScheduler`'s 
`SameThreadExecutorService` which `ConcurrentMergeScheduler` uses for small 
merges?
   
   It's not a big deal not to close it, but it would help catch if we ever send 
tasks to this executor after closing.



##########
lucene/core/src/java/org/apache/lucene/index/MergeRateLimiter.java:
##########
@@ -118,24 +118,32 @@ private long maybePause(long bytes, long curNS) throws 
MergePolicy.MergeAbortedE
       throw new MergePolicy.MergeAbortedException("Merge aborted.");
     }
 
-    double rate = mbPerSec; // read from volatile rate once.
-    double secondsToPause = (bytes / 1024. / 1024.) / rate;
-
-    // Time we should sleep until; this is purely instantaneous
-    // rate (just adds seconds onto the last time we had paused to);
-    // maybe we should also offer decayed recent history one?
-    long targetNS = lastNS + (long) (1000000000 * secondsToPause);
-
-    long curPauseNS = targetNS - curNS;
-
-    // We don't bother with thread pausing if the pause is smaller than 2 msec.
-    if (curPauseNS <= MIN_PAUSE_NS) {
-      // Set to curNS, not targetNS, to enforce the instant rate, not
-      // the "averaged over all history" rate:
-      lastNS = curNS;
+    final double rate = mbPerSec; // read from volatile rate once.
+    final double secondsToPause = (bytes / 1024. / 1024.) / rate;
+
+    AtomicLong curPauseNSSetter = new AtomicLong();
+    lastNS.updateAndGet(

Review Comment:
   >  the output isn't really throttled until a single thread exceeds the limit.
   
   This limitation feels ok to me, let's just add a comment about it? 
Intuitively, the write rate at merge time is rather bursty, so if the sum of 
the bytes written by all threads running this merge exceeds the limit, then 
there would often be one thread that exceeds the limit on its own as well.



##########
lucene/core/src/java/org/apache/lucene/index/MergeRateLimiter.java:
##########
@@ -118,24 +118,32 @@ private long maybePause(long bytes, long curNS) throws 
MergePolicy.MergeAbortedE
       throw new MergePolicy.MergeAbortedException("Merge aborted.");
     }
 
-    double rate = mbPerSec; // read from volatile rate once.
-    double secondsToPause = (bytes / 1024. / 1024.) / rate;
-
-    // Time we should sleep until; this is purely instantaneous
-    // rate (just adds seconds onto the last time we had paused to);
-    // maybe we should also offer decayed recent history one?
-    long targetNS = lastNS + (long) (1000000000 * secondsToPause);
-
-    long curPauseNS = targetNS - curNS;
-
-    // We don't bother with thread pausing if the pause is smaller than 2 msec.
-    if (curPauseNS <= MIN_PAUSE_NS) {
-      // Set to curNS, not targetNS, to enforce the instant rate, not
-      // the "averaged over all history" rate:
-      lastNS = curNS;
+    final double rate = mbPerSec; // read from volatile rate once.
+    final double secondsToPause = (bytes / 1024. / 1024.) / rate;
+
+    AtomicLong curPauseNSSetter = new AtomicLong();
+    lastNS.updateAndGet(
+        last -> {
+          // Time we should sleep until; this is purely instantaneous
+          // rate (just adds seconds onto the last time we had paused to);
+          // maybe we should also offer decayed recent history one?
+          long targetNS = last + (long) (1000000000 * secondsToPause);
+          long curPauseNS = targetNS - curNS;
+          // We don't bother with thread pausing if the pause is smaller than 
2 msec.
+          if (curPauseNS <= MIN_PAUSE_NS) {
+            // Set to curNS, not targetNS, to enforce the instant rate, not
+            // the "averaged over all history" rate:
+            curPauseNSSetter.set(0);
+            return curNS;
+          }

Review Comment:
   It's hard for me to reason about this as well.
   
   I'm wondering about keeping `maybePause` as-is and making 
`MergeRateLimiter#pause` synchronized, essentially trying to make the pausing 
logic behave as if threads were writing bytes sequentially rather than in 
parallel. (I'm considering making `pause` synchronized rather than `maybePause` 
so that `System.nanoTime()` is computed within the lock and the pausing logic 
accounts for the fact that some time may have been spent waiting on the lock 
already.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to