Yes. My spark streaming application works with uncompacted topic. I will check the patch.
wood.super 原始邮件 发件人:Justin millerjustin.mil...@protectwise.com 收件人:namesuperwoodnamesuperw...@gmail.com 抄送:useru...@spark.apache.org; Cody koeningerc...@koeninger.org 发送时间:2018年1月24日(周三) 14:23 主题:Re: uncontinuous offset in kafka will cause the spark streamingfailure We appear to be kindred spirits, I’ve recently run into the same issue. Are you running compacted topics? I’ve run into this issue on non-compacted topics as well, it happens rarely but is still a pain. You might check out this patch and related spark streaming Kafka ticket: https://github.com/apache/spark/compare/master...koeninger:SPARK-17147 https://issues.apache.org/jira/browse/SPARK-17147 I’ll be testing out the patch on somewhat large scale stream processor tomorrow. CCing: Cody Koeninger Best, Justin On Jan 23, 2018, at 10:48 PM, namesuperwood namesuperw...@gmail.com wrote: Hi all kafka version : kafka_2.11-0.11.0.2 spark version : 2.0.1 A topic-partition "adn-tracking,15" in kafka who's earliest offset is1255644602 andlatest offset is1271253441. While starting a spark streaming to process the data from the topic , we got a exception with "Got wrong record XXXX even afterseeking to offset 1266921577”. [ (earliest offset) 1255644602 1266921577 1271253441 ( latest offset ) ] Finally, Ifound the following source code in class CachedKafkaCounsumer from spark-streaming. This is obviously due to the fact that the offset from consumer poll and the offset which the comsuner seek is not equal. Here is the “ CachedKafkaCounsumer.scala” code: def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = { logDebug(s"Get $groupId $topic $partition nextOffset $nextOffset requested $offset") if (offset != nextOffset) { logInfo(s"Initial fetch for $groupId $topic $partition $offset") seek(offset) poll(timeout) } if (!buffer.hasNext()) { poll(timeout) } assert(buffer.hasNext(), s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") var record = buffer.next() if (record.offset != offset) { logInfo(s"Buffer miss for $groupId $topic $partition $offset") seek(offset) poll(timeout) assert(buffer.hasNext(), s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") record = buffer.next() assert(record.offset == offset, s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset") } nextOffset = offset + 1 record } I reproduce this problem, and found out that offset from one topicAndPartition is uncontinuous in Kafka。I think this is a bug that needs to be repaired. Iimplemented a simple project to use consumer to seek offset 1266921577. But it return the offset1266921578. Then while seek to1266921576, it return the1266921576exactly。 There is the code: public class consumerDemo { public static void main(String[] argv){ Properties props = new Properties(); props.put("bootstrap.servers", "172.31.29.31:9091"); props.put("group.id", "consumer-tutorial-demo"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); KafkaConsumerString, String consumer = new KafkaConsumerString, String(props); TopicPartition tp = new TopicPartition("adn-tracking-click", 15); CollectionTopicPartition collection = new ArrayListTopicPartition(); collection.add(tp); consumer.assign(collection); consumer.seek(tp, 1266921576); ConsumerRecordsString, String consumerRecords = consumer.poll(10000); ListConsumerRecordString, String listR = consumerRecords.records(tp); IteratorConsumerRecordString, String iter = listR.iterator(); ConsumerRecordString, String record = iter.next(); System.out.println(" the next record " + record.offset() + " recode topic " + record.topic()); } } wood.super