nfsantos commented on code in PR #2183: URL: https://github.com/apache/jackrabbit-oak/pull/2183#discussion_r2007171388
########## oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java: ########## @@ -38,148 +40,212 @@ import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; -import java.util.Set; +import java.util.OptionalLong; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; -class ElasticBulkProcessorHandler { +public class ElasticBulkProcessorHandler { private static final Logger LOG = LoggerFactory.getLogger(ElasticBulkProcessorHandler.class); - private static final int FAILED_DOC_COUNT_FOR_STATUS_NODE = Integer.getInteger("oak.failedDocStatusLimit", 10000); - - private static final int BULK_PROCESSOR_CONCURRENCY = - Integer.getInteger("oak.indexer.elastic.bulkProcessorConcurrency", 1); - private static final String SYNC_MODE_PROPERTY = "sync-mode"; - private static final String SYNC_RT_MODE = "rt"; - - protected final ElasticConnection elasticConnection; - protected final String indexName; - protected final ElasticIndexDefinition indexDefinition; - private final NodeBuilder definitionBuilder; - protected final BulkIngester<String> bulkIngester; - private final boolean waitForESAcknowledgement; /** - * Coordinates communication between bulk processes. It has a main controller registered at creation time and - * de-registered on {@link ElasticIndexWriter#close(long)}. Each bulk request register a new party in - * this Phaser in {@link OakBulkListener#beforeBulk(long, BulkRequest, List)} and de-register itself when - * the request returns. + * Keeps information about an index that is being written by the bulk processor */ - private final Phaser phaser = new Phaser(1); // register main controller - - /** - * Exceptions occurred while trying to update index in elasticsearch - */ - private final ConcurrentLinkedQueue<ErrorCause> suppressedErrorCauses = new ConcurrentLinkedQueue<>(); + static class IndexInfo { + public final String indexName; + public final ElasticIndexDefinition indexDefinition; + public final NodeBuilder definitionBuilder; + public final boolean waitForESAcknowledgement; + public final boolean isRealTime; + /** + * Exceptions occurred while trying to update index in elasticsearch + */ + public final ConcurrentLinkedQueue<ErrorCause> suppressedErrorCauses = new ConcurrentLinkedQueue<>(); + + long indexOperations = 0; + long deleteOperations = 0; + long updateOperations = 0; + boolean indexModified = false; + + IndexInfo(String indexName, ElasticIndexDefinition indexDefinition, NodeBuilder definitionBuilder, boolean waitForESAcknowledgement, boolean isRealTime) { + this.indexName = indexName; + this.indexDefinition = indexDefinition; + this.definitionBuilder = definitionBuilder; + this.waitForESAcknowledgement = waitForESAcknowledgement; + this.isRealTime = isRealTime; + } + } /** - * Key-value structure to keep the history of bulk requests. Keys are the bulk execution ids, the boolean - * value is {@code true} when at least an update is performed, otherwise {@code false}. + * Context object associated with each operation passed to the bulk processor */ - private final ConcurrentHashMap<Long, Boolean> updatesMap = new ConcurrentHashMap<>(); + public final static class OperationContext { + final IndexInfo indexInfo; + final String documentId; - protected long totalOperations; + OperationContext(IndexInfo indexInfo, String documentId) { + this.indexInfo = indexInfo; + this.documentId = documentId; + } - private ElasticBulkProcessorHandler(@NotNull ElasticConnection elasticConnection, - @NotNull String indexName, - @NotNull ElasticIndexDefinition indexDefinition, - @NotNull NodeBuilder definitionBuilder, - boolean waitForESAcknowledgement) { - this.elasticConnection = elasticConnection; - this.indexName = indexName; - this.indexDefinition = indexDefinition; - this.definitionBuilder = definitionBuilder; - this.waitForESAcknowledgement = waitForESAcknowledgement; - this.bulkIngester = initBulkIngester(); + @Override + public String toString() { + return "OperationContext{" + + "indexInfo=" + indexInfo.indexName + + ", documentId='" + documentId + '\'' + + '}'; + } } - /** - * Returns an ElasticBulkProcessorHandler instance based on the index definition configuration. - * <p> - * The `sync-mode` property can be set to `rt` (real-time). In this case the returned handler will be real-time. - * This option is available for sync index definitions only. - */ - public static ElasticBulkProcessorHandler getBulkProcessorHandler(@NotNull ElasticConnection elasticConnection, - @NotNull String indexName, - @NotNull ElasticIndexDefinition indexDefinition, - @NotNull NodeBuilder definitionBuilder, CommitInfo commitInfo, - boolean waitForESAcknowledgement) { - PropertyState async = indexDefinition.getDefinitionNodeState().getProperty("async"); - - if (async != null) { - return new ElasticBulkProcessorHandler(elasticConnection, indexName, indexDefinition, definitionBuilder, waitForESAcknowledgement); - } + public static final String BULK_ACTIONS_PROP = "oak.indexer.elastic.bulkProcessor.maxBulkOperations"; + public static final int BULK_ACTIONS_DEFAULT = 8192; + public static final String BULK_SIZE_BYTES_PROP = "oak.indexer.elastic.bulkProcessor.maxBulkSizeBytes"; + public static final int BULK_SIZE_BYTES_DEFAULT = 8 * 1024 * 1024; // 8MB + public static final String BULK_FLUSH_INTERVAL_MS_PROP = "oak.indexer.elastic.bulkProcessor.bulkFlushIntervalMs"; + public static final int BULK_FLUSH_INTERVAL_MS_DEFAULT = 3000; + public static final String BULK_MAX_CONCURRENT_REQUESTS_PROP = "oak.indexer.elastic.bulkProcessor.maxConcurrentRequests"; + private static final int BULK_MAX_CONCURRENT_REQUESTS_DEFAULT = 1; + // when true, fails indexing in case of bulk failures + public static final String FAIL_ON_ERROR_PROP = "oak.indexer.elastic.bulkProcessor.failOnError"; + public static final boolean FAIL_ON_ERROR_DEFAULT = true; - // commit-info has priority over configuration in index definition - String syncMode = null; - if (commitInfo != null) { - syncMode = (String) commitInfo.getInfo().get(SYNC_MODE_PROPERTY); - } + private static final String SYNC_MODE_PROPERTY = "sync-mode"; + private static final String SYNC_RT_MODE = "rt"; + private static final int MAX_SUPPRESSED_ERROR_CAUSES = 50; - if (syncMode == null) { - PropertyState syncModeProp = indexDefinition.getDefinitionNodeState().getProperty("sync-mode"); - if (syncModeProp != null) { - syncMode = syncModeProp.getValue(Type.STRING); - } - } + private final int FAILED_DOC_COUNT_FOR_STATUS_NODE = ConfigHelper.getSystemPropertyAsInt("oak.failedDocStatusLimit", 10000); + private final int BULK_MAX_OPERATIONS = ConfigHelper.getSystemPropertyAsInt(BULK_ACTIONS_PROP, BULK_ACTIONS_DEFAULT); + private final int BULK_MAX_SIZE_BYTES = ConfigHelper.getSystemPropertyAsInt(BULK_SIZE_BYTES_PROP, BULK_SIZE_BYTES_DEFAULT); + private final int BULK_FLUSH_INTERVAL_MS = ConfigHelper.getSystemPropertyAsInt(BULK_FLUSH_INTERVAL_MS_PROP, BULK_FLUSH_INTERVAL_MS_DEFAULT); + private final int BULK_MAX_CONCURRENT_REQUESTS = ConfigHelper.getSystemPropertyAsInt(BULK_MAX_CONCURRENT_REQUESTS_PROP, BULK_MAX_CONCURRENT_REQUESTS_DEFAULT); + private final boolean FAIL_ON_ERROR = ConfigHelper.getSystemPropertyAsBoolean(FAIL_ON_ERROR_PROP, FAIL_ON_ERROR_DEFAULT); - if (SYNC_RT_MODE.equals(syncMode)) { - return new RealTimeBulkProcessorHandler(elasticConnection, indexName, indexDefinition, definitionBuilder, waitForESAcknowledgement); - } + private final ElasticConnection elasticConnection; + private final BulkIngester<OperationContext> bulkIngester; - return new ElasticBulkProcessorHandler(elasticConnection, indexName, indexDefinition, definitionBuilder, waitForESAcknowledgement); - } + // Used to keep track of the sequence number of the batches that are currently being processed. + // This is used to wait until all operations for a writer are processed before closing it. + private final ReentrantLock lock = new ReentrantLock(); + private final Condition bulkProcessedCondition = lock.newCondition(); + private final HashSet<Long> pendingBulks = new HashSet<>(); + + private final AtomicBoolean closed = new AtomicBoolean(false); + private final ConcurrentHashMap<String, IndexInfo> registeredIndexes = new ConcurrentHashMap<>(); + private final ConcurrentLinkedQueue<ErrorCause> globalSuppressedErrorCauses = new ConcurrentLinkedQueue<>(); - private BulkIngester<String> initBulkIngester() { + // Time blocked waiting to add operations to the bulk processor. + private final long startTime = System.nanoTime(); + private long totalWaitTimeNanos = 0; + + public ElasticBulkProcessorHandler(@NotNull ElasticConnection elasticConnection) { + this.elasticConnection = elasticConnection; // BulkIngester does not support retry policies. Some retries though are already implemented in the transport layer. // More details here: https://github.com/elastic/elasticsearch-java/issues/478 - return BulkIngester.of(b -> { + LOG.info("Creating bulk ingester [maxActions: {}, maxSizeBytes: {} flushInterval {}, concurrency {}]", + BULK_MAX_OPERATIONS, BULK_MAX_SIZE_BYTES, BULK_FLUSH_INTERVAL_MS, BULK_MAX_CONCURRENT_REQUESTS_PROP); + this.bulkIngester = BulkIngester.of(b -> { b = b.client(elasticConnection.getAsyncClient()) .listener(new OakBulkListener()); - if (indexDefinition.bulkActions > 0) { - b = b.maxOperations(indexDefinition.bulkActions); + if (BULK_MAX_OPERATIONS > 0) { + b = b.maxOperations(BULK_MAX_OPERATIONS); } - if (indexDefinition.bulkSizeBytes > 0) { - b = b.maxSize(indexDefinition.bulkSizeBytes); + if (BULK_MAX_SIZE_BYTES > 0) { + b = b.maxSize(BULK_MAX_SIZE_BYTES); } - if (indexDefinition.bulkFlushIntervalMs > 0) { - b = b.flushInterval(indexDefinition.bulkFlushIntervalMs, TimeUnit.MILLISECONDS); + if (BULK_FLUSH_INTERVAL_MS > 0) { + b = b.flushInterval(BULK_FLUSH_INTERVAL_MS, TimeUnit.MILLISECONDS); } - - return b.maxConcurrentRequests(BULK_PROCESSOR_CONCURRENCY); + if (BULK_MAX_CONCURRENT_REQUESTS > 0) { + b = b.maxConcurrentRequests(BULK_MAX_CONCURRENT_REQUESTS); + } + return b; }); } - private void checkFailures() throws IOException { - if (!suppressedErrorCauses.isEmpty()) { - IOException ioe = new IOException("Exception while indexing. See suppressed for details"); - suppressedErrorCauses.stream().map(ec -> new IllegalStateException(ec.reason())).forEach(ioe::addSuppressed); - throw ioe; + /** + * Registers an ElasticIndex with the given index definition configuration. + * <p> + * The `sync-mode` property can be set to `rt` (real-time). In this case the returned handler will be real-time. + * This option is available for sync index definitions only. + */ + public void registerIndex(String indexName, ElasticIndexDefinition indexDefinition, NodeBuilder definitionBuilder, CommitInfo commitInfo, boolean waitForESAcknowledgement) { + checkOpen(); + if (registeredIndexes.containsKey(indexName)) { + LOG.warn("Index already registered: {}", indexName); Review Comment: For each update cycle, writers are registered at the start and unregistered at the end. And if my understanding is correctly, when a writer is created, the index definition is read again. I try to explain the process below, could you please check if it makes sense? The incremental indexing is managed by the `AsyncIndexUpdate`, and initiated by the method `runWhenPermittted()` https://github.com/apache/jackrabbit-oak/blob/55bc3e17a6c1fcff3e570935ae028f1977a626a1/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java#L488 This calls `updateIndex()`, which creates a new `IndexUpdate` instance: https://github.com/apache/jackrabbit-oak/blob/55bc3e17a6c1fcff3e570935ae028f1977a626a1/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java#L810 This is what actually does the index update. Each of these instances, creates new writers at the start, in the `enter()` method: https://github.com/apache/jackrabbit-oak/blob/55bc3e17a6c1fcff3e570935ae028f1977a626a1/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java#L175 If I understand the code correctly, new writers will read the definition from the NodeStore and should pick up changes: https://github.com/apache/jackrabbit-oak/blob/55bc3e17a6c1fcff3e570935ae028f1977a626a1/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java#L320 https://github.com/apache/jackrabbit-oak/blob/55bc3e17a6c1fcff3e570935ae028f1977a626a1/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexEditorProvider.java#L69 The writers are closed here in FulltextIndexEditor, by the `leave` method when leaving the root node: https://github.com/apache/jackrabbit-oak/blob/55bc3e17a6c1fcff3e570935ae028f1977a626a1/oak-search/src/main/java/org/apache/jackrabbit/oak/plugins/index/search/spi/editor/FulltextIndexEditor.java#L153 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: oak-dev-unsubscr...@jackrabbit.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org