HI again, On Tue, 12 Nov 2019 at 12:31, Jorg Heymans <jorg.heym...@gmail.com> wrote:
> Hi, > > The issue is not that i cannot get a custom deserializer working, it's > that the custom deserializer i provide implements the default method from > the Deserializer interface > https://github.com/apache/kafka/blob/6f0008643db6e7299658442784f1bcc6c96395ed/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L59 > that gives access to record Headers. > > The kafka console consumer never calls this method, it will only call the > variant without Headers > https://github.com/apache/kafka/blob/6f0008643db6e7299658442784f1bcc6c96395ed/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L50 > > I'm using kafka 2.3.0 btw. > > Jorg > Recrord feching (deserialization call) happens using Fetcher. And Fetcher is calling default implementation of Deserializer.deserialize() with header. The default implementation returns the implementation of deserialize() with header. If you provide overridden version of deserializer (for both header/non-header) it will be called. https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1265 https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1268 Console consumer simply puts a consumer wrapper around KafkaConsumer. There is no change in behaviour otherwise. I take it that you've debugged and confirmed that it's not calling your overridden deserialize() with headers? If so, can you link it here for everyone's benefit? Thanks, > On 2019/11/12 11:58:26, "M. Manna" <manme...@gmail.com> wrote: > > > > I think you can try the following to get your implementation working > > > > 1) Provide the SerDe classes into classpath > > 2) Provide your consumer config file > > 3) Provide key/value Deserializer props via --consumer-property arg. > > > >