Hi, Zixuan,

Very glad you can notice this problem. I am preparing to submit a PIP to 
resolve this issue.

This pip is to add a new field to the CommandSubscribe to let the broker know 
how to set the cursor position.

Feel free to give me more good advice and let me know what you think.

Best,
Mattison

> On Mar 14, 2022, at 5:38 PM, PengHui Li <peng...@apache.org> wrote:
> 
> Hi Zixuan,
> 
> We are not able to add `hasMessageAvailable()` check before `readNext()`
> method.
> For some cases, they care about if they need to do something if no messages
> are available,
> but some cases are not, just continues to read the messages from the topic,
> so they don't need
> to check if there are messages int the topic, just waiting for new messages
> coming.
> 
> Thanks,
> Penghui
> 
> On Mon, Mar 14, 2022 at 4:52 PM Zixuan Liu <node...@gmail.com> wrote:
> 
>> Hi Pulsar community,
>> 
>> Currently, the reader support sets the start message id and enables the
>> inclusive.
>> 
>> I have a case that always read the latest message, set the MessageId.latest
>> as start message id with enabling the inclusive, the code so like:
>> ```
>> @Test
>> public void testReaderGetsLatestInclusiveMessage() throws Exception {
>>        String key = "ReaderGetsLatestInclusiveMessageTest";
>>        final String subscriptionName = "my-ex-subscription-" + key;
>>        final String messagePredicate = "my-message-" + key + "-";
>>        final int totalMessages = 10;
>> 
>>        final String topicName = "persistent://prop-xyz/ns1/topic-1-" +
>> key;
>> 
>>        Producer<byte[]> producer =
>> pulsarClient.newProducer().topic(topicName)
>>                .enableBatching(false)
>>                .messageRoutingMode(MessageRoutingMode.SinglePartition)
>>                .create();
>> 
>>        Reader<byte[]> reader = pulsarClient.newReader()
>>                .topic(topicName)
>>                .subscriptionName(subscriptionName)
>>                .receiverQueueSize(100)
>>                .startMessageId(MessageId.latest)
>>                .startMessageIdInclusive()
>>                .create();
>> 
>>        List<Future<MessageId>> futures = Lists.newArrayList();
>>        for (int i = 0; i < totalMessages; i++) {
>>            futures.add(producer.sendAsync((messagePredicate + "producer-"
>> + i).getBytes()));
>>        }
>>        log.info("Waiting for async publish to complete : {}",
>> futures.size());
>>        for (Future<MessageId> future : futures) {
>>            future.get();
>>        }
>> 
>>        int count = 0;
>>        while (true) {
>>            Message<byte[]> message = reader.readNext(1, TimeUnit.SECONDS);
>>            if (message == null) {
>>                break;
>>            }
>>            count++;
>>        }
>>        assertEquals(count, 1);
>>        reader.close();
>>        producer.close();
>>    }
>> ```
>> 
>> This test result: expected 1, but got 10.
>> 
>> This issue can fix by call `reader.hasMessageAvailable();` before
>> `readNext()`, I consider if we can add the call `hasMessageAvailable()` in
>> `readNext()`.
>> 
>> The `hasMessageAvailable()` checks can be used by an application to scan
>> through a topic and stop when the reader reaches the current last published
>> message.
>> 
>> --------
>> Thanks,
>> Zixuan
>> 

Reply via email to