Hello, Could someone reply me if I made a mistake here or not. Did I make a rookie mistake here ? Is this not the correct forum here to put my question ? It's a bit strange to have this error : Exception in thread "main" java.lang.IllegalStateException: This consumer has already been closed On the net there is also not much feedback on this error. Kind regards, Koen
On Sun, Oct 23, 2016 at 6:10 PM, Koen Vantomme <koen.vanto...@gmail.com> wrote: > Hello, > > I'm creating a simple consumer in JAVA, the first time I run the consumer > it works fine. > I stop the application. When I want to rerun the consumer I get error > message "This consumer has already been closed" > > Any suggestions ? > Regards, > Koen > > 2016-10-23 17:17:34,261 [main] INFO AppInfoParser - Kafka commitId : > 23c69d62a0cabf06 > Exception in thread "main" java.lang.IllegalStateException: This consumer > has already been closed. > at org.apache.kafka.clients.consumer.KafkaConsumer. > ensureNotClosed(KafkaConsumer.java:1310) > at org.apache.kafka.clients.consumer.KafkaConsumer. > acquire(KafkaConsumer.java:1321) > at org.apache.kafka.clients.consumer.KafkaConsumer.poll( > KafkaConsumer.java:844) > > > The code : > > String topic ="testmetrics"; > String group ="cg1"; > > Properties props = new Properties(); > props.put("bootstrap.servers", "localhost:9092"); > props.put("group.id", group); > props.put("enable.auto.commit", "true"); > props.put("auto.commit.interval.ms", "1000"); > props.put("session.timeout.ms", "30000"); > props.put("key.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > props.put("value.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > > KafkaConsumer<String, String> consumer = new KafkaConsumer<String, > String>(props); > > > consumer.subscribe(Arrays.asList(topic)); > System.out.println("Subscribed to topic " + topic); > int i = 0; > > while (true) { > ConsumerRecords<String, String> records = consumer.poll(100); > for (ConsumerRecord<String, String> record : records) > { > System.out.printf("offset = %d, key = %s, value = %s\n", > record.offset(), record.key(), record.value()); > } > c > >