I don't agree with you on that,

1) auto.offset.reset is used only when the consumer doesn't have an offset
committed so the first time is consuming a partition
2) the fact that the consumer goes to the next message even if I don't
commit the previous one was something it wasn't clear to me because by
reading docs about manual commit:

> This is useful when the consumption of the messages is coupled with some
> processing logic and hence a message should not be considered as consumed
> until it is completed processing

    It doesn't say anything about the consumer automatically advancing
offset between poll calls internally even without committing the previous
ones. I wasn't aware the above applies only across restarts
3) What I'm mentioning here is the above, not my own application problems,
my own application problems are how I handle the exception and the
guarantees I need to have for each message.

It doesn't make sense in my case to loop forever on a message but maybe in
other cases it does, maybe if you have a financial transaction in each
message and the service that handles them is down for hours you might want
to wait instead of losing a transaction.
Sure I'll write code around it to make duplicates idempotent or to try a
limited amount of times and/or with an exponential backoff.
What I wanted to do was being able to retry processing a message without
the cost of fully restarting the consumer/container.
Without the manual seek it wasn't obvious why it was skipping messages. Now
that I know that I have full control over the flow I can add the exception
handling logic, idempotency or whatever I need, because I know for sure
that I won't lose messages.

Thanks

--
Alessandro Tagliapietra


On Thu, Sep 26, 2019 at 1:54 AM M. Manna <manme...@gmail.com> wrote:

> consumer offset commit can be controlled automatically or manually. What
> you are mentioning here is your own application problems. As I previously
> recommended, you should not hold your application in infinite loop just to
> process these failed messages.
>
> Also, I would certainly not recommend stopping at a specific offset forever
> as you are saying. Kafka is a service which delivers you the messages (with
> consistency and speed). What/how of the message is entirely up to you.
> Using auto.offset.reset you have a choice to start either from the
> beginning, or the latest.
>
> Thanks,
>
> On Thu, 26 Sep 2019 at 04:14, Alessandro Tagliapietra <
> tagliapietra.alessan...@gmail.com> wrote:
>
> > Ok I think I've found the problem
> >
> > looking at
> >
> >
> https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-java.time.Duration-
> > it
> > says:
> >
> > On each poll, consumer will try to use the last consumed offset as the
> > > starting offset and fetch sequentially. The last consumed offset can be
> > > manually set through seek(TopicPartition, long) or automatically set as
> > the
> > > last committed offset for the subscribed list of partitions
> >
> >
> > so the part where it says "automatically set as the last committed
> offset"
> > happens only on consumer start, when not restarting the consumer, what is
> > happening is "consumer will try to use the last consumed offset" meaning
> > that it'll use the last consumed offset no matter if you've committed it
> or
> > not.
> > So adapting my code to:
> >
> >     while (true) {
> > >         val partitionOffsetMap = mutableMapOf<TopicPartition,
> > > OffsetAndMetadata>()
> > >         try {
> > >             val records = consumer.poll(Duration.ofHours(1))
> > >             records.forEach {
> > >                 partitionOffsetMap[TopicPartition(it.topic(),
> > > it.partition())] = OffsetAndMetadata(it.offset() + 1)
> > >                 println("Received message on key ${it.key()} with value
> > > ${it.value()} and offset ${it.offset()}")
> > >                 if (Random.nextInt(0, 100) > 80) {
> > >                     throw Exception("Thrown error on key ${it.key()}
> with
> > > value ${it.value()}")
> > >                 }
> > >             }
> > >             consumer.commitSync(partitionOffsetMap)
> > >             println("Committed offset
> > > ${partitionOffsetMap.values.first().offset()} on partition
> > > ${partitionOffsetMap.keys.first().partition()}")
> > >         } catch (e: Throwable) {
> > >             println("Error Processing message, resetting offset $e")
> > >             consumer.seek(partitionOffsetMap.keys.first(),
> > > partitionOffsetMap.values.first().offset() - 1)
> > >             Thread.sleep(5000)
> > >         }
> > >     }
> >
> >
> > makes everything work fine:
> >
> > Received message on key 1 with value 1 and offset 0
> > > Committed offset 1 on partition 0
> > > Received message on key 1 with value 2 and offset 1
> > > Committed offset 2 on partition 0
> > > Received message on key 1 with value 3 and offset 2
> > > Committed offset 3 on partition 0
> > > Received message on key 1 with value 4 and offset 3
> > > Error Processing message, resetting offset java.lang.Exception: Thrown
> > > error on key 1 with value 4
> > > [main] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer
> > > clientId=consumer-1, groupId=test-group-id] Seeking to offset 3 for
> > > partition topic-0
> > > Received message on key 1 with value 4 and offset 3
> > > Error Processing message, resetting offset java.lang.Exception: Thrown
> > > error on key 1 with value 4
> > > [main] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer
> > > clientId=consumer-1, groupId=test-group-id] Seeking to offset 3 for
> > > partition topic-0
> > > Received message on key 1 with value 4 and offset 3
> > > Committed offset 4 on partition 0
> > > .......
> > > Committed offset 19 on partition 0
> > > Received message on key 1 with value 20 and offset 19
> > > Error Processing message, resetting offset java.lang.Exception: Thrown
> > > error on key 1 with value 20
> > > [main] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer
> > > clientId=consumer-1, groupId=test-group-id] Seeking to offset 19 for
> > > partition topic-0
> > > --------- RESTART ---------
> > > Received message on key 1 with value 20 and offset 19
> > > Committed offset 20 on partition 0
> > > Received message on key 1 with value 21 and offset 20
> > > Committed offset 21 on partition 0
> > > ......
> >
> >
> > As you can see if an exception is caught it resets the offset to the same
> > value before the consume, otherwise it commits the next offset.
> > In fact, you can see that value 4 is reprocessed multiple times without a
> > restart and value 20 which throws and exception is reprocessed after a
> > restart because nothing has been committed.
> >
> > Now, kafka gurus, is this the best way to achieve this? Wouldn't be
> better
> > to have a config like "enable.auto.advance.consume.offset" that disables
> > the implicit behavior and gets advanced automatically on consume if it's
> > true or on commit if it's false?
> > Or is there already some config I've missed?
> >
> > Thanks
> >
> > --
> > Alessandro Tagliapietra
> >
> >
> > On Wed, Sep 25, 2019 at 7:55 PM Alessandro Tagliapietra <
> > tagliapietra.alessan...@gmail.com> wrote:
> >
> > > The only way this works is if I don't catch the exception, let the
> > > consumer crash and fully restart it.
> > >
> > > Maybe the consumer has an internal state that always gets updated when
> it
> > > receives a message during poll?
> > >
> > > --
> > > Alessandro Tagliapietra
> > >
> > >
> > >
> > > On Wed, Sep 25, 2019 at 7:37 PM Alessandro Tagliapietra <
> > > tagliapietra.alessan...@gmail.com> wrote:
> > >
> > >> If I change my code slightly trying to manually commit offsets at
> > >> partition level
> > >>
> > >>     while (true) {
> > >>         try {
> > >>             val records = consumer.poll(Duration.ofHours(1))
> > >>             val partitionOffsetMap = mutableMapOf<TopicPartition,
> > >> OffsetAndMetadata>()
> > >>             records.forEach {
> > >>                 partitionOffsetMap[TopicPartition(it.topic(),
> > >> it.partition())] = OffsetAndMetadata(it.offset())
> > >>                 println("Received message on key ${it.key()} with
> value
> > >> ${it.value()} and offset ${it.offset()}")
> > >>                 if (Random.nextInt(0, 100) > 80) {
> > >>                     throw Exception("Thrown error on key ${it.key()}
> > with
> > >> value ${it.value()}")
> > >>                 }
> > >>             }
> > >>             consumer.commitSync(partitionOffsetMap)
> > >>             println("Committed offset
> > >> ${partitionOffsetMap.values.first().offset()} on partition
> > >> ${partitionOffsetMap.keys.first().partition()}")
> > >>         } catch (e: Throwable) {
> > >>             println("Error Processing message $e")
> > >>             Thread.sleep(5000)
> > >>         }
> > >>     }
> > >>
> > >> it logs this
> > >>
> > >> Received message on key 2 with value 93 and offset 92
> > >> Committed offset 92 on partition 2
> > >> Received message on key 2 with value 94 and offset 93
> > >> Error Processing message java.lang.Exception: Thrown error on key 2
> with
> > >> value 94
> > >> Received message on key 2 with value 95 and offset 94
> > >> Error Processing message java.lang.Exception: Thrown error on key 2
> with
> > >> value 95
> > >> Received message on key 2 with value 96 and offset 95
> > >> Committed offset 95 on partition 2
> > >> Received message on key 2 with value 97 and offset 96
> > >> Committed offset 96 on partition 2
> > >> Received message on key 2 with value 98 and offset 97
> > >> Error Processing message java.lang.Exception: Thrown error on key 2
> with
> > >> value 98
> > >>
> > >> as you can see the offset increases even without logging the
> "Committed
> > >> offset ...." part
> > >>
> > >> --
> > >> Alessandro Tagliapietra
> > >>
> > >>
> > >> On Wed, Sep 25, 2019 at 7:09 PM Alessandro Tagliapietra <
> > >> tagliapietra.alessan...@gmail.com> wrote:
> > >>
> > >>> It's still is in the topic because is weeks after the deletion
> > threshold
> > >>> (today's message with a 4 weeks retention).
> > >>> So I assume the consumer just moves on to the next one.
> > >>> As a test I've created this test script
> > >>> https://gist.github.com/alex88/85ba5c3288a531a107ed6a22751c1088
> > >>>
> > >>> After running this I get in the logs:
> > >>>
> > >>> Received message on key 2 with value 1
> > >>> Received message on key 2 with value 2
> > >>> Received message on key 2 with value 3
> > >>> Error Processing message java.lang.Exception: Thrown error on key 2
> > with
> > >>> value 3
> > >>> Received message on key 2 with value 4
> > >>> Received message on key 2 with value 5
> > >>> Received message on key 2 with value 6
> > >>> Received message on key 2 with value 7
> > >>> Received message on key 2 with value 8
> > >>> Received message on key 2 with value 9
> > >>> Received message on key 2 with value 10
> > >>> Error Processing message java.lang.Exception: Thrown error on key 2
> > with
> > >>> value 10
> > >>> Received message on key 2 with value 11
> > >>> Received message on key 2 with value 12
> > >>> Received message on key 2 with value 13
> > >>> Received message on key 2 with value 14
> > >>> Received message on key 2 with value 15
> > >>> Received message on key 2 with value 16
> > >>> Error Processing message java.lang.Exception: Thrown error on key 2
> > with
> > >>> value 16
> > >>>
> > >>> it seems that the offset just automatically increases, with another
> > >>> client I also see all the values in the topic so they're not deleted.
> > >>>
> > >>> Shouldn't it have retried the message with value 3?
> > >>>
> > >>> --
> > >>> Alessandro Tagliapietra
> > >>>
> > >>>
> > >>> On Wed, Sep 25, 2019 at 6:19 PM Steve Howard <
> > steve.how...@confluent.io>
> > >>> wrote:
> > >>>
> > >>>> "I'm just saying that a message which processing throws an exception
> > is
> > >>>> gone within minutes."
> > >>>>
> > >>>> Is the message no longer in the topic or is your consumer group
> > current
> > >>>> offset just higher than the offset of the message in question?
> > >>>>
> > >>>> On Wed, Sep 25, 2019 at 7:38 PM Alessandro Tagliapietra <
> > >>>> tagliapietra.alessan...@gmail.com> wrote:
> > >>>>
> > >>>> > You mean the retention time of the topic? That's one month
> > >>>> > I'm just saying that a message which processing throws an
> exception
> > >>>> is gone
> > >>>> > within minutes.
> > >>>> >
> > >>>> > I will handle duplicates better later, if we can't be sure that we
> > >>>> don't
> > >>>> > skip/lose these messages then it's useless to use kafka.
> > >>>> > That's why I'm thinking if there's something wrong with my code.
> > >>>> >
> > >>>> > --
> > >>>> > Alessandro Tagliapietra
> > >>>> >
> > >>>> >
> > >>>> > On Wed, Sep 25, 2019 at 4:29 PM M. Manna <manme...@gmail.com>
> > wrote:
> > >>>> >
> > >>>> > > How long is your message retention set for ? Perhaps you want to
> > >>>> increase
> > >>>> > > that to a large enough value.
> > >>>> > >
> > >>>> > > I have almost identical use case, but I would strongly recommend
> > >>>> that you
> > >>>> > > handle duplicates as they are due to your process ( not Kafka
> > >>>> duplicate).
> > >>>> > >
> > >>>> > > Regards,
> > >>>> > >
> > >>>> > > On Wed, 25 Sep 2019 at 22:37, Alessandro Tagliapietra <
> > >>>> > > tagliapietra.alessan...@gmail.com> wrote:
> > >>>> > >
> > >>>> > > > I've disabled the auto commit, so what I thought that code
> would
> > >>>> do is:
> > >>>> > > >
> > >>>> > > >  - it fetches a message
> > >>>> > > >  - it processes a message
> > >>>> > > >  - if everything went fine it commits its offset
> > >>>> > > >  - if there's an exception, it didn't commit, so after the
> error
> > >>>> it
> > >>>> > would
> > >>>> > > > just poll again and get the same message over and over
> > >>>> > > >
> > >>>> > > > instead it seems that message has been skipped somehow.
> > >>>> > > >
> > >>>> > > > In this case what the processing is doing is:
> > >>>> > > >  - opening new orders (with a start datetime and an uuid
> > >>>> generated by
> > >>>> > the
> > >>>> > > > stream producing to the topic this consumer is consuming from)
> > >>>> > > >  - closing previously created orders (by setting an end
> datetime
> > >>>> using
> > >>>> > > the
> > >>>> > > > ID stored in a store)
> > >>>> > > >
> > >>>> > > > And so far I've seen multiple exceptions like updating a non
> > >>>> existent
> > >>>> > > order
> > >>>> > > > (meaning the create message didn't complete the request and it
> > >>>> has been
> > >>>> > > > skipped) or create with same ID (meaning that the backend
> > >>>> returned an
> > >>>> > > error
> > >>>> > > > but correctly created the record on the database).
> > >>>> > > > I would expect that the above code wouldn't just commit
> somehow
> > >>>> and the
> > >>>> > > > consumer would be stuck there indefinitely.
> > >>>> > > >
> > >>>> > > > Regarding ignoring/handling duplicates that's definitely
> > >>>> something that
> > >>>> > > > will be done, but right now i'm implementing it this way so
> that
> > >>>> I can
> > >>>> > > > confirm that I don't lose messages if I don't manually commit
> > them
> > >>>> > which
> > >>>> > > > doesn't seem to be the case
> > >>>> > > >
> > >>>> > > > Any help is really appreciated
> > >>>> > > >
> > >>>> > > > --
> > >>>> > > > Alessandro Tagliapietra
> > >>>> > > >
> > >>>> > > > On Wed, Sep 25, 2019 at 2:20 PM M. Manna <manme...@gmail.com>
> > >>>> wrote:
> > >>>> > > >
> > >>>> > > > > Hi,
> > >>>> > > > >
> > >>>> > > > > How are you managing your offset commits ?
> > >>>> > > > >
> > >>>> > > > >  Also, if it’s a duplicate record issue ( sounds like
> database
> > >>>> entry
> > >>>> > to
> > >>>> > > > > me), have you thought about ignoring/handling duplicates?
> > >>>> > > > >
> > >>>> > > > > Thanks,
> > >>>> > > > >
> > >>>> > > > > On Wed, 25 Sep 2019 at 21:28, Alessandro Tagliapietra <
> > >>>> > > > > tagliapietra.alessan...@gmail.com> wrote:
> > >>>> > > > >
> > >>>> > > > > > Hello everyone,
> > >>>> > > > > >
> > >>>> > > > > > I've a consumer that fetches messages from a topic, for
> each
> > >>>> > message
> > >>>> > > it
> > >>>> > > > > > makes an API call to our backend. To ensure that if a
> > message
> > >>>> fails
> > >>>> > > it
> > >>>> > > > > > tries again to process the message I've set
> max.poll.records
> > >>>> to 1
> > >>>> > and
> > >>>> > > > > I've
> > >>>> > > > > > a code like this:
> > >>>> > > > > >
> > >>>> > > > > >   consumer.subscribe(arrayListOf("orders"))
> > >>>> > > > > >   while (!stopProcessing.get()) {
> > >>>> > > > > >       try {
> > >>>> > > > > >           val records = consumer.poll(Duration.ofHours(1))
> > >>>> > > > > >           records.forEach {
> > >>>> > > > > >               processRecord(it)
> > >>>> > > > > >           }
> > >>>> > > > > >           consumer.commitSync()
> > >>>> > > > > >       } catch (e: Exception) {
> > >>>> > > > > >           logger.error("Error processing order message",
> e)
> > >>>> > > > > >           Sentry.capture(e)
> > >>>> > > > > >           Thread.sleep(30000)
> > >>>> > > > > >       }
> > >>>> > > > > >   }
> > >>>> > > > > >
> > >>>> > > > > > now, if a request fails because the backend complains
> about
> > a
> > >>>> > > duplicate
> > >>>> > > > > > primary ID, due to the nature of the error trying to
> insert
> > >>>> the
> > >>>> > same
> > >>>> > > > > thing
> > >>>> > > > > > would generate that same error over and over again.
> > >>>> > > > > > Instead it seems that after some retries the message is
> > >>>> skipped and
> > >>>> > > it
> > >>>> > > > > goes
> > >>>> > > > > > one with the next one.
> > >>>> > > > > > What could be the reason? If in the next loop iteration it
> > >>>> gets a
> > >>>> > > > message
> > >>>> > > > > > from another partition it would also commit the offset of
> > the
> > >>>> other
> > >>>> > > > > failed
> > >>>> > > > > > partition?
> > >>>> > > > > >
> > >>>> > > > > > Thank you
> > >>>> > > > > >
> > >>>> > > > > > --
> > >>>> > > > > > Alessandro Tagliapietra
> > >>>> > > > > >
> > >>>> > > > >
> > >>>> > > >
> > >>>> > >
> > >>>> >
> > >>>>
> > >>>
> >
>

Reply via email to