Hi,

How are you managing your offset commits ?

 Also, if it’s a duplicate record issue ( sounds like database entry to
me), have you thought about ignoring/handling duplicates?

Thanks,

On Wed, 25 Sep 2019 at 21:28, Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:

> Hello everyone,
>
> I've a consumer that fetches messages from a topic, for each message it
> makes an API call to our backend. To ensure that if a message fails it
> tries again to process the message I've set max.poll.records to 1 and I've
> a code like this:
>
>   consumer.subscribe(arrayListOf("orders"))
>   while (!stopProcessing.get()) {
>       try {
>           val records = consumer.poll(Duration.ofHours(1))
>           records.forEach {
>               processRecord(it)
>           }
>           consumer.commitSync()
>       } catch (e: Exception) {
>           logger.error("Error processing order message", e)
>           Sentry.capture(e)
>           Thread.sleep(30000)
>       }
>   }
>
> now, if a request fails because the backend complains about a duplicate
> primary ID, due to the nature of the error trying to insert the same thing
> would generate that same error over and over again.
> Instead it seems that after some retries the message is skipped and it goes
> one with the next one.
> What could be the reason? If in the next loop iteration it gets a message
> from another partition it would also commit the offset of the other failed
> partition?
>
> Thank you
>
> --
> Alessandro Tagliapietra
>

Reply via email to