Yes. My spark streaming application works with uncompacted topic. I will check 
the patch.


wood.super


原始邮件
发件人:Justin millerjustin.mil...@protectwise.com
收件人:namesuperwoodnamesuperw...@gmail.com
抄送:useru...@spark.apache.org; Cody koeningerc...@koeninger.org
发送时间:2018年1月24日(周三) 14:23
主题:Re: uncontinuous offset in kafka will cause the spark streamingfailure


We appear to be kindred spirits, I’ve recently run into the same issue. Are you 
running compacted topics? I’ve run into this issue on non-compacted topics as 
well, it happens rarely but is still a pain. You might check out this patch and 
related spark streaming Kafka ticket:


https://github.com/apache/spark/compare/master...koeninger:SPARK-17147
https://issues.apache.org/jira/browse/SPARK-17147


I’ll be testing out the patch on somewhat large scale stream processor tomorrow.


CCing: Cody Koeninger


Best,
Justin



On Jan 23, 2018, at 10:48 PM, namesuperwood namesuperw...@gmail.com wrote:


Hi all


kafka version : kafka_2.11-0.11.0.2
   spark version : 2.0.1


A topic-partition "adn-tracking,15" in kafka who's earliest offset is1255644602 
andlatest offset is1271253441. 
While starting a spark streaming to process the data from the topic , we got a 
exception with "Got wrong record XXXX even afterseeking to offset 1266921577”. 
[    (earliest offset) 1255644602  1266921577   1271253441 ( latest offset ) ]
Finally, Ifound the following source code in class CachedKafkaCounsumer from 
spark-streaming. This is obviously due to the fact that the offset from 
consumer poll and the offset which the comsuner seek is not equal.


Here is the “ CachedKafkaCounsumer.scala” code:
def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = {
logDebug(s"Get $groupId $topic $partition nextOffset $nextOffset requested 
$offset") if (offset != nextOffset) {
logInfo(s"Initial fetch for $groupId $topic $partition $offset") seek(offset) 
poll(timeout) }
if (!buffer.hasNext()) { poll(timeout) } assert(buffer.hasNext(), s"Failed to 
get records for $groupId $topic $partition $offset after polling for $timeout") 
var record = buffer.next() if (record.offset != offset) { logInfo(s"Buffer miss 
for $groupId $topic $partition $offset") seek(offset) poll(timeout) 
assert(buffer.hasNext(), s"Failed to get records for $groupId $topic $partition 
$offset after polling for $timeout") record = buffer.next() 
assert(record.offset == offset, s"Got wrong record for $groupId $topic 
$partition even after seeking to offset $offset") } nextOffset = offset + 1 
record
}
I reproduce this problem, and found out that offset from one topicAndPartition 
is uncontinuous in Kafka。I think this is a bug that needs to be repaired.
Iimplemented a simple project to use consumer to seek offset 1266921577. But it 
return the offset1266921578. Then while seek to1266921576, it return 
the1266921576exactly。
 




There is the code:
public class consumerDemo {
public static void main(String[] argv){ 
Properties props = new Properties();
props.put("bootstrap.servers", "172.31.29.31:9091");
props.put("group.id", "consumer-tutorial-demo");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumerString, String consumer = new KafkaConsumerString, String(props);
TopicPartition tp = new TopicPartition("adn-tracking-click", 15);
CollectionTopicPartition collection = new ArrayListTopicPartition();
collection.add(tp); consumer.assign(collection);
consumer.seek(tp, 1266921576); ConsumerRecordsString, String consumerRecords = 
consumer.poll(10000);
ListConsumerRecordString, String listR = consumerRecords.records(tp);
IteratorConsumerRecordString, String  iter = listR.iterator();
ConsumerRecordString, String record = iter.next();
System.out.println(" the next record " + record.offset() + " recode topic " + 
record.topic());
 }
}


















wood.super

Reply via email to