Hi, How are you managing your offset commits ?
Also, if it’s a duplicate record issue ( sounds like database entry to me), have you thought about ignoring/handling duplicates? Thanks, On Wed, 25 Sep 2019 at 21:28, Alessandro Tagliapietra < tagliapietra.alessan...@gmail.com> wrote: > Hello everyone, > > I've a consumer that fetches messages from a topic, for each message it > makes an API call to our backend. To ensure that if a message fails it > tries again to process the message I've set max.poll.records to 1 and I've > a code like this: > > consumer.subscribe(arrayListOf("orders")) > while (!stopProcessing.get()) { > try { > val records = consumer.poll(Duration.ofHours(1)) > records.forEach { > processRecord(it) > } > consumer.commitSync() > } catch (e: Exception) { > logger.error("Error processing order message", e) > Sentry.capture(e) > Thread.sleep(30000) > } > } > > now, if a request fails because the backend complains about a duplicate > primary ID, due to the nature of the error trying to insert the same thing > would generate that same error over and over again. > Instead it seems that after some retries the message is skipped and it goes > one with the next one. > What could be the reason? If in the next loop iteration it gets a message > from another partition it would also commit the offset of the other failed > partition? > > Thank you > > -- > Alessandro Tagliapietra >