Hi Kafka, I have a trident topology in storm which consumes data from kafka. Now i am seeing an issue in KafkaSpout. This is not consuming the very first tthe first uncommitted offset data from kafka.
My storm version is 1.1.1 and kafka version is 0.11.0.0. I have a topic say X and partition of the topic is 3. I have following configuration to consume data using KafkaSpout KafkaSpoutConfig<String, String> kafkaConfig = KafkaSpoutConfig.builder(PropertyUtil.getStringValue(PropertyUtil.KAFKA_BROKERS), PropertyUtil.getStringValue(PropertyUtil.TOPIC_NAME)) .setProp(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "4194304") .setProp(ConsumerConfig.GROUP_ID_CONFIG,PropertyUtil.getStringValue(PropertyUtil.CONSUMER_ID)) .setProp(ConsumerConfig.RECEIVE_BUFFER_CONFIG, "4194304") .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_LATEST) .build(); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new KafkaSpout<String,String>(kafkaConfig),3); Following are my test cases 1. Processor started with new consumer id. The very first time it starts to read the data from latest. Fine. 2. Sending some messages to kafka and i am seeing all the messages are consumed by my trident topology. 3. Stopped my trident topology. 4. Sending some messages to kafka (partition_0). Say example > msg_1 > msg_2 > msg_3 > msg_4 > msg_5 5. Started the topology. And kafkaspout consumes the data from msg_2. It is not consuming the msg_1. 6. Stopped the topology. 7. Sending some messages to kafka to all the partitions (_0, _1, _2). Say example Partition_0 > msg_6 > msg_7 > msg_8 Partition_1 > msg_9 > msg_10 > msg_11 Partition_2 > msg_12 > msg_13 > msg_14 8. Started the topology. And kafkaspout consumes following messages > msg_7 > msg_8 > msg_10 > msg_11 > msg_13 > msg_14 It skipped the earliest uncommitted message in each partition. Below is the definitions of UNCOMMITTED_LATEST in JavaDoc. UNCOMMITTED_LATEST means that the kafka spout polls records from the last committed offset, if any. If no offset has been committed, it behaves as LATEST. As per the definitions, it should read from last committed offset. But it looks like it is reading from uncommitted earliest + 1. I meant the pointer seems to be wrong. Please have a look and let me know if anything wrong in my tests. I am expecting a response from you, even it is not an issue. Thanks, Senthil