I'm totally +1 for the feature to check if we can get message immediately from consumer, this is to say we have message locally.
In my understanding, it's useful to implement some user-defined order to consume messages among different topics, in your case, the "distributed flow control ability". But in the past few years, I've met some users have defined the consume order of different topics by part of the message content, like some critical property value. In these situations, a `peek` method is more suitable. Further more, peek is not effectively equals to `consumer.receive(0, TimeUnit.SECONDS)`. As you will have to store the message somewhere else if you find that it's not the most priority message to process. One last thing, put the concept of "receiverQueue" in the api of consumer, seems a little bit strange, IMHO. ------------------ Original ------------------ From: "dev" <shoot...@gmail.com>; Date: Tue, Oct 26, 2021 12:54 PM To: "dev"<dev@pulsar.apache.org>; Subject: Re: [DISCUSSION] PIP-108: Add method to help user judge if consumer queue has message 3. Our solution implements the distributed flow control ability at client side, so we don't use the listener way. 2. Per customer per consumer in different tenants and namespace, and the `flow-control` need(Some of our customer's machines can't work on high traffic), So `Multi-topic` can't use. 1. We want to use this api to judge if there's messages to receive, like that pseudo code if (consumer.hasMessage()) { .submit(() -> { consumer.pollMessagesAccordingToTheDistributedFlowControl() }) } Matteo Merli <matteo.me...@gmail.com> ??2021??10??26?????? ????12:15?????? > I'm a bit hesitant about this because I think there are already at > least 3 different ways to handle similar scenarios. > > 1. Using listener and avoid calling receive directly > 2. Use multi-topic consumer, so there's a single `Consumer` instance > exposed > 3. Use `consumer.receive(0, TimeUnit.SECONDS)` to probe for message > > > -- > Matteo Merli > <matteo.me...@gmail.com> > > On Mon, Oct 25, 2021 at 7:34 PM ZhangJian He <shoot...@gmail.com> wrote: > > > > I think it's better to add the method to Consumer interface instead of > let > > user casting it to `ConsumerBase`. > > `peek` is most complexly, for the reason, I can use the `peek` object to > > ack??negative ack, but when to remove from the `BlockingQueue`? > > IMHO, people use this api are just to judge if has the message, > otherwise, > > they can just use `receive(0,TimeUnit) > > > > JiangHaiting <jianghait...@foxmail.com> ??2021??10??26?????? ????10:19?????? > > > > > Can this method > > > "org.apache.pulsar.client.impl.ConsumerBase#getTotalIncomingMessages" > do > > > the trick? Though you have to change the type to ConsumerBase. > > > > > > > > > And maybe `peek` is more suitable and useful to add to the Consumer > > > interface?? > > > > > > > > > > > > > > > > > > > > > ------------------&nbsp;Original&nbsp;------------------ > > > From: > > > "dev" > > > < > > > shoot...@gmail.com&gt;; > > > Date:&nbsp;Mon, Oct 25, 2021 07:24 PM > > > To:&nbsp;"dev"<dev@pulsar.apache.org&gt;; > > > > > > Subject:&nbsp;[DISCUSSION] PIP-108: Add method to help user judge if > > > consumer queue has message > > > > > > > > > > > > https://github.com/apache/pulsar/issues/12479 > > > > > > --- Pasted here for quoting convenience --- > > > > > > ## Motivation > > > Currently, I have an application that manages ten thousand of > consumers, > > > and a logic to schedule consumers's receive. It would be helpful to > know if > > > one of the consumers have message to recive. > > > > > > ## Goal > > > To make `Consumer` can judge if there are unreceiving messages > > > > > > ## API Changes > > > > > > Add `hasMessageInReceiverQueue` on the `Consumer` interface. > > > > > > ## Implementation > > > > > > For `ZeroQueueConsumerImpl` return false, Others, judge the > > > `receiveQueueSize` greater than zero. > > > > > > > > > ## Reject Alternatives > > > > > > No alternatives yet. > > > > > > > > > > > > > > > --- > > > Thanks, > > > Haiting Jiang (Github: Jason918) > --- Thanks, Haiting Jiang (Github: Jason918)