[ https://issues.apache.org/jira/browse/KAFKA-946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13775988#comment-13775988 ]
Sam Meder commented on KAFKA-946: --------------------------------- Rebased patch attached. > Kafka Hadoop Consumer fails when verifying message checksum > ----------------------------------------------------------- > > Key: KAFKA-946 > URL: https://issues.apache.org/jira/browse/KAFKA-946 > Project: Kafka > Issue Type: Bug > Components: contrib > Affects Versions: 0.8 > Reporter: Sam Meder > Priority: Critical > Fix For: 0.8 > > Attachments: hadoop_consumer_1.patch > > > The code tries to verify the checksum, but fails because the data available > isn't the same. In KafkaETLContext: > protected boolean get(KafkaETLKey key, BytesWritable value) throws > IOException { > if (_messageIt != null && _messageIt.hasNext()) { > MessageAndOffset messageAndOffset = _messageIt.next(); > ByteBuffer buf = messageAndOffset.message().payload(); > int origSize = buf.remaining(); > byte[] bytes = new byte[origSize]; > buf.get(bytes, buf.position(), origSize); > value.set(bytes, 0, origSize); > key.set(_index, _offset, messageAndOffset.message().checksum()); > _offset = messageAndOffset.nextOffset(); //increase offset > > > _count ++; //increase count > > > return true; > } > else return false; > } > Note that the message payload is used and the message checksum is included in > the key. The in SimpleKafkaETLMapper: > @Override > public void map(KafkaETLKey key, BytesWritable val, > OutputCollector<LongWritable, Text> collector, > Reporter reporter) throws IOException { > byte[] bytes = KafkaETLUtils.getBytes(val); > //check the checksum of message > > > Message message = new Message(bytes); > long checksum = key.getChecksum(); > if (checksum != message.checksum()) > throw new IOException ("Invalid message checksum " > + message.checksum() + ". > Expected " + key + "."); > the Message object is initialized with the payload bytes and a new checksum > is calculated. The problem is that the original message checksum also > contains the key so checksum verification fails... -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira