Sam Meder created KAFKA-946: ------------------------------- Summary: Kafka Hadoop Consumer fails when verifying message checksum Key: KAFKA-946 URL: https://issues.apache.org/jira/browse/KAFKA-946 Project: Kafka Issue Type: Bug Components: contrib Affects Versions: 0.8 Reporter: Sam Meder Priority: Critical
The code tries to verify the checksum, but fails because the data available isn't the same. In KafkaETLContext: protected boolean get(KafkaETLKey key, BytesWritable value) throws IOException { if (_messageIt != null && _messageIt.hasNext()) { MessageAndOffset messageAndOffset = _messageIt.next(); ByteBuffer buf = messageAndOffset.message().payload(); int origSize = buf.remaining(); byte[] bytes = new byte[origSize]; buf.get(bytes, buf.position(), origSize); value.set(bytes, 0, origSize); key.set(_index, _offset, messageAndOffset.message().checksum()); _offset = messageAndOffset.nextOffset(); //increase offset _count ++; //increase count return true; } else return false; } Note that the message payload is used and the message checksum is included in the key. The in SimpleKafkaETLMapper: @Override public void map(KafkaETLKey key, BytesWritable val, OutputCollector<LongWritable, Text> collector, Reporter reporter) throws IOException { byte[] bytes = KafkaETLUtils.getBytes(val); //check the checksum of message Message message = new Message(bytes); long checksum = key.getChecksum(); if (checksum != message.checksum()) throw new IOException ("Invalid message checksum " + message.checksum() + ". Expected " + key + "."); the Message object is initialized with the payload bytes and a new checksum is calculated. The problem is that the original message checksum also contains the key so checksum verification fails... -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira