Hey Jason, Just to make sure I understood you right - you propose not exposing any new exception types but rather the interface itself only? So a sample code dealing with this would be something like:
try { // ... } catch (KafkaException e) { if (e instanceof UnconsumableRecordException) { // handle retry } } If that is the case, I like it better. In regards to automatic handling of unconsumable messages - I like that idea too. To me, a callback seems like the more straightforward approach. A config such as `seek.past.unconsumable.record` limits the behavior too much in my opinion, I believe giving them the option to implement a (or use the default) callback is better in that way. Regardless, I believe this is best left out for another KIP as I feel it would warrant a bigger discussion Best, Stanislav On Mon, Jul 30, 2018 at 9:34 PM Jason Gustafson <ja...@confluent.io> wrote: > Hey Stanislav, > > Thanks for the KIP. I think the goal is to allow users to seek past a > records which cannot be parsed for whatever reason. However, it's a little > annoying that you need to catch two separate types to handle this. I'm > wondering if it makes sense to expose an interface like > `UnconsumableRecordException` or something like that. The consumer could > then have separate internal exception types which extend from > InvalidRecordException and SerializationException respectively and > implement `UnconsumableRecordException`. That would simplify the handling > and users could check the cause if they cared which case it was. > > Another question for consideration. I'd imagine some users would find it > helpful to seek past failed messages automatically. If there is a corrupt > record, for example, there's almost nothing you can do except seek past it > anyway. I'm wondering if there should be a config for this or if users > should be able to install a callback of some sorts to handle failed > records. Not sure if this is that big of a problem for users, but > interested to hear others thoughts. > > Thanks, > Jason > > On Fri, Jul 20, 2018 at 6:32 PM, Stanislav Kozlovski < > stanis...@confluent.io > > wrote: > > > Hi Ted, > > > > I do plan to start one. When is the appropriate time? My reasoning was > that > > people would like to view the changes first > > > > On Fri, Jul 20, 2018, 6:21 PM Ted Yu <yuzhih...@gmail.com> wrote: > > > > > Hi, Stanislav: > > > Do you plan to start VOTE thread ? > > > > > > Cheers > > > > > > On Fri, Jul 20, 2018 at 6:11 PM Stanislav Kozlovski < > > > stanis...@confluent.io> > > > wrote: > > > > > > > Hey group, > > > > > > > > I added a Pull Request for this KIP - here it is > > > > https://github.com/apache/kafka/pull/5410 > > > > Please take a look. > > > > > > > > Best, > > > > Stanislav > > > > > > > > On Thu, Jul 5, 2018 at 11:06 AM Ismael Juma <isma...@gmail.com> > wrote: > > > > > > > > > Yes, the Scala consumers have been removed in 2.0.0, which > simplifies > > > > some > > > > > of this. The following commit was an initial step in unifying the > > > > exception > > > > > handling: > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/commit/96bcfdfc7c9aac075635b2034e65e4 > > 12a725672e > > > > > > > > > > But more can be done as you mentioned. > > > > > > > > > > Ismael > > > > > > > > > > On 5 Jul 2018 9:36 am, "Stanislav Kozlovski" < > stanis...@confluent.io > > > > > > > > wrote: > > > > > > > > > > Hey Ismael, > > > > > > > > > > It is only slightly related - my PR would attach two new attributes > > and > > > > > also touch upon deserialization exceptions. > > > > > > > > > > But this PR did provide me with some insight: > > > > > Maybe the best approach would be to make `InvalidRecordException` a > > > > public > > > > > exception instead of introducing a new one - I did not realize it > was > > > not > > > > > publicly exposed. > > > > > Does the following: > > > > > > > > > > InvalidMessageException extends CorruptRecordException for > temporary > > > > > compatibility with the old Scala clients. > > > > > * We want to update the server side code to use and catch the new > > > > > CorruptRecordException. > > > > > * Because ByteBufferMessageSet.scala and Message.scala are used in > > > > > both server and client code having > > > > > * InvalidMessageException extend CorruptRecordException allows us > to > > > > > change server code without affecting the client. > > > > > > > > > > still apply? I can see that the `ByteBufferMessageSet` and > `Message` > > > > scala > > > > > classes are not present in the codebase anymore. AFAIA the old > scala > > > > > clients were removed with 2.0.0 and we can thus update the server > > side > > > > code > > > > > to use the `CorruptRecordException` while changing and exposing > > > > > `InvalidRecordException` to the public. WDYT? > > > > > > > > > > I will also make sure to not expose the cause of the exception when > > not > > > > > needed, maybe I'll outright remove the `cause` attribute > > > > > > > > > > > > > > > On Thu, Jul 5, 2018 at 4:55 PM Ismael Juma <ism...@juma.me.uk> > > wrote: > > > > > > > > > > > Thanks for the KIP, Stanislav. The following PR looks related: > > > > > > > > > > > > https://github.com/apache/kafka/pull/4093/files > > > > > > > > > > > > Ismael > > > > > > > > > > > > On Thu, Jul 5, 2018 at 8:44 AM Stanislav Kozlovski < > > > > > stanis...@confluent.io > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > Hey everybody, > > > > > > > > > > > > > > I just created a new KIP about exposing more information in > > > > exceptions > > > > > > > caused by consumer record deserialization/validation. Please > > have a > > > > > look > > > > > > at > > > > > > > it, it is a very short page. > > > > > > > > > > > > > > I am working under the assumption that all invalid record or > > > > > > > deserialization exceptions in the consumer pass through the > > > `Fetcher` > > > > > > > class. Please confirm if that is true, otherwise I might miss > > some > > > > > places > > > > > > > where the exceptions are raised in my implementation > > > > > > > > > > > > > > One concern I have is the name of the second exception - > > > > > > > `InoperativeRecordException`. I would have named it > > > > > > > `InvalidRecordException` but that is taken. The `Fetcher` class > > > > catches > > > > > > > `InvalidRecordException` (here > > > > > > > < > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/c5b00d20d3703b7fc4358b7258d5d6 > > adb890136f/clients/src/main/java/org/apache/kafka/clients/ > > consumer/internals/Fetcher.java#L1081 > > > > > > > > > > > > > > > and here > > > > > > > < > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/c5b00d20d3703b7fc4358b7258d5d6 > > adb890136f/clients/src/main/java/org/apache/kafka/clients/ > > consumer/internals/Fetcher.java#L1092 > > > > > > > >) > > > > > > > and re-raises it as `KafkaException`, which exposes it as a > > > > > non-retriable > > > > > > > exception to the user (`InvalidRecordException` extends > > > > > > > `RetriableExecption`, but `KafkaException` doesn't). > > > > > > > A suggestion I got for an alternative name was > > > > > > > `InvalidFetchRecordException`. Please chime in if you have > ideas > > > > > > > > > > > > > > Confluence page: KIP-334 > > > > > > > < > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage. > > action?pageId=87297793 > > > > > > > > > > > > > > > JIRA Issue: KAFKA-5682 < > > > > > https://issues.apache.org/jira/browse/KAFKA-5682 > > > > > > > > > > > > > > -- > > > > > > > Best, > > > > > > > Stanislav > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > Best, > > > > > Stanislav > > > > > > > > > > > > > > > > > -- > > > > Best, > > > > Stanislav > > > > > > > > > > -- Best, Stanislav