[ 
https://issues.apache.org/jira/browse/KAFKA-7769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16734806#comment-16734806
 ] 

Asaf Mesika commented on KAFKA-7769:
------------------------------------

 
Unfortunately I'm not up to speed with the exact protocol details between 
client and server, but I presume the client tells the server that he's an old 
client, and the server "remembers" that for the session created, and returns 
Record Batches using magic number v0 or v1. 
The exception stack trace shows something odd. It seems that the magic number 
sent was v2, thus MemoryRecords class creates an Iterator of 
DefaultRecordBatch, but a tidy bit later, it reaches a point where it tries to 
convert it to MessageAndOffset, and fails since from some odd reason it only 
able to do so for AbstractLegacyRecordBatch. 
 
This is the parts I saw:
 
*PartitionTopicInfo.scala*
{code:java}
/**
* Enqueue a message set for processing.
*/
def enqueue(messages: ByteBufferMessageSet) {
val size = messages.validBytes
if(size > 0) {
   val next = messages.shallowIterator.toSeq.last.nextOffset
 {code}
*ByteBufferMessageSet*
{code:java}
/** iterator over compressed messages without decompressing */
def shallowIterator: Iterator[MessageAndOffset] = internalIterator(isShallow = 
true)
/** When flag isShallow is set to be true, we do a shallow iteration: just 
traverse the first level of messages. **/
private def internalIterator(isShallow: Boolean = false): 
Iterator[MessageAndOffset] = {
if (isShallow)
   
asRecords.batches.asScala.iterator.map(MessageAndOffset.fromRecordBatch){code}
{code:java}
override def asRecords: MemoryRecords = 
MemoryRecords.readableRecords(buffer.duplicate()){code}
*MemoryRecords*
{code:java}
public static MemoryRecords readableRecords(ByteBuffer buffer) {
return new MemoryRecords(buffer);
}
private final Iterable<MutableRecordBatch> batches = new 
Iterable<MutableRecordBatch>() {
@Override
public Iterator<MutableRecordBatch> iterator() {
   return new RecordBatchIterator<>(new 
ByteBufferLogInputStream(buffer.duplicate(), Integer.MAX_VALUE));
}
};{code}
 
*ByteBufferLogInputStream*
{code:java}
public MutableRecordBatch nextBatch() throws IOException {
int remaining = buffer.remaining();
if (remaining < LOG_OVERHEAD)
return null;

int recordSize = buffer.getInt(buffer.position() + SIZE_OFFSET);
// V0 has the smallest overhead, stricter checking is done later
if (recordSize < LegacyRecord.RECORD_OVERHEAD_V0)
  throw new CorruptRecordException(String.format("Record size is less than the 
minimum record overhead (%d)", LegacyRecord.RECORD_OVERHEAD_V0));
if (recordSize > maxMessageSize)
  throw new CorruptRecordException(String.format("Record size exceeds the 
largest allowable message size (%d).", maxMessageSize));

int batchSize = recordSize + LOG_OVERHEAD;
if (remaining < batchSize)
return null;

byte magic = buffer.get(buffer.position() + MAGIC_OFFSET);

ByteBuffer batchSlice = buffer.slice();
batchSlice.limit(batchSize);
buffer.position(buffer.position() + batchSize);

if (magic < 0 || magic > RecordBatch.CURRENT_MAGIC_VALUE)
  throw new CorruptRecordException("Invalid magic found in record: " + magic);

if (magic > RecordBatch.MAGIC_VALUE_V1)
  return new DefaultRecordBatch(batchSlice);
else
  return new AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch(batchSlice);
}{code}
 
So the stream constructs DefaultRecordBatch which later fail since they try to 
map it o MessageAndOffset but can't do it for DefaultRecordBatch - can't figure 
out why..
 
To me it seems like a bug.
 
Our current work-around is to restart either the server or client, and it 
solves it.  
 

>  Illegal batch type class
> -------------------------
>
>                 Key: KAFKA-7769
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7769
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 1.1.0
>         Environment: Kafka 1.1.0 on client side, running on Java 8
> Kafka 1.1.0 on broker side, running on Java 8
>            Reporter: Asaf Mesika
>            Priority: Major
>
> I get the following exception from Kafka Consumer version 1.1.0:
> {code:java}
> kafka.common.KafkaException: Error processing data for partition acmetopic-24 
> offset 1408742703 at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:205)
>  at scala.Option.foreach(Option.scala:257) at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:169)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:166)
>  at scala.collection.Iterator.foreach(Iterator.scala:929) at 
> scala.collection.Iterator.foreach$(Iterator.scala:929) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1417) at 
> scala.collection.IterableLike.foreach(IterableLike.scala:71) at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:70) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5(AbstractFetcherThread.scala:166)
>  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) at 
> kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:166)
>  at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111) at 
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) Caused by: 
> java.lang.IllegalArgumentException: Illegal batch type class 
> org.apache.kafka.common.record.DefaultRecordBatch. The older message format 
> classes only support conversion from class 
> org.apache.kafka.common.record.AbstractLegacyRecordBatch, which is used for 
> magic v0 and v1 at 
> kafka.message.MessageAndOffset$.fromRecordBatch(MessageAndOffset.scala:30) at 
> kafka.message.ByteBufferMessageSet.$anonfun$internalIterator$1(ByteBufferMessageSet.scala:169)
>  at scala.collection.Iterator$$anon$10.next(Iterator.scala:448) at 
> scala.collection.Iterator.toStream(Iterator.scala:1403) at 
> scala.collection.Iterator.toStream$(Iterator.scala:1402) at 
> scala.collection.AbstractIterator.toStream(Iterator.scala:1417) at 
> scala.collection.TraversableOnce.toSeq(TraversableOnce.scala:298) at 
> scala.collection.TraversableOnce.toSeq$(TraversableOnce.scala:298) at 
> scala.collection.AbstractIterator.toSeq(Iterator.scala:1417) at 
> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:59) at 
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:87)
>  at 
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:37)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:183)
>  ... 15 common frames omitted{code}
>  
> It happens once in  a while. I initialise the client with zooKeeper connect, 
> and not with bootstrap servers. 
> Any idea what can cause this?
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to