Debugging into ConsoleConsumer.scala it eventually just calls this:

val convertedBytes = deserializer.map(_.deserialize(topic, 
nonNullBytes).toString.
        getBytes(StandardCharsets.UTF_8)).getOrElse(nonNullBytes)

See
https://github.com/apache/kafka/blob/33d06082117d971cdcddd4f01392006b543f3c01/core/src/main/scala/kafka/tools/ConsoleConsumer.scala#L514

So the deserializer method containing headers will never be called in the case 
of ConsoleConsumer. I will log an issue on this.

Thanks for helping to debug this !

Jorg

On 2019/11/12 12:57:21, "M. Manna" <manme...@gmail.com> wrote: 
> 
> Recrord feching (deserialization call) happens using Fetcher. And Fetcher
> is calling default implementation of Deserializer.deserialize() with
> header. The default implementation returns the implementation of
> deserialize() with header. If you provide overridden version of
> deserializer (for both header/non-header) it will be called.
> 
> https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1265
> 
> https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1268
> 
> Console consumer simply puts a consumer wrapper around KafkaConsumer. There
> is no change in behaviour otherwise. I take it that you've debugged and
> confirmed that it's not calling your overridden deserialize() with headers?
> If so, can you link it here for everyone's benefit?
> 

Reply via email to