I'm totally +1 for the feature to check if we can get message immediately
from consumer, this is to say we have message locally.
In my understanding, it's useful to implement some user-defined order to
consume messages among different topics, in your case, the "distributed flow
control ability".
But in the past few years, I've met some users have defined the consume order
of different topics by part of the message content, like some critical
property value.
In these situations, a `peek` method is more suitable.
Further more, peek is not effectively equals to `consumer.receive(0,
TimeUnit.SECONDS)`. As you will have to store the message somewhere else if you
find that it's not the most priority message to process.
One last thing, put the concept of "receiverQueue" in the api of consumer,
seems a little bit strange, IMHO.
------------------ Original ------------------
From:
"dev"
<[email protected]>;
Date: Tue, Oct 26, 2021 12:54 PM
To: "dev"<[email protected]>;
Subject: Re: [DISCUSSION] PIP-108: Add method to help user judge if
consumer queue has message
3. Our solution implements the distributed flow control ability at client
side, so we don't use the listener way.
2. Per customer per consumer in different tenants and namespace, and the
`flow-control` need(Some of our customer's machines can't work on high
traffic), So `Multi-topic` can't use.
1. We want to use this api to judge if there's messages to receive, like
that pseudo code
if (consumer.hasMessage()) {
.submit(() -> {
consumer.pollMessagesAccordingToTheDistributedFlowControl()
})
}
Matteo Merli <[email protected]> ??2021??10??26?????? ????12:15??????
> I'm a bit hesitant about this because I think there are already at
> least 3 different ways to handle similar scenarios.
>
> 1. Using listener and avoid calling receive directly
> 2. Use multi-topic consumer, so there's a single `Consumer` instance
> exposed
> 3. Use `consumer.receive(0, TimeUnit.SECONDS)` to probe for message
>
>
> --
> Matteo Merli
> <[email protected]>
>
> On Mon, Oct 25, 2021 at 7:34 PM ZhangJian He <[email protected]> wrote:
> >
> > I think it's better to add the method to Consumer interface instead of
> let
> > user casting it to `ConsumerBase`.
> > `peek` is most complexly, for the reason, I can use the `peek`
object to
> > ack??negative ack, but when to remove from the `BlockingQueue`?
> > IMHO, people use this api are just to judge if has the message,
> otherwise,
> > they can just use `receive(0,TimeUnit)
> >
> > JiangHaiting <[email protected]> ??2021??10??26??????
????10:19??????
> >
> > > Can this method
> > >
"org.apache.pulsar.client.impl.ConsumerBase#getTotalIncomingMessages"
> do
> > > the trick? Though you have to change the type to ConsumerBase.
> > >
> > >
> > > And maybe `peek` is more suitable and useful to add to the
Consumer
> > > interface??
> > >
> > >
> > >
> > >
> > >
> > >
> > > ------------------&nbsp;Original&nbsp;------------------
> > > From:
> >
>
"dev"
> >
>
<
> > > [email protected]&gt;;
> > > Date:&nbsp;Mon, Oct 25, 2021 07:24 PM
> > > To:&nbsp;"dev"<[email protected]&gt;;
> > >
> > > Subject:&nbsp;[DISCUSSION] PIP-108: Add method to help user
judge if
> > > consumer queue has message
> > >
> > >
> > >
> > > https://github.com/apache/pulsar/issues/12479
> > >
> > > --- Pasted here for quoting convenience ---
> > >
> > > ## Motivation
> > > Currently, I have an application that manages ten thousand of
> consumers,
> > > and a logic to schedule consumers's receive. It would be helpful
to
> know if
> > > one of the consumers have message to recive.
> > >
> > > ## Goal
> > > To make `Consumer` can judge if there are unreceiving messages
> > >
> > > ## API Changes
> > >
> > > Add `hasMessageInReceiverQueue` on the `Consumer` interface.
> > >
> > > ## Implementation
> > >
> > > For `ZeroQueueConsumerImpl` return false, Others, judge the
> > > `receiveQueueSize` greater than zero.
> > >
> > >
> > > ## Reject Alternatives
> > >
> > > No alternatives yet.
> > >
> > >
> > >
> > >
> > > ---
> > > Thanks,
> > > Haiting Jiang (Github: Jason918)
>
---
Thanks,
Haiting Jiang (Github: Jason918)