Debugging into ConsoleConsumer.scala it eventually just calls this: val convertedBytes = deserializer.map(_.deserialize(topic, nonNullBytes).toString. getBytes(StandardCharsets.UTF_8)).getOrElse(nonNullBytes)
See https://github.com/apache/kafka/blob/33d06082117d971cdcddd4f01392006b543f3c01/core/src/main/scala/kafka/tools/ConsoleConsumer.scala#L514 So the deserializer method containing headers will never be called in the case of ConsoleConsumer. I will log an issue on this. Thanks for helping to debug this ! Jorg On 2019/11/12 12:57:21, "M. Manna" <manme...@gmail.com> wrote: > > Recrord feching (deserialization call) happens using Fetcher. And Fetcher > is calling default implementation of Deserializer.deserialize() with > header. The default implementation returns the implementation of > deserialize() with header. If you provide overridden version of > deserializer (for both header/non-header) it will be called. > > https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1265 > > https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1268 > > Console consumer simply puts a consumer wrapper around KafkaConsumer. There > is no change in behaviour otherwise. I take it that you've debugged and > confirmed that it's not calling your overridden deserialize() with headers? > If so, can you link it here for everyone's benefit? >