Hi All, 


We are using kafka 2.10_0.9 new version, but consumer we are using old high 
level and low level api. 

I am trying to fetch earliest valid offset for topic, but it is returning 
latest offset if the data(log) is deleted after certain interval(which is 
configured in server properties) 

in previous version even if data gets deleted , we were able to get original 
earliest valid offset, I am using this code to get the offset 

suppose I have added 2 messages starting from offset 40 and 41, now file is 
deleted and now inserted new message at 42, this will create new log file, but 
when i fetch earlist valid offset I am getting as 42,I was expecting 40 


I am not committing offsets anywhere, I do not want to do that. 

PartitionOffsetRequestInfo partitionOffsetRequestInfo = new 
PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime(), 10 ); 
Map<TopicAndPartition, PartitionOffsetRequestInfo> map = new 
HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); 
map.put(topicAndPartition, partitionOffsetRequestInfo); 
OffsetRequest request = new OffsetRequest(map, ( short ) 0 , CLIENT_ID ); 
OffsetResponse startOffsets = simpleConsumer.getOffsetsBefore(request); 

if (startOffsets.offsets(topicName, partition).length > 0 ) { 
long validoffset = startOffsets.offsets(topicName, partition)[ 0 ]; 
simpleConsumer.close(); 
return validoffset; 
} 
simpleConsumer.close(); 


Can you help me on this? 


Thanks, 
snehalata 

Reply via email to