Note that I did work around this issue, by including the entire Kafka: <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.2.2</version> </dependency>
dependency, and using the legacy Consumer API, instead of the kafka-clients dependency: <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.8.2.2</version> </dependency> Listed in the documentation: http://kafka.apache.org/documentation.html#producerapi Since this dependency is called out right before it outlines the Consumer API, and the Consumer API docs don't mention that the Consumer API in the kafka-clients dependency is broken, it might be helpful if documentation points at that the kafka-clients dependency contains a broken Consumer, and the kafka_2.10 dependency should be used to access the legacy api. Take care, -stu On Fri, Oct 30, 2015 at 2:07 PM, Stu Smith <stu26c...@gmail.com> wrote: > Hello! > > I'm running into trouble using the latest Kafka client. > > 0.8.2.2 appears to be listed as a stable release on Maven Central: > > http://mvnrepository.com/artifact/org.apache.kafka/kafka-clients > > And it only includes the: > > org.apache.kafka.clients.consumer.KafkaConsumer client > > All the other Consumers listed in the documents do not appear to be > available in this release. > > However, in the example code for the 0.8.2.2 branch, it covers the > ConsumerConnector client: > > > https://github.com/apache/kafka/blob/0.8.2.2/examples/src/main/java/kafka/examples/Consumer.java > > Which no longer exists in the 0.8.2.2 release. > > The KafkaConsumer client I get always returns null on poll(), similar to > behavior reported for the 0.8.2 branch (but not the 0.8.2.2 branch): > > > http://mail-archives.apache.org/mod_mbox/kafka-users/201502.mbox/%3CCAOeJiJj-c747Ak99qioytrD4=E24W8SiVqgx=ooqfkvdb7+...@mail.gmail.com%3E > > So it appear to 0.8.2.2 shipped with an old, broken KafkaConsumer client, > but removed the older, working ConsumerConnector / MessageStream interface. > > Is KafkaConsumer expected to work in 0.8.2.2 ? > Or are we expected to use the old client, and I'm somehow not seeing the > package? > > I confirmed I have messages waiting by using the java producer api, and > listening with the consoleConsumer application, and it happily prints > whatever the producer sends. However, the ConsoleConsumer appears to be > using the scala API, so it can't provide any leads on how to use the java > one. > > Or am I doing something wrong ? > > scannerKafkaProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092"); > scannerKafkaProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, > "true"); > scannerKafkaProperties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, > "10"); > scannerKafkaProperties.put(ConsumerConfig.SESSION_TIMEOUT_MS, "30000"); > scannerKafkaProperties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, > "roundrobin"); > scannerKafkaProperties.put("zookeeper.session.timeout.ms", "400"); > scannerKafkaProperties.put("zookeeper.sync.time.ms", "200"); > scannerKafkaProperties.put("zookeeper.connect","localhost:2181"); > > ... > private static final String DESERIALIZER = > "org.apache.kafka.common.serialization.StringDeserializer"; > private static final String SERIALIZER = > "org.apache.kafka.common.serialization.StringSerializer";... > > kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > DESERIALIZER); > kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > DESERIALIZER); > kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > SERIALIZER); > kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > SERIALIZER); > kafkaProperties.put("offsets.storage","kafka"); > kafkaProperties.put("dual.commit.enabled", "false"); > ... > this.kafkaConsumer = new KafkaConsumer<>( kafkaProperties ); > this.kafkaProducer = new KafkaProducer<>( kafkaProperties ); > ... > TopicPartition topicPartition = new TopicPartition(this.topic,0); > this.kafkaConsumer.subscribe(topicPartition); > ... > while( this.running ) { > Map<String, ConsumerRecords<String, String>> messages = > this.kafkaConsumer.poll(messageWaitTimeout); > if( messages == null ) { > //this.log.debug("Finished polling, no messages received."); > for( int i = 0; i < 200; ++i ) { > this.kafkaProducer.send(new ProducerRecord<>(this.topic, 0, > "test", "test")); > } > continue; > } > .... > (And to re-iterate, the console consumer does pick up the messages, if I > run it, but the Java API does not). > > Or is the 0.8.2.2 High-Level Java API simple not usable? > > Take care, > -stu >