Re: Consumer commit logic

2019-09-26 Thread Alessandro Tagliapietra
I don't agree with you on that, 1) auto.offset.reset is used only when the consumer doesn't have an offset committed so the first time is consuming a partition 2) the fact that the consumer goes to the next message even if I don't commit the previous one was something it wasn't clear to me because

Re: Consumer commit logic

2019-09-26 Thread M. Manna
consumer offset commit can be controlled automatically or manually. What you are mentioning here is your own application problems. As I previously recommended, you should not hold your application in infinite loop just to process these failed messages. Also, I would certainly not recommend stoppin

Re: Consumer commit logic

2019-09-25 Thread Alessandro Tagliapietra
Ok I think I've found the problem looking at https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-java.time.Duration- it says: On each poll, consumer will try to use the last consumed offset as the > starting offset and fetch sequentially. The last consume

Re: Consumer commit logic

2019-09-25 Thread Alessandro Tagliapietra
The only way this works is if I don't catch the exception, let the consumer crash and fully restart it. Maybe the consumer has an internal state that always gets updated when it receives a message during poll? -- Alessandro Tagliapietra On Wed, Sep 25, 2019 at 7:37 PM Alessandro Tagliapietra <

Re: Consumer commit logic

2019-09-25 Thread Alessandro Tagliapietra
If I change my code slightly trying to manually commit offsets at partition level while (true) { try { val records = consumer.poll(Duration.ofHours(1)) val partitionOffsetMap = mutableMapOf() records.forEach { partitionOffsetMap[Topic

Re: Consumer commit logic

2019-09-25 Thread Alessandro Tagliapietra
It's still is in the topic because is weeks after the deletion threshold (today's message with a 4 weeks retention). So I assume the consumer just moves on to the next one. As a test I've created this test script https://gist.github.com/alex88/85ba5c3288a531a107ed6a22751c1088 After running this I

Re: Consumer commit logic

2019-09-25 Thread Steve Howard
"I'm just saying that a message which processing throws an exception is gone within minutes." Is the message no longer in the topic or is your consumer group current offset just higher than the offset of the message in question? On Wed, Sep 25, 2019 at 7:38 PM Alessandro Tagliapietra < tagliapiet

Re: Consumer commit logic

2019-09-25 Thread Alessandro Tagliapietra
You mean the retention time of the topic? That's one month I'm just saying that a message which processing throws an exception is gone within minutes. I will handle duplicates better later, if we can't be sure that we don't skip/lose these messages then it's useless to use kafka. That's why I'm th

Re: Consumer commit logic

2019-09-25 Thread M. Manna
How long is your message retention set for ? Perhaps you want to increase that to a large enough value. I have almost identical use case, but I would strongly recommend that you handle duplicates as they are due to your process ( not Kafka duplicate). Regards, On Wed, 25 Sep 2019 at 22:37, Aless

Re: Consumer commit logic

2019-09-25 Thread Alessandro Tagliapietra
I've disabled the auto commit, so what I thought that code would do is: - it fetches a message - it processes a message - if everything went fine it commits its offset - if there's an exception, it didn't commit, so after the error it would just poll again and get the same message over and ove

Re: Consumer commit logic

2019-09-25 Thread M. Manna
Hi, How are you managing your offset commits ? Also, if it’s a duplicate record issue ( sounds like database entry to me), have you thought about ignoring/handling duplicates? Thanks, On Wed, 25 Sep 2019 at 21:28, Alessandro Tagliapietra < tagliapietra.alessan...@gmail.com> wrote: > Hello eve

Consumer commit logic

2019-09-25 Thread Alessandro Tagliapietra
Hello everyone, I've a consumer that fetches messages from a topic, for each message it makes an API call to our backend. To ensure that if a message fails it tries again to process the message I've set max.poll.records to 1 and I've a code like this: consumer.subscribe(arrayListOf("orders"))