[ https://issues.apache.org/jira/browse/KAFKA-7769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16734806#comment-16734806 ]
Asaf Mesika commented on KAFKA-7769: ------------------------------------ Unfortunately I'm not up to speed with the exact protocol details between client and server, but I presume the client tells the server that he's an old client, and the server "remembers" that for the session created, and returns Record Batches using magic number v0 or v1. The exception stack trace shows something odd. It seems that the magic number sent was v2, thus MemoryRecords class creates an Iterator of DefaultRecordBatch, but a tidy bit later, it reaches a point where it tries to convert it to MessageAndOffset, and fails since from some odd reason it only able to do so for AbstractLegacyRecordBatch. This is the parts I saw: *PartitionTopicInfo.scala* {code:java} /** * Enqueue a message set for processing. */ def enqueue(messages: ByteBufferMessageSet) { val size = messages.validBytes if(size > 0) { val next = messages.shallowIterator.toSeq.last.nextOffset {code} *ByteBufferMessageSet* {code:java} /** iterator over compressed messages without decompressing */ def shallowIterator: Iterator[MessageAndOffset] = internalIterator(isShallow = true) /** When flag isShallow is set to be true, we do a shallow iteration: just traverse the first level of messages. **/ private def internalIterator(isShallow: Boolean = false): Iterator[MessageAndOffset] = { if (isShallow) asRecords.batches.asScala.iterator.map(MessageAndOffset.fromRecordBatch){code} {code:java} override def asRecords: MemoryRecords = MemoryRecords.readableRecords(buffer.duplicate()){code} *MemoryRecords* {code:java} public static MemoryRecords readableRecords(ByteBuffer buffer) { return new MemoryRecords(buffer); } private final Iterable<MutableRecordBatch> batches = new Iterable<MutableRecordBatch>() { @Override public Iterator<MutableRecordBatch> iterator() { return new RecordBatchIterator<>(new ByteBufferLogInputStream(buffer.duplicate(), Integer.MAX_VALUE)); } };{code} *ByteBufferLogInputStream* {code:java} public MutableRecordBatch nextBatch() throws IOException { int remaining = buffer.remaining(); if (remaining < LOG_OVERHEAD) return null; int recordSize = buffer.getInt(buffer.position() + SIZE_OFFSET); // V0 has the smallest overhead, stricter checking is done later if (recordSize < LegacyRecord.RECORD_OVERHEAD_V0) throw new CorruptRecordException(String.format("Record size is less than the minimum record overhead (%d)", LegacyRecord.RECORD_OVERHEAD_V0)); if (recordSize > maxMessageSize) throw new CorruptRecordException(String.format("Record size exceeds the largest allowable message size (%d).", maxMessageSize)); int batchSize = recordSize + LOG_OVERHEAD; if (remaining < batchSize) return null; byte magic = buffer.get(buffer.position() + MAGIC_OFFSET); ByteBuffer batchSlice = buffer.slice(); batchSlice.limit(batchSize); buffer.position(buffer.position() + batchSize); if (magic < 0 || magic > RecordBatch.CURRENT_MAGIC_VALUE) throw new CorruptRecordException("Invalid magic found in record: " + magic); if (magic > RecordBatch.MAGIC_VALUE_V1) return new DefaultRecordBatch(batchSlice); else return new AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch(batchSlice); }{code} So the stream constructs DefaultRecordBatch which later fail since they try to map it o MessageAndOffset but can't do it for DefaultRecordBatch - can't figure out why.. To me it seems like a bug. Our current work-around is to restart either the server or client, and it solves it. > Illegal batch type class > ------------------------- > > Key: KAFKA-7769 > URL: https://issues.apache.org/jira/browse/KAFKA-7769 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 1.1.0 > Environment: Kafka 1.1.0 on client side, running on Java 8 > Kafka 1.1.0 on broker side, running on Java 8 > Reporter: Asaf Mesika > Priority: Major > > I get the following exception from Kafka Consumer version 1.1.0: > {code:java} > kafka.common.KafkaException: Error processing data for partition acmetopic-24 > offset 1408742703 at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:205) > at scala.Option.foreach(Option.scala:257) at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:169) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:166) > at scala.collection.Iterator.foreach(Iterator.scala:929) at > scala.collection.Iterator.foreach$(Iterator.scala:929) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1417) at > scala.collection.IterableLike.foreach(IterableLike.scala:71) at > scala.collection.IterableLike.foreach$(IterableLike.scala:70) at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5(AbstractFetcherThread.scala:166) > at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) at > kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:166) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111) at > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) Caused by: > java.lang.IllegalArgumentException: Illegal batch type class > org.apache.kafka.common.record.DefaultRecordBatch. The older message format > classes only support conversion from class > org.apache.kafka.common.record.AbstractLegacyRecordBatch, which is used for > magic v0 and v1 at > kafka.message.MessageAndOffset$.fromRecordBatch(MessageAndOffset.scala:30) at > kafka.message.ByteBufferMessageSet.$anonfun$internalIterator$1(ByteBufferMessageSet.scala:169) > at scala.collection.Iterator$$anon$10.next(Iterator.scala:448) at > scala.collection.Iterator.toStream(Iterator.scala:1403) at > scala.collection.Iterator.toStream$(Iterator.scala:1402) at > scala.collection.AbstractIterator.toStream(Iterator.scala:1417) at > scala.collection.TraversableOnce.toSeq(TraversableOnce.scala:298) at > scala.collection.TraversableOnce.toSeq$(TraversableOnce.scala:298) at > scala.collection.AbstractIterator.toSeq(Iterator.scala:1417) at > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:59) at > kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:87) > at > kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:37) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:183) > ... 15 common frames omitted{code} > > It happens once in a while. I initialise the client with zooKeeper connect, > and not with bootstrap servers. > Any idea what can cause this? > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)