Sam Meder created KAFKA-946:
-------------------------------

             Summary: Kafka Hadoop Consumer fails when verifying message 
checksum
                 Key: KAFKA-946
                 URL: https://issues.apache.org/jira/browse/KAFKA-946
             Project: Kafka
          Issue Type: Bug
          Components: contrib
    Affects Versions: 0.8
            Reporter: Sam Meder
            Priority: Critical


The code tries to verify the checksum, but fails because the data available 
isn't the same. In KafkaETLContext:

    protected boolean get(KafkaETLKey key, BytesWritable value) throws 
IOException {
        if (_messageIt != null && _messageIt.hasNext()) {
            MessageAndOffset messageAndOffset = _messageIt.next();

            ByteBuffer buf = messageAndOffset.message().payload();
            int origSize = buf.remaining();
            byte[] bytes = new byte[origSize];
          buf.get(bytes, buf.position(), origSize);
            value.set(bytes, 0, origSize);

            key.set(_index, _offset, messageAndOffset.message().checksum());

            _offset = messageAndOffset.nextOffset();  //increase offset         
                                                                                
                                         
            _count ++;  //increase count                                        
                                                                                
                                         

            return true;
        }
        else return false;
    }

Note that the message payload is used and the message checksum is included in 
the key. The in SimpleKafkaETLMapper:

    @Override
    public void map(KafkaETLKey key, BytesWritable val,
            OutputCollector<LongWritable, Text> collector,
            Reporter reporter) throws IOException {


        byte[] bytes = KafkaETLUtils.getBytes(val);

        //check the checksum of message                                         
                                                                                
                                         
        Message message = new Message(bytes);
        long checksum = key.getChecksum();
        if (checksum != message.checksum())
            throw new IOException ("Invalid message checksum "
                                            + message.checksum() + ". Expected 
" + key + ".");

the Message object is initialized with the payload bytes and a new checksum is 
calculated. The problem is that the original message checksum also contains the 
key so checksum verification fails...


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to