I don't agree with you on that, 1) auto.offset.reset is used only when the consumer doesn't have an offset committed so the first time is consuming a partition 2) the fact that the consumer goes to the next message even if I don't commit the previous one was something it wasn't clear to me because by reading docs about manual commit:
> This is useful when the consumption of the messages is coupled with some > processing logic and hence a message should not be considered as consumed > until it is completed processing It doesn't say anything about the consumer automatically advancing offset between poll calls internally even without committing the previous ones. I wasn't aware the above applies only across restarts 3) What I'm mentioning here is the above, not my own application problems, my own application problems are how I handle the exception and the guarantees I need to have for each message. It doesn't make sense in my case to loop forever on a message but maybe in other cases it does, maybe if you have a financial transaction in each message and the service that handles them is down for hours you might want to wait instead of losing a transaction. Sure I'll write code around it to make duplicates idempotent or to try a limited amount of times and/or with an exponential backoff. What I wanted to do was being able to retry processing a message without the cost of fully restarting the consumer/container. Without the manual seek it wasn't obvious why it was skipping messages. Now that I know that I have full control over the flow I can add the exception handling logic, idempotency or whatever I need, because I know for sure that I won't lose messages. Thanks -- Alessandro Tagliapietra On Thu, Sep 26, 2019 at 1:54 AM M. Manna <manme...@gmail.com> wrote: > consumer offset commit can be controlled automatically or manually. What > you are mentioning here is your own application problems. As I previously > recommended, you should not hold your application in infinite loop just to > process these failed messages. > > Also, I would certainly not recommend stopping at a specific offset forever > as you are saying. Kafka is a service which delivers you the messages (with > consistency and speed). What/how of the message is entirely up to you. > Using auto.offset.reset you have a choice to start either from the > beginning, or the latest. > > Thanks, > > On Thu, 26 Sep 2019 at 04:14, Alessandro Tagliapietra < > tagliapietra.alessan...@gmail.com> wrote: > > > Ok I think I've found the problem > > > > looking at > > > > > https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-java.time.Duration- > > it > > says: > > > > On each poll, consumer will try to use the last consumed offset as the > > > starting offset and fetch sequentially. The last consumed offset can be > > > manually set through seek(TopicPartition, long) or automatically set as > > the > > > last committed offset for the subscribed list of partitions > > > > > > so the part where it says "automatically set as the last committed > offset" > > happens only on consumer start, when not restarting the consumer, what is > > happening is "consumer will try to use the last consumed offset" meaning > > that it'll use the last consumed offset no matter if you've committed it > or > > not. > > So adapting my code to: > > > > while (true) { > > > val partitionOffsetMap = mutableMapOf<TopicPartition, > > > OffsetAndMetadata>() > > > try { > > > val records = consumer.poll(Duration.ofHours(1)) > > > records.forEach { > > > partitionOffsetMap[TopicPartition(it.topic(), > > > it.partition())] = OffsetAndMetadata(it.offset() + 1) > > > println("Received message on key ${it.key()} with value > > > ${it.value()} and offset ${it.offset()}") > > > if (Random.nextInt(0, 100) > 80) { > > > throw Exception("Thrown error on key ${it.key()} > with > > > value ${it.value()}") > > > } > > > } > > > consumer.commitSync(partitionOffsetMap) > > > println("Committed offset > > > ${partitionOffsetMap.values.first().offset()} on partition > > > ${partitionOffsetMap.keys.first().partition()}") > > > } catch (e: Throwable) { > > > println("Error Processing message, resetting offset $e") > > > consumer.seek(partitionOffsetMap.keys.first(), > > > partitionOffsetMap.values.first().offset() - 1) > > > Thread.sleep(5000) > > > } > > > } > > > > > > makes everything work fine: > > > > Received message on key 1 with value 1 and offset 0 > > > Committed offset 1 on partition 0 > > > Received message on key 1 with value 2 and offset 1 > > > Committed offset 2 on partition 0 > > > Received message on key 1 with value 3 and offset 2 > > > Committed offset 3 on partition 0 > > > Received message on key 1 with value 4 and offset 3 > > > Error Processing message, resetting offset java.lang.Exception: Thrown > > > error on key 1 with value 4 > > > [main] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer > > > clientId=consumer-1, groupId=test-group-id] Seeking to offset 3 for > > > partition topic-0 > > > Received message on key 1 with value 4 and offset 3 > > > Error Processing message, resetting offset java.lang.Exception: Thrown > > > error on key 1 with value 4 > > > [main] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer > > > clientId=consumer-1, groupId=test-group-id] Seeking to offset 3 for > > > partition topic-0 > > > Received message on key 1 with value 4 and offset 3 > > > Committed offset 4 on partition 0 > > > ....... > > > Committed offset 19 on partition 0 > > > Received message on key 1 with value 20 and offset 19 > > > Error Processing message, resetting offset java.lang.Exception: Thrown > > > error on key 1 with value 20 > > > [main] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer > > > clientId=consumer-1, groupId=test-group-id] Seeking to offset 19 for > > > partition topic-0 > > > --------- RESTART --------- > > > Received message on key 1 with value 20 and offset 19 > > > Committed offset 20 on partition 0 > > > Received message on key 1 with value 21 and offset 20 > > > Committed offset 21 on partition 0 > > > ...... > > > > > > As you can see if an exception is caught it resets the offset to the same > > value before the consume, otherwise it commits the next offset. > > In fact, you can see that value 4 is reprocessed multiple times without a > > restart and value 20 which throws and exception is reprocessed after a > > restart because nothing has been committed. > > > > Now, kafka gurus, is this the best way to achieve this? Wouldn't be > better > > to have a config like "enable.auto.advance.consume.offset" that disables > > the implicit behavior and gets advanced automatically on consume if it's > > true or on commit if it's false? > > Or is there already some config I've missed? > > > > Thanks > > > > -- > > Alessandro Tagliapietra > > > > > > On Wed, Sep 25, 2019 at 7:55 PM Alessandro Tagliapietra < > > tagliapietra.alessan...@gmail.com> wrote: > > > > > The only way this works is if I don't catch the exception, let the > > > consumer crash and fully restart it. > > > > > > Maybe the consumer has an internal state that always gets updated when > it > > > receives a message during poll? > > > > > > -- > > > Alessandro Tagliapietra > > > > > > > > > > > > On Wed, Sep 25, 2019 at 7:37 PM Alessandro Tagliapietra < > > > tagliapietra.alessan...@gmail.com> wrote: > > > > > >> If I change my code slightly trying to manually commit offsets at > > >> partition level > > >> > > >> while (true) { > > >> try { > > >> val records = consumer.poll(Duration.ofHours(1)) > > >> val partitionOffsetMap = mutableMapOf<TopicPartition, > > >> OffsetAndMetadata>() > > >> records.forEach { > > >> partitionOffsetMap[TopicPartition(it.topic(), > > >> it.partition())] = OffsetAndMetadata(it.offset()) > > >> println("Received message on key ${it.key()} with > value > > >> ${it.value()} and offset ${it.offset()}") > > >> if (Random.nextInt(0, 100) > 80) { > > >> throw Exception("Thrown error on key ${it.key()} > > with > > >> value ${it.value()}") > > >> } > > >> } > > >> consumer.commitSync(partitionOffsetMap) > > >> println("Committed offset > > >> ${partitionOffsetMap.values.first().offset()} on partition > > >> ${partitionOffsetMap.keys.first().partition()}") > > >> } catch (e: Throwable) { > > >> println("Error Processing message $e") > > >> Thread.sleep(5000) > > >> } > > >> } > > >> > > >> it logs this > > >> > > >> Received message on key 2 with value 93 and offset 92 > > >> Committed offset 92 on partition 2 > > >> Received message on key 2 with value 94 and offset 93 > > >> Error Processing message java.lang.Exception: Thrown error on key 2 > with > > >> value 94 > > >> Received message on key 2 with value 95 and offset 94 > > >> Error Processing message java.lang.Exception: Thrown error on key 2 > with > > >> value 95 > > >> Received message on key 2 with value 96 and offset 95 > > >> Committed offset 95 on partition 2 > > >> Received message on key 2 with value 97 and offset 96 > > >> Committed offset 96 on partition 2 > > >> Received message on key 2 with value 98 and offset 97 > > >> Error Processing message java.lang.Exception: Thrown error on key 2 > with > > >> value 98 > > >> > > >> as you can see the offset increases even without logging the > "Committed > > >> offset ...." part > > >> > > >> -- > > >> Alessandro Tagliapietra > > >> > > >> > > >> On Wed, Sep 25, 2019 at 7:09 PM Alessandro Tagliapietra < > > >> tagliapietra.alessan...@gmail.com> wrote: > > >> > > >>> It's still is in the topic because is weeks after the deletion > > threshold > > >>> (today's message with a 4 weeks retention). > > >>> So I assume the consumer just moves on to the next one. > > >>> As a test I've created this test script > > >>> https://gist.github.com/alex88/85ba5c3288a531a107ed6a22751c1088 > > >>> > > >>> After running this I get in the logs: > > >>> > > >>> Received message on key 2 with value 1 > > >>> Received message on key 2 with value 2 > > >>> Received message on key 2 with value 3 > > >>> Error Processing message java.lang.Exception: Thrown error on key 2 > > with > > >>> value 3 > > >>> Received message on key 2 with value 4 > > >>> Received message on key 2 with value 5 > > >>> Received message on key 2 with value 6 > > >>> Received message on key 2 with value 7 > > >>> Received message on key 2 with value 8 > > >>> Received message on key 2 with value 9 > > >>> Received message on key 2 with value 10 > > >>> Error Processing message java.lang.Exception: Thrown error on key 2 > > with > > >>> value 10 > > >>> Received message on key 2 with value 11 > > >>> Received message on key 2 with value 12 > > >>> Received message on key 2 with value 13 > > >>> Received message on key 2 with value 14 > > >>> Received message on key 2 with value 15 > > >>> Received message on key 2 with value 16 > > >>> Error Processing message java.lang.Exception: Thrown error on key 2 > > with > > >>> value 16 > > >>> > > >>> it seems that the offset just automatically increases, with another > > >>> client I also see all the values in the topic so they're not deleted. > > >>> > > >>> Shouldn't it have retried the message with value 3? > > >>> > > >>> -- > > >>> Alessandro Tagliapietra > > >>> > > >>> > > >>> On Wed, Sep 25, 2019 at 6:19 PM Steve Howard < > > steve.how...@confluent.io> > > >>> wrote: > > >>> > > >>>> "I'm just saying that a message which processing throws an exception > > is > > >>>> gone within minutes." > > >>>> > > >>>> Is the message no longer in the topic or is your consumer group > > current > > >>>> offset just higher than the offset of the message in question? > > >>>> > > >>>> On Wed, Sep 25, 2019 at 7:38 PM Alessandro Tagliapietra < > > >>>> tagliapietra.alessan...@gmail.com> wrote: > > >>>> > > >>>> > You mean the retention time of the topic? That's one month > > >>>> > I'm just saying that a message which processing throws an > exception > > >>>> is gone > > >>>> > within minutes. > > >>>> > > > >>>> > I will handle duplicates better later, if we can't be sure that we > > >>>> don't > > >>>> > skip/lose these messages then it's useless to use kafka. > > >>>> > That's why I'm thinking if there's something wrong with my code. > > >>>> > > > >>>> > -- > > >>>> > Alessandro Tagliapietra > > >>>> > > > >>>> > > > >>>> > On Wed, Sep 25, 2019 at 4:29 PM M. Manna <manme...@gmail.com> > > wrote: > > >>>> > > > >>>> > > How long is your message retention set for ? Perhaps you want to > > >>>> increase > > >>>> > > that to a large enough value. > > >>>> > > > > >>>> > > I have almost identical use case, but I would strongly recommend > > >>>> that you > > >>>> > > handle duplicates as they are due to your process ( not Kafka > > >>>> duplicate). > > >>>> > > > > >>>> > > Regards, > > >>>> > > > > >>>> > > On Wed, 25 Sep 2019 at 22:37, Alessandro Tagliapietra < > > >>>> > > tagliapietra.alessan...@gmail.com> wrote: > > >>>> > > > > >>>> > > > I've disabled the auto commit, so what I thought that code > would > > >>>> do is: > > >>>> > > > > > >>>> > > > - it fetches a message > > >>>> > > > - it processes a message > > >>>> > > > - if everything went fine it commits its offset > > >>>> > > > - if there's an exception, it didn't commit, so after the > error > > >>>> it > > >>>> > would > > >>>> > > > just poll again and get the same message over and over > > >>>> > > > > > >>>> > > > instead it seems that message has been skipped somehow. > > >>>> > > > > > >>>> > > > In this case what the processing is doing is: > > >>>> > > > - opening new orders (with a start datetime and an uuid > > >>>> generated by > > >>>> > the > > >>>> > > > stream producing to the topic this consumer is consuming from) > > >>>> > > > - closing previously created orders (by setting an end > datetime > > >>>> using > > >>>> > > the > > >>>> > > > ID stored in a store) > > >>>> > > > > > >>>> > > > And so far I've seen multiple exceptions like updating a non > > >>>> existent > > >>>> > > order > > >>>> > > > (meaning the create message didn't complete the request and it > > >>>> has been > > >>>> > > > skipped) or create with same ID (meaning that the backend > > >>>> returned an > > >>>> > > error > > >>>> > > > but correctly created the record on the database). > > >>>> > > > I would expect that the above code wouldn't just commit > somehow > > >>>> and the > > >>>> > > > consumer would be stuck there indefinitely. > > >>>> > > > > > >>>> > > > Regarding ignoring/handling duplicates that's definitely > > >>>> something that > > >>>> > > > will be done, but right now i'm implementing it this way so > that > > >>>> I can > > >>>> > > > confirm that I don't lose messages if I don't manually commit > > them > > >>>> > which > > >>>> > > > doesn't seem to be the case > > >>>> > > > > > >>>> > > > Any help is really appreciated > > >>>> > > > > > >>>> > > > -- > > >>>> > > > Alessandro Tagliapietra > > >>>> > > > > > >>>> > > > On Wed, Sep 25, 2019 at 2:20 PM M. Manna <manme...@gmail.com> > > >>>> wrote: > > >>>> > > > > > >>>> > > > > Hi, > > >>>> > > > > > > >>>> > > > > How are you managing your offset commits ? > > >>>> > > > > > > >>>> > > > > Also, if it’s a duplicate record issue ( sounds like > database > > >>>> entry > > >>>> > to > > >>>> > > > > me), have you thought about ignoring/handling duplicates? > > >>>> > > > > > > >>>> > > > > Thanks, > > >>>> > > > > > > >>>> > > > > On Wed, 25 Sep 2019 at 21:28, Alessandro Tagliapietra < > > >>>> > > > > tagliapietra.alessan...@gmail.com> wrote: > > >>>> > > > > > > >>>> > > > > > Hello everyone, > > >>>> > > > > > > > >>>> > > > > > I've a consumer that fetches messages from a topic, for > each > > >>>> > message > > >>>> > > it > > >>>> > > > > > makes an API call to our backend. To ensure that if a > > message > > >>>> fails > > >>>> > > it > > >>>> > > > > > tries again to process the message I've set > max.poll.records > > >>>> to 1 > > >>>> > and > > >>>> > > > > I've > > >>>> > > > > > a code like this: > > >>>> > > > > > > > >>>> > > > > > consumer.subscribe(arrayListOf("orders")) > > >>>> > > > > > while (!stopProcessing.get()) { > > >>>> > > > > > try { > > >>>> > > > > > val records = consumer.poll(Duration.ofHours(1)) > > >>>> > > > > > records.forEach { > > >>>> > > > > > processRecord(it) > > >>>> > > > > > } > > >>>> > > > > > consumer.commitSync() > > >>>> > > > > > } catch (e: Exception) { > > >>>> > > > > > logger.error("Error processing order message", > e) > > >>>> > > > > > Sentry.capture(e) > > >>>> > > > > > Thread.sleep(30000) > > >>>> > > > > > } > > >>>> > > > > > } > > >>>> > > > > > > > >>>> > > > > > now, if a request fails because the backend complains > about > > a > > >>>> > > duplicate > > >>>> > > > > > primary ID, due to the nature of the error trying to > insert > > >>>> the > > >>>> > same > > >>>> > > > > thing > > >>>> > > > > > would generate that same error over and over again. > > >>>> > > > > > Instead it seems that after some retries the message is > > >>>> skipped and > > >>>> > > it > > >>>> > > > > goes > > >>>> > > > > > one with the next one. > > >>>> > > > > > What could be the reason? If in the next loop iteration it > > >>>> gets a > > >>>> > > > message > > >>>> > > > > > from another partition it would also commit the offset of > > the > > >>>> other > > >>>> > > > > failed > > >>>> > > > > > partition? > > >>>> > > > > > > > >>>> > > > > > Thank you > > >>>> > > > > > > > >>>> > > > > > -- > > >>>> > > > > > Alessandro Tagliapietra > > >>>> > > > > > > > >>>> > > > > > > >>>> > > > > > >>>> > > > > >>>> > > > >>>> > > >>> > > >