Hi On Tue, 12 Nov 2019 at 09:53, Jorg Heymans <jorg.heym...@gmail.com> wrote:
> Indeed, i corrected the typo but now my deserializer class is not taken > into account at all and it goes back to the default deserializer. You can > verify this by putting a non-existent class and it still runs fine. > > value.deserializer=does.not.exist > > In ConsoleConsumer, the bootstrap.server, key/value deserializer are being enforced via --consumer-property arg. It's aggregating all properties between --consumer-property and --consumer.config. It'll prioritise kv pair supplied via --consumer-property over the prop file. I think you can try the following to get your implementation working 1) Provide the SerDe classes into classpath 2) Provide your consumer config file 3) Provide key/value Deserializer props via --consumer-property arg. See how that works for you. Thanks, > Jorg > > On 2019/11/11 14:31:49, "M. Manna" <manme...@gmail.com> wrote: > > You have a typo - you mean deserializer > > > > Please try again. > > > > Regards, > > > > On Mon, 11 Nov 2019 at 14:28, Jorg Heymans <jorg.heym...@gmail.com> > wrote: > > > > > Don't think that option is available there, specifying > > > 'value.deserializer' in my consumer-config.properties file gives > > > > > > [2019-11-11 15:16:26,589] WARN The configuration 'value.serializer' was > > > supplied but isn't a known config. > > > (org.apache.kafka.clients.consumer.ConsumerConfig) > > > > > > Does there exist a description of what properties the consumer-config > > > properties file accepts ? I could find only a few references to it in > the > > > documentation. > > > > > > Jorg > > > > > > On 2019/11/11 13:00:03, "M. Manna" <manme...@gmail.com> wrote: > > > > Hi, > > > > > > > > > > > > On Mon, 11 Nov 2019 at 10:58, Jorg Heymans <jorg.heym...@gmail.com> > > > wrote: > > > > > > > > > Hi, > > > > > > > > > > I have created a class implementing Deserializer, providing an > > > > > implementation for > > > > > > > > > > public String deserialize(String topic, Headers headers, byte[] > data) > > > > > > > > > > that does some conditional processing based on headers, and then > calls > > > the > > > > > other serde method > > > > > > > > > > public String deserialize(String topic, byte[] data) > > > > > > > > > > What i'm seeing is that kafka-console-consumer only uses the second > > > method > > > > > when a value deserializer is specified. Is there a way to force it > to > > > > > invoke the first method, so i can do processing with headers ? I > tried > > > > > implementing the deprecated 'ExtendedSerializer' but it does not > make a > > > > > difference. > > > > > > > > > > Thanks, > > > > > Jorg > > > > > > > > > > > > > Have you tried providing a separate prop file using consumer.config > > > > argument? Please see the reference here: > > > > > > > > --consumer.config <String: config file> Consumer config properties > file. > > > > Note > > > > that [consumer-property] > takes > > > > precedence over this > config. > > > > > > > > Try that and see how it goes. > > > > > > > > Thanks, > > > > > > > > > >