Hi Pulsar community,

Currently, the reader support sets the start message id and enables the
inclusive.

I have a case that always read the latest message, set the MessageId.latest
as start message id with enabling the inclusive, the code so like:
```
@Test
public void testReaderGetsLatestInclusiveMessage() throws Exception {
        String key = "ReaderGetsLatestInclusiveMessageTest";
        final String subscriptionName = "my-ex-subscription-" + key;
        final String messagePredicate = "my-message-" + key + "-";
        final int totalMessages = 10;

        final String topicName = "persistent://prop-xyz/ns1/topic-1-" + key;

        Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName)
                .enableBatching(false)
                .messageRoutingMode(MessageRoutingMode.SinglePartition)
                .create();

        Reader<byte[]> reader = pulsarClient.newReader()
                .topic(topicName)
                .subscriptionName(subscriptionName)
                .receiverQueueSize(100)
                .startMessageId(MessageId.latest)
                .startMessageIdInclusive()
                .create();

        List<Future<MessageId>> futures = Lists.newArrayList();
        for (int i = 0; i < totalMessages; i++) {
            futures.add(producer.sendAsync((messagePredicate + "producer-"
+ i).getBytes()));
        }
        log.info("Waiting for async publish to complete : {}",
futures.size());
        for (Future<MessageId> future : futures) {
            future.get();
        }

        int count = 0;
        while (true) {
            Message<byte[]> message = reader.readNext(1, TimeUnit.SECONDS);
            if (message == null) {
                break;
            }
            count++;
        }
        assertEquals(count, 1);
        reader.close();
        producer.close();
    }
```

This test result: expected 1, but got 10.

This issue can fix by call `reader.hasMessageAvailable();` before
`readNext()`, I consider if we can add the call `hasMessageAvailable()` in
`readNext()`.

The `hasMessageAvailable()` checks can be used by an application to scan
through a topic and stop when the reader reaches the current last published
message.

--------
Thanks,
Zixuan

Reply via email to