How long is your message retention set for ? Perhaps you want to increase
that to a large enough value.

I have almost identical use case, but I would strongly recommend that you
handle duplicates as they are due to your process ( not Kafka duplicate).

Regards,

On Wed, 25 Sep 2019 at 22:37, Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:

> I've disabled the auto commit, so what I thought that code would do is:
>
>  - it fetches a message
>  - it processes a message
>  - if everything went fine it commits its offset
>  - if there's an exception, it didn't commit, so after the error it would
> just poll again and get the same message over and over
>
> instead it seems that message has been skipped somehow.
>
> In this case what the processing is doing is:
>  - opening new orders (with a start datetime and an uuid generated by the
> stream producing to the topic this consumer is consuming from)
>  - closing previously created orders (by setting an end datetime using the
> ID stored in a store)
>
> And so far I've seen multiple exceptions like updating a non existent order
> (meaning the create message didn't complete the request and it has been
> skipped) or create with same ID (meaning that the backend returned an error
> but correctly created the record on the database).
> I would expect that the above code wouldn't just commit somehow and the
> consumer would be stuck there indefinitely.
>
> Regarding ignoring/handling duplicates that's definitely something that
> will be done, but right now i'm implementing it this way so that I can
> confirm that I don't lose messages if I don't manually commit them which
> doesn't seem to be the case
>
> Any help is really appreciated
>
> --
> Alessandro Tagliapietra
>
> On Wed, Sep 25, 2019 at 2:20 PM M. Manna <manme...@gmail.com> wrote:
>
> > Hi,
> >
> > How are you managing your offset commits ?
> >
> >  Also, if it’s a duplicate record issue ( sounds like database entry to
> > me), have you thought about ignoring/handling duplicates?
> >
> > Thanks,
> >
> > On Wed, 25 Sep 2019 at 21:28, Alessandro Tagliapietra <
> > tagliapietra.alessan...@gmail.com> wrote:
> >
> > > Hello everyone,
> > >
> > > I've a consumer that fetches messages from a topic, for each message it
> > > makes an API call to our backend. To ensure that if a message fails it
> > > tries again to process the message I've set max.poll.records to 1 and
> > I've
> > > a code like this:
> > >
> > >   consumer.subscribe(arrayListOf("orders"))
> > >   while (!stopProcessing.get()) {
> > >       try {
> > >           val records = consumer.poll(Duration.ofHours(1))
> > >           records.forEach {
> > >               processRecord(it)
> > >           }
> > >           consumer.commitSync()
> > >       } catch (e: Exception) {
> > >           logger.error("Error processing order message", e)
> > >           Sentry.capture(e)
> > >           Thread.sleep(30000)
> > >       }
> > >   }
> > >
> > > now, if a request fails because the backend complains about a duplicate
> > > primary ID, due to the nature of the error trying to insert the same
> > thing
> > > would generate that same error over and over again.
> > > Instead it seems that after some retries the message is skipped and it
> > goes
> > > one with the next one.
> > > What could be the reason? If in the next loop iteration it gets a
> message
> > > from another partition it would also commit the offset of the other
> > failed
> > > partition?
> > >
> > > Thank you
> > >
> > > --
> > > Alessandro Tagliapietra
> > >
> >
>

Reply via email to