gortiz commented on code in PR #11205:
URL: https://github.com/apache/pinot/pull/11205#discussion_r1285457564
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java:
##########
@@ -51,16 +51,30 @@ public class ReceivingMailbox {
// TODO: Revisit if this is the correct way to apply back pressure
private final BlockingQueue<TransferableBlock> _blocks = new
ArrayBlockingQueue<>(DEFAULT_MAX_PENDING_BLOCKS);
private final AtomicReference<TransferableBlock> _errorBlock = new
AtomicReference<>();
+ @Nullable
+ private volatile Reader _reader;
public ReceivingMailbox(String id, Consumer<OpChainId> receiveMailCallback) {
_id = id;
_receiveMailCallback = receiveMailCallback;
}
+ public void registeredReader(Reader reader) {
Review Comment:
> At that moment the reader is not registered but there are contents within
the mailbox
Is that something that can only happen in `InMemorySendingMailbox`? I though
it could also happen in the grpc one. Anyway, this supports this pattern:
1. If reader is created before the writer, no problem. It will be blocked in
the queue and registered as listener.
2. In the other case, the writer will add the block to the queue and will
notify nobody. It will continue producing messages until the stream is closed
or the buffer is full (same conditions we had before).
3. Once the reader is registered, it won't see the previous notifications
(because sender didn't send them) but it will first read the queue.
4. Once the queue is completely consumed, instead of blocking on the queue,
we block on a operator defined structure (which is also a queue, but that is
not relevant). That structure is shared for the operator, which may have
several read mailboxes, and unblock whenever one them has data. In fact this
part is very similar to what we had before. But instead of storing that
structure in a map in the dispatcher /scheduler (which sounds strange now that
there is nothing to schedule), it is stored in the operator.
> Our use pattern is not standard, so not sure if we should put a general
framework.
That is not correct. First, it is quite standard. In fact reactive-streams
implements the same feature (in a even more generic fashion, letting you chose
between cold and hot streams). I would even suggest to use reactive patterns in
future to make the code simpler from our side.
But also that is not correct inside our own project. Right now, in master,
both `MailboxReceiveOperator` and `SortedMailboxReceiveOperator` share this
code (in the algorithmic form used in master, with noops and delegating on the
dispatcher/scheduler to block). Also, as shown in the previous commits,
pipeline breaker could be implemented using the same pattern (but using
blocking streams instead of unblocking). I've removed that because you had some
concerns, but we use that pattern in at least 2 places. Given how complex is to
reason about concurrency, I think it is quite better to use a single, generic
structure that can be tested in isolation and then used that in different
places.
If you are really concerned about the lack of reusability, then we can
rollback the changes introduced in c936badf3f8f158568d51793ba59fe657acf7a13,
when I've added this class in order to reuse the code between receiving mailbox
and pipeline breaker and go back to the version we had in
650e4a88c032c1e9e0345cf82c577474fdd357e8, where mostly the same code was in
`BaseMailboxReceiveOperator` and therefore only `MailboxReceiveOperator` and
`SortedMailboxReceiveOperator`.
Alternatively you may be asking to go even further back and recover the
algorithm where the blocking code was in the dispatcher/scheduler. We can
discuss about that, but I think that is an incorrect pattern that introduces
dependencies between the scheduler and the operators. We can go back there if
you think there is a correctness issue with this proposed new structure, but I
don't think reusability is a strong reason recover that code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]