Hello, I'm creating a simple consumer in JAVA, the first time I run the consumer it works fine. I stop the application. When I want to rerun the consumer I get error message "This consumer has already been closed"
Any suggestions ? Regards, Koen 2016-10-23 17:17:34,261 [main] INFO AppInfoParser - Kafka commitId : 23c69d62a0cabf06 Exception in thread "main" java.lang.IllegalStateException: This consumer has already been closed. at org.apache.kafka.clients.consumer.KafkaConsumer.ensureNotClosed(KafkaConsumer.java:1310) at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1321) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:844) The code : String topic ="testmetrics"; String group ="cg1"; Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", group); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList(topic)); System.out.println("Subscribed to topic " + topic); int i = 0; while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); } c