Hi all

kafka version : kafka_2.11-0.11.0.2
   spark version : 2.0.1


A topic-partition "adn-tracking,15" in kafka who's earliest offset is1255644602 
andlatest offset is1271253441. 
While starting a spark streaming to process the data from the topic , we got a 
exception with "Got wrong record XXXX even afterseeking to offset 1266921577”. 
[    (earliest offset) 1255644602  1266921577   1271253441 ( latest offset ) ]
Finally, Ifound the following source code in class CachedKafkaCounsumer from 
spark-streaming. This is obviously due to the fact that the offset from 
consumer poll and the offset which the comsuner seek is not equal.
Here is the “ CachedKafkaCounsumer.scala” code:
def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = {
logDebug(s"Get $groupId $topic $partition nextOffset $nextOffset requested 
$offset") if (offset != nextOffset) {
logInfo(s"Initial fetch for $groupId $topic $partition $offset") seek(offset) 
poll(timeout) }
if (!buffer.hasNext()) { poll(timeout) } assert(buffer.hasNext(), s"Failed to 
get records for $groupId $topic $partition $offset after polling for $timeout") 
var record = buffer.next() if (record.offset != offset) { logInfo(s"Buffer miss 
for $groupId $topic $partition $offset") seek(offset) poll(timeout) 
assert(buffer.hasNext(), s"Failed to get records for $groupId $topic $partition 
$offset after polling for $timeout") record = buffer.next() 
assert(record.offset == offset, s"Got wrong record for $groupId $topic 
$partition even after seeking to offset $offset") } nextOffset = offset + 1 
record
}
I reproduce this problem, and found out that offset from one topicAndPartition 
is uncontinuous in Kafka。I think this is a bug that needs to be repaired.
Iimplemented a simple project to use consumer to seek offset 1266921577. But it 
return the offset1266921578. Then while seek to1266921576, it return 
the1266921576exactly。
 




There is the code:
public class consumerDemo {
public static void main(String[] argv){ 
Properties props = new Properties();
props.put("bootstrap.servers", "172.31.29.31:9091");
props.put("group.id", "consumer-tutorial-demo");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumerString, String consumer = new KafkaConsumerString, String(props);
TopicPartition tp = new TopicPartition("adn-tracking-click", 15);
CollectionTopicPartition collection = new ArrayListTopicPartition();
collection.add(tp); consumer.assign(collection);
consumer.seek(tp, 1266921576); ConsumerRecordsString, String consumerRecords = 
consumer.poll(10000);
ListConsumerRecordString, String listR = consumerRecords.records(tp);
IteratorConsumerRecordString, String  iter = listR.iterator();
ConsumerRecordString, String record = iter.next();
System.out.println(" the next record " + record.offset() + " recode topic " + 
record.topic());
 }
}


















wood.super

Reply via email to