Thanks @Mattison and @Penghui, Good idea! I consider fixing this in client, then I will improve this in the broker.
> This pip is to add a new field to the CommandSubscribe to let the broker know how to set the cursor position. I am also interested in this. PengHui Li <peng...@apache.org> 于2022年3月14日周一 23:33写道: > Thanks @Mattison. > > Yes, It's a long term fix solution, the protocol changes need to upgrade > the broker and client > and we can only apply the protocol changes to a major release. > > I think Zixuan is trying to find a short-term fix without protocol changes. > > Penghui > > On Mon, Mar 14, 2022 at 5:59 PM mattison chao <mattisonc...@gmail.com> > wrote: > > > Hi, Zixuan, > > > > Very glad you can notice this problem. I am preparing to submit a PIP to > > resolve this issue. > > > > This pip is to add a new field to the CommandSubscribe to let the broker > > know how to set the cursor position. > > > > Feel free to give me more good advice and let me know what you think. > > > > Best, > > Mattison > > > > > On Mar 14, 2022, at 5:38 PM, PengHui Li <peng...@apache.org> wrote: > > > > > > Hi Zixuan, > > > > > > We are not able to add `hasMessageAvailable()` check before > `readNext()` > > > method. > > > For some cases, they care about if they need to do something if no > > messages > > > are available, > > > but some cases are not, just continues to read the messages from the > > topic, > > > so they don't need > > > to check if there are messages int the topic, just waiting for new > > messages > > > coming. > > > > > > Thanks, > > > Penghui > > > > > > On Mon, Mar 14, 2022 at 4:52 PM Zixuan Liu <node...@gmail.com> wrote: > > > > > >> Hi Pulsar community, > > >> > > >> Currently, the reader support sets the start message id and enables > the > > >> inclusive. > > >> > > >> I have a case that always read the latest message, set the > > MessageId.latest > > >> as start message id with enabling the inclusive, the code so like: > > >> ``` > > >> @Test > > >> public void testReaderGetsLatestInclusiveMessage() throws Exception { > > >> String key = "ReaderGetsLatestInclusiveMessageTest"; > > >> final String subscriptionName = "my-ex-subscription-" + key; > > >> final String messagePredicate = "my-message-" + key + "-"; > > >> final int totalMessages = 10; > > >> > > >> final String topicName = "persistent://prop-xyz/ns1/topic-1-" + > > >> key; > > >> > > >> Producer<byte[]> producer = > > >> pulsarClient.newProducer().topic(topicName) > > >> .enableBatching(false) > > >> .messageRoutingMode(MessageRoutingMode.SinglePartition) > > >> .create(); > > >> > > >> Reader<byte[]> reader = pulsarClient.newReader() > > >> .topic(topicName) > > >> .subscriptionName(subscriptionName) > > >> .receiverQueueSize(100) > > >> .startMessageId(MessageId.latest) > > >> .startMessageIdInclusive() > > >> .create(); > > >> > > >> List<Future<MessageId>> futures = Lists.newArrayList(); > > >> for (int i = 0; i < totalMessages; i++) { > > >> futures.add(producer.sendAsync((messagePredicate + > > "producer-" > > >> + i).getBytes())); > > >> } > > >> log.info("Waiting for async publish to complete : {}", > > >> futures.size()); > > >> for (Future<MessageId> future : futures) { > > >> future.get(); > > >> } > > >> > > >> int count = 0; > > >> while (true) { > > >> Message<byte[]> message = reader.readNext(1, > > TimeUnit.SECONDS); > > >> if (message == null) { > > >> break; > > >> } > > >> count++; > > >> } > > >> assertEquals(count, 1); > > >> reader.close(); > > >> producer.close(); > > >> } > > >> ``` > > >> > > >> This test result: expected 1, but got 10. > > >> > > >> This issue can fix by call `reader.hasMessageAvailable();` before > > >> `readNext()`, I consider if we can add the call > `hasMessageAvailable()` > > in > > >> `readNext()`. > > >> > > >> The `hasMessageAvailable()` checks can be used by an application to > scan > > >> through a topic and stop when the reader reaches the current last > > published > > >> message. > > >> > > >> -------- > > >> Thanks, > > >> Zixuan > > >> > > > > >