Hello, matteo, About my PIP 108, the reason I want to have method `hasMessageInReceiverQueue` is that We need to control consumer at different rate. For example, consumer A 10msg/s, consumer B 100 msg/s so I can't use the `listener` mode. the `receive(0)` method will remove the message from queue. currently my work flow is 1) check if has message 2) apply for flow quota 3) receive messages. I can not put the mssage back to the queue, can't use the `receive(0)`. And `apply for flow quota` is a costly action. And discussed with PengHui Li and Hang Chen, we think `hasLocalMessages` is better than hasMessageInReceiverQueue
ZhangJian He <shoot...@gmail.com> 于2021年10月27日周三 下午7:47写道: > Some users want to use this api to judge if there's messages to receive, > like that pseudo code: > if (consumer.hasMessage()) { > .submit(() -> { > consumer.pollMessagesAccordingToTheDistributedFlowControl() > }) > } > > don't want to remove the message from queue. > > PengHui Li <peng...@apache.org> 于2021年10月27日周三 下午7:43写道: > >> @ZhangJian He, as Matteo mentioned, Use `consumer.receive(0, >> TimeUnit.SECONDS)` can achieve the same purpose for checking if there are >> messages in the local cache. >> >> Thanks >> Penghui >> >> On Tue, Oct 26, 2021 at 2:35 PM ZhangJian He <shoot...@gmail.com> wrote: >> >> > If some users need the message content to do user-defined actions, we >> need >> > to ensure the user can't use the `peekMessage` to do things like ack >> > because the message are still in the blockingQueue, return just a >> > content-copy? >> > >> > introduced `localBuffer` might be good ? `hasMessagesInLocalBuffer` >> > >> > JiangHaiting <jianghait...@foxmail.com> 于2021年10月26日周二 下午2:20写道: >> > >> > > I'm totally +1 for the feature to check if we can get >> > > message immediately from consumer, this is to say we have message >> > > locally. >> > > >> > > >> > > In my understanding, it's useful to implement some user-defined order >> to >> > > consume messages among different topics, in your case, the >> "distributed >> > > flow control ability". >> > > But in the past few years, I've met some users have defined the >> consume >> > > order of different topics by part of the message content, like >> > > some critical property value. >> > > In these situations, a `peek` method is more suitable. >> > > >> > > >> > > Further more, peek is not effectively equals to `consumer.receive(0, >> > > TimeUnit.SECONDS)`. As you will have to store the message somewhere >> else >> > if >> > > you find that it's not the most priority message to process. >> > > >> > > >> > > One last thing, put the concept of "receiverQueue" in the api of >> > consumer, >> > > seems a little bit strange, IMHO. >> > > >> > > >> > > >> > > >> > > ------------------ Original ------------------ >> > > From: >> > > "dev" >> > > < >> > > shoot...@gmail.com>; >> > > Date: Tue, Oct 26, 2021 12:54 PM >> > > To: "dev"<dev@pulsar.apache.org>; >> > > >> > > Subject: Re: [DISCUSSION] PIP-108: Add method to help user judge >> if >> > > consumer queue has message >> > > >> > > >> > > >> > > 3. Our solution implements the distributed flow control ability at >> client >> > > side, so we don't use the listener way. >> > > 2. Per customer per consumer in different tenants and namespace, and >> the >> > > `flow-control` need(Some of our customer's machines can't work on high >> > > traffic), So `Multi-topic` can't use. >> > > 1. We want to use this api to judge if there's messages to receive, >> like >> > > that pseudo code >> > > if (consumer.hasMessage()) { >> > > .submit(() -> { >> > > consumer.pollMessagesAccordingToTheDistributedFlowControl() >> > > }) >> > > } >> > > >> > > Matteo Merli <matteo.me...@gmail.com> 于2021年10月26日周二 下午12:15写道: >> > > >> > > > I'm a bit hesitant about this because I think there are already >> at >> > > > least 3 different ways to handle similar scenarios. >> > > > >> > > > 1. Using listener and avoid calling receive directly >> > > > 2. Use multi-topic consumer, so there's a single `Consumer` >> > > instance >> > > > exposed >> > > > 3. Use `consumer.receive(0, TimeUnit.SECONDS)` to probe for >> > > message >> > > > >> > > > >> > > > -- >> > > > Matteo Merli >> > > > <matteo.me...@gmail.com> >> > > > >> > > > On Mon, Oct 25, 2021 at 7:34 PM ZhangJian He <shoot...@gmail.com >> > > >> > > wrote: >> > > > > >> > > > > I think it's better to add the method to Consumer interface >> > > instead of >> > > > let >> > > > > user casting it to `ConsumerBase`. >> > > > > `peek` is most complexly, for the reason, I can use >> the >> > > `peek` object to >> > > > > ack、negative ack, but when to remove from the >> `BlockingQueue`? >> > > > > IMHO, people use this api are just to judge if has the >> message, >> > > > otherwise, >> > > > > they can just use `receive(0,TimeUnit) >> > > > > >> > > > > JiangHaiting <jianghait...@foxmail.com> 于2021年10月26日周二 >> > > 上午10:19写道: >> > > > > >> > > > > > Can this method >> > > > > > >> > > "org.apache.pulsar.client.impl.ConsumerBase#getTotalIncomingMessages" >> > > > do >> > > > > > the trick? Though you have to change the type to >> > > ConsumerBase. >> > > > > > >> > > > > > >> > > > > > And maybe `peek` is more suitable and useful to add to >> the >> > > Consumer >> > > > > > interface? >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > ------------------&nbsp;Original&nbsp;------------------ >> > > > > > From: >> > > > > >> > > >> > >> > >> > > "dev" >> > > > > >> > > >> > >> > >> > > < >> > > > > > shoot...@gmail.com&gt;; >> > > > > > Date:&nbsp;Mon, Oct 25, 2021 07:24 PM >> > > > > > To:&nbsp;"dev"<dev@pulsar.apache.org&gt;; >> > > > > > >> > > > > > Subject:&nbsp;[DISCUSSION] PIP-108: Add method to >> help >> > > user judge if >> > > > > > consumer queue has message >> > > > > > >> > > > > > >> > > > > > >> > > > > > https://github.com/apache/pulsar/issues/12479 >> > > > > > >> > > > > > --- Pasted here for quoting convenience --- >> > > > > > >> > > > > > ## Motivation >> > > > > > Currently, I have an application that manages ten >> thousand >> > > of >> > > > consumers, >> > > > > > and a logic to schedule consumers's receive. It would >> be >> > > helpful to >> > > > know if >> > > > > > one of the consumers have message to recive. >> > > > > > >> > > > > > ## Goal >> > > > > > To make `Consumer` can judge if there are unreceiving >> > > messages >> > > > > > >> > > > > > ## API Changes >> > > > > > >> > > > > > Add `hasMessageInReceiverQueue` on the `Consumer` >> > interface. >> > > > > > >> > > > > > ## Implementation >> > > > > > >> > > > > > For `ZeroQueueConsumerImpl` return false, Others, judge >> > the >> > > > > > `receiveQueueSize` greater than zero. >> > > > > > >> > > > > > >> > > > > > ## Reject Alternatives >> > > > > > >> > > > > > No alternatives yet. >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > --- >> > > > > > Thanks, >> > > > > > Haiting Jiang (Github: Jason918) >> > > > >> > > >> > > >> > > >> > > >> > > --- >> > > Thanks, >> > > Haiting Jiang (Github: Jason918) >> > >> >