Some users want to use this api to judge if there's messages to receive, like that pseudo code: if (consumer.hasMessage()) { .submit(() -> { consumer.pollMessagesAccordingToTheDistributedFlowControl() }) }
don't want to remove the message from queue. PengHui Li <peng...@apache.org> 于2021年10月27日周三 下午7:43写道: > @ZhangJian He, as Matteo mentioned, Use `consumer.receive(0, > TimeUnit.SECONDS)` can achieve the same purpose for checking if there are > messages in the local cache. > > Thanks > Penghui > > On Tue, Oct 26, 2021 at 2:35 PM ZhangJian He <shoot...@gmail.com> wrote: > > > If some users need the message content to do user-defined actions, we > need > > to ensure the user can't use the `peekMessage` to do things like ack > > because the message are still in the blockingQueue, return just a > > content-copy? > > > > introduced `localBuffer` might be good ? `hasMessagesInLocalBuffer` > > > > JiangHaiting <jianghait...@foxmail.com> 于2021年10月26日周二 下午2:20写道: > > > > > I'm totally +1 for the feature to check if we can get > > > message immediately from consumer, this is to say we have message > > > locally. > > > > > > > > > In my understanding, it's useful to implement some user-defined order > to > > > consume messages among different topics, in your case, the "distributed > > > flow control ability". > > > But in the past few years, I've met some users have defined the consume > > > order of different topics by part of the message content, like > > > some critical property value. > > > In these situations, a `peek` method is more suitable. > > > > > > > > > Further more, peek is not effectively equals to `consumer.receive(0, > > > TimeUnit.SECONDS)`. As you will have to store the message somewhere > else > > if > > > you find that it's not the most priority message to process. > > > > > > > > > One last thing, put the concept of "receiverQueue" in the api of > > consumer, > > > seems a little bit strange, IMHO. > > > > > > > > > > > > > > > ------------------ Original ------------------ > > > From: > > > "dev" > > > < > > > shoot...@gmail.com>; > > > Date: Tue, Oct 26, 2021 12:54 PM > > > To: "dev"<dev@pulsar.apache.org>; > > > > > > Subject: Re: [DISCUSSION] PIP-108: Add method to help user judge > if > > > consumer queue has message > > > > > > > > > > > > 3. Our solution implements the distributed flow control ability at > client > > > side, so we don't use the listener way. > > > 2. Per customer per consumer in different tenants and namespace, and > the > > > `flow-control` need(Some of our customer's machines can't work on high > > > traffic), So `Multi-topic` can't use. > > > 1. We want to use this api to judge if there's messages to receive, > like > > > that pseudo code > > > if (consumer.hasMessage()) { > > > .submit(() -> { > > > consumer.pollMessagesAccordingToTheDistributedFlowControl() > > > }) > > > } > > > > > > Matteo Merli <matteo.me...@gmail.com> 于2021年10月26日周二 下午12:15写道: > > > > > > > I'm a bit hesitant about this because I think there are already at > > > > least 3 different ways to handle similar scenarios. > > > > > > > > 1. Using listener and avoid calling receive directly > > > > 2. Use multi-topic consumer, so there's a single `Consumer` > > > instance > > > > exposed > > > > 3. Use `consumer.receive(0, TimeUnit.SECONDS)` to probe for > > > message > > > > > > > > > > > > -- > > > > Matteo Merli > > > > <matteo.me...@gmail.com> > > > > > > > > On Mon, Oct 25, 2021 at 7:34 PM ZhangJian He <shoot...@gmail.com > > > > > > wrote: > > > > > > > > > > I think it's better to add the method to Consumer interface > > > instead of > > > > let > > > > > user casting it to `ConsumerBase`. > > > > > `peek` is most complexly, for the reason, I can use the > > > `peek` object to > > > > > ack、negative ack, but when to remove from the > `BlockingQueue`? > > > > > IMHO, people use this api are just to judge if has the > message, > > > > otherwise, > > > > > they can just use `receive(0,TimeUnit) > > > > > > > > > > JiangHaiting <jianghait...@foxmail.com> 于2021年10月26日周二 > > > 上午10:19写道: > > > > > > > > > > > Can this method > > > > > > > > > "org.apache.pulsar.client.impl.ConsumerBase#getTotalIncomingMessages" > > > > do > > > > > > the trick? Though you have to change the type to > > > ConsumerBase. > > > > > > > > > > > > > > > > > > And maybe `peek` is more suitable and useful to add to > the > > > Consumer > > > > > > interface? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > ------------------&nbsp;Original&nbsp;------------------ > > > > > > From: > > > > > > > > > > > > > > > "dev" > > > > > > > > > > > > > > > < > > > > > > shoot...@gmail.com&gt;; > > > > > > Date:&nbsp;Mon, Oct 25, 2021 07:24 PM > > > > > > To:&nbsp;"dev"<dev@pulsar.apache.org&gt;; > > > > > > > > > > > > Subject:&nbsp;[DISCUSSION] PIP-108: Add method to > help > > > user judge if > > > > > > consumer queue has message > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/pulsar/issues/12479 > > > > > > > > > > > > --- Pasted here for quoting convenience --- > > > > > > > > > > > > ## Motivation > > > > > > Currently, I have an application that manages ten > thousand > > > of > > > > consumers, > > > > > > and a logic to schedule consumers's receive. It would be > > > helpful to > > > > know if > > > > > > one of the consumers have message to recive. > > > > > > > > > > > > ## Goal > > > > > > To make `Consumer` can judge if there are unreceiving > > > messages > > > > > > > > > > > > ## API Changes > > > > > > > > > > > > Add `hasMessageInReceiverQueue` on the `Consumer` > > interface. > > > > > > > > > > > > ## Implementation > > > > > > > > > > > > For `ZeroQueueConsumerImpl` return false, Others, judge > > the > > > > > > `receiveQueueSize` greater than zero. > > > > > > > > > > > > > > > > > > ## Reject Alternatives > > > > > > > > > > > > No alternatives yet. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > --- > > > > > > Thanks, > > > > > > Haiting Jiang (Github: Jason918) > > > > > > > > > > > > > > > > > > > --- > > > Thanks, > > > Haiting Jiang (Github: Jason918) > > >