jpountz commented on code in PR #13124:
URL: https://github.com/apache/lucene/pull/13124#discussion_r1501838905


##########
lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java:
##########
@@ -910,4 +936,58 @@ public void setSuppressExceptions(ConcurrentMergeScheduler 
cms) {
           }
         });
   }
+
+  private class ScaledExecutor extends ThreadPoolExecutor {

Review Comment:
   nit: I usually have a preference for composition over inheritance, ie. could 
we wrap the thread-pool executor instead of wrapping it?



##########
lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java:
##########
@@ -910,4 +936,58 @@ public void setSuppressExceptions(ConcurrentMergeScheduler 
cms) {
           }
         });
   }
+
+  private class ScaledExecutor extends ThreadPoolExecutor {
+
+    AtomicInteger activeCount = new AtomicInteger(0);
+
+    public ScaledExecutor() {
+      super(
+          Math.max(0, maxThreadCount - 1),
+          Math.max(1, maxThreadCount - 1),
+          Long.MAX_VALUE,
+          TimeUnit.NANOSECONDS,
+          new SynchronousQueue<>());
+    }
+
+    private void updatePoolSize() {
+      int newMax = Math.max(0, maxThreadCount - 1);
+      if (newMax > getCorePoolSize()) {
+        setMaximumPoolSize(Math.max(newMax, 1));
+        setCorePoolSize(newMax);
+      } else {
+        setCorePoolSize(newMax);
+        setMaximumPoolSize(Math.max(newMax, 1));
+      }
+    }
+
+    boolean incrementUpTo(int max) {
+      while (true) {
+        int value = activeCount.get();
+        if (value >= max) {
+          return false;
+        }
+        if (activeCount.compareAndSet(value, value + 1)) {
+          return true;
+        }

Review Comment:
   If I read correctly, this tries to keep `activeCount <= maxThreadCount`. I 
was thinking we should try to keep `activeCount <= maxThreadCount - 
mergeThreadCount()`. Otherwise we're effectively using more than 
`maxThreadCount` for merging in total, which I find a bit surprising?



##########
lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java:
##########
@@ -910,4 +936,58 @@ public void setSuppressExceptions(ConcurrentMergeScheduler 
cms) {
           }
         });
   }
+
+  private class ScaledExecutor extends ThreadPoolExecutor {
+
+    AtomicInteger activeCount = new AtomicInteger(0);
+
+    public ScaledExecutor() {
+      super(
+          Math.max(0, maxThreadCount - 1),
+          Math.max(1, maxThreadCount - 1),
+          Long.MAX_VALUE,
+          TimeUnit.NANOSECONDS,
+          new SynchronousQueue<>());
+    }

Review Comment:
   Thinking out loud: for the case when tens of index writers are open in the 
same JVM, we may want to configure a timeout on threads in order to avoid 
spending too much heap on idle threads?



##########
lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java:
##########
@@ -52,6 +56,14 @@ public Directory wrapForMerge(OneMerge merge, Directory in) {
     return in;
   }
 
+  /**
+   * Provides an executor for parallelism during a single merge operation. By 
default, this method
+   * returns an executor that runs tasks in the calling thread.
+   */
+  public Executor getInterMergeExecutor(OneMerge merge) {

Review Comment:
   Should it be get**Intra**MergeExecutor?



##########
lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java:
##########
@@ -52,4 +53,9 @@ public Directory wrapForMerge(OneMerge merge, Directory in) {
   public MergeScheduler clone() {
     return this;
   }
+
+  @Override
+  public Executor getInterMergeExecutor(OneMerge merge) {
+    return null;

Review Comment:
   nit: throw instead?



##########
lucene/core/src/java/org/apache/lucene/index/SegmentMerger.java:
##########
@@ -56,13 +58,19 @@ final class SegmentMerger {
       InfoStream infoStream,
       Directory dir,
       FieldInfos.FieldNumbers fieldNumbers,
-      IOContext context)
+      IOContext context,
+      Executor parallelMergeTaskExecutor)
       throws IOException {
     if (context.context != IOContext.Context.MERGE) {
       throw new IllegalArgumentException(
           "IOContext.context should be MERGE; got: " + context.context);
     }
-    mergeState = new MergeState(readers, segmentInfo, infoStream);
+    mergeState =
+        new MergeState(
+            readers,
+            segmentInfo,
+            infoStream,
+            parallelMergeTaskExecutor == null ? null : new 
TaskExecutor(parallelMergeTaskExecutor));

Review Comment:
   Nit: it's a bit weird to use a class from the search package for merging 
(TaskExecutor). Should merging get access to the raw Executor, which is a bit 
more flexible (I don't know if all formats will be able to split work into a 
list of tasks up-front)? Vectors could still wrap inside a `TaskExecutor` for 
convenience?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to