Hi, I'm on 8.2.2. and can't use recent Consumer API described in "Kafka definitive guide". My version of kafka-clients has stubs for consumer methods. This is why I'm using "old" high level consumer API and it doesn't work well for me. I have single actor that tries to read messages from single kafka topic with single kafka partition
Here is my code: def consume(numberOfEvents: Int, await: Duration = 100.millis): List[MessageEnvelope] = { val consumerProperties = new Properties() consumerProperties.put("zookeeper.connect", kafkaConfig.zooKeeperConnectString) consumerProperties.put("group.id", consumerGroup) consumerProperties.put("auto.offset.reset", "smallest") val consumer = Consumer.create(new ConsumerConfig(consumerProperties)) try { val messageStreams = consumer.createMessageStreams( Predef.Map(kafkaConfig.topic -> 1), new DefaultDecoder, new MessageEnvelopeDecoder) val receiveMessageFuture = Future[List[MessageEnvelope]] { messageStreams(kafkaConfig.topic) .flatMap(stream => stream.take(numberOfEvents).map(_.message())) } Await.result(receiveMessageFuture, await) } finally { consumer.shutdown() } It works fine, but I suppose I should reuse consumerConnector instance val consumer = Consumer.create(new ConsumerConfig(consumerProperties)) I shouldn't create it for each "message poll" I tried to have single instance of ConsumerConnector and messageStreams for my singleton consumer actor. It didn't go well. Exception is thrown 2017-04-17_20:02:44.236 WARN MessageEnvelopeConsumer - Error while consuming messages kafka.common.MessageStreamsExistException: ZookeeperConsumerConnector can create message streams at most once at kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:151) at MessageEnvelopeConsumer.consume(MessageEnvelopeConsumer.scala:47) at this line: messageStreams(kafkaConfig.topic) .flatMap(stream => stream.take(numberOfEvents).map(_.message())) Then I tried to reuse only consumer and create messageStream each time I poll messages. Didn't go well, exception is: 2017-04-17_20:02:44.236 WARN MessageEnvelopeConsumer - Error while consuming messages kafka.common.MessageStreamsExistException: ZookeeperConsumerConnector can create message streams at most once at kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:151) at MessageEnvelopeConsumer.consume(MessageEnvelopeConsumer.scala:47) Exception is obvious to me, but I don't create two consumer instances. I have loggers, counters in my test and I'm 100% I do not call Consumer.create(new ConsumerConfig(consumerProperties)) twice during test