Hi Pulsar community, Currently, the reader support sets the start message id and enables the inclusive.
I have a case that always read the latest message, set the MessageId.latest as start message id with enabling the inclusive, the code so like: ``` @Test public void testReaderGetsLatestInclusiveMessage() throws Exception { String key = "ReaderGetsLatestInclusiveMessageTest"; final String subscriptionName = "my-ex-subscription-" + key; final String messagePredicate = "my-message-" + key + "-"; final int totalMessages = 10; final String topicName = "persistent://prop-xyz/ns1/topic-1-" + key; Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) .enableBatching(false) .messageRoutingMode(MessageRoutingMode.SinglePartition) .create(); Reader<byte[]> reader = pulsarClient.newReader() .topic(topicName) .subscriptionName(subscriptionName) .receiverQueueSize(100) .startMessageId(MessageId.latest) .startMessageIdInclusive() .create(); List<Future<MessageId>> futures = Lists.newArrayList(); for (int i = 0; i < totalMessages; i++) { futures.add(producer.sendAsync((messagePredicate + "producer-" + i).getBytes())); } log.info("Waiting for async publish to complete : {}", futures.size()); for (Future<MessageId> future : futures) { future.get(); } int count = 0; while (true) { Message<byte[]> message = reader.readNext(1, TimeUnit.SECONDS); if (message == null) { break; } count++; } assertEquals(count, 1); reader.close(); producer.close(); } ``` This test result: expected 1, but got 10. This issue can fix by call `reader.hasMessageAvailable();` before `readNext()`, I consider if we can add the call `hasMessageAvailable()` in `readNext()`. The `hasMessageAvailable()` checks can be used by an application to scan through a topic and stop when the reader reaches the current last published message. -------- Thanks, Zixuan