Hello,
Could someone reply me if I made a mistake here or not. Did I make a rookie
mistake here ? Is this not the correct forum here to put my question ?
It's a bit strange to have this error  : Exception in thread "main"
java.lang.IllegalStateException: This consumer has already been closed
On the net there is also not much feedback  on this error.
Kind regards,
Koen


On Sun, Oct 23, 2016 at 6:10 PM, Koen Vantomme <koen.vanto...@gmail.com>
wrote:

> Hello,
>
> I'm creating a simple consumer in JAVA, the first time I run the consumer
> it works fine.
> I stop the application. When I want to rerun the consumer I get error
> message "This consumer has already been closed"
>
> Any suggestions ?
> Regards,
> Koen
>
> 2016-10-23 17:17:34,261 [main] INFO   AppInfoParser - Kafka commitId :
> 23c69d62a0cabf06
> Exception in thread "main" java.lang.IllegalStateException: This consumer
> has already been closed.
> at org.apache.kafka.clients.consumer.KafkaConsumer.
> ensureNotClosed(KafkaConsumer.java:1310)
> at org.apache.kafka.clients.consumer.KafkaConsumer.
> acquire(KafkaConsumer.java:1321)
> at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:844)
>
>
> The code :
>
> String topic ="testmetrics";
> String group ="cg1";
>
> Properties props = new Properties();
> props.put("bootstrap.servers", "localhost:9092");
> props.put("group.id", group);
> props.put("enable.auto.commit", "true");
> props.put("auto.commit.interval.ms", "1000");
> props.put("session.timeout.ms", "30000");
> props.put("key.deserializer",
>         "org.apache.kafka.common.serialization.StringDeserializer");
> props.put("value.deserializer",
>         "org.apache.kafka.common.serialization.StringDeserializer");
>
> KafkaConsumer<String, String> consumer = new KafkaConsumer<String, 
> String>(props);
>
>
> consumer.subscribe(Arrays.asList(topic));
> System.out.println("Subscribed to topic " + topic);
> int i = 0;
>
> while (true) {
>     ConsumerRecords<String, String> records = consumer.poll(100);
>     for (ConsumerRecord<String, String> record : records)
>         {
>         System.out.printf("offset = %d, key = %s, value = %s\n",
>                 record.offset(), record.key(), record.value());
>         }
> c
>
>

Reply via email to