nfsantos commented on code in PR #2183: URL: https://github.com/apache/jackrabbit-oak/pull/2183#discussion_r2038864642
########## oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java: ########## @@ -192,72 +258,222 @@ public void update(String id, ElasticDocument document) throws IOException { } // Add the update operation with the script - add(BulkOperation.of(op -> op.update(uf -> uf.index(indexName).id(id) - .action(uaf -> uaf.script(s -> s.source(script.toString()).params("document", JsonData.of(document))) - .upsert(document)))), id); + add(BulkOperation.of(op -> op.update(uf -> + uf.index(indexName).id(id).action(uaf -> + uaf.script(s -> s.source(script.toString()).params("document", JsonData.of(document))) + .upsert(document)))), + context); } } - public void delete(String id) throws IOException { - add(BulkOperation.of(op -> op.delete(idx -> idx.index(indexName).id(id))), id); - } - - private void add(BulkOperation operation, String context) throws IOException { - // fail fast: we don't want to wait until the processor gets closed to fail - checkFailures(); - bulkIngester.add(operation, context); - totalOperations++; + public void delete(String indexName, String id) throws IOException { + checkOpen(); + IndexInfo indexInfo = getIndexInfoOrFail(indexName); + indexInfo.deleteOperations++; + add(BulkOperation.of(op -> op.delete(idx -> idx.index(indexName).id(id))), new OperationContext(indexInfo, id)); } /** - * Closes the bulk ingester and waits for all the bulk requests to return. + * Closes an index. The underlying bulk ingestor will be flushed, to ensure that all pending operations for this + * index are sent to the server. If this index was registered with waitForESAcknowledgement set to true, then this + * method will wait until we receive an acknowledgement from the server for all the operations up to when this + * method was called. + * <p> + * Note: Closing an index will have the side-effect of flushing all pending operations for all indexes registered + * with the bulk processor. This should be transparent for the user, but it may mean that this method would take + * longer to return than if it was flushing only the operations for the index being closed. * * @return {@code true} if at least one update was performed, {@code false} otherwise * @throws IOException if an error happened while processing the bulk requests */ - public boolean close() throws IOException { - LOG.trace("Calling close on bulk ingester {}", bulkIngester); - bulkIngester.close(); - LOG.trace("Bulk Ingester {} closed", bulkIngester); - - // de-register main controller - int phase = phaser.arriveAndDeregister(); - - if (totalOperations == 0) { // no need to invoke phaser await if we already know there were no operations - LOG.debug("No operations executed in this processor. Close immediately"); - return false; + public boolean closeIndex(String indexName) throws IOException { Review Comment: I renamed to `flushIndex`. Indeed it is a better description of what the method does. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: oak-dev-unsubscr...@jackrabbit.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org