If I change my code slightly trying to manually commit offsets at partition
level

    while (true) {
        try {
            val records = consumer.poll(Duration.ofHours(1))
            val partitionOffsetMap = mutableMapOf<TopicPartition,
OffsetAndMetadata>()
            records.forEach {
                partitionOffsetMap[TopicPartition(it.topic(),
it.partition())] = OffsetAndMetadata(it.offset())
                println("Received message on key ${it.key()} with value
${it.value()} and offset ${it.offset()}")
                if (Random.nextInt(0, 100) > 80) {
                    throw Exception("Thrown error on key ${it.key()} with
value ${it.value()}")
                }
            }
            consumer.commitSync(partitionOffsetMap)
            println("Committed offset
${partitionOffsetMap.values.first().offset()} on partition
${partitionOffsetMap.keys.first().partition()}")
        } catch (e: Throwable) {
            println("Error Processing message $e")
            Thread.sleep(5000)
        }
    }

it logs this

Received message on key 2 with value 93 and offset 92
Committed offset 92 on partition 2
Received message on key 2 with value 94 and offset 93
Error Processing message java.lang.Exception: Thrown error on key 2 with
value 94
Received message on key 2 with value 95 and offset 94
Error Processing message java.lang.Exception: Thrown error on key 2 with
value 95
Received message on key 2 with value 96 and offset 95
Committed offset 95 on partition 2
Received message on key 2 with value 97 and offset 96
Committed offset 96 on partition 2
Received message on key 2 with value 98 and offset 97
Error Processing message java.lang.Exception: Thrown error on key 2 with
value 98

as you can see the offset increases even without logging the "Committed
offset ...." part

--
Alessandro Tagliapietra


On Wed, Sep 25, 2019 at 7:09 PM Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:

> It's still is in the topic because is weeks after the deletion threshold
> (today's message with a 4 weeks retention).
> So I assume the consumer just moves on to the next one.
> As a test I've created this test script
> https://gist.github.com/alex88/85ba5c3288a531a107ed6a22751c1088
>
> After running this I get in the logs:
>
> Received message on key 2 with value 1
> Received message on key 2 with value 2
> Received message on key 2 with value 3
> Error Processing message java.lang.Exception: Thrown error on key 2 with
> value 3
> Received message on key 2 with value 4
> Received message on key 2 with value 5
> Received message on key 2 with value 6
> Received message on key 2 with value 7
> Received message on key 2 with value 8
> Received message on key 2 with value 9
> Received message on key 2 with value 10
> Error Processing message java.lang.Exception: Thrown error on key 2 with
> value 10
> Received message on key 2 with value 11
> Received message on key 2 with value 12
> Received message on key 2 with value 13
> Received message on key 2 with value 14
> Received message on key 2 with value 15
> Received message on key 2 with value 16
> Error Processing message java.lang.Exception: Thrown error on key 2 with
> value 16
>
> it seems that the offset just automatically increases, with another client
> I also see all the values in the topic so they're not deleted.
>
> Shouldn't it have retried the message with value 3?
>
> --
> Alessandro Tagliapietra
>
>
> On Wed, Sep 25, 2019 at 6:19 PM Steve Howard <steve.how...@confluent.io>
> wrote:
>
>> "I'm just saying that a message which processing throws an exception is
>> gone within minutes."
>>
>> Is the message no longer in the topic or is your consumer group current
>> offset just higher than the offset of the message in question?
>>
>> On Wed, Sep 25, 2019 at 7:38 PM Alessandro Tagliapietra <
>> tagliapietra.alessan...@gmail.com> wrote:
>>
>> > You mean the retention time of the topic? That's one month
>> > I'm just saying that a message which processing throws an exception is
>> gone
>> > within minutes.
>> >
>> > I will handle duplicates better later, if we can't be sure that we don't
>> > skip/lose these messages then it's useless to use kafka.
>> > That's why I'm thinking if there's something wrong with my code.
>> >
>> > --
>> > Alessandro Tagliapietra
>> >
>> >
>> > On Wed, Sep 25, 2019 at 4:29 PM M. Manna <manme...@gmail.com> wrote:
>> >
>> > > How long is your message retention set for ? Perhaps you want to
>> increase
>> > > that to a large enough value.
>> > >
>> > > I have almost identical use case, but I would strongly recommend that
>> you
>> > > handle duplicates as they are due to your process ( not Kafka
>> duplicate).
>> > >
>> > > Regards,
>> > >
>> > > On Wed, 25 Sep 2019 at 22:37, Alessandro Tagliapietra <
>> > > tagliapietra.alessan...@gmail.com> wrote:
>> > >
>> > > > I've disabled the auto commit, so what I thought that code would do
>> is:
>> > > >
>> > > >  - it fetches a message
>> > > >  - it processes a message
>> > > >  - if everything went fine it commits its offset
>> > > >  - if there's an exception, it didn't commit, so after the error it
>> > would
>> > > > just poll again and get the same message over and over
>> > > >
>> > > > instead it seems that message has been skipped somehow.
>> > > >
>> > > > In this case what the processing is doing is:
>> > > >  - opening new orders (with a start datetime and an uuid generated
>> by
>> > the
>> > > > stream producing to the topic this consumer is consuming from)
>> > > >  - closing previously created orders (by setting an end datetime
>> using
>> > > the
>> > > > ID stored in a store)
>> > > >
>> > > > And so far I've seen multiple exceptions like updating a non
>> existent
>> > > order
>> > > > (meaning the create message didn't complete the request and it has
>> been
>> > > > skipped) or create with same ID (meaning that the backend returned
>> an
>> > > error
>> > > > but correctly created the record on the database).
>> > > > I would expect that the above code wouldn't just commit somehow and
>> the
>> > > > consumer would be stuck there indefinitely.
>> > > >
>> > > > Regarding ignoring/handling duplicates that's definitely something
>> that
>> > > > will be done, but right now i'm implementing it this way so that I
>> can
>> > > > confirm that I don't lose messages if I don't manually commit them
>> > which
>> > > > doesn't seem to be the case
>> > > >
>> > > > Any help is really appreciated
>> > > >
>> > > > --
>> > > > Alessandro Tagliapietra
>> > > >
>> > > > On Wed, Sep 25, 2019 at 2:20 PM M. Manna <manme...@gmail.com>
>> wrote:
>> > > >
>> > > > > Hi,
>> > > > >
>> > > > > How are you managing your offset commits ?
>> > > > >
>> > > > >  Also, if it’s a duplicate record issue ( sounds like database
>> entry
>> > to
>> > > > > me), have you thought about ignoring/handling duplicates?
>> > > > >
>> > > > > Thanks,
>> > > > >
>> > > > > On Wed, 25 Sep 2019 at 21:28, Alessandro Tagliapietra <
>> > > > > tagliapietra.alessan...@gmail.com> wrote:
>> > > > >
>> > > > > > Hello everyone,
>> > > > > >
>> > > > > > I've a consumer that fetches messages from a topic, for each
>> > message
>> > > it
>> > > > > > makes an API call to our backend. To ensure that if a message
>> fails
>> > > it
>> > > > > > tries again to process the message I've set max.poll.records to
>> 1
>> > and
>> > > > > I've
>> > > > > > a code like this:
>> > > > > >
>> > > > > >   consumer.subscribe(arrayListOf("orders"))
>> > > > > >   while (!stopProcessing.get()) {
>> > > > > >       try {
>> > > > > >           val records = consumer.poll(Duration.ofHours(1))
>> > > > > >           records.forEach {
>> > > > > >               processRecord(it)
>> > > > > >           }
>> > > > > >           consumer.commitSync()
>> > > > > >       } catch (e: Exception) {
>> > > > > >           logger.error("Error processing order message", e)
>> > > > > >           Sentry.capture(e)
>> > > > > >           Thread.sleep(30000)
>> > > > > >       }
>> > > > > >   }
>> > > > > >
>> > > > > > now, if a request fails because the backend complains about a
>> > > duplicate
>> > > > > > primary ID, due to the nature of the error trying to insert the
>> > same
>> > > > > thing
>> > > > > > would generate that same error over and over again.
>> > > > > > Instead it seems that after some retries the message is skipped
>> and
>> > > it
>> > > > > goes
>> > > > > > one with the next one.
>> > > > > > What could be the reason? If in the next loop iteration it gets
>> a
>> > > > message
>> > > > > > from another partition it would also commit the offset of the
>> other
>> > > > > failed
>> > > > > > partition?
>> > > > > >
>> > > > > > Thank you
>> > > > > >
>> > > > > > --
>> > > > > > Alessandro Tagliapietra
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>

Reply via email to