Thanks @Mattison and @Penghui,

Good idea! I consider fixing this in client, then I will improve this in
the broker.

> This pip is to add a new field to the CommandSubscribe to let the broker
know how to set the cursor position.
I am also interested in this.

PengHui Li <peng...@apache.org> 于2022年3月14日周一 23:33写道:

> Thanks @Mattison.
>
> Yes, It's a long term fix solution, the protocol changes need to upgrade
> the broker and client
> and we can only apply the protocol changes to a major release.
>
> I think Zixuan is trying to find a short-term fix without protocol changes.
>
> Penghui
>
> On Mon, Mar 14, 2022 at 5:59 PM mattison chao <mattisonc...@gmail.com>
> wrote:
>
> > Hi, Zixuan,
> >
> > Very glad you can notice this problem. I am preparing to submit a PIP to
> > resolve this issue.
> >
> > This pip is to add a new field to the CommandSubscribe to let the broker
> > know how to set the cursor position.
> >
> > Feel free to give me more good advice and let me know what you think.
> >
> > Best,
> > Mattison
> >
> > > On Mar 14, 2022, at 5:38 PM, PengHui Li <peng...@apache.org> wrote:
> > >
> > > Hi Zixuan,
> > >
> > > We are not able to add `hasMessageAvailable()` check before
> `readNext()`
> > > method.
> > > For some cases, they care about if they need to do something if no
> > messages
> > > are available,
> > > but some cases are not, just continues to read the messages from the
> > topic,
> > > so they don't need
> > > to check if there are messages int the topic, just waiting for new
> > messages
> > > coming.
> > >
> > > Thanks,
> > > Penghui
> > >
> > > On Mon, Mar 14, 2022 at 4:52 PM Zixuan Liu <node...@gmail.com> wrote:
> > >
> > >> Hi Pulsar community,
> > >>
> > >> Currently, the reader support sets the start message id and enables
> the
> > >> inclusive.
> > >>
> > >> I have a case that always read the latest message, set the
> > MessageId.latest
> > >> as start message id with enabling the inclusive, the code so like:
> > >> ```
> > >> @Test
> > >> public void testReaderGetsLatestInclusiveMessage() throws Exception {
> > >>        String key = "ReaderGetsLatestInclusiveMessageTest";
> > >>        final String subscriptionName = "my-ex-subscription-" + key;
> > >>        final String messagePredicate = "my-message-" + key + "-";
> > >>        final int totalMessages = 10;
> > >>
> > >>        final String topicName = "persistent://prop-xyz/ns1/topic-1-" +
> > >> key;
> > >>
> > >>        Producer<byte[]> producer =
> > >> pulsarClient.newProducer().topic(topicName)
> > >>                .enableBatching(false)
> > >>                .messageRoutingMode(MessageRoutingMode.SinglePartition)
> > >>                .create();
> > >>
> > >>        Reader<byte[]> reader = pulsarClient.newReader()
> > >>                .topic(topicName)
> > >>                .subscriptionName(subscriptionName)
> > >>                .receiverQueueSize(100)
> > >>                .startMessageId(MessageId.latest)
> > >>                .startMessageIdInclusive()
> > >>                .create();
> > >>
> > >>        List<Future<MessageId>> futures = Lists.newArrayList();
> > >>        for (int i = 0; i < totalMessages; i++) {
> > >>            futures.add(producer.sendAsync((messagePredicate +
> > "producer-"
> > >> + i).getBytes()));
> > >>        }
> > >>        log.info("Waiting for async publish to complete : {}",
> > >> futures.size());
> > >>        for (Future<MessageId> future : futures) {
> > >>            future.get();
> > >>        }
> > >>
> > >>        int count = 0;
> > >>        while (true) {
> > >>            Message<byte[]> message = reader.readNext(1,
> > TimeUnit.SECONDS);
> > >>            if (message == null) {
> > >>                break;
> > >>            }
> > >>            count++;
> > >>        }
> > >>        assertEquals(count, 1);
> > >>        reader.close();
> > >>        producer.close();
> > >>    }
> > >> ```
> > >>
> > >> This test result: expected 1, but got 10.
> > >>
> > >> This issue can fix by call `reader.hasMessageAvailable();` before
> > >> `readNext()`, I consider if we can add the call
> `hasMessageAvailable()`
> > in
> > >> `readNext()`.
> > >>
> > >> The `hasMessageAvailable()` checks can be used by an application to
> scan
> > >> through a topic and stop when the reader reaches the current last
> > published
> > >> message.
> > >>
> > >> --------
> > >> Thanks,
> > >> Zixuan
> > >>
> >
> >
>

Reply via email to