Hi everyone,

What is the best practice to  launch multiple Kafka Consumers that will be 
running forever ?

So far I have a single class that is called from a command line (.sh) launcher 
executed at the OS boot time, and that class main(String[] args) method will be 
starting many Consumers.

Should I make each Consumer to implement Runnable/extend Thread) and start each 
consumer within a Thread  or is there another approach ?

here is the code example:

----------
public class MyConsumer extends Thread  {

private static AtomicBoolean closed = new AtomicBoolean(false);

private static KafkaConsumer<String, String> consumer = 
KafkaUtil.getNewConsumer("CONSUMER_GROUP_NAME");

    static {
        attachShutDownHook();
    }

public void run() {
consume();
}

    public static void consume() {
        try {

            while (true) {
                consumer.subscribe(Arrays.asList("TOPIC_NAME"));

                ConsumerRecords < String, String > records = consumer.poll(100);

                for (ConsumerRecord < String, String > record: records) {

                    // do stuff with the message

                }

                consumer.commitSync();

            } //end while

        } catch (WakeupException e) {
            if (!closed.get())
                throw e;
        } finally {
            consumer.close();
        }

    }

    public static void shutdown() {
        consumer.unsubscribe();
        closed.set(true);
        consumer.wakeup();
    }
// the shutdown Hook will properly close and clean stuff when a shutdown happens
    private static void attachShutDownHook() {
        Runtime.getRuntime().addShutdownHook(new Thread() {
            public void run() {
                    shutdown();
                } // end run
        });
    }

}
----------

--
IPVP

Reply via email to