Hi group, I've updated the KIP and PR with the discussed interface changes. I am also starting a voting thread
Best, Stanislav On Thu, Aug 2, 2018 at 1:27 AM Jason Gustafson <ja...@confluent.io> wrote: > Hey Stanislav, > > Just to make sure I understood you right - you propose not exposing any new > > exception types but rather the interface itself only? > > > Yes, exactly. Our exception hierarchy is a bit of a mess to be honest. > Interfaces are more flexible and here it simplifies the error handling. > > Regardless, I believe this is best left out for another KIP as I feel it > > would warrant a bigger discussion > > > Ok, that's fair. I thought I'd suggest it here just to see if there was any > interest in the community. At least with this KIP, users have a viable way > to skip past bad data if they wish. > > -Jason > > On Tue, Jul 31, 2018 at 2:42 AM, Stanislav Kozlovski < > stanis...@confluent.io > > wrote: > > > Hey Jason, > > > > Just to make sure I understood you right - you propose not exposing any > new > > exception types but rather the interface itself only? So a sample code > > dealing with this would be something like: > > > > try { > > // ... > > } catch (KafkaException e) { > > if (e instanceof UnconsumableRecordException) { > > // handle retry > > } > > } > > > > If that is the case, I like it better. > > > > > > In regards to automatic handling of unconsumable messages - I like that > > idea too. To me, a callback seems like the more straightforward > approach. A > > config such as `seek.past.unconsumable.record` limits the behavior too > > much > > in my opinion, I believe giving them the option to implement a (or use > the > > default) callback is better in that way. > > Regardless, I believe this is best left out for another KIP as I feel it > > would warrant a bigger discussion > > > > Best, > > Stanislav > > > > On Mon, Jul 30, 2018 at 9:34 PM Jason Gustafson <ja...@confluent.io> > > wrote: > > > > > Hey Stanislav, > > > > > > Thanks for the KIP. I think the goal is to allow users to seek past a > > > records which cannot be parsed for whatever reason. However, it's a > > little > > > annoying that you need to catch two separate types to handle this. I'm > > > wondering if it makes sense to expose an interface like > > > `UnconsumableRecordException` or something like that. The consumer > could > > > then have separate internal exception types which extend from > > > InvalidRecordException and SerializationException respectively and > > > implement `UnconsumableRecordException`. That would simplify the > handling > > > and users could check the cause if they cared which case it was. > > > > > > Another question for consideration. I'd imagine some users would find > it > > > helpful to seek past failed messages automatically. If there is a > corrupt > > > record, for example, there's almost nothing you can do except seek past > > it > > > anyway. I'm wondering if there should be a config for this or if users > > > should be able to install a callback of some sorts to handle failed > > > records. Not sure if this is that big of a problem for users, but > > > interested to hear others thoughts. > > > > > > Thanks, > > > Jason > > > > > > On Fri, Jul 20, 2018 at 6:32 PM, Stanislav Kozlovski < > > > stanis...@confluent.io > > > > wrote: > > > > > > > Hi Ted, > > > > > > > > I do plan to start one. When is the appropriate time? My reasoning > was > > > that > > > > people would like to view the changes first > > > > > > > > On Fri, Jul 20, 2018, 6:21 PM Ted Yu <yuzhih...@gmail.com> wrote: > > > > > > > > > Hi, Stanislav: > > > > > Do you plan to start VOTE thread ? > > > > > > > > > > Cheers > > > > > > > > > > On Fri, Jul 20, 2018 at 6:11 PM Stanislav Kozlovski < > > > > > stanis...@confluent.io> > > > > > wrote: > > > > > > > > > > > Hey group, > > > > > > > > > > > > I added a Pull Request for this KIP - here it is > > > > > > https://github.com/apache/kafka/pull/5410 > > > > > > Please take a look. > > > > > > > > > > > > Best, > > > > > > Stanislav > > > > > > > > > > > > On Thu, Jul 5, 2018 at 11:06 AM Ismael Juma <isma...@gmail.com> > > > wrote: > > > > > > > > > > > > > Yes, the Scala consumers have been removed in 2.0.0, which > > > simplifies > > > > > > some > > > > > > > of this. The following commit was an initial step in unifying > the > > > > > > exception > > > > > > > handling: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/commit/ > > 96bcfdfc7c9aac075635b2034e65e4 > > > > 12a725672e > > > > > > > > > > > > > > But more can be done as you mentioned. > > > > > > > > > > > > > > Ismael > > > > > > > > > > > > > > On 5 Jul 2018 9:36 am, "Stanislav Kozlovski" < > > > stanis...@confluent.io > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > Hey Ismael, > > > > > > > > > > > > > > It is only slightly related - my PR would attach two new > > attributes > > > > and > > > > > > > also touch upon deserialization exceptions. > > > > > > > > > > > > > > But this PR did provide me with some insight: > > > > > > > Maybe the best approach would be to make > > `InvalidRecordException` a > > > > > > public > > > > > > > exception instead of introducing a new one - I did not realize > it > > > was > > > > > not > > > > > > > publicly exposed. > > > > > > > Does the following: > > > > > > > > > > > > > > InvalidMessageException extends CorruptRecordException for > > > temporary > > > > > > > compatibility with the old Scala clients. > > > > > > > * We want to update the server side code to use and catch the > > new > > > > > > > CorruptRecordException. > > > > > > > * Because ByteBufferMessageSet.scala and Message.scala are > used > > in > > > > > > > both server and client code having > > > > > > > * InvalidMessageException extend CorruptRecordException allows > > us > > > to > > > > > > > change server code without affecting the client. > > > > > > > > > > > > > > still apply? I can see that the `ByteBufferMessageSet` and > > > `Message` > > > > > > scala > > > > > > > classes are not present in the codebase anymore. AFAIA the old > > > scala > > > > > > > clients were removed with 2.0.0 and we can thus update the > server > > > > side > > > > > > code > > > > > > > to use the `CorruptRecordException` while changing and exposing > > > > > > > `InvalidRecordException` to the public. WDYT? > > > > > > > > > > > > > > I will also make sure to not expose the cause of the exception > > when > > > > not > > > > > > > needed, maybe I'll outright remove the `cause` attribute > > > > > > > > > > > > > > > > > > > > > On Thu, Jul 5, 2018 at 4:55 PM Ismael Juma <ism...@juma.me.uk> > > > > wrote: > > > > > > > > > > > > > > > Thanks for the KIP, Stanislav. The following PR looks > related: > > > > > > > > > > > > > > > > https://github.com/apache/kafka/pull/4093/files > > > > > > > > > > > > > > > > Ismael > > > > > > > > > > > > > > > > On Thu, Jul 5, 2018 at 8:44 AM Stanislav Kozlovski < > > > > > > > stanis...@confluent.io > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hey everybody, > > > > > > > > > > > > > > > > > > I just created a new KIP about exposing more information in > > > > > > exceptions > > > > > > > > > caused by consumer record deserialization/validation. > Please > > > > have a > > > > > > > look > > > > > > > > at > > > > > > > > > it, it is a very short page. > > > > > > > > > > > > > > > > > > I am working under the assumption that all invalid record > or > > > > > > > > > deserialization exceptions in the consumer pass through the > > > > > `Fetcher` > > > > > > > > > class. Please confirm if that is true, otherwise I might > miss > > > > some > > > > > > > places > > > > > > > > > where the exceptions are raised in my implementation > > > > > > > > > > > > > > > > > > One concern I have is the name of the second exception - > > > > > > > > > `InoperativeRecordException`. I would have named it > > > > > > > > > `InvalidRecordException` but that is taken. The `Fetcher` > > class > > > > > > catches > > > > > > > > > `InvalidRecordException` (here > > > > > > > > > < > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/c5b00d20d3703b7fc4358b7258d5d6 > > > > adb890136f/clients/src/main/java/org/apache/kafka/clients/ > > > > consumer/internals/Fetcher.java#L1081 > > > > > > > > > > > > > > > > > > > and here > > > > > > > > > < > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/c5b00d20d3703b7fc4358b7258d5d6 > > > > adb890136f/clients/src/main/java/org/apache/kafka/clients/ > > > > consumer/internals/Fetcher.java#L1092 > > > > > > > > > >) > > > > > > > > > and re-raises it as `KafkaException`, which exposes it as a > > > > > > > non-retriable > > > > > > > > > exception to the user (`InvalidRecordException` extends > > > > > > > > > `RetriableExecption`, but `KafkaException` doesn't). > > > > > > > > > A suggestion I got for an alternative name was > > > > > > > > > `InvalidFetchRecordException`. Please chime in if you have > > > ideas > > > > > > > > > > > > > > > > > > Confluence page: KIP-334 > > > > > > > > > < > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage. > > > > action?pageId=87297793 > > > > > > > > > > > > > > > > > > > JIRA Issue: KAFKA-5682 < > > > > > > > https://issues.apache.org/jira/browse/KAFKA-5682 > > > > > > > > > > > > > > > > > > -- > > > > > > > > > Best, > > > > > > > > > Stanislav > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > Best, > > > > > > > Stanislav > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > Best, > > > > > > Stanislav > > > > > > > > > > > > > > > > > > > > > > > > -- > > Best, > > Stanislav > > > -- Best, Stanislav