Hi Pulsar community,
Currently, the reader support sets the start message id and enables the
inclusive.
I have a case that always read the latest message, set the MessageId.latest
as start message id with enabling the inclusive, the code so like:
```
@Test
public void testReaderGetsLatestInclusiveMessage() throws Exception {
String key = "ReaderGetsLatestInclusiveMessageTest";
final String subscriptionName = "my-ex-subscription-" + key;
final String messagePredicate = "my-message-" + key + "-";
final int totalMessages = 10;
final String topicName = "persistent://prop-xyz/ns1/topic-1-" + key;
Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
Reader<byte[]> reader = pulsarClient.newReader()
.topic(topicName)
.subscriptionName(subscriptionName)
.receiverQueueSize(100)
.startMessageId(MessageId.latest)
.startMessageIdInclusive()
.create();
List<Future<MessageId>> futures = Lists.newArrayList();
for (int i = 0; i < totalMessages; i++) {
futures.add(producer.sendAsync((messagePredicate + "producer-"
+ i).getBytes()));
}
log.info("Waiting for async publish to complete : {}",
futures.size());
for (Future<MessageId> future : futures) {
future.get();
}
int count = 0;
while (true) {
Message<byte[]> message = reader.readNext(1, TimeUnit.SECONDS);
if (message == null) {
break;
}
count++;
}
assertEquals(count, 1);
reader.close();
producer.close();
}
```
This test result: expected 1, but got 10.
This issue can fix by call `reader.hasMessageAvailable();` before
`readNext()`, I consider if we can add the call `hasMessageAvailable()` in
`readNext()`.
The `hasMessageAvailable()` checks can be used by an application to scan
through a topic and stop when the reader reaches the current last published
message.
--------
Thanks,
Zixuan