Debugging into ConsoleConsumer.scala it eventually just calls this:
val convertedBytes = deserializer.map(_.deserialize(topic,
nonNullBytes).toString.
getBytes(StandardCharsets.UTF_8)).getOrElse(nonNullBytes)
See
https://github.com/apache/kafka/blob/33d06082117d971cdcddd4f01392006b543f3c01/core/src/main/scala/kafka/tools/ConsoleConsumer.scala#L514
So the deserializer method containing headers will never be called in the case
of ConsoleConsumer. I will log an issue on this.
Thanks for helping to debug this !
Jorg
On 2019/11/12 12:57:21, "M. Manna" <[email protected]> wrote:
>
> Recrord feching (deserialization call) happens using Fetcher. And Fetcher
> is calling default implementation of Deserializer.deserialize() with
> header. The default implementation returns the implementation of
> deserialize() with header. If you provide overridden version of
> deserializer (for both header/non-header) it will be called.
>
> https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1265
>
> https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1268
>
> Console consumer simply puts a consumer wrapper around KafkaConsumer. There
> is no change in behaviour otherwise. I take it that you've debugged and
> confirmed that it's not calling your overridden deserialize() with headers?
> If so, can you link it here for everyone's benefit?
>