If I change my code slightly trying to manually commit offsets at partition level
while (true) { try { val records = consumer.poll(Duration.ofHours(1)) val partitionOffsetMap = mutableMapOf<TopicPartition, OffsetAndMetadata>() records.forEach { partitionOffsetMap[TopicPartition(it.topic(), it.partition())] = OffsetAndMetadata(it.offset()) println("Received message on key ${it.key()} with value ${it.value()} and offset ${it.offset()}") if (Random.nextInt(0, 100) > 80) { throw Exception("Thrown error on key ${it.key()} with value ${it.value()}") } } consumer.commitSync(partitionOffsetMap) println("Committed offset ${partitionOffsetMap.values.first().offset()} on partition ${partitionOffsetMap.keys.first().partition()}") } catch (e: Throwable) { println("Error Processing message $e") Thread.sleep(5000) } } it logs this Received message on key 2 with value 93 and offset 92 Committed offset 92 on partition 2 Received message on key 2 with value 94 and offset 93 Error Processing message java.lang.Exception: Thrown error on key 2 with value 94 Received message on key 2 with value 95 and offset 94 Error Processing message java.lang.Exception: Thrown error on key 2 with value 95 Received message on key 2 with value 96 and offset 95 Committed offset 95 on partition 2 Received message on key 2 with value 97 and offset 96 Committed offset 96 on partition 2 Received message on key 2 with value 98 and offset 97 Error Processing message java.lang.Exception: Thrown error on key 2 with value 98 as you can see the offset increases even without logging the "Committed offset ...." part -- Alessandro Tagliapietra On Wed, Sep 25, 2019 at 7:09 PM Alessandro Tagliapietra < tagliapietra.alessan...@gmail.com> wrote: > It's still is in the topic because is weeks after the deletion threshold > (today's message with a 4 weeks retention). > So I assume the consumer just moves on to the next one. > As a test I've created this test script > https://gist.github.com/alex88/85ba5c3288a531a107ed6a22751c1088 > > After running this I get in the logs: > > Received message on key 2 with value 1 > Received message on key 2 with value 2 > Received message on key 2 with value 3 > Error Processing message java.lang.Exception: Thrown error on key 2 with > value 3 > Received message on key 2 with value 4 > Received message on key 2 with value 5 > Received message on key 2 with value 6 > Received message on key 2 with value 7 > Received message on key 2 with value 8 > Received message on key 2 with value 9 > Received message on key 2 with value 10 > Error Processing message java.lang.Exception: Thrown error on key 2 with > value 10 > Received message on key 2 with value 11 > Received message on key 2 with value 12 > Received message on key 2 with value 13 > Received message on key 2 with value 14 > Received message on key 2 with value 15 > Received message on key 2 with value 16 > Error Processing message java.lang.Exception: Thrown error on key 2 with > value 16 > > it seems that the offset just automatically increases, with another client > I also see all the values in the topic so they're not deleted. > > Shouldn't it have retried the message with value 3? > > -- > Alessandro Tagliapietra > > > On Wed, Sep 25, 2019 at 6:19 PM Steve Howard <steve.how...@confluent.io> > wrote: > >> "I'm just saying that a message which processing throws an exception is >> gone within minutes." >> >> Is the message no longer in the topic or is your consumer group current >> offset just higher than the offset of the message in question? >> >> On Wed, Sep 25, 2019 at 7:38 PM Alessandro Tagliapietra < >> tagliapietra.alessan...@gmail.com> wrote: >> >> > You mean the retention time of the topic? That's one month >> > I'm just saying that a message which processing throws an exception is >> gone >> > within minutes. >> > >> > I will handle duplicates better later, if we can't be sure that we don't >> > skip/lose these messages then it's useless to use kafka. >> > That's why I'm thinking if there's something wrong with my code. >> > >> > -- >> > Alessandro Tagliapietra >> > >> > >> > On Wed, Sep 25, 2019 at 4:29 PM M. Manna <manme...@gmail.com> wrote: >> > >> > > How long is your message retention set for ? Perhaps you want to >> increase >> > > that to a large enough value. >> > > >> > > I have almost identical use case, but I would strongly recommend that >> you >> > > handle duplicates as they are due to your process ( not Kafka >> duplicate). >> > > >> > > Regards, >> > > >> > > On Wed, 25 Sep 2019 at 22:37, Alessandro Tagliapietra < >> > > tagliapietra.alessan...@gmail.com> wrote: >> > > >> > > > I've disabled the auto commit, so what I thought that code would do >> is: >> > > > >> > > > - it fetches a message >> > > > - it processes a message >> > > > - if everything went fine it commits its offset >> > > > - if there's an exception, it didn't commit, so after the error it >> > would >> > > > just poll again and get the same message over and over >> > > > >> > > > instead it seems that message has been skipped somehow. >> > > > >> > > > In this case what the processing is doing is: >> > > > - opening new orders (with a start datetime and an uuid generated >> by >> > the >> > > > stream producing to the topic this consumer is consuming from) >> > > > - closing previously created orders (by setting an end datetime >> using >> > > the >> > > > ID stored in a store) >> > > > >> > > > And so far I've seen multiple exceptions like updating a non >> existent >> > > order >> > > > (meaning the create message didn't complete the request and it has >> been >> > > > skipped) or create with same ID (meaning that the backend returned >> an >> > > error >> > > > but correctly created the record on the database). >> > > > I would expect that the above code wouldn't just commit somehow and >> the >> > > > consumer would be stuck there indefinitely. >> > > > >> > > > Regarding ignoring/handling duplicates that's definitely something >> that >> > > > will be done, but right now i'm implementing it this way so that I >> can >> > > > confirm that I don't lose messages if I don't manually commit them >> > which >> > > > doesn't seem to be the case >> > > > >> > > > Any help is really appreciated >> > > > >> > > > -- >> > > > Alessandro Tagliapietra >> > > > >> > > > On Wed, Sep 25, 2019 at 2:20 PM M. Manna <manme...@gmail.com> >> wrote: >> > > > >> > > > > Hi, >> > > > > >> > > > > How are you managing your offset commits ? >> > > > > >> > > > > Also, if it’s a duplicate record issue ( sounds like database >> entry >> > to >> > > > > me), have you thought about ignoring/handling duplicates? >> > > > > >> > > > > Thanks, >> > > > > >> > > > > On Wed, 25 Sep 2019 at 21:28, Alessandro Tagliapietra < >> > > > > tagliapietra.alessan...@gmail.com> wrote: >> > > > > >> > > > > > Hello everyone, >> > > > > > >> > > > > > I've a consumer that fetches messages from a topic, for each >> > message >> > > it >> > > > > > makes an API call to our backend. To ensure that if a message >> fails >> > > it >> > > > > > tries again to process the message I've set max.poll.records to >> 1 >> > and >> > > > > I've >> > > > > > a code like this: >> > > > > > >> > > > > > consumer.subscribe(arrayListOf("orders")) >> > > > > > while (!stopProcessing.get()) { >> > > > > > try { >> > > > > > val records = consumer.poll(Duration.ofHours(1)) >> > > > > > records.forEach { >> > > > > > processRecord(it) >> > > > > > } >> > > > > > consumer.commitSync() >> > > > > > } catch (e: Exception) { >> > > > > > logger.error("Error processing order message", e) >> > > > > > Sentry.capture(e) >> > > > > > Thread.sleep(30000) >> > > > > > } >> > > > > > } >> > > > > > >> > > > > > now, if a request fails because the backend complains about a >> > > duplicate >> > > > > > primary ID, due to the nature of the error trying to insert the >> > same >> > > > > thing >> > > > > > would generate that same error over and over again. >> > > > > > Instead it seems that after some retries the message is skipped >> and >> > > it >> > > > > goes >> > > > > > one with the next one. >> > > > > > What could be the reason? If in the next loop iteration it gets >> a >> > > > message >> > > > > > from another partition it would also commit the offset of the >> other >> > > > > failed >> > > > > > partition? >> > > > > > >> > > > > > Thank you >> > > > > > >> > > > > > -- >> > > > > > Alessandro Tagliapietra >> > > > > > >> > > > > >> > > > >> > > >> > >> >