xiangfu0 commented on code in PR #11411:
URL: https://github.com/apache/pinot/pull/11411#discussion_r1303392649
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java:
##########
@@ -62,70 +55,55 @@ public Map<Integer, List<TransferableBlock>> getResultMap()
{
}
@Nullable
+ public TransferableBlock getErrorBlock() {
+ return _errorBlock;
+ }
+
@Override
public String toExplainString() {
return EXPLAIN_NAME;
}
@Override
protected TransferableBlock getNextBlock() {
- // Poll from every mailbox operator:
- // - Return the first content block
- // - If no content block found but there are mailboxes not finished, try
again
- // - If all content blocks are already returned, return end-of-stream block
- while (!_workerEntries.isEmpty()) {
- if (_finalBlock != null) {
- return _finalBlock;
- }
- if (System.currentTimeMillis() > _context.getDeadlineMs()) {
- _finalBlock =
TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
- constructErrorResponse(_finalBlock);
- return _finalBlock;
- }
-
- Map.Entry<Integer, Operator<TransferableBlock>> worker =
_workerEntries.getLast();
- TransferableBlock block = worker.getValue().nextBlock();
-
- if (block == null) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("==[PB]== Null block on " + _context.getId() + " worker
" + worker.getKey());
+ if (_errorBlock != null) {
+ return _errorBlock;
+ }
+ // NOTE: Put an empty list for each worker in case there is no data block
returned from that worker
+ if (_workerMap.size() == 1) {
+ Map.Entry<Integer, Operator<TransferableBlock>> entry =
_workerMap.entrySet().iterator().next();
+ List<TransferableBlock> dataBlocks = new ArrayList<>();
+ _resultMap = Collections.singletonMap(entry.getKey(), dataBlocks);
+ Operator<TransferableBlock> operator = entry.getValue();
+ TransferableBlock block = operator.nextBlock();
+ while (!block.isSuccessfulEndOfStreamBlock()) {
+ if (block.isErrorBlock()) {
+ _errorBlock = block;
+ return block;
}
- continue;
+ dataBlocks.add(block);
+ block = operator.nextBlock();
}
-
- // Release the mailbox worker when the block is end-of-stream
- if (block.isSuccessfulEndOfStreamBlock()) {
- _workerEntries.removeLast();
- continue;
+ } else {
+ _resultMap = new HashMap<>();
+ for (int workerKey : _workerMap.keySet()) {
+ _resultMap.put(workerKey, new ArrayList<>());
}
-
- if (block.isErrorBlock()) {
- _finalBlock = block;
- }
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("==[PB]== Returned block from : " + _context.getId() + "
block: " + block);
- }
- _resultMap.get(worker.getKey()).add(block);
- return block;
- }
- if (System.currentTimeMillis() > _context.getDeadlineMs()) {
- _finalBlock =
TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
- return _finalBlock;
- } else if (_finalBlock == null) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("==[PB]== Finished : " + _context.getId());
+ // Keep polling from every operator in round-robin fashion
+ Queue<Map.Entry<Integer, Operator<TransferableBlock>>> entries = new
ArrayDeque<>(_workerMap.entrySet());
+ while (!entries.isEmpty()) {
+ Map.Entry<Integer, Operator<TransferableBlock>> entry = entries.poll();
+ TransferableBlock block = entry.getValue().nextBlock();
+ if (block.isErrorBlock()) {
+ _errorBlock = block;
+ return block;
+ }
+ if (block.isDataBlock()) {
+ _resultMap.get(entry.getKey()).add(block);
+ entries.offer(entry);
+ }
Review Comment:
Do we check for statsMap of endOfStream data block?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]