HI folks,

I found your email from the apache pulsar website. I need your help with the 
following problem that we are facing

Lets say I have a publisher which ran normally first and produced 20 messages 
to the topic

import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;

public class TestPub {
    public static void main(String[] args) throws PulsarClientException, 
InterruptedException {
        PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .build();
        Producer<byte[]> producer = client.newProducer()
                .topic("example-topic")
                .create();

        for (int i = 0; i < 20; i++)
         {
            String msg = "my-message"+System.currentTimeMillis();
            producer.newMessage()
                    .value(msg.getBytes())
                    .send();
            System.out.println("Sent:"+msg);
            Thread.sleep(1000);
        }
    }
}

Now I need to launch the subscriber but I only care about the last message that 
was saved to the topic - I thought that specifying latest in consumer or reader 
should help but it looks like it doesn't work for me, looks like latest means 
next message that will be sent to the topic when consumer or reader subscribed, 
is there a way then for me to find the message before the latest?


import org.apache.pulsar.client.api.*;

public class TestSub {
    public static void main(String[] args) throws PulsarClientException {
        PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .build();
        read(client);
        consume(client);
    }

    static void consume(PulsarClient client) throws PulsarClientException  {
        Consumer consumer = client.newConsumer()
                .topic("example-topic")
                .subscriptionName("my-subscription-consumer")
                .subscriptionType(SubscriptionType.Exclusive)
                .subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
                .startMessageIdInclusive()
                .subscribe();
        //consumer.seek(MessageId.latest);
        while (true) {
        // Wait for a message
            Message msg = consumer.receive();
            try {
                System.out.println("Message consumed: " +
                        new String(msg.getData()));
                //consumer.acknowledge(msg);
            } catch (Exception e) {
                consumer.negativeAcknowledge(msg);
            }
        }
    }

    static void read(PulsarClient client) throws PulsarClientException  {
        Reader consumer = client.newReader()
                .topic("example-topic")
                .subscriptionName("my-subscription-reader")
                .startMessageIdInclusive()
                .startMessageId(MessageId.latest)
                .create()
                ;
        while (true) {
            // Wait for a message
            Message msg = consumer.readNext();
            try {
                System.out.println("Message read: " +
                        new String(msg.getData()));
                //consumer.acknowledge(msg);
            } catch (Exception e) {
                System.out.println(e);
            }
        }

    }

}

Thanks,
Andrey


This e-mail, including any attachments, is confidential and may be privileged 
and is for the intended recipient(s) only. If received in error, please 
immediately delete this email and any attachments and contact the sender. 
Unauthorized copying, use or disclosure of this email or its content or 
attachments is prohibited. For full email disclaimer, click here.
To unsubscribe from receiving commercial electronic messages from The Bank of 
Nova Scotia, or from certain of its affiliates, including Scotia iTRADE and the 
Scotia Wealth Management businesses, please click here.
Pour obtenir la traduction en français, cliquez ici.
Haga clic aquí para ver la traducción al español.

Reply via email to