Hi Deepak,

Do you return DeserializationHandlerResponse.CONTINUE or DeserializationHandlerResponse.FAIL in your CustomExceptionHandler?

With DeserializationHandlerResponse.CONTINUE, the processing of records should not stop and after the next offset commit the bad records should not be read anymore from the input if your application restarts (except if you reset your application). That does not guarantee that duplicate bad records get appended to your file, though. But it might reduce the duplicates.

See also the following link for an example of returning DeserializationHandlerResponse.CONTINUE:
https://kafka.apache.org/26/documentation/streams/developer-guide/config-streams.html#default-deserialization-exception-handler

Best,
Bruno

On 01.09.20 10:14, Deepak Raghav wrote:
Hi Team

I have created a CustomExceptionHandler class by
implementing DeserializationExceptionHandler interface to handle the
exception during deserialization time.

But the problem with this approach is that if there is some exception
raised for some record and after that stream is stopped and
restarted again, it reads those bad messages again.

I am storing those bad messages in some file in the filesystem and with
this approach, duplicate messages are getting appended in the file when the
stream is started since those bad message's offset are not getting
increased.

Please let me know if I missed anything.

Regards and Thanks
Deepak Raghav

Reply via email to