Hi Deepak,
Do you return DeserializationHandlerResponse.CONTINUE or
DeserializationHandlerResponse.FAIL in your CustomExceptionHandler?
With DeserializationHandlerResponse.CONTINUE, the processing of records
should not stop and after the next offset commit the bad records should
not be read anymore from the input if your application restarts (except
if you reset your application). That does not guarantee that duplicate
bad records get appended to your file, though. But it might reduce the
duplicates.
See also the following link for an example of returning
DeserializationHandlerResponse.CONTINUE:
https://kafka.apache.org/26/documentation/streams/developer-guide/config-streams.html#default-deserialization-exception-handler
Best,
Bruno
On 01.09.20 10:14, Deepak Raghav wrote:
Hi Team
I have created a CustomExceptionHandler class by
implementing DeserializationExceptionHandler interface to handle the
exception during deserialization time.
But the problem with this approach is that if there is some exception
raised for some record and after that stream is stopped and
restarted again, it reads those bad messages again.
I am storing those bad messages in some file in the filesystem and with
this approach, duplicate messages are getting appended in the file when the
stream is started since those bad message's offset are not getting
increased.
Please let me know if I missed anything.
Regards and Thanks
Deepak Raghav