HI folks, I found your email from the apache pulsar website. I need your help with the following problem that we are facing
Lets say I have a publisher which ran normally first and produced 20 messages to the topic import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; public class TestPub { public static void main(String[] args) throws PulsarClientException, InterruptedException { PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://localhost:6650") .build(); Producer<byte[]> producer = client.newProducer() .topic("example-topic") .create(); for (int i = 0; i < 20; i++) { String msg = "my-message"+System.currentTimeMillis(); producer.newMessage() .value(msg.getBytes()) .send(); System.out.println("Sent:"+msg); Thread.sleep(1000); } } } Now I need to launch the subscriber but I only care about the last message that was saved to the topic - I thought that specifying latest in consumer or reader should help but it looks like it doesn't work for me, looks like latest means next message that will be sent to the topic when consumer or reader subscribed, is there a way then for me to find the message before the latest? import org.apache.pulsar.client.api.*; public class TestSub { public static void main(String[] args) throws PulsarClientException { PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://localhost:6650") .build(); read(client); consume(client); } static void consume(PulsarClient client) throws PulsarClientException { Consumer consumer = client.newConsumer() .topic("example-topic") .subscriptionName("my-subscription-consumer") .subscriptionType(SubscriptionType.Exclusive) .subscriptionInitialPosition(SubscriptionInitialPosition.Latest) .startMessageIdInclusive() .subscribe(); //consumer.seek(MessageId.latest); while (true) { // Wait for a message Message msg = consumer.receive(); try { System.out.println("Message consumed: " + new String(msg.getData())); //consumer.acknowledge(msg); } catch (Exception e) { consumer.negativeAcknowledge(msg); } } } static void read(PulsarClient client) throws PulsarClientException { Reader consumer = client.newReader() .topic("example-topic") .subscriptionName("my-subscription-reader") .startMessageIdInclusive() .startMessageId(MessageId.latest) .create() ; while (true) { // Wait for a message Message msg = consumer.readNext(); try { System.out.println("Message read: " + new String(msg.getData())); //consumer.acknowledge(msg); } catch (Exception e) { System.out.println(e); } } } } Thanks, Andrey This e-mail, including any attachments, is confidential and may be privileged and is for the intended recipient(s) only. If received in error, please immediately delete this email and any attachments and contact the sender. Unauthorized copying, use or disclosure of this email or its content or attachments is prohibited. For full email disclaimer, click here. To unsubscribe from receiving commercial electronic messages from The Bank of Nova Scotia, or from certain of its affiliates, including Scotia iTRADE and the Scotia Wealth Management businesses, please click here. Pour obtenir la traduction en français, cliquez ici. Haga clic aquí para ver la traducción al español.