How long is your message retention set for ? Perhaps you want to increase that to a large enough value.
I have almost identical use case, but I would strongly recommend that you handle duplicates as they are due to your process ( not Kafka duplicate). Regards, On Wed, 25 Sep 2019 at 22:37, Alessandro Tagliapietra < tagliapietra.alessan...@gmail.com> wrote: > I've disabled the auto commit, so what I thought that code would do is: > > - it fetches a message > - it processes a message > - if everything went fine it commits its offset > - if there's an exception, it didn't commit, so after the error it would > just poll again and get the same message over and over > > instead it seems that message has been skipped somehow. > > In this case what the processing is doing is: > - opening new orders (with a start datetime and an uuid generated by the > stream producing to the topic this consumer is consuming from) > - closing previously created orders (by setting an end datetime using the > ID stored in a store) > > And so far I've seen multiple exceptions like updating a non existent order > (meaning the create message didn't complete the request and it has been > skipped) or create with same ID (meaning that the backend returned an error > but correctly created the record on the database). > I would expect that the above code wouldn't just commit somehow and the > consumer would be stuck there indefinitely. > > Regarding ignoring/handling duplicates that's definitely something that > will be done, but right now i'm implementing it this way so that I can > confirm that I don't lose messages if I don't manually commit them which > doesn't seem to be the case > > Any help is really appreciated > > -- > Alessandro Tagliapietra > > On Wed, Sep 25, 2019 at 2:20 PM M. Manna <manme...@gmail.com> wrote: > > > Hi, > > > > How are you managing your offset commits ? > > > > Also, if it’s a duplicate record issue ( sounds like database entry to > > me), have you thought about ignoring/handling duplicates? > > > > Thanks, > > > > On Wed, 25 Sep 2019 at 21:28, Alessandro Tagliapietra < > > tagliapietra.alessan...@gmail.com> wrote: > > > > > Hello everyone, > > > > > > I've a consumer that fetches messages from a topic, for each message it > > > makes an API call to our backend. To ensure that if a message fails it > > > tries again to process the message I've set max.poll.records to 1 and > > I've > > > a code like this: > > > > > > consumer.subscribe(arrayListOf("orders")) > > > while (!stopProcessing.get()) { > > > try { > > > val records = consumer.poll(Duration.ofHours(1)) > > > records.forEach { > > > processRecord(it) > > > } > > > consumer.commitSync() > > > } catch (e: Exception) { > > > logger.error("Error processing order message", e) > > > Sentry.capture(e) > > > Thread.sleep(30000) > > > } > > > } > > > > > > now, if a request fails because the backend complains about a duplicate > > > primary ID, due to the nature of the error trying to insert the same > > thing > > > would generate that same error over and over again. > > > Instead it seems that after some retries the message is skipped and it > > goes > > > one with the next one. > > > What could be the reason? If in the next loop iteration it gets a > message > > > from another partition it would also commit the offset of the other > > failed > > > partition? > > > > > > Thank you > > > > > > -- > > > Alessandro Tagliapietra > > > > > >