Hello Guozhang, As for the logging I plan on having three logs. First, the client log that it is requesting an application shutdown, second, the leader log processId of the invoker, third, then the StreamRebalanceListener it logs that it is closing because of an `stream.appShutdown`. Hopefully this will be enough to make the cause of the close clear.
I see what you mean about the name being dependent on the behavior of the method so I will try to clarify. This is how I currently envision the call working. It is not an option to directly initiate a shutdown through a StreamThread object from a KafkaStreams object because "KafkaConsumer is not safe for multi-threaded access". So how it works is that the method in KafkaStreams finds the first alive thread and sets a flag in the StreamThread. The StreamThread will receive the flag in its runloop then set the error code and trigger a rebalance, afterwards it will stop processing. After the KafkaStreams has set the flag it will return true and continue running. If there are no alive threads the shutdown will fail and return false. What do you think the blocking behavior should be? I think that the StreamThread should definitely stop to prevent any of the corruption we are trying to avoid by shutting down, but I don't see any advantage of the KafkaStreams call blocking. You are correct to be concerned about the uncaught exception handler. If there are no live StreamThreads the rebalance will not be started at all and this would be a problem. However the user should be aware of this because of the return of false and react appropriately. This would also be fixed if we implemented our own handler so we can rebalance before the StreamThread closes. With that in mind I believe that `initiateClosingAllClients` would be an appropriate name. WDYT? Walker On Wed, Sep 16, 2020 at 11:43 AM Guozhang Wang <wangg...@gmail.com> wrote: > Hello Walker, > > Thanks for the updated KIP. Previously I'm also a bit hesitant on the newly > added public exception to communicate user-requested whole app shutdown, > but the reason I did not bring this up is that I feel there's still a need > from operational aspects that we can differentiate the scenario where an > instance is closed because of a) local `streams.close()` triggered, or b) a > remote instance's `stream.shutdownApp` triggered. So if we are going to > remove that exception (which I'm also in favor), we should at least > differentiate from the log4j levels. > > Regarding the semantics that "It should wait to receive the shutdown > request in the rebalance it triggers." I'm not sure I fully understand, > since this may be triggered from the stream thread's uncaught exception > handler, if that thread is already dead then maybe a rebalance listener > would not even be fired at all. Although I know this is some implementation > details that you probably abstract away from the proposal, I'd like to make > sure that we are on the same page regarding its blocking behavior since it > is quite crucial to users as well. Could you elaborate a bit more? > > Regarding the function name, I guess my personal preference would depend on > its actual blocking behavior as above :) > > > Guozhang > > > > > On Wed, Sep 16, 2020 at 10:52 AM Walker Carlson <wcarl...@confluent.io> > wrote: > > > Hello all again, > > > > I have updated the kip to no longer use an exception and instead add a > > method to the KafkaStreams class, this seems to satisfy everyone's > concerns > > about how and when the functionality will be invoked. > > > > There is still a question over the name. We must decide between > > "shutdownApplication", "initateCloseAll", "closeAllInstaces" or some > > variation. > > > > I am rather indifferent to the name. I think that they all get the point > > across. The most clear to me would be shutdownApplicaiton or > > closeAllInstacnes but WDYT? > > > > Walker > > > > > > > > On Wed, Sep 16, 2020 at 9:30 AM Walker Carlson <wcarl...@confluent.io> > > wrote: > > > > > Hello Guozhang and Bruno, > > > > > > Thanks for the feedback. > > > > > > I will respond in two parts but I would like to clarify that I am not > > tied > > > down to any of these names, but since we are still deciding if we want > to > > > have an exception or not I would rather not get tripped up on choosing > a > > > name just yet. > > > > > > Guozhang: > > > 1) you are right about the INCOMPLETE_SOURCE_TOPIC_METADATA error. I > am > > > not planning on changing the behavior of handling source topic > deletion. > > > That is being down in kip-662 by Bruno. He is enabling the user to > create > > > their own handler and shutdownApplication is giving them the option to > > > shutdown. > > > > > > 2) It seems that we will remove the Exception entirely so this won't > > > matter (below) > > > > > > 3) It should wait to receive the shutdown request in the rebalance it > > > triggers. That might be a better name. I am torn between using > > > "application" or "all Instances" in a couple places. I think we should > > pick > > > one and be consistent but I am unsure which is more descriptive. > > > > > > Bruno: > > > I agree that in principle Exceptions should be used in exception cases. > > > And I have added a method in KafkaStreams to handle cases where an > > > Exception would not be appropriate. I guess you think that users should > > > never throw a Streams Exception then they could always throw and catch > > > their own exception and call shutdown Application from there. This > would > > > allow them to exit a processor if they wanted to shutdown from there. I > > > will update the Kip to remove the exception. > > > > > > I would like to add that in the case of trying to shutdown from the > > > uncaught exception handler that we need at least one StreamThread to be > > > alive. So having our own handler instead of using the default one after > > the > > > thread has died would let us always close the application. > > > > > > Walker > > > > > > On Wed, Sep 16, 2020 at 5:02 AM Bruno Cadonna <br...@confluent.io> > > wrote: > > > > > >> Hi Walker, > > >> > > >> Thank you for the KIP! > > >> > > >> I like the motivation of the KIP and the method to request a shutdown > of > > >> all Kafka Streams clients of Kafka Streams application. I think we > > >> really need such functionality to react on errors. However, I am not > > >> convinced that throwing an exception to shutdown all clients is a good > > >> idea. > > >> > > >> An exception signals an exceptional situation to which we can react in > > >> multiple ways depending on the context. The exception that you propose > > >> seems rather a well defined user command than a exceptional situation > to > > >> me. IMO, we should not use exceptions to control program flow because > it > > >> mixes cause and effect. Hence, I would propose an invariant for public > > >> exceptions in Kafka Streams. The public exceptions in Kafka Streams > > >> should be caught by users, not thrown. But maybe I am missing the big > > >> advantage of using an exception here. > > >> > > >> I echo Guozhang's third point about clarifying the behavior of the > > >> method and the naming. > > >> > > >> Best, > > >> Bruno > > >> > > >> On 16.09.20 06:28, Guozhang Wang wrote: > > >> > Hello Walker, > > >> > > > >> > Thanks for proposing the KIP! I have a couple more comments: > > >> > > > >> > 1. ShutdownRequestedException: my understanding is that this > exception > > >> is > > >> > only used if the application-shutdown was initiated by by the user > > >> > triggered "shutdownApplication()", otherwise e.g. if it is due to > > source > > >> > topic not found and Streams library decides to close the whole > > >> application > > >> > automatically, we would still throw the original exception > > >> > a.k.a. MissingSourceTopicException to the uncaught exception > handling. > > >> Is > > >> > that the case? Also for this exception, which package are you > > proposing > > >> to > > >> > add to? > > >> > > > >> > 2. ShutdownRequestedException: for its constructor, I'm wondering > what > > >> > Throwable "root cause" could it ever be? Since I'm guessing here > that > > we > > >> > would just use a single error code in the protocol still to tell > other > > >> > instances to shutdown, and that error code would not allow us to > > encode > > >> any > > >> > more information like root causes at all, it seems that parameter > > would > > >> > always be null. > > >> > > > >> > 3. shutdownApplication: again I'd like to clarify, would this > function > > >> > block on the local instance to complete shutting down all its > threads > > >> like > > >> > `close()` as well, or would it just to initiate the shutdown and not > > >> wait > > >> > for local threads at all? Also a nit suggestion regarding the name, > if > > >> it > > >> > is only for initiating the shutdown, maybe naming as > > "initiateCloseAll" > > >> > would be more specific? > > >> > > > >> > > > >> > Guozhang > > >> > > > >> > > > >> > > > >> > > > >> > On Mon, Sep 14, 2020 at 10:13 AM Walker Carlson < > > wcarl...@confluent.io> > > >> > wrote: > > >> > > > >> >> Hello Matthias and Sophie, > > >> >> > > >> >> You both make good points. I will respond to the separately below. > > >> >> > > >> >> > > >> >> Matthias: > > >> >> That is a fair point. KIP-662 > > >> >> < > > >> >> > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-662%3A+Throw+Exception+when+Source+Topics+of+a+Streams+App+are+Deleted > > >> >>> , > > >> >> which > > >> >> is accepted, will make it so Source topic deletion will make it to > > the > > >> >> uncaught exception handler. Shutdown can be initiated from there. > > >> However > > >> >> this would mean that the stream thread is already dead. So I would > > >> have to > > >> >> rethink the exception for this use case, perhaps it would be needed > > in > > >> the > > >> >> KakfaStreams object. But this still leaves the case where there is > > >> only one > > >> >> stream thread. I will think about it. > > >> >> > > >> >> Maybe the source topics are a bad example as it makes this kip > > >> dependent on > > >> >> Kip-662 getting implemented in a certain way. However this is not > the > > >> only > > >> >> reason this could be useful here > > >> >> <https://issues.apache.org/jira/browse/KAFKA-4748> is a jira > ticket > > >> asking > > >> >> for the same functionality. I have added a few other use cases to > the > > >> kip. > > >> >> Although I will still be rethinking where I want to add this > > >> functionality > > >> >> and whether it should be an exception or not. > > >> >> > > >> >> Sophie: > > >> >> I agree that shutting down an instance could also be useful. There > > was > > >> some > > >> >> discussion about this on KIP-663. It seems that we came to the > > >> conclusion > > >> >> that close(Duration.ZERO) would be sufficient. link > > >> >> < > > >> >> > > >> > > > https://mail-archives.apache.org/mod_mbox/kafka-dev/202008.mbox/%3c95f95168-2811-e57e-96e2-fb5e71d92...@confluent.io%3e > > >> >>> > > >> >> to > > >> >> thread > > >> >> > > >> >> Also I am not set on the name ShutdownRequested. If we decide to > keep > > >> at as > > >> >> an exception your idea is probably a better name. > > >> >> > > >> >> > > >> >> Thanks for the feedback, > > >> >> Walker > > >> >> > > >> >> > > >> >> On Fri, Sep 11, 2020 at 11:08 AM Matthias J. Sax <mj...@apache.org > > > > >> wrote: > > >> >> > > >> >>> Thanks for the KIP. > > >> >>> > > >> >>> It seem that the new exception would need to be thrown by user > code? > > >> >>> However, in the motivation you mention the scenario of a missing > > >> source > > >> >>> topic that a user cannot detect, but KafkaStreams runtime would be > > >> >>> responsible to handle. > > >> >>> > > >> >>> How do both things go together? > > >> >>> > > >> >>> > > >> >>> -Matthias > > >> >>> > > >> >>> On 9/11/20 10:31 AM, Walker Carlson wrote: > > >> >>>> Hello all, > > >> >>>> > > >> >>>> I have created KIP-671 to give the option to shutdown a streams > > >> >>>> application in response to an error. > > >> >>>> > > >> >>> > > >> >> > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-671%3A+Shutdown+Streams+Application+when+appropriate+exception+is+thrown > > >> >>>> > > >> >>>> This is because of the Jira ticket > > >> >>>> <https://issues.apache.org/jira/browse/KAFKA-9331> > > >> >>>> > > >> >>>> Please give it a look and let me know if you have any feedback. > > >> >>>> > > >> >>>> Thanks, > > >> >>>> Walker > > >> >>>> > > >> >>> > > >> >>> > > >> >> > > >> > > > >> > > > >> > > > > > > > > -- > -- Guozhang >