I've disabled the auto commit, so what I thought that code would do is: - it fetches a message - it processes a message - if everything went fine it commits its offset - if there's an exception, it didn't commit, so after the error it would just poll again and get the same message over and over
instead it seems that message has been skipped somehow. In this case what the processing is doing is: - opening new orders (with a start datetime and an uuid generated by the stream producing to the topic this consumer is consuming from) - closing previously created orders (by setting an end datetime using the ID stored in a store) And so far I've seen multiple exceptions like updating a non existent order (meaning the create message didn't complete the request and it has been skipped) or create with same ID (meaning that the backend returned an error but correctly created the record on the database). I would expect that the above code wouldn't just commit somehow and the consumer would be stuck there indefinitely. Regarding ignoring/handling duplicates that's definitely something that will be done, but right now i'm implementing it this way so that I can confirm that I don't lose messages if I don't manually commit them which doesn't seem to be the case Any help is really appreciated -- Alessandro Tagliapietra On Wed, Sep 25, 2019 at 2:20 PM M. Manna <manme...@gmail.com> wrote: > Hi, > > How are you managing your offset commits ? > > Also, if it’s a duplicate record issue ( sounds like database entry to > me), have you thought about ignoring/handling duplicates? > > Thanks, > > On Wed, 25 Sep 2019 at 21:28, Alessandro Tagliapietra < > tagliapietra.alessan...@gmail.com> wrote: > > > Hello everyone, > > > > I've a consumer that fetches messages from a topic, for each message it > > makes an API call to our backend. To ensure that if a message fails it > > tries again to process the message I've set max.poll.records to 1 and > I've > > a code like this: > > > > consumer.subscribe(arrayListOf("orders")) > > while (!stopProcessing.get()) { > > try { > > val records = consumer.poll(Duration.ofHours(1)) > > records.forEach { > > processRecord(it) > > } > > consumer.commitSync() > > } catch (e: Exception) { > > logger.error("Error processing order message", e) > > Sentry.capture(e) > > Thread.sleep(30000) > > } > > } > > > > now, if a request fails because the backend complains about a duplicate > > primary ID, due to the nature of the error trying to insert the same > thing > > would generate that same error over and over again. > > Instead it seems that after some retries the message is skipped and it > goes > > one with the next one. > > What could be the reason? If in the next loop iteration it gets a message > > from another partition it would also commit the offset of the other > failed > > partition? > > > > Thank you > > > > -- > > Alessandro Tagliapietra > > >