Sounds good to me. I also feel that this call should be non-blocking but I guess I was confused from the discussion thread that the API is designed in a blocking fashion which contradicts with my perspective and hence I asked for clarification :)
Guozhang On Wed, Sep 16, 2020 at 12:32 PM Walker Carlson <wcarl...@confluent.io> wrote: > Hello Guozhang, > > As for the logging I plan on having three logs. First, the client log that > it is requesting an application shutdown, second, the leader log processId > of the invoker, third, then the StreamRebalanceListener it logs that it is > closing because of an `stream.appShutdown`. Hopefully this will be enough > to make the cause of the close clear. > > I see what you mean about the name being dependent on the behavior of the > method so I will try to clarify. This is how I currently envision the call > working. > > It is not an option to directly initiate a shutdown through a StreamThread > object from a KafkaStreams object because "KafkaConsumer is not safe for > multi-threaded access". So how it works is that the method in KafkaStreams > finds the first alive thread and sets a flag in the StreamThread. The > StreamThread will receive the flag in its runloop then set the error code > and trigger a rebalance, afterwards it will stop processing. After the > KafkaStreams has set the flag it will return true and continue running. If > there are no alive threads the shutdown will fail and return false. > > What do you think the blocking behavior should be? I think that the > StreamThread should definitely stop to prevent any of the corruption we are > trying to avoid by shutting down, but I don't see any advantage of the > KafkaStreams call blocking. > > You are correct to be concerned about the uncaught exception handler. If > there are no live StreamThreads the rebalance will not be started at all > and this would be a problem. However the user should be aware of this > because of the return of false and react appropriately. This would also be > fixed if we implemented our own handler so we can rebalance before the > StreamThread closes. > > With that in mind I believe that `initiateClosingAllClients` would be an > appropriate name. WDYT? > > Walker > > > On Wed, Sep 16, 2020 at 11:43 AM Guozhang Wang <wangg...@gmail.com> wrote: > > > Hello Walker, > > > > Thanks for the updated KIP. Previously I'm also a bit hesitant on the > newly > > added public exception to communicate user-requested whole app shutdown, > > but the reason I did not bring this up is that I feel there's still a > need > > from operational aspects that we can differentiate the scenario where an > > instance is closed because of a) local `streams.close()` triggered, or > b) a > > remote instance's `stream.shutdownApp` triggered. So if we are going to > > remove that exception (which I'm also in favor), we should at least > > differentiate from the log4j levels. > > > > Regarding the semantics that "It should wait to receive the shutdown > > request in the rebalance it triggers." I'm not sure I fully understand, > > since this may be triggered from the stream thread's uncaught exception > > handler, if that thread is already dead then maybe a rebalance listener > > would not even be fired at all. Although I know this is some > implementation > > details that you probably abstract away from the proposal, I'd like to > make > > sure that we are on the same page regarding its blocking behavior since > it > > is quite crucial to users as well. Could you elaborate a bit more? > > > > Regarding the function name, I guess my personal preference would depend > on > > its actual blocking behavior as above :) > > > > > > Guozhang > > > > > > > > > > On Wed, Sep 16, 2020 at 10:52 AM Walker Carlson <wcarl...@confluent.io> > > wrote: > > > > > Hello all again, > > > > > > I have updated the kip to no longer use an exception and instead add a > > > method to the KafkaStreams class, this seems to satisfy everyone's > > concerns > > > about how and when the functionality will be invoked. > > > > > > There is still a question over the name. We must decide between > > > "shutdownApplication", "initateCloseAll", "closeAllInstaces" or some > > > variation. > > > > > > I am rather indifferent to the name. I think that they all get the > point > > > across. The most clear to me would be shutdownApplicaiton or > > > closeAllInstacnes but WDYT? > > > > > > Walker > > > > > > > > > > > > On Wed, Sep 16, 2020 at 9:30 AM Walker Carlson <wcarl...@confluent.io> > > > wrote: > > > > > > > Hello Guozhang and Bruno, > > > > > > > > Thanks for the feedback. > > > > > > > > I will respond in two parts but I would like to clarify that I am not > > > tied > > > > down to any of these names, but since we are still deciding if we > want > > to > > > > have an exception or not I would rather not get tripped up on > choosing > > a > > > > name just yet. > > > > > > > > Guozhang: > > > > 1) you are right about the INCOMPLETE_SOURCE_TOPIC_METADATA error. I > > am > > > > not planning on changing the behavior of handling source topic > > deletion. > > > > That is being down in kip-662 by Bruno. He is enabling the user to > > create > > > > their own handler and shutdownApplication is giving them the option > to > > > > shutdown. > > > > > > > > 2) It seems that we will remove the Exception entirely so this won't > > > > matter (below) > > > > > > > > 3) It should wait to receive the shutdown request in the rebalance it > > > > triggers. That might be a better name. I am torn between using > > > > "application" or "all Instances" in a couple places. I think we > should > > > pick > > > > one and be consistent but I am unsure which is more descriptive. > > > > > > > > Bruno: > > > > I agree that in principle Exceptions should be used in exception > cases. > > > > And I have added a method in KafkaStreams to handle cases where an > > > > Exception would not be appropriate. I guess you think that users > should > > > > never throw a Streams Exception then they could always throw and > catch > > > > their own exception and call shutdown Application from there. This > > would > > > > allow them to exit a processor if they wanted to shutdown from > there. I > > > > will update the Kip to remove the exception. > > > > > > > > I would like to add that in the case of trying to shutdown from the > > > > uncaught exception handler that we need at least one StreamThread to > be > > > > alive. So having our own handler instead of using the default one > after > > > the > > > > thread has died would let us always close the application. > > > > > > > > Walker > > > > > > > > On Wed, Sep 16, 2020 at 5:02 AM Bruno Cadonna <br...@confluent.io> > > > wrote: > > > > > > > >> Hi Walker, > > > >> > > > >> Thank you for the KIP! > > > >> > > > >> I like the motivation of the KIP and the method to request a > shutdown > > of > > > >> all Kafka Streams clients of Kafka Streams application. I think we > > > >> really need such functionality to react on errors. However, I am not > > > >> convinced that throwing an exception to shutdown all clients is a > good > > > >> idea. > > > >> > > > >> An exception signals an exceptional situation to which we can react > in > > > >> multiple ways depending on the context. The exception that you > propose > > > >> seems rather a well defined user command than a exceptional > situation > > to > > > >> me. IMO, we should not use exceptions to control program flow > because > > it > > > >> mixes cause and effect. Hence, I would propose an invariant for > public > > > >> exceptions in Kafka Streams. The public exceptions in Kafka Streams > > > >> should be caught by users, not thrown. But maybe I am missing the > big > > > >> advantage of using an exception here. > > > >> > > > >> I echo Guozhang's third point about clarifying the behavior of the > > > >> method and the naming. > > > >> > > > >> Best, > > > >> Bruno > > > >> > > > >> On 16.09.20 06:28, Guozhang Wang wrote: > > > >> > Hello Walker, > > > >> > > > > >> > Thanks for proposing the KIP! I have a couple more comments: > > > >> > > > > >> > 1. ShutdownRequestedException: my understanding is that this > > exception > > > >> is > > > >> > only used if the application-shutdown was initiated by by the user > > > >> > triggered "shutdownApplication()", otherwise e.g. if it is due to > > > source > > > >> > topic not found and Streams library decides to close the whole > > > >> application > > > >> > automatically, we would still throw the original exception > > > >> > a.k.a. MissingSourceTopicException to the uncaught exception > > handling. > > > >> Is > > > >> > that the case? Also for this exception, which package are you > > > proposing > > > >> to > > > >> > add to? > > > >> > > > > >> > 2. ShutdownRequestedException: for its constructor, I'm wondering > > what > > > >> > Throwable "root cause" could it ever be? Since I'm guessing here > > that > > > we > > > >> > would just use a single error code in the protocol still to tell > > other > > > >> > instances to shutdown, and that error code would not allow us to > > > encode > > > >> any > > > >> > more information like root causes at all, it seems that parameter > > > would > > > >> > always be null. > > > >> > > > > >> > 3. shutdownApplication: again I'd like to clarify, would this > > function > > > >> > block on the local instance to complete shutting down all its > > threads > > > >> like > > > >> > `close()` as well, or would it just to initiate the shutdown and > not > > > >> wait > > > >> > for local threads at all? Also a nit suggestion regarding the > name, > > if > > > >> it > > > >> > is only for initiating the shutdown, maybe naming as > > > "initiateCloseAll" > > > >> > would be more specific? > > > >> > > > > >> > > > > >> > Guozhang > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > On Mon, Sep 14, 2020 at 10:13 AM Walker Carlson < > > > wcarl...@confluent.io> > > > >> > wrote: > > > >> > > > > >> >> Hello Matthias and Sophie, > > > >> >> > > > >> >> You both make good points. I will respond to the separately > below. > > > >> >> > > > >> >> > > > >> >> Matthias: > > > >> >> That is a fair point. KIP-662 > > > >> >> < > > > >> >> > > > >> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-662%3A+Throw+Exception+when+Source+Topics+of+a+Streams+App+are+Deleted > > > >> >>> , > > > >> >> which > > > >> >> is accepted, will make it so Source topic deletion will make it > to > > > the > > > >> >> uncaught exception handler. Shutdown can be initiated from there. > > > >> However > > > >> >> this would mean that the stream thread is already dead. So I > would > > > >> have to > > > >> >> rethink the exception for this use case, perhaps it would be > needed > > > in > > > >> the > > > >> >> KakfaStreams object. But this still leaves the case where there > is > > > >> only one > > > >> >> stream thread. I will think about it. > > > >> >> > > > >> >> Maybe the source topics are a bad example as it makes this kip > > > >> dependent on > > > >> >> Kip-662 getting implemented in a certain way. However this is not > > the > > > >> only > > > >> >> reason this could be useful here > > > >> >> <https://issues.apache.org/jira/browse/KAFKA-4748> is a jira > > ticket > > > >> asking > > > >> >> for the same functionality. I have added a few other use cases to > > the > > > >> kip. > > > >> >> Although I will still be rethinking where I want to add this > > > >> functionality > > > >> >> and whether it should be an exception or not. > > > >> >> > > > >> >> Sophie: > > > >> >> I agree that shutting down an instance could also be useful. > There > > > was > > > >> some > > > >> >> discussion about this on KIP-663. It seems that we came to the > > > >> conclusion > > > >> >> that close(Duration.ZERO) would be sufficient. link > > > >> >> < > > > >> >> > > > >> > > > > > > https://mail-archives.apache.org/mod_mbox/kafka-dev/202008.mbox/%3c95f95168-2811-e57e-96e2-fb5e71d92...@confluent.io%3e > > > >> >>> > > > >> >> to > > > >> >> thread > > > >> >> > > > >> >> Also I am not set on the name ShutdownRequested. If we decide to > > keep > > > >> at as > > > >> >> an exception your idea is probably a better name. > > > >> >> > > > >> >> > > > >> >> Thanks for the feedback, > > > >> >> Walker > > > >> >> > > > >> >> > > > >> >> On Fri, Sep 11, 2020 at 11:08 AM Matthias J. Sax < > mj...@apache.org > > > > > > >> wrote: > > > >> >> > > > >> >>> Thanks for the KIP. > > > >> >>> > > > >> >>> It seem that the new exception would need to be thrown by user > > code? > > > >> >>> However, in the motivation you mention the scenario of a missing > > > >> source > > > >> >>> topic that a user cannot detect, but KafkaStreams runtime would > be > > > >> >>> responsible to handle. > > > >> >>> > > > >> >>> How do both things go together? > > > >> >>> > > > >> >>> > > > >> >>> -Matthias > > > >> >>> > > > >> >>> On 9/11/20 10:31 AM, Walker Carlson wrote: > > > >> >>>> Hello all, > > > >> >>>> > > > >> >>>> I have created KIP-671 to give the option to shutdown a streams > > > >> >>>> application in response to an error. > > > >> >>>> > > > >> >>> > > > >> >> > > > >> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-671%3A+Shutdown+Streams+Application+when+appropriate+exception+is+thrown > > > >> >>>> > > > >> >>>> This is because of the Jira ticket > > > >> >>>> <https://issues.apache.org/jira/browse/KAFKA-9331> > > > >> >>>> > > > >> >>>> Please give it a look and let me know if you have any feedback. > > > >> >>>> > > > >> >>>> Thanks, > > > >> >>>> Walker > > > >> >>>> > > > >> >>> > > > >> >>> > > > >> >> > > > >> > > > > >> > > > > >> > > > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang