Hi everyone, What is the best practice to launch multiple Kafka Consumers that will be running forever ?
So far I have a single class that is called from a command line (.sh) launcher executed at the OS boot time, and that class main(String[] args) method will be starting many Consumers. Should I make each Consumer to implement Runnable/extend Thread) and start each consumer within a Thread or is there another approach ? here is the code example: ---------- public class MyConsumer extends Thread { private static AtomicBoolean closed = new AtomicBoolean(false); private static KafkaConsumer<String, String> consumer = KafkaUtil.getNewConsumer("CONSUMER_GROUP_NAME"); static { attachShutDownHook(); } public void run() { consume(); } public static void consume() { try { while (true) { consumer.subscribe(Arrays.asList("TOPIC_NAME")); ConsumerRecords < String, String > records = consumer.poll(100); for (ConsumerRecord < String, String > record: records) { // do stuff with the message } consumer.commitSync(); } //end while } catch (WakeupException e) { if (!closed.get()) throw e; } finally { consumer.close(); } } public static void shutdown() { consumer.unsubscribe(); closed.set(true); consumer.wakeup(); } // the shutdown Hook will properly close and clean stuff when a shutdown happens private static void attachShutDownHook() { Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { shutdown(); } // end run }); } } ---------- -- IPVP