Thanks Jason, I agree with the proposed solution here, will update the KIP.
On Thu, Jan 28, 2021 at 10:52 AM Jason Gustafson <ja...@confluent.io> wrote: > Hi Boyang, > > It seems like a reasonable suggestion. I wonder if a flag is sufficient > though. The current `Callback` documentation treats "fatal" errors from the > perspective of the individual message that was sent. > > ``` > * Non-Retriable exceptions (fatal, the message will > never be sent): > ``` > > However, we also have fatal errors from the perspective of the transaction > (e.g. when the producer gets fenced). Perhaps that suggests we need > something richer than a boolean flag: > > At a high level, I think the following cases are possible: > > - message rejected (e.g. message too large, invalid topic) > - delivery failed after retries/delivery timeout (e.g. timeout, crc error, > not enough replicas) > - transaction failed (e.g. producer fenced, invalid transaction state) > > Perhaps instead we can have a type like the following: > > class SendFailure { > FailureType failureType; > Exception cause; > } > > enum FailureType { > MESSSAGE_REJECTED, DELIVERY_FAILED, TRANSACTION_FAILED > } > > (Not married to any of these names, just a starting point.) > > Then we add a new `onCompletion` as you've suggested: > > default void onCompletion(RecordMetadata metadata, SendFailure failure) { > this.onCompletion(metadata, failure.cause()); > } > > This would give streams and other applications enough information to know > whether the message can be retried and whether the transaction can be > aborted. > > What do you think? > > -Jason > > > On Wed, Jan 27, 2021 at 9:51 PM Boyang Chen <reluctanthero...@gmail.com> > wrote: > > > Thanks Jason for the thoughts. > > > > On Wed, Jan 27, 2021 at 11:52 AM Jason Gustafson <ja...@confluent.io> > > wrote: > > > > > Hi Boyang, > > > > > > Thanks for the iterations here. I think this is something we should > have > > > done a long time ago. It sounds like there are two API changes here: > > > > > > 1. We are introducing the `CommitFailedException` to wrap abortable > > > errors that are raised from `commitTransaction`. This sounds fine to > me. > > As > > > far as I know, the only case we might need this is when we add support > to > > > let producers recover from coordinator timeouts. Are there any others? > > > > > > I think the purpose here is to ensure non-fatal exceptions are unified > > under the same > > exception umbrella, to make the proceeding to abort any ongoing > transaction > > much easier. > > I don't think `coordinator timeouts` is the only case to recover here, > > since we have other > > non-fatal exceptions such as UnknownPid. > > > > 2. We are wrapping non-fatal errors raised from `send` as > `KafkaException`. > > > The motivation for this is less clear to me and it doesn't look like > the > > > example from the KIP depends on it. My concern here is compatibility. > > > Currently we have the following documentation for the `Callback` API: > > > > > > ``` > > > * Non-Retriable exceptions (fatal, the message > will > > > never be sent): > > > * > > > * InvalidTopicException > > > * OffsetMetadataTooLargeException > > > * RecordBatchTooLargeException > > > * RecordTooLargeException > > > * UnknownServerException > > > * UnknownProducerIdException > > > * InvalidProducerEpochException > > > * > > > * Retriable exceptions (transient, may be covered > > by > > > increasing #.retries): > > > * > > > * CorruptRecordException > > > * InvalidMetadataException > > > * NotEnoughReplicasAfterAppendException > > > * NotEnoughReplicasException > > > * OffsetOutOfRangeException > > > * TimeoutException > > > * UnknownTopicOrPartitionException > > > ``` > > > > > > If we wrap all the retriable exceptions documented here as > > > `KafkaException`, wouldn't that break any error handling that users > might > > > already have? it's gonna introduce a compatibility issue. > > > > > > The original intention was to simplify `send` callback error handling > by > > doing exception wrapping, as on Streams level > > we have to prepare an exhausting list of exceptions to catch as fatal, > and > > the same lengthy list to catch as > > non-fatal. It would be much easier if we got `hints` from the callback. > > However, > > I agree there is a compatibility concern, what about deprecating the > > existing: > > > > void onCompletion(RecordMetadata metadata, Exception exception) > > > > and replace it with: > > > > default void onCompletion(RecordMetadata metadata, Exception exception, > > boolean isFatal) { > > this.onCompletion(metadata, exception); > > } > > > > to make sure new users get the benefit of understanding the fatality > based > > on the info presented by the producer? > > > > Thanks, > > > Jason > > > > > > > > > On Sat, Jan 23, 2021 at 3:31 AM Hiringuru <i...@hiringuru.com> wrote: > > > > > > > Why we are receiving all emails kindly remove us from > > > > dev@kafka.apache.org we don't want to receive emails anymore. > > > > > > > > Thanks > > > > > On 01/23/2021 4:14 AM Guozhang Wang <wangg...@gmail.com> wrote: > > > > > > > > > > > > > > > Thanks Boyang, yes I think I was confused about the different > > handling > > > of > > > > > two abortTxn calls, and now I get it was not intentional. I think I > > do > > > > not > > > > > have more concerns. > > > > > > > > > > On Fri, Jan 22, 2021 at 1:12 PM Boyang Chen < > > > reluctanthero...@gmail.com> > > > > > wrote: > > > > > > > > > > > Thanks for the clarification Guozhang, I got your point that we > > want > > > to > > > > > > have a consistent handling of fatal exceptions being thrown from > > the > > > > > > abortTxn. I modified the current template to move the fatal > > exception > > > > > > try-catch outside of the processing loop to make sure we could > get > > a > > > > chance > > > > > > to close consumer/producer modules. Let me know what you think. > > > > > > > > > > > > Best, > > > > > > Boyang > > > > > > > > > > > > On Fri, Jan 22, 2021 at 11:05 AM Boyang Chen < > > > > reluctanthero...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > My understanding is that abortTransaction would only throw when > > the > > > > > > > producer is in fatal state. For CommitFailed, the producer > should > > > > still > > > > > > be > > > > > > > in the abortable error state, so that abortTransaction call > would > > > not > > > > > > throw. > > > > > > > > > > > > > > On Fri, Jan 22, 2021 at 11:02 AM Guozhang Wang < > > wangg...@gmail.com > > > > > > > > > > wrote: > > > > > > > > > > > > > >> Or are you going to maintain some internal state such that, > the > > > > > > >> `abortTransaction` in the catch block would never throw again? > > > > > > >> > > > > > > >> On Fri, Jan 22, 2021 at 11:01 AM Guozhang Wang < > > > wangg...@gmail.com> > > > > > > >> wrote: > > > > > > >> > > > > > > >> > Hi Boyang/Jason, > > > > > > >> > > > > > > > >> > I've also thought about this (i.e. using CommitFailed for > all > > > > > > >> non-fatal), > > > > > > >> > but what I'm pondering is that, in the catch (CommitFailed) > > > block, > > > > > > what > > > > > > >> > would happen if the `producer.abortTransaction();` throws > > again? > > > > > > should > > > > > > >> > that be captured as a fatal and cause the client to close > > again. > > > > > > >> > > > > > > > >> > If yes, then naively the pattern would be: > > > > > > >> > > > > > > > >> > ... > > > > > > >> > catch (CommitFailedException e) { > > > > > > >> > // Transaction commit failed with abortable error, > > user > > > > could > > > > > > >> reset > > > > > > >> > // the application state and resume with a new > > > > transaction. > > > > > > The > > > > > > >> > root > > > > > > >> > // cause was wrapped in the thrown exception. > > > > > > >> > resetToLastCommittedPositions(consumer); > > > > > > >> > try { > > > > > > >> > producer.abortTransaction(); > > > > > > >> > } catch (KafkaException e) { > > > > > > >> > producer.close(); > > > > > > >> > consumer.close(); > > > > > > >> > throw e; > > > > > > >> > } > > > > > > >> > } catch (KafkaException e) { > > > > > > >> > producer.close(); > > > > > > >> > consumer.close(); > > > > > > >> > throw e; > > > > > > >> > } > > > > > > >> > ... > > > > > > >> > > > > > > > >> > Guozhang > > > > > > >> > > > > > > > >> > On Fri, Jan 22, 2021 at 10:47 AM Boyang Chen < > > > > > > >> reluctanthero...@gmail.com> > > > > > > >> > wrote: > > > > > > >> > > > > > > > >> >> Hey Guozhang, > > > > > > >> >> > > > > > > >> >> Jason and I were discussing the new API offline and decided > > to > > > > take > > > > > > >> >> another > > > > > > >> >> approach. Firstly, the reason not to invent a new API with > > > > returned > > > > > > >> >> boolean > > > > > > >> >> flag is for compatibility consideration, since old EOS code > > > > would not > > > > > > >> know > > > > > > >> >> that a given transaction commit was failed internally as > they > > > > don't > > > > > > >> listen > > > > > > >> >> to the output flag. Our proposed alternative solution is to > > let > > > > > > >> >> *commitTransaction > > > > > > >> >> throw CommitFailedException whenever the commit failed with > > > > non-fatal > > > > > > >> >> exception*, so that on the caller side the handling logic > > > > becomes: > > > > > > >> >> > > > > > > >> >> try { > > > > > > >> >> if (shouldCommit) { > > > > > > >> >> producer.commitTransaction(); > > > > > > >> >> } else { > > > > > > >> >> resetToLastCommittedPositions(consumer); > > > > > > >> >> producer.abortTransaction(); > > > > > > >> >> } > > > > > > >> >> } catch (CommitFailedException e) { > > > > > > >> >> // Transaction commit failed with abortable error, > > user > > > > could > > > > > > >> >> reset > > > > > > >> >> // the application state and resume with a new > > > > transaction. > > > > > > The > > > > > > >> >> root > > > > > > >> >> // cause was wrapped in the thrown exception. > > > > > > >> >> resetToLastCommittedPositions(consumer); > > > > > > >> >> producer.abortTransaction(); > > > > > > >> >> } catch (KafkaException e) { > > > > > > >> >> producer.close(); > > > > > > >> >> consumer.close(); > > > > > > >> >> throw e; > > > > > > >> >> } > > > > > > >> >> > > > > > > >> >> This approach looks cleaner as all exception types other > than > > > > > > >> CommitFailed > > > > > > >> >> will doom to be fatal, which is very easy to adopt for > users. > > > In > > > > the > > > > > > >> >> meantime, we still maintain the commitTxn behavior to throw > > > > instead > > > > > > of > > > > > > >> >> silently failing. > > > > > > >> >> > > > > > > >> >> In addition, we decided to drop the recommendation to > handle > > > > > > >> >> TimeoutException and leave it to the users to make the > call. > > > The > > > > > > >> downside > > > > > > >> >> for blindly calling abortTxn upon timeout is that we could > > > > result in > > > > > > an > > > > > > >> >> illegal state when the commit was already successful on the > > > > broker > > > > > > >> >> side. Without a good guarantee on the outcome, > > overcomplicating > > > > the > > > > > > >> >> template should not be encouraged IMHO. > > > > > > >> >> > > > > > > >> >> Let me know your thoughts on the new approach here, thank > > you! > > > > > > >> >> > > > > > > >> >> Best, > > > > > > >> >> Boyang > > > > > > >> >> > > > > > > >> >> On Tue, Jan 19, 2021 at 11:11 AM Guozhang Wang < > > > > wangg...@gmail.com> > > > > > > >> >> wrote: > > > > > > >> >> > > > > > > >> >> > Thanks for your clarification on 2)/3), that makes sense. > > > > > > >> >> > > > > > > > >> >> > On Tue, Jan 19, 2021 at 10:16 AM Boyang Chen < > > > > > > >> >> reluctanthero...@gmail.com> > > > > > > >> >> > wrote: > > > > > > >> >> > > > > > > > >> >> > > Thanks for the input Guozhang, replied inline. > > > > > > >> >> > > > > > > > > >> >> > > On Mon, Jan 18, 2021 at 8:57 PM Guozhang Wang < > > > > > > wangg...@gmail.com> > > > > > > >> >> > wrote: > > > > > > >> >> > > > > > > > > >> >> > > > Hello Boyang, > > > > > > >> >> > > > > > > > > > >> >> > > > Thanks for the updated KIP. I read it again and have > > the > > > > > > >> following > > > > > > >> >> > > > thoughts: > > > > > > >> >> > > > > > > > > > >> >> > > > 0. I'm a bit concerned that if commitTxn does not > throw > > > any > > > > > > >> >> non-fatal > > > > > > >> >> > > > exception, and instead we rely on the subsequent > > beginTxn > > > > call > > > > > > to > > > > > > >> >> > throw, > > > > > > >> >> > > it > > > > > > >> >> > > > may violate some callers with a pattern that relying > on > > > > > > >> commitTxn to > > > > > > >> >> > > > succeed to make some non-rollback operations. For > > > example: > > > > > > >> >> > > > > > > > > > >> >> > > > beginTxn() > > > > > > >> >> > > > // do some read-write on my local DB > > > > > > >> >> > > > commitTxn() > > > > > > >> >> > > > // if commitTxn succeeds, then commit the DB > > > > > > >> >> > > > > > > > > > >> >> > > > ------------- > > > > > > >> >> > > > > > > > > > >> >> > > > The issue is that, committing DB is a non-rollback > > > > operation, > > > > > > and > > > > > > >> >> users > > > > > > >> >> > > may > > > > > > >> >> > > > just rely on commitTxn to return without error to > make > > > this > > > > > > >> >> > non-rollback > > > > > > >> >> > > > call. Of course we can just claim this pattern is not > > > > > > legitimate > > > > > > >> >> and is > > > > > > >> >> > > not > > > > > > >> >> > > > the right way of doing things, but many users may > > > naturally > > > > > > adopt > > > > > > >> >> this > > > > > > >> >> > > > pattern. > > > > > > >> >> > > > > > > > > > >> >> > > > So maybe we should still let commitTxn also throw > > > non-fatal > > > > > > >> >> exceptions, > > > > > > >> >> > > in > > > > > > >> >> > > > which case we would then call abortTxn again. > > > > > > >> >> > > > > > > > > > >> >> > > > But if we do this, the pattern becomes: > > > > > > >> >> > > > > > > > > > >> >> > > > try { > > > > > > >> >> > > > beginTxn() > > > > > > >> >> > > > // do something > > > > > > >> >> > > > } catch (Exception) { > > > > > > >> >> > > > shouldCommit = false; > > > > > > >> >> > > > } > > > > > > >> >> > > > > > > > > > >> >> > > > if (shouldCommit) { > > > > > > >> >> > > > try { > > > > > > >> >> > > > commitTxn() > > > > > > >> >> > > > } catch (...) { // enumerate all fatal > > > > exceptions > > > > > > >> >> > > > shutdown() > > > > > > >> >> > > > } catch (KafkaException) { > > > > > > >> >> > > > // non-fatal > > > > > > >> >> > > > shouldCommit = false; > > > > > > >> >> > > > } > > > > > > >> >> > > > } > > > > > > >> >> > > > > > > > > > >> >> > > > if (!shouldCommit && running()) { > > > > > > >> >> > > > try { > > > > > > >> >> > > > abortTxn() > > > > > > >> >> > > > } catch (KafkaException) { > > > > > > >> >> > > > // only throw fatal > > > > > > >> >> > > > shutdown() > > > > > > >> >> > > > } > > > > > > >> >> > > > } > > > > > > >> >> > > > > > > > > > >> >> > > > --------------------- > > > > > > >> >> > > > > > > > > > >> >> > > > Which is much more complicated. > > > > > > >> >> > > > > > > > > > >> >> > > > Thank makes me think, the alternative we have > discussed > > > > offline > > > > > > >> may > > > > > > >> >> be > > > > > > >> >> > > > better: let commitTxn() to return a boolean flag. > > > > > > >> >> > > > > > > > > > >> >> > > > * If it returns true, it means the commit succeeded. > > > Users > > > > can > > > > > > >> >> > > comfortably > > > > > > >> >> > > > continue and do any external non-rollback operations > if > > > > they > > > > > > >> like. > > > > > > >> >> > > > * If it returns false, it means the commit failed > with > > > > > > non-fatal > > > > > > >> >> error > > > > > > >> >> > > *AND > > > > > > >> >> > > > the txn has been aborted*. Users do not need to call > > > > abortTxn > > > > > > >> again. > > > > > > >> >> > > > * If it throws, then it means fatal errors. Users > > should > > > > shut > > > > > > >> down > > > > > > >> >> the > > > > > > >> >> > > > client. > > > > > > >> >> > > > > > > > > > >> >> > > > In this case, the pattern becomes: > > > > > > >> >> > > > > > > > > > >> >> > > > try { > > > > > > >> >> > > > beginTxn() > > > > > > >> >> > > > // do something > > > > > > >> >> > > > } catch (Exception) { > > > > > > >> >> > > > shouldCommit = false; > > > > > > >> >> > > > } > > > > > > >> >> > > > > > > > > > >> >> > > > try { > > > > > > >> >> > > > if (shouldCommit) { > > > > > > >> >> > > > commitSucceeded = commitTxn() > > > > > > >> >> > > > } else { > > > > > > >> >> > > > // reset offsets, rollback operations, etc > > > > > > >> >> > > > abortTxn() > > > > > > >> >> > > > } > > > > > > >> >> > > > } catch (KafkaException) { > > > > > > >> >> > > > // only throw fatal > > > > > > >> >> > > > shutdown() > > > > > > >> >> > > > } > > > > > > >> >> > > > > > > > > > >> >> > > > if (commitSucceeded) > > > > > > >> >> > > > // do other non-rollbackable things > > > > > > >> >> > > > else > > > > > > >> >> > > > // reset offsets, rollback operations, etc > > > > > > >> >> > > > > > > > > > >> >> > > > ------------------------- > > > > > > >> >> > > > > > > > > > >> >> > > > Of course, if we want to have better visibility into > > what > > > > > > caused > > > > > > >> the > > > > > > >> >> > > commit > > > > > > >> >> > > > to fail and txn to abort. We can let the return type > be > > > an > > > > enum > > > > > > >> >> instead > > > > > > >> >> > > of > > > > > > >> >> > > > a flag. But my main idea is to still let the > commitTxn > > be > > > > the > > > > > > >> final > > > > > > >> >> > point > > > > > > >> >> > > > users can tell whether this txn succeeded or not, > > instead > > > > of > > > > > > >> >> relying on > > > > > > >> >> > > the > > > > > > >> >> > > > next beginTxn() call. > > > > > > >> >> > > > > > > > > > >> >> > > > I agree that adding a boolean flag is indeed useful > in > > > this > > > > > > case. > > > > > > >> >> Will > > > > > > >> >> > > update the KIP. > > > > > > >> >> > > > > > > > > >> >> > > 1. Re: "while maintaining the behavior to throw fatal > > > > exception > > > > > > in > > > > > > >> >> raw" > > > > > > >> >> > not > > > > > > >> >> > > > sure what do you mean by "throw" here. Are you > > proposing > > > > the > > > > > > >> >> callback > > > > > > >> >> > > would > > > > > > >> >> > > > *pass* (not throw) in any fatal exceptions when > > triggered > > > > > > without > > > > > > >> >> > > wrapping? > > > > > > >> >> > > > > > > > > > >> >> > > > Yes, I want to say *pass*, the benefit is to make the > > end > > > > > > user's > > > > > > >> >> > > expectation consistent > > > > > > >> >> > > regarding exception handling. > > > > > > >> >> > > > > > > > > >> >> > > > > > > > > >> >> > > > 2. I'm not sure I understand the claim regarding the > > > > callback > > > > > > >> that > > > > > > >> >> "In > > > > > > >> >> > > EOS > > > > > > >> >> > > > setup, it is not required to handle the exception". > Are > > > you > > > > > > >> >> proposing > > > > > > >> >> > > that, > > > > > > >> >> > > > e.g. in Streams, we do not try to handle any > exceptions > > > if > > > > EOS > > > > > > is > > > > > > >> >> > enabled > > > > > > >> >> > > > in the callback anymore, but just let commitTxn() > > itself > > > to > > > > > > fail > > > > > > >> to > > > > > > >> >> be > > > > > > >> >> > > > notified about the problem? Personally I think in > > Streams > > > > we > > > > > > >> should > > > > > > >> >> > just > > > > > > >> >> > > > make the handling logic of the callback to be > > consistent > > > > > > >> regardless > > > > > > >> >> of > > > > > > >> >> > > the > > > > > > >> >> > > > EOS settings (today we have different logic depending > > on > > > > this > > > > > > >> logic, > > > > > > >> >> > > which > > > > > > >> >> > > > I think could be unified as well). > > > > > > >> >> > > > > > > > > > >> >> > > > My idea originates from the claim on send API: > > > > > > >> >> > > "When used as part of a transaction, it is not > necessary > > to > > > > > > define > > > > > > >> a > > > > > > >> >> > > callback or check the result of the future in order to > > > > detect > > > > > > >> errors > > > > > > >> >> > from > > > > > > >> >> > > <code>send</code>. " > > > > > > >> >> > > My understanding is that for EOS, the exception will be > > > > detected > > > > > > by > > > > > > >> >> > calling > > > > > > >> >> > > transactional APIs either way, so it is a duplicate > > > handling > > > > to > > > > > > >> track > > > > > > >> >> > > all the sendExceptions in RecordCollector. However, I > > > looked > > > > up > > > > > > >> >> > > sendException is being used today as follow: > > > > > > >> >> > > > > > > > > >> >> > > 1. Pass to "ProductionExceptionHandler" for any default > > or > > > > > > >> customized > > > > > > >> >> > > exception handler to handle > > > > > > >> >> > > 2. Stop collecting offset info or new exceptions > > > > > > >> >> > > 3. Check and rethrow exceptions in close(), flush() or > > new > > > > send() > > > > > > >> >> calls > > > > > > >> >> > > > > > > > > >> >> > > To my understanding, we should still honor the > commitment > > > to > > > > #1 > > > > > > for > > > > > > >> >> any > > > > > > >> >> > > user customized implementation. The #2 does not really > > > > affect our > > > > > > >> >> > decision > > > > > > >> >> > > upon EOS. The #3 here is still valuable as it could > help > > us > > > > fail > > > > > > >> fast > > > > > > >> >> in > > > > > > >> >> > > new send() instead of waiting to later stage of > > processing. > > > > In > > > > > > that > > > > > > >> >> > sense, > > > > > > >> >> > > I agree we should continue to record send exceptions > even > > > > under > > > > > > EOS > > > > > > >> >> case > > > > > > >> >> > to > > > > > > >> >> > > ensure the strength of stream side Producer logic. On > the > > > > safer > > > > > > >> side, > > > > > > >> >> we > > > > > > >> >> > no > > > > > > >> >> > > longer need to wrap certain fatal exceptions like > > > > ProducerFenced > > > > > > as > > > > > > >> >> > > TaskMigrated, since they should not crash the stream > > thread > > > > if > > > > > > >> thrown > > > > > > >> >> in > > > > > > >> >> > > raw format, once we adopt the new processing model in > the > > > > send > > > > > > >> phase. > > > > > > >> >> > > > > > > > > >> >> > > > > > > > > >> >> > > > > > > > > > >> >> > > > Guozhang > > > > > > >> >> > > > > > > > > > >> >> > > > > > > > > > >> >> > > > > > > > > > >> >> > > > On Thu, Dec 17, 2020 at 8:42 PM Boyang Chen < > > > > > > >> >> > reluctanthero...@gmail.com> > > > > > > >> >> > > > wrote: > > > > > > >> >> > > > > > > > > > >> >> > > > > Thanks for everyone's feedback so far. I have > > polished > > > > the > > > > > > KIP > > > > > > >> >> after > > > > > > >> >> > > > > offline discussion with some folks working on EOS > to > > > > make the > > > > > > >> >> > exception > > > > > > >> >> > > > > handling more lightweight. The essential change is > > that > > > > we > > > > > > are > > > > > > >> not > > > > > > >> >> > > > > inventing a new intermediate exception type, but > > > instead > > > > > > >> >> separating a > > > > > > >> >> > > > full > > > > > > >> >> > > > > transaction into two phases: > > > > > > >> >> > > > > > > > > > > >> >> > > > > 1. The data transmission phase > > > > > > >> >> > > > > 2. The commit phase > > > > > > >> >> > > > > > > > > > > >> >> > > > > This way for any exception thrown from phase 1, > will > > be > > > > an > > > > > > >> >> indicator > > > > > > >> >> > in > > > > > > >> >> > > > > phase 2 whether we should do commit or abort, and > > from > > > > now on > > > > > > >> >> > > > > `commitTransaction` should only throw fatal > > exceptions, > > > > > > >> similar to > > > > > > >> >> > > > > `abortTransaction`, so that any KafkaException > caught > > > in > > > > the > > > > > > >> >> commit > > > > > > >> >> > > phase > > > > > > >> >> > > > > will be definitely fatal to crash the app. For more > > > > advanced > > > > > > >> users > > > > > > >> >> > such > > > > > > >> >> > > > as > > > > > > >> >> > > > > Streams, we have the ability to further wrap > selected > > > > types > > > > > > of > > > > > > >> >> fatal > > > > > > >> >> > > > > exceptions to trigger task migration if necessary. > > > > > > >> >> > > > > > > > > > > >> >> > > > > More details in the KIP, feel free to take another > > > look, > > > > > > >> thanks! > > > > > > >> >> > > > > > > > > > > >> >> > > > > On Thu, Dec 17, 2020 at 8:36 PM Boyang Chen < > > > > > > >> >> > > reluctanthero...@gmail.com> > > > > > > >> >> > > > > wrote: > > > > > > >> >> > > > > > > > > > > >> >> > > > > > Thanks Bruno for the feedback. > > > > > > >> >> > > > > > > > > > > > >> >> > > > > > On Mon, Dec 7, 2020 at 5:26 AM Bruno Cadonna < > > > > > > >> >> br...@confluent.io> > > > > > > >> >> > > > wrote: > > > > > > >> >> > > > > > > > > > > > >> >> > > > > >> Thanks Boyang for the KIP! > > > > > > >> >> > > > > >> > > > > > > >> >> > > > > >> Like Matthias, I do also not know the producer > > > > internal > > > > > > well > > > > > > >> >> > enough > > > > > > >> >> > > to > > > > > > >> >> > > > > >> comment on the categorization. However, I think > > > > having a > > > > > > >> super > > > > > > >> >> > > > exception > > > > > > >> >> > > > > >> (e.g. RetriableException) that encodes if an > > > > exception is > > > > > > >> >> fatal > > > > > > >> >> > or > > > > > > >> >> > > > not > > > > > > >> >> > > > > >> is cleaner, better understandable and less > > > > error-prone, > > > > > > >> because > > > > > > >> >> > > > ideally > > > > > > >> >> > > > > >> when you add a new non-fatal exception in future > > you > > > > just > > > > > > >> need > > > > > > >> >> to > > > > > > >> >> > > > think > > > > > > >> >> > > > > >> about letting it inherit from the super > exception > > > and > > > > all > > > > > > >> the > > > > > > >> >> rest > > > > > > >> >> > > of > > > > > > >> >> > > > > >> the code will just behave correctly without the > > need > > > > to > > > > > > wrap > > > > > > >> >> the > > > > > > >> >> > new > > > > > > >> >> > > > > >> exception into another exception each time it is > > > > thrown > > > > > > >> (maybe > > > > > > >> >> it > > > > > > >> >> > is > > > > > > >> >> > > > > >> thrown at different location in the code). > > > > > > >> >> > > > > >> > > > > > > >> >> > > > > >> As far as I understand the following statement > > from > > > > your > > > > > > >> >> previous > > > > > > >> >> > > > e-mail > > > > > > >> >> > > > > >> is the reason that currently such a super > > exception > > > > is not > > > > > > >> >> > possible: > > > > > > >> >> > > > > >> > > > > > > >> >> > > > > >> "Right now we have RetriableException type, if > we > > > are > > > > > > going > > > > > > >> to > > > > > > >> >> > add a > > > > > > >> >> > > > > >> `ProducerRetriableException` type, we have to > put > > > > this new > > > > > > >> >> > interface > > > > > > >> >> > > > as > > > > > > >> >> > > > > >> the parent of the RetriableException, because > not > > > all > > > > > > thrown > > > > > > >> >> > > non-fatal > > > > > > >> >> > > > > >> exceptions are `retriable` in general for > > producer" > > > > > > >> >> > > > > >> > > > > > > >> >> > > > > >> > > > > > > >> >> > > > > >> In the list of exceptions in your KIP, I found > > > > non-fatal > > > > > > >> >> > exceptions > > > > > > >> >> > > > that > > > > > > >> >> > > > > >> do not inherit from RetriableException. I guess > > > those > > > > are > > > > > > >> the > > > > > > >> >> ones > > > > > > >> >> > > you > > > > > > >> >> > > > > >> are referring to in your statement: > > > > > > >> >> > > > > >> > > > > > > >> >> > > > > >> InvalidProducerEpochException > > > > > > >> >> > > > > >> InvalidPidMappingException > > > > > > >> >> > > > > >> TransactionAbortedException > > > > > > >> >> > > > > >> > > > > > > >> >> > > > > >> All of those exceptions are non-fatal and do not > > > > inherit > > > > > > >> from > > > > > > >> >> > > > > >> RetriableException. Is there a reason for that? > If > > > > they > > > > > > >> >> depended > > > > > > >> >> > > from > > > > > > >> >> > > > > >> RetriableException we would be a bit closer to a > > > super > > > > > > >> >> exception I > > > > > > >> >> > > > > >> mention above. > > > > > > >> >> > > > > >> > > > > > > >> >> > > > > >> The reason is that sender may catch those > > exceptions > > > > in > > > > > > the > > > > > > >> >> > > > > > ProduceResponse, and it currently does infinite > > > > > > >> >> > > > > > retries on RetriableException. To make sure we > > could > > > > > > trigger > > > > > > >> the > > > > > > >> >> > > > > > abortTransaction() by catching non-fatal thrown > > > > > > >> >> > > > > > exceptions and reinitialize the txn state, we > chose > > > > not to > > > > > > >> let > > > > > > >> >> > those > > > > > > >> >> > > > > > exceptions inherit RetriableException so that > > > > > > >> >> > > > > > they won't cause infinite retry on sender. > > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > >> With OutOfOrderSequenceException and > > > > > > >> >> UnknownProducerIdException, I > > > > > > >> >> > > > think > > > > > > >> >> > > > > >> to understand that their fatality depends on the > > > type > > > > > > (i.e. > > > > > > >> >> > > > > >> configuration) of the producer. That makes it > > > > difficult to > > > > > > >> >> have a > > > > > > >> >> > > > super > > > > > > >> >> > > > > >> exception that encodes the retriability as > > mentioned > > > > > > above. > > > > > > >> >> Would > > > > > > >> >> > it > > > > > > >> >> > > > be > > > > > > >> >> > > > > >> possible to introduce exceptions that inherit > from > > > > > > >> >> > > RetriableException > > > > > > >> >> > > > > >> and that are thrown when those exceptions are > > caught > > > > from > > > > > > >> the > > > > > > >> >> > > brokers > > > > > > >> >> > > > > >> and the type of the producer is such that the > > > > exceptions > > > > > > are > > > > > > >> >> > > > retriable? > > > > > > >> >> > > > > >> > > > > > > >> >> > > > > >> Yea, I think in general the exception type > mixing > > is > > > > > > >> difficult > > > > > > >> >> to > > > > > > >> >> > > get > > > > > > >> >> > > > > > them all right. I have already proposed another > > > > solution > > > > > > >> based > > > > > > >> >> on > > > > > > >> >> > my > > > > > > >> >> > > > > > offline discussion with some folks working on EOS > > > > > > >> >> > > > > > to make the handling more straightforward for end > > > users > > > > > > >> without > > > > > > >> >> the > > > > > > >> >> > > > need > > > > > > >> >> > > > > > to distinguish exception fatality. > > > > > > >> >> > > > > > > > > > > > >> >> > > > > >> Best, > > > > > > >> >> > > > > >> Bruno > > > > > > >> >> > > > > >> > > > > > > >> >> > > > > >> > > > > > > >> >> > > > > >> On 04.12.20 19:34, Guozhang Wang wrote: > > > > > > >> >> > > > > >> > Thanks Boyang for the proposal! I made a pass > > over > > > > the > > > > > > >> list > > > > > > >> >> and > > > > > > >> >> > > here > > > > > > >> >> > > > > are > > > > > > >> >> > > > > >> > some thoughts: > > > > > > >> >> > > > > >> > > > > > > > >> >> > > > > >> > 0) Although this is not part of the public > API, > > I > > > > think > > > > > > we > > > > > > >> >> > should > > > > > > >> >> > > > make > > > > > > >> >> > > > > >> sure > > > > > > >> >> > > > > >> > that the suggested pattern (i.e. user can > always > > > > call > > > > > > >> >> abortTxn() > > > > > > >> >> > > > when > > > > > > >> >> > > > > >> > handling non-fatal errors) are indeed > supported. > > > > E.g. if > > > > > > >> the > > > > > > >> >> txn > > > > > > >> >> > > is > > > > > > >> >> > > > > >> already > > > > > > >> >> > > > > >> > aborted by the broker side, then users can > still > > > > call > > > > > > >> >> abortTxn > > > > > > >> >> > > which > > > > > > >> >> > > > > >> would > > > > > > >> >> > > > > >> > not throw another exception but just be > treated > > > as a > > > > > > >> no-op. > > > > > > >> >> > > > > >> > > > > > > > >> >> > > > > >> > 1) *ConcurrentTransactionsException*: I think > > this > > > > error > > > > > > >> can > > > > > > >> >> > also > > > > > > >> >> > > be > > > > > > >> >> > > > > >> > returned but not documented yet. This should > be > > a > > > > > > >> non-fatal > > > > > > >> >> > error. > > > > > > >> >> > > > > >> > > > > > > > >> >> > > > > >> > 2) *InvalidTxnStateException*: this error is > > > > returned > > > > > > from > > > > > > >> >> > broker > > > > > > >> >> > > > when > > > > > > >> >> > > > > >> txn > > > > > > >> >> > > > > >> > state transition failed (e.g. it is trying to > > > > transit to > > > > > > >> >> > > > > complete-commit > > > > > > >> >> > > > > >> > while the current state is not > prepare-commit). > > > This > > > > > > error > > > > > > >> >> could > > > > > > >> >> > > > > >> indicates > > > > > > >> >> > > > > >> > a bug on the client internal code or the > broker > > > > code, > > > > > > OR a > > > > > > >> >> user > > > > > > >> >> > > > error > > > > > > >> >> > > > > >> --- a > > > > > > >> >> > > > > >> > similar error is > > ConcurrentTransactionsException, > > > > i.e. > > > > > > if > > > > > > >> >> Kafka > > > > > > >> >> > is > > > > > > >> >> > > > > >> bug-free > > > > > > >> >> > > > > >> > these exceptions would only be returned if > users > > > > try to > > > > > > do > > > > > > >> >> > > something > > > > > > >> >> > > > > >> wrong, > > > > > > >> >> > > > > >> > e.g. calling abortTxn right after a commitTxn, > > > etc. > > > > So > > > > > > I'm > > > > > > >> >> > > thinking > > > > > > >> >> > > > it > > > > > > >> >> > > > > >> > should be a non-fatal error instead of a fatal > > > > error, > > > > > > >> wdyt? > > > > > > >> >> > > > > >> > > > > > > > >> >> > > > > >> > 3) *KafkaException*: case i "indicates fatal > > > > > > transactional > > > > > > >> >> > > sequence > > > > > > >> >> > > > > >> > (Fatal)", this is a bit conflicting with the > > > > > > >> >> > > > *OutOfSequenceException* > > > > > > >> >> > > > > >> that > > > > > > >> >> > > > > >> > is treated as non-fatal. I guess your proposal > > is > > > > that > > > > > > >> >> > > > > >> > OutOfOrderSequenceException would be treated > > > either > > > > as > > > > > > >> fatal > > > > > > >> >> > with > > > > > > >> >> > > > > >> > transactional producer, or non-fatal with > > > idempotent > > > > > > >> >> producer, > > > > > > >> >> > is > > > > > > >> >> > > > that > > > > > > >> >> > > > > >> > right? If the producer is only configured with > > > > > > idempotency > > > > > > >> >> but > > > > > > >> >> > not > > > > > > >> >> > > > > >> > transaction, then throwing a > > > > > > >> >> TransactionStateCorruptedException > > > > > > >> >> > > for > > > > > > >> >> > > > > >> > non-fatal errors would be confusing since > users > > > are > > > > not > > > > > > >> using > > > > > > >> >> > > > > >> transactions > > > > > > >> >> > > > > >> > at all.. So I suggest we always throw > > > > > > >> OutOfSequenceException > > > > > > >> >> > as-is > > > > > > >> >> > > > > (i.e. > > > > > > >> >> > > > > >> > not wrapped) no matter how the producer is > > > > configured, > > > > > > and > > > > > > >> >> let > > > > > > >> >> > the > > > > > > >> >> > > > > >> caller > > > > > > >> >> > > > > >> > decide how to handle it based on whether it is > > > only > > > > > > >> >> idempotent > > > > > > >> >> > or > > > > > > >> >> > > > > >> > transactional itself. > > > > > > >> >> > > > > >> > > > > > > > >> >> > > > > >> > 4) Besides all the txn APIs, the `send()` > > > callback / > > > > > > >> future > > > > > > >> >> can > > > > > > >> >> > > also > > > > > > >> >> > > > > >> throw > > > > > > >> >> > > > > >> > txn-related exceptions, I think this KIP > should > > > also > > > > > > cover > > > > > > >> >> this > > > > > > >> >> > > API > > > > > > >> >> > > > as > > > > > > >> >> > > > > >> well? > > > > > > >> >> > > > > >> > > > > > > > >> >> > > > > >> > 5) This is related to 1/2) above: sometimes > > those > > > > > > >> non-fatal > > > > > > >> >> > errors > > > > > > >> >> > > > > like > > > > > > >> >> > > > > >> > ConcurrentTxn or InvalidTxnState are not due > to > > > the > > > > > > state > > > > > > >> >> being > > > > > > >> >> > > > > >> corrupted > > > > > > >> >> > > > > >> > at the broker side, but maybe users are doing > > > > something > > > > > > >> >> wrong. > > > > > > >> >> > So > > > > > > >> >> > > > I'm > > > > > > >> >> > > > > >> > wondering if we should further distinguish > those > > > > > > non-fatal > > > > > > >> >> > errors > > > > > > >> >> > > > > >> between > > > > > > >> >> > > > > >> > a) those that are caused by Kafka itself, > e.g. a > > > > broker > > > > > > >> timed > > > > > > >> >> > out > > > > > > >> >> > > > and > > > > > > >> >> > > > > >> > aborted a txn and later an endTxn request is > > > > received, > > > > > > >> and b) > > > > > > >> >> > the > > > > > > >> >> > > > > user's > > > > > > >> >> > > > > >> > API call pattern is incorrect, causing the > > request > > > > to be > > > > > > >> >> > rejected > > > > > > >> >> > > > with > > > > > > >> >> > > > > >> an > > > > > > >> >> > > > > >> > error code from the broker. > > > > > > >> >> *TransactionStateCorruptedException* > > > > > > >> >> > > > feels > > > > > > >> >> > > > > >> to > > > > > > >> >> > > > > >> > me more like for case a), but not case b). > > > > > > >> >> > > > > >> > > > > > > > >> >> > > > > >> > > > > > > > >> >> > > > > >> > Guozhang > > > > > > >> >> > > > > >> > > > > > > > >> >> > > > > >> > > > > > > > >> >> > > > > >> > On Wed, Dec 2, 2020 at 4:50 PM Boyang Chen < > > > > > > >> >> > > > > reluctanthero...@gmail.com> > > > > > > >> >> > > > > >> > wrote: > > > > > > >> >> > > > > >> > > > > > > > >> >> > > > > >> >> Thanks Matthias, I think your proposal makes > > > sense > > > > as > > > > > > >> well, > > > > > > >> >> on > > > > > > >> >> > > the > > > > > > >> >> > > > > pro > > > > > > >> >> > > > > >> side > > > > > > >> >> > > > > >> >> we could have a universally agreed exception > > type > > > > to be > > > > > > >> >> caught > > > > > > >> >> > by > > > > > > >> >> > > > the > > > > > > >> >> > > > > >> user, > > > > > > >> >> > > > > >> >> without having an extra layer on top of the > > > actual > > > > > > >> >> exceptions. > > > > > > >> >> > I > > > > > > >> >> > > > > could > > > > > > >> >> > > > > >> see > > > > > > >> >> > > > > >> >> some issue on downsides: > > > > > > >> >> > > > > >> >> > > > > > > >> >> > > > > >> >> 1. The exception hierarchy will be more > > complex. > > > > Right > > > > > > >> now > > > > > > >> >> we > > > > > > >> >> > > have > > > > > > >> >> > > > > >> >> RetriableException type, if we are going to > > add a > > > > > > >> >> > > > > >> >> `ProducerRetriableException` type, we have to > > put > > > > this > > > > > > >> new > > > > > > >> >> > > > interface > > > > > > >> >> > > > > >> as the > > > > > > >> >> > > > > >> >> parent of the RetriableException, because not > > all > > > > > > thrown > > > > > > >> >> > > non-fatal > > > > > > >> >> > > > > >> >> exceptions are `retriable` in general for > > > > producer, for > > > > > > >> >> example > > > > > > >> >> > > > > >> >> < > > > > > > >> >> > > > > >> >> > > > > > > >> >> > > > > >> > > > > > > >> >> > > > > > > > > > > >> >> > > > > > > > > > >> >> > > > > > > > > >> >> > > > > > > > >> >> > > > > > > >> > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/e275742f850af4a1b79b0d1bd1ac9a1d2e89c64e/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L745 > > > > > > >> >> > > > > >> >>> . > > > > > > >> >> > > > > >> >> We could have an empty interface > > > > > > >> >> `ProducerRetriableException` > > > > > > >> >> > to > > > > > > >> >> > > > let > > > > > > >> >> > > > > >> all > > > > > > >> >> > > > > >> >> the thrown exceptions implement for sure, but > > > it's > > > > a > > > > > > bit > > > > > > >> >> > > unorthodox > > > > > > >> >> > > > > in > > > > > > >> >> > > > > >> the > > > > > > >> >> > > > > >> >> way we deal with exceptions in general. > > > > > > >> >> > > > > >> >> > > > > > > >> >> > > > > >> >> 2. There are cases where we throw a > > > KafkaException > > > > > > >> wrapping > > > > > > >> >> > > another > > > > > > >> >> > > > > >> >> KafkaException as either fatal or non-fatal. > If > > > we > > > > use > > > > > > an > > > > > > >> >> > > interface > > > > > > >> >> > > > > to > > > > > > >> >> > > > > >> >> solve #1, it is also required to implement > > > another > > > > > > >> bloated > > > > > > >> >> > > > exception > > > > > > >> >> > > > > >> class > > > > > > >> >> > > > > >> >> which could replace KafkaException type, as > we > > > > couldn't > > > > > > >> mark > > > > > > >> >> > > > > >> KafkaException > > > > > > >> >> > > > > >> >> as retriable for sure. > > > > > > >> >> > > > > >> >> > > > > > > >> >> > > > > >> >> 3. In terms of the encapsulation, wrapping > > means > > > we > > > > > > could > > > > > > >> >> limit > > > > > > >> >> > > the > > > > > > >> >> > > > > >> scope > > > > > > >> >> > > > > >> >> of affection to the producer only, which is > > > > important > > > > > > >> since > > > > > > >> >> we > > > > > > >> >> > > > don't > > > > > > >> >> > > > > >> want > > > > > > >> >> > > > > >> >> shared exception types to implement a > > > > producer-related > > > > > > >> >> > interface, > > > > > > >> >> > > > > such > > > > > > >> >> > > > > >> >> as UnknownTopicOrPartitionException. > > > > > > >> >> > > > > >> >> > > > > > > >> >> > > > > >> >> Best, > > > > > > >> >> > > > > >> >> Boyang > > > > > > >> >> > > > > >> >> > > > > > > >> >> > > > > >> >> On Wed, Dec 2, 2020 at 3:38 PM Matthias J. > Sax > > < > > > > > > >> >> > mj...@apache.org > > > > > > >> >> > > > > > > > > > >> >> > > > > >> wrote: > > > > > > >> >> > > > > >> >> > > > > > > >> >> > > > > >> >>> Thanks for the KIP Boyang! > > > > > > >> >> > > > > >> >>> > > > > > > >> >> > > > > >> >>> Overall, categorizing exceptions makes a lot > > of > > > > sense. > > > > > > >> As I > > > > > > >> >> > > don't > > > > > > >> >> > > > > know > > > > > > >> >> > > > > >> >>> the producer internals well enough, I cannot > > > > comment > > > > > > on > > > > > > >> the > > > > > > >> >> > > > > >> >>> categorization in detail though. > > > > > > >> >> > > > > >> >>> > > > > > > >> >> > > > > >> >>> What I am wondering is, if we should > introduce > > > an > > > > > > >> exception > > > > > > >> >> > > > > interface > > > > > > >> >> > > > > >> >>> that non-fatal exception would implement > > instead > > > > of > > > > > > >> >> creating a > > > > > > >> >> > > new > > > > > > >> >> > > > > >> class > > > > > > >> >> > > > > >> >>> that will wrap non-fatal exceptions? What > > would > > > > be the > > > > > > >> >> > pros/cons > > > > > > >> >> > > > for > > > > > > >> >> > > > > >> >>> both designs? > > > > > > >> >> > > > > >> >>> > > > > > > >> >> > > > > >> >>> > > > > > > >> >> > > > > >> >>> -Matthias > > > > > > >> >> > > > > >> >>> > > > > > > >> >> > > > > >> >>> > > > > > > >> >> > > > > >> >>> On 12/2/20 11:35 AM, Boyang Chen wrote: > > > > > > >> >> > > > > >> >>>> Hey there, > > > > > > >> >> > > > > >> >>>> > > > > > > >> >> > > > > >> >>>> I would like to start a discussion thread > for > > > > > > KIP-691: > > > > > > >> >> > > > > >> >>>> > > > > > > >> >> > > > > >> >>> > > > > > > >> >> > > > > >> >> > > > > > > >> >> > > > > >> > > > > > > >> >> > > > > > > > > > > >> >> > > > > > > > > > >> >> > > > > > > > > >> >> > > > > > > > >> >> > > > > > > >> > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-691%3A+Enhance+Transactional+Producer+Exception+Handling > > > > > > >> >> > > > > >> >>>> > > > > > > >> >> > > > > >> >>>> The KIP is aiming to simplify the exception > > > > handling > > > > > > >> logic > > > > > > >> >> > for > > > > > > >> >> > > > > >> >>>> transactional Producer users by classifying > > > > fatal and > > > > > > >> >> > non-fatal > > > > > > >> >> > > > > >> >>> exceptions > > > > > > >> >> > > > > >> >>>> and throw them correspondingly for easier > > catch > > > > and > > > > > > >> retry. > > > > > > >> >> > Let > > > > > > >> >> > > me > > > > > > >> >> > > > > >> know > > > > > > >> >> > > > > >> >>> what > > > > > > >> >> > > > > >> >>>> you think. > > > > > > >> >> > > > > >> >>>> > > > > > > >> >> > > > > >> >>>> Best, > > > > > > >> >> > > > > >> >>>> Boyang > > > > > > >> >> > > > > >> >>>> > > > > > > >> >> > > > > >> >>> > > > > > > >> >> > > > > >> >> > > > > > > >> >> > > > > >> > > > > > > > >> >> > > > > >> > > > > > > > >> >> > > > > >> > > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > > > > > >> >> > > > > > > > > > >> >> > > > > > > > > > >> >> > > > -- > > > > > > >> >> > > > -- Guozhang > > > > > > >> >> > > > > > > > > > >> >> > > > > > > > > >> >> > > > > > > > >> >> > > > > > > > >> >> > -- > > > > > > >> >> > -- Guozhang > > > > > > >> >> > > > > > > > >> >> > > > > > > >> > > > > > > > >> > > > > > > > >> > -- > > > > > > >> > -- Guozhang > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > >> -- > > > > > > >> -- Guozhang > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > -- Guozhang > > > > > > > > > >