Thanks @Mattison. Yes, It's a long term fix solution, the protocol changes need to upgrade the broker and client and we can only apply the protocol changes to a major release.
I think Zixuan is trying to find a short-term fix without protocol changes. Penghui On Mon, Mar 14, 2022 at 5:59 PM mattison chao <mattisonc...@gmail.com> wrote: > Hi, Zixuan, > > Very glad you can notice this problem. I am preparing to submit a PIP to > resolve this issue. > > This pip is to add a new field to the CommandSubscribe to let the broker > know how to set the cursor position. > > Feel free to give me more good advice and let me know what you think. > > Best, > Mattison > > > On Mar 14, 2022, at 5:38 PM, PengHui Li <peng...@apache.org> wrote: > > > > Hi Zixuan, > > > > We are not able to add `hasMessageAvailable()` check before `readNext()` > > method. > > For some cases, they care about if they need to do something if no > messages > > are available, > > but some cases are not, just continues to read the messages from the > topic, > > so they don't need > > to check if there are messages int the topic, just waiting for new > messages > > coming. > > > > Thanks, > > Penghui > > > > On Mon, Mar 14, 2022 at 4:52 PM Zixuan Liu <node...@gmail.com> wrote: > > > >> Hi Pulsar community, > >> > >> Currently, the reader support sets the start message id and enables the > >> inclusive. > >> > >> I have a case that always read the latest message, set the > MessageId.latest > >> as start message id with enabling the inclusive, the code so like: > >> ``` > >> @Test > >> public void testReaderGetsLatestInclusiveMessage() throws Exception { > >> String key = "ReaderGetsLatestInclusiveMessageTest"; > >> final String subscriptionName = "my-ex-subscription-" + key; > >> final String messagePredicate = "my-message-" + key + "-"; > >> final int totalMessages = 10; > >> > >> final String topicName = "persistent://prop-xyz/ns1/topic-1-" + > >> key; > >> > >> Producer<byte[]> producer = > >> pulsarClient.newProducer().topic(topicName) > >> .enableBatching(false) > >> .messageRoutingMode(MessageRoutingMode.SinglePartition) > >> .create(); > >> > >> Reader<byte[]> reader = pulsarClient.newReader() > >> .topic(topicName) > >> .subscriptionName(subscriptionName) > >> .receiverQueueSize(100) > >> .startMessageId(MessageId.latest) > >> .startMessageIdInclusive() > >> .create(); > >> > >> List<Future<MessageId>> futures = Lists.newArrayList(); > >> for (int i = 0; i < totalMessages; i++) { > >> futures.add(producer.sendAsync((messagePredicate + > "producer-" > >> + i).getBytes())); > >> } > >> log.info("Waiting for async publish to complete : {}", > >> futures.size()); > >> for (Future<MessageId> future : futures) { > >> future.get(); > >> } > >> > >> int count = 0; > >> while (true) { > >> Message<byte[]> message = reader.readNext(1, > TimeUnit.SECONDS); > >> if (message == null) { > >> break; > >> } > >> count++; > >> } > >> assertEquals(count, 1); > >> reader.close(); > >> producer.close(); > >> } > >> ``` > >> > >> This test result: expected 1, but got 10. > >> > >> This issue can fix by call `reader.hasMessageAvailable();` before > >> `readNext()`, I consider if we can add the call `hasMessageAvailable()` > in > >> `readNext()`. > >> > >> The `hasMessageAvailable()` checks can be used by an application to scan > >> through a topic and stop when the reader reaches the current last > published > >> message. > >> > >> -------- > >> Thanks, > >> Zixuan > >> > >