Ok I think I've found the problem looking at https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-java.time.Duration- it says:
On each poll, consumer will try to use the last consumed offset as the > starting offset and fetch sequentially. The last consumed offset can be > manually set through seek(TopicPartition, long) or automatically set as the > last committed offset for the subscribed list of partitions so the part where it says "automatically set as the last committed offset" happens only on consumer start, when not restarting the consumer, what is happening is "consumer will try to use the last consumed offset" meaning that it'll use the last consumed offset no matter if you've committed it or not. So adapting my code to: while (true) { > val partitionOffsetMap = mutableMapOf<TopicPartition, > OffsetAndMetadata>() > try { > val records = consumer.poll(Duration.ofHours(1)) > records.forEach { > partitionOffsetMap[TopicPartition(it.topic(), > it.partition())] = OffsetAndMetadata(it.offset() + 1) > println("Received message on key ${it.key()} with value > ${it.value()} and offset ${it.offset()}") > if (Random.nextInt(0, 100) > 80) { > throw Exception("Thrown error on key ${it.key()} with > value ${it.value()}") > } > } > consumer.commitSync(partitionOffsetMap) > println("Committed offset > ${partitionOffsetMap.values.first().offset()} on partition > ${partitionOffsetMap.keys.first().partition()}") > } catch (e: Throwable) { > println("Error Processing message, resetting offset $e") > consumer.seek(partitionOffsetMap.keys.first(), > partitionOffsetMap.values.first().offset() - 1) > Thread.sleep(5000) > } > } makes everything work fine: Received message on key 1 with value 1 and offset 0 > Committed offset 1 on partition 0 > Received message on key 1 with value 2 and offset 1 > Committed offset 2 on partition 0 > Received message on key 1 with value 3 and offset 2 > Committed offset 3 on partition 0 > Received message on key 1 with value 4 and offset 3 > Error Processing message, resetting offset java.lang.Exception: Thrown > error on key 1 with value 4 > [main] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer > clientId=consumer-1, groupId=test-group-id] Seeking to offset 3 for > partition topic-0 > Received message on key 1 with value 4 and offset 3 > Error Processing message, resetting offset java.lang.Exception: Thrown > error on key 1 with value 4 > [main] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer > clientId=consumer-1, groupId=test-group-id] Seeking to offset 3 for > partition topic-0 > Received message on key 1 with value 4 and offset 3 > Committed offset 4 on partition 0 > ....... > Committed offset 19 on partition 0 > Received message on key 1 with value 20 and offset 19 > Error Processing message, resetting offset java.lang.Exception: Thrown > error on key 1 with value 20 > [main] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer > clientId=consumer-1, groupId=test-group-id] Seeking to offset 19 for > partition topic-0 > --------- RESTART --------- > Received message on key 1 with value 20 and offset 19 > Committed offset 20 on partition 0 > Received message on key 1 with value 21 and offset 20 > Committed offset 21 on partition 0 > ...... As you can see if an exception is caught it resets the offset to the same value before the consume, otherwise it commits the next offset. In fact, you can see that value 4 is reprocessed multiple times without a restart and value 20 which throws and exception is reprocessed after a restart because nothing has been committed. Now, kafka gurus, is this the best way to achieve this? Wouldn't be better to have a config like "enable.auto.advance.consume.offset" that disables the implicit behavior and gets advanced automatically on consume if it's true or on commit if it's false? Or is there already some config I've missed? Thanks -- Alessandro Tagliapietra On Wed, Sep 25, 2019 at 7:55 PM Alessandro Tagliapietra < tagliapietra.alessan...@gmail.com> wrote: > The only way this works is if I don't catch the exception, let the > consumer crash and fully restart it. > > Maybe the consumer has an internal state that always gets updated when it > receives a message during poll? > > -- > Alessandro Tagliapietra > > > > On Wed, Sep 25, 2019 at 7:37 PM Alessandro Tagliapietra < > tagliapietra.alessan...@gmail.com> wrote: > >> If I change my code slightly trying to manually commit offsets at >> partition level >> >> while (true) { >> try { >> val records = consumer.poll(Duration.ofHours(1)) >> val partitionOffsetMap = mutableMapOf<TopicPartition, >> OffsetAndMetadata>() >> records.forEach { >> partitionOffsetMap[TopicPartition(it.topic(), >> it.partition())] = OffsetAndMetadata(it.offset()) >> println("Received message on key ${it.key()} with value >> ${it.value()} and offset ${it.offset()}") >> if (Random.nextInt(0, 100) > 80) { >> throw Exception("Thrown error on key ${it.key()} with >> value ${it.value()}") >> } >> } >> consumer.commitSync(partitionOffsetMap) >> println("Committed offset >> ${partitionOffsetMap.values.first().offset()} on partition >> ${partitionOffsetMap.keys.first().partition()}") >> } catch (e: Throwable) { >> println("Error Processing message $e") >> Thread.sleep(5000) >> } >> } >> >> it logs this >> >> Received message on key 2 with value 93 and offset 92 >> Committed offset 92 on partition 2 >> Received message on key 2 with value 94 and offset 93 >> Error Processing message java.lang.Exception: Thrown error on key 2 with >> value 94 >> Received message on key 2 with value 95 and offset 94 >> Error Processing message java.lang.Exception: Thrown error on key 2 with >> value 95 >> Received message on key 2 with value 96 and offset 95 >> Committed offset 95 on partition 2 >> Received message on key 2 with value 97 and offset 96 >> Committed offset 96 on partition 2 >> Received message on key 2 with value 98 and offset 97 >> Error Processing message java.lang.Exception: Thrown error on key 2 with >> value 98 >> >> as you can see the offset increases even without logging the "Committed >> offset ...." part >> >> -- >> Alessandro Tagliapietra >> >> >> On Wed, Sep 25, 2019 at 7:09 PM Alessandro Tagliapietra < >> tagliapietra.alessan...@gmail.com> wrote: >> >>> It's still is in the topic because is weeks after the deletion threshold >>> (today's message with a 4 weeks retention). >>> So I assume the consumer just moves on to the next one. >>> As a test I've created this test script >>> https://gist.github.com/alex88/85ba5c3288a531a107ed6a22751c1088 >>> >>> After running this I get in the logs: >>> >>> Received message on key 2 with value 1 >>> Received message on key 2 with value 2 >>> Received message on key 2 with value 3 >>> Error Processing message java.lang.Exception: Thrown error on key 2 with >>> value 3 >>> Received message on key 2 with value 4 >>> Received message on key 2 with value 5 >>> Received message on key 2 with value 6 >>> Received message on key 2 with value 7 >>> Received message on key 2 with value 8 >>> Received message on key 2 with value 9 >>> Received message on key 2 with value 10 >>> Error Processing message java.lang.Exception: Thrown error on key 2 with >>> value 10 >>> Received message on key 2 with value 11 >>> Received message on key 2 with value 12 >>> Received message on key 2 with value 13 >>> Received message on key 2 with value 14 >>> Received message on key 2 with value 15 >>> Received message on key 2 with value 16 >>> Error Processing message java.lang.Exception: Thrown error on key 2 with >>> value 16 >>> >>> it seems that the offset just automatically increases, with another >>> client I also see all the values in the topic so they're not deleted. >>> >>> Shouldn't it have retried the message with value 3? >>> >>> -- >>> Alessandro Tagliapietra >>> >>> >>> On Wed, Sep 25, 2019 at 6:19 PM Steve Howard <steve.how...@confluent.io> >>> wrote: >>> >>>> "I'm just saying that a message which processing throws an exception is >>>> gone within minutes." >>>> >>>> Is the message no longer in the topic or is your consumer group current >>>> offset just higher than the offset of the message in question? >>>> >>>> On Wed, Sep 25, 2019 at 7:38 PM Alessandro Tagliapietra < >>>> tagliapietra.alessan...@gmail.com> wrote: >>>> >>>> > You mean the retention time of the topic? That's one month >>>> > I'm just saying that a message which processing throws an exception >>>> is gone >>>> > within minutes. >>>> > >>>> > I will handle duplicates better later, if we can't be sure that we >>>> don't >>>> > skip/lose these messages then it's useless to use kafka. >>>> > That's why I'm thinking if there's something wrong with my code. >>>> > >>>> > -- >>>> > Alessandro Tagliapietra >>>> > >>>> > >>>> > On Wed, Sep 25, 2019 at 4:29 PM M. Manna <manme...@gmail.com> wrote: >>>> > >>>> > > How long is your message retention set for ? Perhaps you want to >>>> increase >>>> > > that to a large enough value. >>>> > > >>>> > > I have almost identical use case, but I would strongly recommend >>>> that you >>>> > > handle duplicates as they are due to your process ( not Kafka >>>> duplicate). >>>> > > >>>> > > Regards, >>>> > > >>>> > > On Wed, 25 Sep 2019 at 22:37, Alessandro Tagliapietra < >>>> > > tagliapietra.alessan...@gmail.com> wrote: >>>> > > >>>> > > > I've disabled the auto commit, so what I thought that code would >>>> do is: >>>> > > > >>>> > > > - it fetches a message >>>> > > > - it processes a message >>>> > > > - if everything went fine it commits its offset >>>> > > > - if there's an exception, it didn't commit, so after the error >>>> it >>>> > would >>>> > > > just poll again and get the same message over and over >>>> > > > >>>> > > > instead it seems that message has been skipped somehow. >>>> > > > >>>> > > > In this case what the processing is doing is: >>>> > > > - opening new orders (with a start datetime and an uuid >>>> generated by >>>> > the >>>> > > > stream producing to the topic this consumer is consuming from) >>>> > > > - closing previously created orders (by setting an end datetime >>>> using >>>> > > the >>>> > > > ID stored in a store) >>>> > > > >>>> > > > And so far I've seen multiple exceptions like updating a non >>>> existent >>>> > > order >>>> > > > (meaning the create message didn't complete the request and it >>>> has been >>>> > > > skipped) or create with same ID (meaning that the backend >>>> returned an >>>> > > error >>>> > > > but correctly created the record on the database). >>>> > > > I would expect that the above code wouldn't just commit somehow >>>> and the >>>> > > > consumer would be stuck there indefinitely. >>>> > > > >>>> > > > Regarding ignoring/handling duplicates that's definitely >>>> something that >>>> > > > will be done, but right now i'm implementing it this way so that >>>> I can >>>> > > > confirm that I don't lose messages if I don't manually commit them >>>> > which >>>> > > > doesn't seem to be the case >>>> > > > >>>> > > > Any help is really appreciated >>>> > > > >>>> > > > -- >>>> > > > Alessandro Tagliapietra >>>> > > > >>>> > > > On Wed, Sep 25, 2019 at 2:20 PM M. Manna <manme...@gmail.com> >>>> wrote: >>>> > > > >>>> > > > > Hi, >>>> > > > > >>>> > > > > How are you managing your offset commits ? >>>> > > > > >>>> > > > > Also, if it’s a duplicate record issue ( sounds like database >>>> entry >>>> > to >>>> > > > > me), have you thought about ignoring/handling duplicates? >>>> > > > > >>>> > > > > Thanks, >>>> > > > > >>>> > > > > On Wed, 25 Sep 2019 at 21:28, Alessandro Tagliapietra < >>>> > > > > tagliapietra.alessan...@gmail.com> wrote: >>>> > > > > >>>> > > > > > Hello everyone, >>>> > > > > > >>>> > > > > > I've a consumer that fetches messages from a topic, for each >>>> > message >>>> > > it >>>> > > > > > makes an API call to our backend. To ensure that if a message >>>> fails >>>> > > it >>>> > > > > > tries again to process the message I've set max.poll.records >>>> to 1 >>>> > and >>>> > > > > I've >>>> > > > > > a code like this: >>>> > > > > > >>>> > > > > > consumer.subscribe(arrayListOf("orders")) >>>> > > > > > while (!stopProcessing.get()) { >>>> > > > > > try { >>>> > > > > > val records = consumer.poll(Duration.ofHours(1)) >>>> > > > > > records.forEach { >>>> > > > > > processRecord(it) >>>> > > > > > } >>>> > > > > > consumer.commitSync() >>>> > > > > > } catch (e: Exception) { >>>> > > > > > logger.error("Error processing order message", e) >>>> > > > > > Sentry.capture(e) >>>> > > > > > Thread.sleep(30000) >>>> > > > > > } >>>> > > > > > } >>>> > > > > > >>>> > > > > > now, if a request fails because the backend complains about a >>>> > > duplicate >>>> > > > > > primary ID, due to the nature of the error trying to insert >>>> the >>>> > same >>>> > > > > thing >>>> > > > > > would generate that same error over and over again. >>>> > > > > > Instead it seems that after some retries the message is >>>> skipped and >>>> > > it >>>> > > > > goes >>>> > > > > > one with the next one. >>>> > > > > > What could be the reason? If in the next loop iteration it >>>> gets a >>>> > > > message >>>> > > > > > from another partition it would also commit the offset of the >>>> other >>>> > > > > failed >>>> > > > > > partition? >>>> > > > > > >>>> > > > > > Thank you >>>> > > > > > >>>> > > > > > -- >>>> > > > > > Alessandro Tagliapietra >>>> > > > > > >>>> > > > > >>>> > > > >>>> > > >>>> > >>>> >>>