Hey Jason,

Just to make sure I understood you right - you propose not exposing any new
exception types but rather the interface itself only? So a sample code
dealing with this would be something like:

try {
    // ...
} catch (KafkaException e) {
    if (e instanceof UnconsumableRecordException) {
      // handle retry
    }
}

If that is the case, I like it better.


In regards to automatic handling of unconsumable messages - I like that
idea too. To me, a callback seems like the more straightforward approach. A
config such as `seek.past.unconsumable.record` limits the behavior too much
in my opinion, I believe giving them the option to implement a (or use the
default) callback is better in that way.
Regardless, I believe this is best left out for another KIP as I feel it
would warrant a bigger discussion

Best,
Stanislav

On Mon, Jul 30, 2018 at 9:34 PM Jason Gustafson <ja...@confluent.io> wrote:

> Hey Stanislav,
>
> Thanks for the KIP. I think the goal is to allow users to seek past a
> records which cannot be parsed for whatever reason. However, it's a little
> annoying that you need to catch two separate types to handle this. I'm
> wondering if it makes sense to expose an interface like
> `UnconsumableRecordException` or something like that. The consumer could
> then have separate internal exception types which extend from
> InvalidRecordException and SerializationException respectively and
> implement `UnconsumableRecordException`. That would simplify the handling
> and users could check the cause if they cared which case it was.
>
> Another question for consideration. I'd imagine some users would find it
> helpful to seek past failed messages automatically. If there is a corrupt
> record, for example, there's almost nothing you can do except seek past it
> anyway. I'm wondering if there should be a config for this or if users
> should be able to install a callback of some sorts to handle failed
> records. Not sure if this is that big of a problem for users, but
> interested to hear others thoughts.
>
> Thanks,
> Jason
>
> On Fri, Jul 20, 2018 at 6:32 PM, Stanislav Kozlovski <
> stanis...@confluent.io
> > wrote:
>
> > Hi Ted,
> >
> > I do plan to start one. When is the appropriate time? My reasoning was
> that
> > people would like to view the changes first
> >
> > On Fri, Jul 20, 2018, 6:21 PM Ted Yu <yuzhih...@gmail.com> wrote:
> >
> > > Hi, Stanislav:
> > > Do you plan to start VOTE thread ?
> > >
> > > Cheers
> > >
> > > On Fri, Jul 20, 2018 at 6:11 PM Stanislav Kozlovski <
> > > stanis...@confluent.io>
> > > wrote:
> > >
> > > > Hey group,
> > > >
> > > > I added a Pull Request for this KIP - here it is
> > > > https://github.com/apache/kafka/pull/5410
> > > > Please take a look.
> > > >
> > > > Best,
> > > > Stanislav
> > > >
> > > > On Thu, Jul 5, 2018 at 11:06 AM Ismael Juma <isma...@gmail.com>
> wrote:
> > > >
> > > > > Yes, the Scala consumers have been removed in 2.0.0, which
> simplifies
> > > > some
> > > > > of this. The following commit was an initial step in unifying the
> > > > exception
> > > > > handling:
> > > > >
> > > > >
> > > > >
> > > >
> > > https://github.com/apache/kafka/commit/96bcfdfc7c9aac075635b2034e65e4
> > 12a725672e
> > > > >
> > > > > But more can be done as you mentioned.
> > > > >
> > > > > Ismael
> > > > >
> > > > > On 5 Jul 2018 9:36 am, "Stanislav Kozlovski" <
> stanis...@confluent.io
> > >
> > > > > wrote:
> > > > >
> > > > > Hey Ismael,
> > > > >
> > > > > It is only slightly related - my PR would attach two new attributes
> > and
> > > > > also touch upon deserialization exceptions.
> > > > >
> > > > > But this PR did provide me with some insight:
> > > > > Maybe the best approach would be to make `InvalidRecordException` a
> > > > public
> > > > > exception instead of introducing a new one - I did not realize it
> was
> > > not
> > > > > publicly exposed.
> > > > > Does the following:
> > > > >
> > > > >  InvalidMessageException extends CorruptRecordException for
> temporary
> > > > > compatibility with the old Scala clients.
> > > > >  * We want to update the server side code to use and catch the new
> > > > > CorruptRecordException.
> > > > >  * Because ByteBufferMessageSet.scala and Message.scala are used in
> > > > > both server and client code having
> > > > >  * InvalidMessageException extend CorruptRecordException allows us
> to
> > > > > change server code without affecting the client.
> > > > >
> > > > > still apply? I can see that the `ByteBufferMessageSet` and
> `Message`
> > > > scala
> > > > > classes are not present in the codebase anymore. AFAIA the old
> scala
> > > > > clients were removed with 2.0.0 and we can thus update the server
> > side
> > > > code
> > > > > to use the `CorruptRecordException` while changing and exposing
> > > > > `InvalidRecordException` to the public. WDYT?
> > > > >
> > > > > I will also make sure to not expose the cause of the exception when
> > not
> > > > > needed, maybe I'll outright remove the `cause` attribute
> > > > >
> > > > >
> > > > > On Thu, Jul 5, 2018 at 4:55 PM Ismael Juma <ism...@juma.me.uk>
> > wrote:
> > > > >
> > > > > > Thanks for the KIP, Stanislav. The following PR looks related:
> > > > > >
> > > > > > https://github.com/apache/kafka/pull/4093/files
> > > > > >
> > > > > > Ismael
> > > > > >
> > > > > > On Thu, Jul 5, 2018 at 8:44 AM Stanislav Kozlovski <
> > > > > stanis...@confluent.io
> > > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hey everybody,
> > > > > > >
> > > > > > > I just created a new KIP about exposing more information in
> > > > exceptions
> > > > > > > caused by consumer record deserialization/validation. Please
> > have a
> > > > > look
> > > > > > at
> > > > > > > it, it is a very short page.
> > > > > > >
> > > > > > > I am working under the assumption that all invalid record or
> > > > > > > deserialization exceptions in the consumer pass through the
> > > `Fetcher`
> > > > > > > class. Please confirm if that is true, otherwise I might miss
> > some
> > > > > places
> > > > > > > where the exceptions are raised in my implementation
> > > > > > >
> > > > > > > One concern I have is the name of the second exception -
> > > > > > > `InoperativeRecordException`. I would have named it
> > > > > > > `InvalidRecordException` but that is taken. The `Fetcher` class
> > > > catches
> > > > > > > `InvalidRecordException` (here
> > > > > > > <
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > >
> > > https://github.com/apache/kafka/blob/c5b00d20d3703b7fc4358b7258d5d6
> > adb890136f/clients/src/main/java/org/apache/kafka/clients/
> > consumer/internals/Fetcher.java#L1081
> > > > > > > >
> > > > > > > and here
> > > > > > > <
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > >
> > > https://github.com/apache/kafka/blob/c5b00d20d3703b7fc4358b7258d5d6
> > adb890136f/clients/src/main/java/org/apache/kafka/clients/
> > consumer/internals/Fetcher.java#L1092
> > > > > > > >)
> > > > > > > and re-raises it as `KafkaException`, which exposes it as a
> > > > > non-retriable
> > > > > > > exception to the user (`InvalidRecordException` extends
> > > > > > > `RetriableExecption`, but `KafkaException` doesn't).
> > > > > > > A suggestion I got for an alternative name was
> > > > > > > `InvalidFetchRecordException`. Please chime in if you have
> ideas
> > > > > > >
> > > > > > > Confluence page: KIP-334
> > > > > > > <
> > > > > >
> > > > >
> > > >
> > > https://cwiki.apache.org/confluence/pages/viewpage.
> > action?pageId=87297793
> > > > > > > >
> > > > > > > JIRA Issue: KAFKA-5682 <
> > > > > https://issues.apache.org/jira/browse/KAFKA-5682
> > > > > > >
> > > > > > > --
> > > > > > > Best,
> > > > > > > Stanislav
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Best,
> > > > > Stanislav
> > > > >
> > > >
> > > >
> > > > --
> > > > Best,
> > > > Stanislav
> > > >
> > >
> >
>


-- 
Best,
Stanislav

Reply via email to