Luckily, I was able to find that needle in a haystack. :-D Thanks a lot for your guidance Matthias, it helped me a lot to understand the issue.
On Mon, Nov 25, 2019 at 9:01 PM Matthias J. Sax <matth...@confluent.io> wrote: > Fankly, I am not entirely sure... > > I would _assume_ that you could still change the message format but I > would highly recommend to try it out first in a non-production > environment first. > > -Matthias > > On 11/25/19 4:51 AM, Shalom Sagges wrote: > > Thanks a lot Matthias! > > > > This problematic topic is actually a topic that's been mirrored from an > > older 0.8 version (using kafka-mirror). > > I guess it's not possible to upgrade the message format in this case? > > > > Thanks again for your help! > > > > On Fri, Nov 22, 2019 at 7:32 AM Matthias J. Sax <matth...@confluent.io> > > wrote: > > > >> It's going to be hard to find out which client it is. This is a known > >> issue in general and there is a KIP that address is: > >> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-511%3A+Collect+and+Expose+Client%27s+Name+and+Version+in+the+Brokers > >> > >> The root cause for the error you see seems to be, that the client tries > >> to write messages including record headers. Record headers where added > >> in 0.11.0.0, thus, your brokers basically support them. > >> > >> However, it seems that the topic in question is still on message format > >> 0.10 that does not support record headers. Note that broker version and > >> message format are independent of each other. You can see from the stack > >> trace, that the broker tries to down convert the message format (I > >> assuem from 0.11 to 0.10 -- this down convertion would succeed if record > >> headers would not be used). > >> > >>> > >> > org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:245) > >> > >> Thus, the client must either stop using records headers, or you need to > >> upgrade the message format to 0.11. See the docs for details about > >> upgrading the message format. > >> > >> > >> Hope that helps. > >> > >> > >> -Matthias > >> > >> > >> On 11/21/19 12:38 AM, Shalom Sagges wrote: > >>> Hi Experts, > >>> > >>> I use Kafka 0.11.2 > >>> > >>> I have an issue where the Kafka logs are bombarded with the following > >> error: > >>> ERROR [KafkaApi-14733] Error when handling request > >>> > >> > {replica_id=-1,max_wait_time=0,min_bytes=0,max_bytes=2147483647,topics=[{topic=my_topic,partitions=[{partition=22,fetch_offset=1297798,max_bytes=1048576}]}]} > >>> (kafka.server.KafkaApis) > >>> java.lang.IllegalArgumentException: *Magic v1 does not support record > >>> headers* > >>> at > >>> > >> > org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:385) > >>> at > >>> > >> > org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:568) > >>> at > >>> > >> > org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:117) > >>> at > >>> > >> > org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:98) > >>> at > >>> > >> > org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:245) > >>> at > >>> > >> > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:523) > >>> at > >>> > >> > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:521) > >>> at scala.Option.map(Option.scala:146) > >>> at > >>> > >> > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:521) > >>> at > >>> > >> > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:511) > >>> at scala.Option.flatMap(Option.scala:171) > >>> at > >>> > >> > kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:511) > >>> at > >>> > >> > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:559) > >>> at > >>> > >> > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:558) > >>> at scala.collection.Iterator$class.foreach(Iterator.scala:891) > >>> at > scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > >>> at > >>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > >>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > >>> at > >>> > >> > kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:558) > >>> at > >>> > >> > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:579) > >>> at > >>> > >> > kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196) > >>> at > >>> kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2014) > >>> at > >>> > >> > kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:578) > >>> at > >>> > >> > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:598) > >>> at > >>> > >> > kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196) > >>> at > >>> > >> > kafka.server.ClientQuotaManager.recordAndMaybeThrottle(ClientQuotaManager.scala:188) > >>> at > >>> > >> > kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:597) > >>> at > >>> > >> > kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614) > >>> at > >>> > >> > kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614) > >>> at > >>> kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:640) > >>> at > kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:606) > >>> at kafka.server.KafkaApis.handle(KafkaApis.scala:98) > >>> at > >>> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66) > >>> at java.lang.Thread.run(Thread.java:745) > >>> > >>> > >>> I understand this is probably related to a client that uses a client > >>> version that isn't compatible with 0.11, but I don't know how to > pinpoint > >>> the client since the topic is used by multiple consumers. > >>> Any idea what this error actually means and how I can find the culprit? > >>> I can't read anything in the logs besides this error :-S > >>> > >>> Thanks a lot! > >>> > >> > >> > > > >