Hi Team, Does any one use Kangaroo Project which uses SimpleConsumer of Kafka API, that provides support to integrate with MapReduce Jobs. Facing one issue related to getting an offsets for specific consumer. Also after Re launching, the committed offsets for a consumer is to be read, and start from that point to generate the next Input Split.
My environment is - CDH - CDH-5.5 KAFKA - KAFKA-0.8.2 In my case , One broker created a new topic with one partition, and in that we have 10 messages. Now , I launched my map reduce, with Kangaroo's Kafkainputformat, but not able to get any Inputsplit, may be beacuse my offsets are set or retrived correctly. Any clue what could get wrong on this ? below is the job details KafkaJobBuilder kafkaJobBuilder = KafkaJobBuilder.newBuilder(); kafkaJobBuilder.setJobName("amit-topic-kafka-hadoop"); kafkaJobBuilder.setZkConnect(zkString); kafkaJobBuilder.addQueueInput(topicName, consumerGroup, KafkaReaderMap.class); kafkaJobBuilder.setNumReduceTasks(0); kafkaJobBuilder.setKafkaFetchSizeBytes(1000000); kafkaJobBuilder.setMapOutputKeyClass(Text.class); kafkaJobBuilder.setMapOutputValueClass(Text.class); kafkaJobBuilder.setTextFileOutputFormat(outputPath); // Configuration configuration = new Configuration(); getConf().setLong("kafka.timestamp.offset", System.currentTimeMillis()); getConf().setInt("kafka.max.splits.per.partition", Integer.MAX_VALUE); Thanks Amit