I don't agree with you on that,
1) auto.offset.reset is used only when the consumer doesn't have an offset
committed so the first time is consuming a partition
2) the fact that the consumer goes to the next message even if I don't
commit the previous one was something it wasn't clear to me because
consumer offset commit can be controlled automatically or manually. What
you are mentioning here is your own application problems. As I previously
recommended, you should not hold your application in infinite loop just to
process these failed messages.
Also, I would certainly not recommend stoppin
Ok I think I've found the problem
looking at
https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-java.time.Duration-
it
says:
On each poll, consumer will try to use the last consumed offset as the
> starting offset and fetch sequentially. The last consume
The only way this works is if I don't catch the exception, let the consumer
crash and fully restart it.
Maybe the consumer has an internal state that always gets updated when it
receives a message during poll?
--
Alessandro Tagliapietra
On Wed, Sep 25, 2019 at 7:37 PM Alessandro Tagliapietra <
If I change my code slightly trying to manually commit offsets at partition
level
while (true) {
try {
val records = consumer.poll(Duration.ofHours(1))
val partitionOffsetMap = mutableMapOf()
records.forEach {
partitionOffsetMap[Topic
It's still is in the topic because is weeks after the deletion threshold
(today's message with a 4 weeks retention).
So I assume the consumer just moves on to the next one.
As a test I've created this test script
https://gist.github.com/alex88/85ba5c3288a531a107ed6a22751c1088
After running this I
"I'm just saying that a message which processing throws an exception is
gone within minutes."
Is the message no longer in the topic or is your consumer group current
offset just higher than the offset of the message in question?
On Wed, Sep 25, 2019 at 7:38 PM Alessandro Tagliapietra <
tagliapiet
You mean the retention time of the topic? That's one month
I'm just saying that a message which processing throws an exception is gone
within minutes.
I will handle duplicates better later, if we can't be sure that we don't
skip/lose these messages then it's useless to use kafka.
That's why I'm th
How long is your message retention set for ? Perhaps you want to increase
that to a large enough value.
I have almost identical use case, but I would strongly recommend that you
handle duplicates as they are due to your process ( not Kafka duplicate).
Regards,
On Wed, 25 Sep 2019 at 22:37, Aless
I've disabled the auto commit, so what I thought that code would do is:
- it fetches a message
- it processes a message
- if everything went fine it commits its offset
- if there's an exception, it didn't commit, so after the error it would
just poll again and get the same message over and ove
Hi,
How are you managing your offset commits ?
Also, if it’s a duplicate record issue ( sounds like database entry to
me), have you thought about ignoring/handling duplicates?
Thanks,
On Wed, 25 Sep 2019 at 21:28, Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:
> Hello eve
Hello everyone,
I've a consumer that fetches messages from a topic, for each message it
makes an API call to our backend. To ensure that if a message fails it
tries again to process the message I've set max.poll.records to 1 and I've
a code like this:
consumer.subscribe(arrayListOf("orders"))
12 matches
Mail list logo