Jackie-Jiang commented on code in PR #11411:
URL: https://github.com/apache/pinot/pull/11411#discussion_r1303409129


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java:
##########
@@ -62,70 +55,55 @@ public Map<Integer, List<TransferableBlock>> getResultMap() 
{
   }
 
   @Nullable
+  public TransferableBlock getErrorBlock() {
+    return _errorBlock;
+  }
+
   @Override
   public String toExplainString() {
     return EXPLAIN_NAME;
   }
 
   @Override
   protected TransferableBlock getNextBlock() {
-    // Poll from every mailbox operator:
-    // - Return the first content block
-    // - If no content block found but there are mailboxes not finished, try 
again
-    // - If all content blocks are already returned, return end-of-stream block
-    while (!_workerEntries.isEmpty()) {
-      if (_finalBlock != null) {
-        return _finalBlock;
-      }
-      if (System.currentTimeMillis() > _context.getDeadlineMs()) {
-        _finalBlock = 
TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
-        constructErrorResponse(_finalBlock);
-        return _finalBlock;
-      }
-
-      Map.Entry<Integer, Operator<TransferableBlock>> worker = 
_workerEntries.getLast();
-      TransferableBlock block = worker.getValue().nextBlock();
-
-      if (block == null) {
-        if (LOGGER.isDebugEnabled()) {
-          LOGGER.debug("==[PB]== Null block on " + _context.getId() + " worker 
" + worker.getKey());
+    if (_errorBlock != null) {
+      return _errorBlock;
+    }
+    // NOTE: Put an empty list for each worker in case there is no data block 
returned from that worker
+    if (_workerMap.size() == 1) {
+      Map.Entry<Integer, Operator<TransferableBlock>> entry = 
_workerMap.entrySet().iterator().next();
+      List<TransferableBlock> dataBlocks = new ArrayList<>();
+      _resultMap = Collections.singletonMap(entry.getKey(), dataBlocks);
+      Operator<TransferableBlock> operator = entry.getValue();
+      TransferableBlock block = operator.nextBlock();
+      while (!block.isSuccessfulEndOfStreamBlock()) {
+        if (block.isErrorBlock()) {
+          _errorBlock = block;
+          return block;
         }
-        continue;
+        dataBlocks.add(block);
+        block = operator.nextBlock();
       }
-
-      // Release the mailbox worker when the block is end-of-stream
-      if (block.isSuccessfulEndOfStreamBlock()) {
-        _workerEntries.removeLast();
-        continue;
+    } else {
+      _resultMap = new HashMap<>();
+      for (int workerKey : _workerMap.keySet()) {
+        _resultMap.put(workerKey, new ArrayList<>());
       }
-
-      if (block.isErrorBlock()) {
-        _finalBlock = block;
-      }
-      if (LOGGER.isTraceEnabled()) {
-        LOGGER.trace("==[PB]== Returned block from : " + _context.getId() + " 
block: " + block);
-      }
-      _resultMap.get(worker.getKey()).add(block);
-      return block;
-    }
-    if (System.currentTimeMillis() > _context.getDeadlineMs()) {
-      _finalBlock = 
TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
-      return _finalBlock;
-    } else if (_finalBlock == null) {
-      if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug("==[PB]== Finished : " + _context.getId());
+      // Keep polling from every operator in round-robin fashion
+      Queue<Map.Entry<Integer, Operator<TransferableBlock>>> entries = new 
ArrayDeque<>(_workerMap.entrySet());
+      while (!entries.isEmpty()) {
+        Map.Entry<Integer, Operator<TransferableBlock>> entry = entries.poll();
+        TransferableBlock block = entry.getValue().nextBlock();
+        if (block.isErrorBlock()) {
+          _errorBlock = block;
+          return block;
+        }
+        if (block.isDataBlock()) {
+          _resultMap.get(entry.getKey()).add(block);
+          entries.offer(entry);
+        }

Review Comment:
   It is tracked within the op-chain



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to