Hi Zixuan,

We are not able to add `hasMessageAvailable()` check before `readNext()`
method.
For some cases, they care about if they need to do something if no messages
are available,
but some cases are not, just continues to read the messages from the topic,
so they don't need
to check if there are messages int the topic, just waiting for new messages
coming.
Thanks,
Penghui

On Mon, Mar 14, 2022 at 4:52 PM Zixuan Liu <node...@gmail.com> wrote:

> Hi Pulsar community,
>
> Currently, the reader support sets the start message id and enables the
> inclusive.
>
> I have a case that always read the latest message, set the MessageId.latest
> as start message id with enabling the inclusive, the code so like:
> ```
> @Test
> public void testReaderGetsLatestInclusiveMessage() throws Exception {
>         String key = "ReaderGetsLatestInclusiveMessageTest";
>         final String subscriptionName = "my-ex-subscription-" + key;
>         final String messagePredicate = "my-message-" + key + "-";
>         final int totalMessages = 10;
>
>         final String topicName = "persistent://prop-xyz/ns1/topic-1-" +
> key;
>
>         Producer<byte[]> producer =
> pulsarClient.newProducer().topic(topicName)
>                 .enableBatching(false)
>                 .messageRoutingMode(MessageRoutingMode.SinglePartition)
>                 .create();
>
>         Reader<byte[]> reader = pulsarClient.newReader()
>                 .topic(topicName)
>                 .subscriptionName(subscriptionName)
>                 .receiverQueueSize(100)
>                 .startMessageId(MessageId.latest)
>                 .startMessageIdInclusive()
>                 .create();
>
>         List<Future<MessageId>> futures = Lists.newArrayList();
>         for (int i = 0; i < totalMessages; i++) {
>             futures.add(producer.sendAsync((messagePredicate + "producer-"
> + i).getBytes()));
>         }
>         log.info("Waiting for async publish to complete : {}",
> futures.size());
>         for (Future<MessageId> future : futures) {
>             future.get();
>         }
>
>         int count = 0;
>         while (true) {
>             Message<byte[]> message = reader.readNext(1, TimeUnit.SECONDS);
>             if (message == null) {
>                 break;
>             }
>             count++;
>         }
>         assertEquals(count, 1);
>         reader.close();
>         producer.close();
>     }
> ```
>
> This test result: expected 1, but got 10.
>
> This issue can fix by call `reader.hasMessageAvailable();` before
> `readNext()`, I consider if we can add the call `hasMessageAvailable()` in
> `readNext()`.
>
> The `hasMessageAvailable()` checks can be used by an application to scan
> through a topic and stop when the reader reaches the current last published
> message.
>
> --------
> Thanks,
> Zixuan
>

Reply via email to