Note that I did work around this issue, by including the entire Kafka:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.8.2.2</version>
</dependency>

dependency, and using the legacy Consumer API,
instead of the kafka-clients dependency:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.8.2.2</version>
</dependency>

Listed in the documentation:

http://kafka.apache.org/documentation.html#producerapi

Since this dependency is called out right before it outlines the Consumer
API, and the Consumer API docs don't mention that the Consumer API in the
kafka-clients dependency is broken, it might be helpful if documentation
points at that the kafka-clients dependency contains a broken Consumer, and
the kafka_2.10 dependency should be used to access the legacy api.

Take care,
  -stu

On Fri, Oct 30, 2015 at 2:07 PM, Stu Smith <stu26c...@gmail.com> wrote:

> Hello!
>
>   I'm running into trouble using the latest Kafka client.
>
> 0.8.2.2 appears to be listed as a stable release on Maven Central:
>
> http://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
>
> And it only includes the:
>
> org.apache.kafka.clients.consumer.KafkaConsumer client
>
> All the other Consumers listed in the documents do not appear to be
> available in this release.
>
> However, in the example code for the 0.8.2.2 branch, it covers the
> ConsumerConnector client:
>
>
> https://github.com/apache/kafka/blob/0.8.2.2/examples/src/main/java/kafka/examples/Consumer.java
>
> Which no longer exists in the 0.8.2.2 release.
>
> The KafkaConsumer client I get always returns null on poll(), similar to
> behavior reported for the 0.8.2 branch (but not the 0.8.2.2 branch):
>
>
> http://mail-archives.apache.org/mod_mbox/kafka-users/201502.mbox/%3CCAOeJiJj-c747Ak99qioytrD4=E24W8SiVqgx=ooqfkvdb7+...@mail.gmail.com%3E
>
> So it appear to 0.8.2.2 shipped with an old, broken KafkaConsumer client,
> but removed the older, working ConsumerConnector / MessageStream interface.
>
> Is KafkaConsumer expected to work in 0.8.2.2 ?
> Or are we expected to use the old client, and I'm somehow not seeing the
> package?
>
> I confirmed I have messages waiting by using the java producer api, and
> listening with the consoleConsumer application, and it happily prints
> whatever the producer sends. However, the ConsoleConsumer appears to be
> using the scala API, so it can't provide any leads on how to use the java
> one.
>
> Or am I doing something wrong ?
>
> scannerKafkaProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092");
> scannerKafkaProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
> "true");
> scannerKafkaProperties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
> "10");
> scannerKafkaProperties.put(ConsumerConfig.SESSION_TIMEOUT_MS, "30000");
> scannerKafkaProperties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY,
> "roundrobin");
> scannerKafkaProperties.put("zookeeper.session.timeout.ms", "400");
> scannerKafkaProperties.put("zookeeper.sync.time.ms", "200");
> scannerKafkaProperties.put("zookeeper.connect","localhost:2181");
>
> ...
> private static final String DESERIALIZER =
> "org.apache.kafka.common.serialization.StringDeserializer";
> private static final String SERIALIZER =
> "org.apache.kafka.common.serialization.StringSerializer";...
>
> kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> DESERIALIZER);
> kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> DESERIALIZER);
> kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
> SERIALIZER);
> kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> SERIALIZER);
> kafkaProperties.put("offsets.storage","kafka");
> kafkaProperties.put("dual.commit.enabled", "false");
> ...
> this.kafkaConsumer = new KafkaConsumer<>( kafkaProperties );
> this.kafkaProducer = new KafkaProducer<>( kafkaProperties );
> ...
> TopicPartition topicPartition = new TopicPartition(this.topic,0);
> this.kafkaConsumer.subscribe(topicPartition);
> ...
> while( this.running ) {
> Map<String, ConsumerRecords<String, String>> messages =
> this.kafkaConsumer.poll(messageWaitTimeout);
> if( messages == null ) {
>     //this.log.debug("Finished polling, no messages received.");
>     for( int i = 0; i < 200; ++i ) {
>         this.kafkaProducer.send(new ProducerRecord<>(this.topic, 0,
> "test", "test"));
>     }
>     continue;
> }
> ....
> (And to re-iterate, the console consumer does pick up the messages, if I
> run it, but the Java API does not).
>
> Or is the 0.8.2.2 High-Level Java API simple not usable?
>
> Take care,
>   -stu
>

Reply via email to