gortiz commented on code in PR #11205:
URL: https://github.com/apache/pinot/pull/11205#discussion_r1277731284
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java:
##########
@@ -51,16 +51,30 @@ public class ReceivingMailbox {
// TODO: Revisit if this is the correct way to apply back pressure
private final BlockingQueue<TransferableBlock> _blocks = new
ArrayBlockingQueue<>(DEFAULT_MAX_PENDING_BLOCKS);
private final AtomicReference<TransferableBlock> _errorBlock = new
AtomicReference<>();
+ @Nullable
+ private volatile Reader _reader;
public ReceivingMailbox(String id, Consumer<OpChainId> receiveMailCallback) {
_id = id;
_receiveMailCallback = receiveMailCallback;
}
+ public void registeredReader(Reader reader) {
Review Comment:
This is my main contribution over @walterddr's code. Instead of storing a
map in the scheduler that maps op chains to their blocking objects (in his case
a blocking list of 1 element) here I'm registering the ReceivingOperator on the
mailbox itself. By doing that we decouple the receiving operation from the
scheduler.
The ReceivingMailbox will notify the reader (if any) whenever a message is
received. The older `_receiveMailCallback` is maintained because some tests use
it, but we may decide to change it in a way that it doesn't need to allocate
OpChainIds for each message sent.
You can see BaseMailboxReceiveOperator.onData to understand how this
callback is implemented.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]