nfsantos commented on code in PR #2410:
URL: https://github.com/apache/jackrabbit-oak/pull/2410#discussion_r2250359188


##########
oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java:
##########
@@ -415,31 +466,72 @@ private void scan() {
                 LOG.trace("Kicking new search after query {}", searchReq);
 
                 searchStartTime = System.currentTimeMillis();
-                indexNode.getConnection().getAsyncClient()
+                ongoingRequest = indexNode.getConnection().getAsyncClient()
                         .search(searchReq, ObjectNode.class)
-                        .whenComplete(((searchResponse, throwable) -> {
-                            if (throwable != null) {
-                                onFailure(throwable);
-                            } else onSuccess(searchResponse);
-                        }));
+                        .whenComplete(this::handleResponse);
                 
metricHandler.markQuery(indexNode.getDefinition().getIndexPath(), false);
             } else {
                 LOG.trace("Scanner is closing or still processing data from 
the previous scan");
             }
         }
 
+        private void handleResponse(SearchResponse<ObjectNode> searchResponse, 
Throwable throwable) {
+            ongoingRequest = null;
+            if (isClosed.get()) {
+                LOG.info("Scanner is closed, not processing search response");
+                return;
+            }
+            try {
+                if (throwable == null) {
+                    onSuccess(searchResponse);
+                } else {
+                    onFailure(throwable);
+                }
+            } catch (Throwable t) {
+                LOG.warn("Error processing search response", t);
+                Throwable prevValue = systemErrorRef.getAndSet(t);
+                if (prevValue != null) {
+                    LOG.warn("System error reference was previously set to {}. 
It has now been reset to new error {}", prevValue.getMessage(), t.getMessage());
+                }
+                try {
+                    if (!queue.offer(POISON_PILL, enqueueTimeoutMs, 
TimeUnit.MILLISECONDS)) {
+                        LOG.warn("Timeout waiting to enqueue poison pill after 
error processing search response. The iterator might not be closed properly.");
+                    }
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();  // restore interrupt 
status
+                    LOG.warn("Interrupted while trying to enqueue poison pill 
after error processing search response", e);
+                }
+                throw t;

Review Comment:
   I changed the method to not throw an exception, to respect the contract of 
`whenComplete`. 
   I want to preserve the semantics that the consumer will see all the results 
that were received until the error occurred. So any results on the queue should 
be given to the consumer, before we close the iterator due to errors. That's 
why in this error handler I enqueue the poison pill to schedule throwing an 
error to the consumer when it finishes reading the results. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: oak-dev-unsubscr...@jackrabbit.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to