Thanks @Mattison.

Yes, It's a long term fix solution, the protocol changes need to upgrade
the broker and client
and we can only apply the protocol changes to a major release.

I think Zixuan is trying to find a short-term fix without protocol changes.

Penghui

On Mon, Mar 14, 2022 at 5:59 PM mattison chao <mattisonc...@gmail.com>
wrote:

> Hi, Zixuan,
>
> Very glad you can notice this problem. I am preparing to submit a PIP to
> resolve this issue.
>
> This pip is to add a new field to the CommandSubscribe to let the broker
> know how to set the cursor position.
>
> Feel free to give me more good advice and let me know what you think.
>
> Best,
> Mattison
>
> > On Mar 14, 2022, at 5:38 PM, PengHui Li <peng...@apache.org> wrote:
> >
> > Hi Zixuan,
> >
> > We are not able to add `hasMessageAvailable()` check before `readNext()`
> > method.
> > For some cases, they care about if they need to do something if no
> messages
> > are available,
> > but some cases are not, just continues to read the messages from the
> topic,
> > so they don't need
> > to check if there are messages int the topic, just waiting for new
> messages
> > coming.
> >
> > Thanks,
> > Penghui
> >
> > On Mon, Mar 14, 2022 at 4:52 PM Zixuan Liu <node...@gmail.com> wrote:
> >
> >> Hi Pulsar community,
> >>
> >> Currently, the reader support sets the start message id and enables the
> >> inclusive.
> >>
> >> I have a case that always read the latest message, set the
> MessageId.latest
> >> as start message id with enabling the inclusive, the code so like:
> >> ```
> >> @Test
> >> public void testReaderGetsLatestInclusiveMessage() throws Exception {
> >>        String key = "ReaderGetsLatestInclusiveMessageTest";
> >>        final String subscriptionName = "my-ex-subscription-" + key;
> >>        final String messagePredicate = "my-message-" + key + "-";
> >>        final int totalMessages = 10;
> >>
> >>        final String topicName = "persistent://prop-xyz/ns1/topic-1-" +
> >> key;
> >>
> >>        Producer<byte[]> producer =
> >> pulsarClient.newProducer().topic(topicName)
> >>                .enableBatching(false)
> >>                .messageRoutingMode(MessageRoutingMode.SinglePartition)
> >>                .create();
> >>
> >>        Reader<byte[]> reader = pulsarClient.newReader()
> >>                .topic(topicName)
> >>                .subscriptionName(subscriptionName)
> >>                .receiverQueueSize(100)
> >>                .startMessageId(MessageId.latest)
> >>                .startMessageIdInclusive()
> >>                .create();
> >>
> >>        List<Future<MessageId>> futures = Lists.newArrayList();
> >>        for (int i = 0; i < totalMessages; i++) {
> >>            futures.add(producer.sendAsync((messagePredicate +
> "producer-"
> >> + i).getBytes()));
> >>        }
> >>        log.info("Waiting for async publish to complete : {}",
> >> futures.size());
> >>        for (Future<MessageId> future : futures) {
> >>            future.get();
> >>        }
> >>
> >>        int count = 0;
> >>        while (true) {
> >>            Message<byte[]> message = reader.readNext(1,
> TimeUnit.SECONDS);
> >>            if (message == null) {
> >>                break;
> >>            }
> >>            count++;
> >>        }
> >>        assertEquals(count, 1);
> >>        reader.close();
> >>        producer.close();
> >>    }
> >> ```
> >>
> >> This test result: expected 1, but got 10.
> >>
> >> This issue can fix by call `reader.hasMessageAvailable();` before
> >> `readNext()`, I consider if we can add the call `hasMessageAvailable()`
> in
> >> `readNext()`.
> >>
> >> The `hasMessageAvailable()` checks can be used by an application to scan
> >> through a topic and stop when the reader reaches the current last
> published
> >> message.
> >>
> >> --------
> >> Thanks,
> >> Zixuan
> >>
>
>

Reply via email to