nfsantos commented on code in PR #2183:
URL: https://github.com/apache/jackrabbit-oak/pull/2183#discussion_r2007171388


##########
oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java:
##########
@@ -38,148 +40,212 @@
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
-import java.util.Set;
+import java.util.OptionalLong;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Phaser;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
-class ElasticBulkProcessorHandler {
+public class ElasticBulkProcessorHandler {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ElasticBulkProcessorHandler.class);
-    private static final int FAILED_DOC_COUNT_FOR_STATUS_NODE = 
Integer.getInteger("oak.failedDocStatusLimit", 10000);
-
-    private static final int BULK_PROCESSOR_CONCURRENCY =
-            Integer.getInteger("oak.indexer.elastic.bulkProcessorConcurrency", 
1);
-    private static final String SYNC_MODE_PROPERTY = "sync-mode";
-    private static final String SYNC_RT_MODE = "rt";
-
-    protected final ElasticConnection elasticConnection;
-    protected final String indexName;
-    protected final ElasticIndexDefinition indexDefinition;
-    private final NodeBuilder definitionBuilder;
-    protected final BulkIngester<String> bulkIngester;
-    private final boolean waitForESAcknowledgement;
 
     /**
-     * Coordinates communication between bulk processes. It has a main 
controller registered at creation time and
-     * de-registered on {@link ElasticIndexWriter#close(long)}. Each bulk 
request register a new party in
-     * this Phaser in {@link OakBulkListener#beforeBulk(long, BulkRequest, 
List)} and de-register itself when
-     * the request returns.
+     * Keeps information about an index that is being written by the bulk 
processor
      */
-    private final Phaser phaser = new Phaser(1); // register main controller
-
-    /**
-     * Exceptions occurred while trying to update index in elasticsearch
-     */
-    private final ConcurrentLinkedQueue<ErrorCause> suppressedErrorCauses = 
new ConcurrentLinkedQueue<>();
+    static class IndexInfo {
+        public final String indexName;
+        public final ElasticIndexDefinition indexDefinition;
+        public final NodeBuilder definitionBuilder;
+        public final boolean waitForESAcknowledgement;
+        public final boolean isRealTime;
+        /**
+         * Exceptions occurred while trying to update index in elasticsearch
+         */
+        public final ConcurrentLinkedQueue<ErrorCause> suppressedErrorCauses = 
new ConcurrentLinkedQueue<>();
+
+        long indexOperations = 0;
+        long deleteOperations = 0;
+        long updateOperations = 0;
+        boolean indexModified = false;
+
+        IndexInfo(String indexName, ElasticIndexDefinition indexDefinition, 
NodeBuilder definitionBuilder, boolean waitForESAcknowledgement, boolean 
isRealTime) {
+            this.indexName = indexName;
+            this.indexDefinition = indexDefinition;
+            this.definitionBuilder = definitionBuilder;
+            this.waitForESAcknowledgement = waitForESAcknowledgement;
+            this.isRealTime = isRealTime;
+        }
+    }
 
     /**
-     * Key-value structure to keep the history of bulk requests. Keys are the 
bulk execution ids, the boolean
-     * value is {@code true} when at least an update is performed, otherwise 
{@code false}.
+     * Context object associated with each operation passed to the bulk 
processor
      */
-    private final ConcurrentHashMap<Long, Boolean> updatesMap = new 
ConcurrentHashMap<>();
+    public final static class OperationContext {
+        final IndexInfo indexInfo;
+        final String documentId;
 
-    protected long totalOperations;
+        OperationContext(IndexInfo indexInfo, String documentId) {
+            this.indexInfo = indexInfo;
+            this.documentId = documentId;
+        }
 
-    private ElasticBulkProcessorHandler(@NotNull ElasticConnection 
elasticConnection,
-                                        @NotNull String indexName,
-                                        @NotNull ElasticIndexDefinition 
indexDefinition,
-                                        @NotNull NodeBuilder definitionBuilder,
-                                        boolean waitForESAcknowledgement) {
-        this.elasticConnection = elasticConnection;
-        this.indexName = indexName;
-        this.indexDefinition = indexDefinition;
-        this.definitionBuilder = definitionBuilder;
-        this.waitForESAcknowledgement = waitForESAcknowledgement;
-        this.bulkIngester = initBulkIngester();
+        @Override
+        public String toString() {
+            return "OperationContext{" +
+                    "indexInfo=" + indexInfo.indexName +
+                    ", documentId='" + documentId + '\'' +
+                    '}';
+        }
     }
 
-    /**
-     * Returns an ElasticBulkProcessorHandler instance based on the index 
definition configuration.
-     * <p>
-     * The `sync-mode` property can be set to `rt` (real-time). In this case 
the returned handler will be real-time.
-     * This option is available for sync index definitions only.
-     */
-    public static ElasticBulkProcessorHandler getBulkProcessorHandler(@NotNull 
ElasticConnection elasticConnection,
-                                                                      @NotNull 
String indexName,
-                                                                      @NotNull 
ElasticIndexDefinition indexDefinition,
-                                                                      @NotNull 
NodeBuilder definitionBuilder, CommitInfo commitInfo,
-                                                                      boolean 
waitForESAcknowledgement) {
-        PropertyState async = 
indexDefinition.getDefinitionNodeState().getProperty("async");
-
-        if (async != null) {
-            return new ElasticBulkProcessorHandler(elasticConnection, 
indexName, indexDefinition, definitionBuilder, waitForESAcknowledgement);
-        }
+    public static final String BULK_ACTIONS_PROP = 
"oak.indexer.elastic.bulkProcessor.maxBulkOperations";
+    public static final int BULK_ACTIONS_DEFAULT = 8192;
+    public static final String BULK_SIZE_BYTES_PROP = 
"oak.indexer.elastic.bulkProcessor.maxBulkSizeBytes";
+    public static final int BULK_SIZE_BYTES_DEFAULT = 8 * 1024 * 1024; // 8MB
+    public static final String BULK_FLUSH_INTERVAL_MS_PROP = 
"oak.indexer.elastic.bulkProcessor.bulkFlushIntervalMs";
+    public static final int BULK_FLUSH_INTERVAL_MS_DEFAULT = 3000;
+    public static final String BULK_MAX_CONCURRENT_REQUESTS_PROP = 
"oak.indexer.elastic.bulkProcessor.maxConcurrentRequests";
+    private static final int BULK_MAX_CONCURRENT_REQUESTS_DEFAULT = 1;
+    // when true, fails indexing in case of bulk failures
+    public static final String FAIL_ON_ERROR_PROP = 
"oak.indexer.elastic.bulkProcessor.failOnError";
+    public static final boolean FAIL_ON_ERROR_DEFAULT = true;
 
-        // commit-info has priority over configuration in index definition
-        String syncMode = null;
-        if (commitInfo != null) {
-            syncMode = (String) commitInfo.getInfo().get(SYNC_MODE_PROPERTY);
-        }
+    private static final String SYNC_MODE_PROPERTY = "sync-mode";
+    private static final String SYNC_RT_MODE = "rt";
+    private static final int MAX_SUPPRESSED_ERROR_CAUSES = 50;
 
-        if (syncMode == null) {
-            PropertyState syncModeProp = 
indexDefinition.getDefinitionNodeState().getProperty("sync-mode");
-            if (syncModeProp != null) {
-                syncMode = syncModeProp.getValue(Type.STRING);
-            }
-        }
+    private final int FAILED_DOC_COUNT_FOR_STATUS_NODE = 
ConfigHelper.getSystemPropertyAsInt("oak.failedDocStatusLimit", 10000);
+    private final int BULK_MAX_OPERATIONS = 
ConfigHelper.getSystemPropertyAsInt(BULK_ACTIONS_PROP, BULK_ACTIONS_DEFAULT);
+    private final int BULK_MAX_SIZE_BYTES = 
ConfigHelper.getSystemPropertyAsInt(BULK_SIZE_BYTES_PROP, 
BULK_SIZE_BYTES_DEFAULT);
+    private final int BULK_FLUSH_INTERVAL_MS = 
ConfigHelper.getSystemPropertyAsInt(BULK_FLUSH_INTERVAL_MS_PROP, 
BULK_FLUSH_INTERVAL_MS_DEFAULT);
+    private final int BULK_MAX_CONCURRENT_REQUESTS = 
ConfigHelper.getSystemPropertyAsInt(BULK_MAX_CONCURRENT_REQUESTS_PROP, 
BULK_MAX_CONCURRENT_REQUESTS_DEFAULT);
+    private final boolean FAIL_ON_ERROR = 
ConfigHelper.getSystemPropertyAsBoolean(FAIL_ON_ERROR_PROP, 
FAIL_ON_ERROR_DEFAULT);
 
-        if (SYNC_RT_MODE.equals(syncMode)) {
-            return new RealTimeBulkProcessorHandler(elasticConnection, 
indexName, indexDefinition, definitionBuilder, waitForESAcknowledgement);
-        }
+    private final ElasticConnection elasticConnection;
+    private final BulkIngester<OperationContext> bulkIngester;
 
-        return new ElasticBulkProcessorHandler(elasticConnection, indexName, 
indexDefinition, definitionBuilder, waitForESAcknowledgement);
-    }
+    // Used to keep track of the sequence number of the batches that are 
currently being processed.
+    // This is used to wait until all operations for a writer are processed 
before closing it.
+    private final ReentrantLock lock = new ReentrantLock();
+    private final Condition bulkProcessedCondition = lock.newCondition();
+    private final HashSet<Long> pendingBulks = new HashSet<>();
+
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final ConcurrentHashMap<String, IndexInfo> registeredIndexes = new 
ConcurrentHashMap<>();
+    private final ConcurrentLinkedQueue<ErrorCause> 
globalSuppressedErrorCauses = new ConcurrentLinkedQueue<>();
 
-    private BulkIngester<String> initBulkIngester() {
+    // Time blocked waiting to add operations to the bulk processor.
+    private final long startTime = System.nanoTime();
+    private long totalWaitTimeNanos = 0;
+
+    public ElasticBulkProcessorHandler(@NotNull ElasticConnection 
elasticConnection) {
+        this.elasticConnection = elasticConnection;
         // BulkIngester does not support retry policies. Some retries though 
are already implemented in the transport layer.
         // More details here: 
https://github.com/elastic/elasticsearch-java/issues/478
-        return BulkIngester.of(b -> {
+        LOG.info("Creating bulk ingester [maxActions: {}, maxSizeBytes: {} 
flushInterval {}, concurrency {}]",
+                BULK_MAX_OPERATIONS, BULK_MAX_SIZE_BYTES, 
BULK_FLUSH_INTERVAL_MS, BULK_MAX_CONCURRENT_REQUESTS_PROP);
+        this.bulkIngester = BulkIngester.of(b -> {
             b = b.client(elasticConnection.getAsyncClient())
                     .listener(new OakBulkListener());
-            if (indexDefinition.bulkActions > 0) {
-                b = b.maxOperations(indexDefinition.bulkActions);
+            if (BULK_MAX_OPERATIONS > 0) {
+                b = b.maxOperations(BULK_MAX_OPERATIONS);
             }
-            if (indexDefinition.bulkSizeBytes > 0) {
-                b = b.maxSize(indexDefinition.bulkSizeBytes);
+            if (BULK_MAX_SIZE_BYTES > 0) {
+                b = b.maxSize(BULK_MAX_SIZE_BYTES);
             }
-            if (indexDefinition.bulkFlushIntervalMs > 0) {
-                b = b.flushInterval(indexDefinition.bulkFlushIntervalMs, 
TimeUnit.MILLISECONDS);
+            if (BULK_FLUSH_INTERVAL_MS > 0) {
+                b = b.flushInterval(BULK_FLUSH_INTERVAL_MS, 
TimeUnit.MILLISECONDS);
             }
-
-            return b.maxConcurrentRequests(BULK_PROCESSOR_CONCURRENCY);
+            if (BULK_MAX_CONCURRENT_REQUESTS > 0) {
+                b = b.maxConcurrentRequests(BULK_MAX_CONCURRENT_REQUESTS);
+            }
+            return b;
         });
     }
 
-    private void checkFailures() throws IOException {
-        if (!suppressedErrorCauses.isEmpty()) {
-            IOException ioe = new IOException("Exception while indexing. See 
suppressed for details");
-            suppressedErrorCauses.stream().map(ec -> new 
IllegalStateException(ec.reason())).forEach(ioe::addSuppressed);
-            throw ioe;
+    /**
+     * Registers an ElasticIndex with the given index definition configuration.
+     * <p>
+     * The `sync-mode` property can be set to `rt` (real-time). In this case 
the returned handler will be real-time.
+     * This option is available for sync index definitions only.
+     */
+    public void registerIndex(String indexName, ElasticIndexDefinition 
indexDefinition, NodeBuilder definitionBuilder, CommitInfo commitInfo, boolean 
waitForESAcknowledgement) {
+        checkOpen();
+        if (registeredIndexes.containsKey(indexName)) {
+            LOG.warn("Index already registered: {}", indexName);

Review Comment:
   For each update cycle, writers are registered at the start and unregistered 
at the end. And if my understanding is correctly, when a writer is created, the 
index definition is read again. I try to explain the process below, could you 
please check if it makes sense?
   
   The incremental indexing is managed by the `AsyncIndexUpdate`, and initiated 
by the method `runWhenPermittted()`
   
https://github.com/apache/jackrabbit-oak/blob/55bc3e17a6c1fcff3e570935ae028f1977a626a1/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java#L488
   
   This calls `updateIndex()`, which creates a new `IndexUpdate` instance:
   
https://github.com/apache/jackrabbit-oak/blob/55bc3e17a6c1fcff3e570935ae028f1977a626a1/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java#L810
   
   This is what actually does the index update. Each of these instances, 
creates new writers at the start, in the `enter()` method:
   
https://github.com/apache/jackrabbit-oak/blob/55bc3e17a6c1fcff3e570935ae028f1977a626a1/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java#L175
   
   If I understand the code correctly, new writers will read the definition 
from the NodeStore and should pick up changes:
   
https://github.com/apache/jackrabbit-oak/blob/55bc3e17a6c1fcff3e570935ae028f1977a626a1/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java#L320
   
https://github.com/apache/jackrabbit-oak/blob/55bc3e17a6c1fcff3e570935ae028f1977a626a1/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexEditorProvider.java#L69
   
   The writers are closed here in FulltextIndexEditor, by the `leave` method 
when leaving the root node:
   
https://github.com/apache/jackrabbit-oak/blob/55bc3e17a6c1fcff3e570935ae028f1977a626a1/oak-search/src/main/java/org/apache/jackrabbit/oak/plugins/index/search/spi/editor/FulltextIndexEditor.java#L153
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: oak-dev-unsubscr...@jackrabbit.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to