jpountz commented on code in PR #13124:
URL: https://github.com/apache/lucene/pull/13124#discussion_r1525217090
##########
lucene/core/src/java/org/apache/lucene/index/SegmentMerger.java:
##########
@@ -130,19 +135,31 @@ MergeState merge() throws IOException {
IOContext.READ,
segmentWriteState.segmentSuffix);
+ TaskExecutor taskExecutor = new
TaskExecutor(mergeState.intraMergeTaskExecutor);
+ List<Callable<Void>> mergingTasks = new ArrayList<>();
+
if (mergeState.mergeFieldInfos.hasNorms()) {
mergeWithLogging(this::mergeNorms, segmentWriteState, segmentReadState,
"norms", numMerged);
}
mergeWithLogging(this::mergeTerms, segmentWriteState, segmentReadState,
"postings", numMerged);
Review Comment:
Can we have a parallel task that handles norms + terms so that the order is
respected?
##########
lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java:
##########
@@ -902,12 +932,57 @@ private static String getSegmentName(MergePolicy.OneMerge
merge) {
}
static {
- TestSecrets.setConcurrentMergeSchedulerAccess(
- new ConcurrentMergeSchedulerAccess() {
- @Override
- public void setSuppressExceptions(ConcurrentMergeScheduler cms) {
- cms.setSuppressExceptions();
+
TestSecrets.setConcurrentMergeSchedulerAccess(ConcurrentMergeScheduler::setSuppressExceptions);
+ }
+
+ private class ScaledExecutor implements Executor {
+
+ private final AtomicInteger activeCount = new AtomicInteger(0);
+ private final ThreadPoolExecutor executor;
+
+ public ScaledExecutor() {
+ this.executor =
+ new ThreadPoolExecutor(0, 1024, 1L, TimeUnit.MINUTES, new
SynchronousQueue<>());
+ }
+
+ void shutdown() {
+ executor.shutdown();
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ assert mergeThreads.contains(Thread.currentThread()) : "caller is not a
merge thread";
Review Comment:
I'd expect this assertion to no longer be valid, since SegmentMerger may
fork into this executor for vectors, and then the task for vectors may want to
further fork into a separate thread? So the caller may be either a MergeThread,
or a thread from the wrapper thread pool?
##########
lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java:
##########
@@ -910,4 +936,55 @@ public void setSuppressExceptions(ConcurrentMergeScheduler
cms) {
}
});
}
+
+ private class ScaledExecutor implements Executor {
Review Comment:
nit: should it be called `CachedExecutor` now?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]