Sounds good to me. I also feel that this call should be non-blocking but I
guess I was confused from the discussion thread that the API is designed in
a blocking fashion which contradicts with my perspective and hence I asked
for clarification :)

Guozhang


On Wed, Sep 16, 2020 at 12:32 PM Walker Carlson <wcarl...@confluent.io>
wrote:

> Hello Guozhang,
>
> As for the logging I plan on having three logs. First, the client log that
> it is requesting an application shutdown, second, the leader log processId
> of the invoker, third, then the StreamRebalanceListener it logs that it is
> closing because of an `stream.appShutdown`. Hopefully this will be enough
> to make the cause of the close clear.
>
> I see what you mean about the name being dependent on the behavior of the
> method so I will try to clarify.  This is how I currently envision the call
> working.
>
> It is not an option to directly initiate a shutdown through a StreamThread
> object from a KafkaStreams object because "KafkaConsumer is not safe for
> multi-threaded access". So how it works is that the method in KafkaStreams
> finds the first alive thread and sets a flag in the StreamThread. The
> StreamThread will receive the flag in its runloop then set the error code
> and trigger a rebalance, afterwards it will stop processing. After the
> KafkaStreams has set the flag it will return true and continue running. If
> there are no alive threads the shutdown will fail and return false.
>
> What do you think the blocking behavior should be? I think that the
> StreamThread should definitely stop to prevent any of the corruption we are
> trying to avoid by shutting down, but I don't see any advantage of the
> KafkaStreams call blocking.
>
> You are correct to be concerned about the uncaught exception handler. If
> there are no live StreamThreads the rebalance will not be started at all
> and this would be a problem. However the user should be aware of this
> because of the return of false and react appropriately. This would also be
> fixed if we implemented our own handler so we can rebalance before the
> StreamThread closes.
>
> With that in mind I believe that `initiateClosingAllClients` would be an
> appropriate name. WDYT?
>
> Walker
>
>
> On Wed, Sep 16, 2020 at 11:43 AM Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hello Walker,
> >
> > Thanks for the updated KIP. Previously I'm also a bit hesitant on the
> newly
> > added public exception to communicate user-requested whole app shutdown,
> > but the reason I did not bring this up is that I feel there's still a
> need
> > from operational aspects that we can differentiate the scenario where an
> > instance is closed because of a) local `streams.close()` triggered, or
> b) a
> > remote instance's `stream.shutdownApp` triggered. So if we are going to
> > remove that exception (which I'm also in favor), we should at least
> > differentiate from the log4j levels.
> >
> > Regarding the semantics that "It should wait to receive the shutdown
> > request in the rebalance it triggers." I'm not sure I fully understand,
> > since this may be triggered from the stream thread's uncaught exception
> > handler, if that thread is already dead then maybe a rebalance listener
> > would not even be fired at all. Although I know this is some
> implementation
> > details that you probably abstract away from the proposal, I'd like to
> make
> > sure that we are on the same page regarding its blocking behavior since
> it
> > is quite crucial to users as well. Could you elaborate a bit more?
> >
> > Regarding the function name, I guess my personal preference would depend
> on
> > its actual blocking behavior as above :)
> >
> >
> > Guozhang
> >
> >
> >
> >
> > On Wed, Sep 16, 2020 at 10:52 AM Walker Carlson <wcarl...@confluent.io>
> > wrote:
> >
> > > Hello all again,
> > >
> > > I have updated the kip to no longer use an exception and instead add a
> > > method to the KafkaStreams class, this seems to satisfy everyone's
> > concerns
> > > about how and when the functionality will be invoked.
> > >
> > > There is still a question over the name. We must decide between
> > > "shutdownApplication", "initateCloseAll", "closeAllInstaces" or some
> > > variation.
> > >
> > > I am rather indifferent to the name. I think that they all get the
> point
> > > across. The most clear to me would be shutdownApplicaiton or
> > > closeAllInstacnes but WDYT?
> > >
> > > Walker
> > >
> > >
> > >
> > > On Wed, Sep 16, 2020 at 9:30 AM Walker Carlson <wcarl...@confluent.io>
> > > wrote:
> > >
> > > > Hello Guozhang and Bruno,
> > > >
> > > > Thanks for the feedback.
> > > >
> > > > I will respond in two parts but I would like to clarify that I am not
> > > tied
> > > > down to any of these names, but since we are still deciding if we
> want
> > to
> > > > have an exception or not I would rather not get tripped up on
> choosing
> > a
> > > > name just yet.
> > > >
> > > > Guozhang:
> > > > 1)  you are right about the INCOMPLETE_SOURCE_TOPIC_METADATA error. I
> > am
> > > > not planning on changing the behavior of handling source topic
> > deletion.
> > > > That is being down in kip-662 by Bruno. He is enabling the user to
> > create
> > > > their own handler and shutdownApplication is giving them the option
> to
> > > > shutdown.
> > > >
> > > > 2) It seems that we will remove the Exception entirely so this won't
> > > > matter (below)
> > > >
> > > > 3) It should wait to receive the shutdown request in the rebalance it
> > > > triggers. That might be a better name. I am torn between using
> > > > "application" or "all Instances" in a couple places. I think we
> should
> > > pick
> > > > one and be consistent but I am unsure which is more descriptive.
> > > >
> > > > Bruno:
> > > > I agree that in principle Exceptions should be used in exception
> cases.
> > > > And I have added a method in KafkaStreams to handle cases where an
> > > > Exception would not be appropriate. I guess you think that users
> should
> > > > never throw a Streams Exception then they could always throw and
> catch
> > > > their own exception and call shutdown Application from there. This
> > would
> > > > allow them to exit a processor if they wanted to shutdown from
> there. I
> > > > will update the Kip to remove the exception.
> > > >
> > > > I would like to add that in the case of trying to shutdown from the
> > > > uncaught exception handler that we need at least one StreamThread to
> be
> > > > alive. So having our own handler instead of using the default one
> after
> > > the
> > > > thread has died would let us always close the application.
> > > >
> > > > Walker
> > > >
> > > > On Wed, Sep 16, 2020 at 5:02 AM Bruno Cadonna <br...@confluent.io>
> > > wrote:
> > > >
> > > >> Hi Walker,
> > > >>
> > > >> Thank you for the KIP!
> > > >>
> > > >> I like the motivation of the KIP and the method to request a
> shutdown
> > of
> > > >> all Kafka Streams clients of Kafka Streams application. I think we
> > > >> really need such functionality to react on errors. However, I am not
> > > >> convinced that throwing an exception to shutdown all clients is a
> good
> > > >> idea.
> > > >>
> > > >> An exception signals an exceptional situation to which we can react
> in
> > > >> multiple ways depending on the context. The exception that you
> propose
> > > >> seems rather a well defined user command than a exceptional
> situation
> > to
> > > >> me. IMO, we should not use exceptions to control program flow
> because
> > it
> > > >> mixes cause and effect. Hence, I would propose an invariant for
> public
> > > >> exceptions in Kafka Streams. The public exceptions in Kafka Streams
> > > >> should be caught by users, not thrown. But maybe I am missing the
> big
> > > >> advantage of using an exception here.
> > > >>
> > > >> I echo Guozhang's third point about clarifying the behavior of the
> > > >> method and the naming.
> > > >>
> > > >> Best,
> > > >> Bruno
> > > >>
> > > >> On 16.09.20 06:28, Guozhang Wang wrote:
> > > >> > Hello Walker,
> > > >> >
> > > >> > Thanks for proposing the KIP! I have a couple more comments:
> > > >> >
> > > >> > 1. ShutdownRequestedException: my understanding is that this
> > exception
> > > >> is
> > > >> > only used if the application-shutdown was initiated by by the user
> > > >> > triggered "shutdownApplication()", otherwise e.g. if it is due to
> > > source
> > > >> > topic not found and Streams library decides to close the whole
> > > >> application
> > > >> > automatically, we would still throw the original exception
> > > >> > a.k.a. MissingSourceTopicException to the uncaught exception
> > handling.
> > > >> Is
> > > >> > that the case? Also for this exception, which package are you
> > > proposing
> > > >> to
> > > >> > add to?
> > > >> >
> > > >> > 2. ShutdownRequestedException: for its constructor, I'm wondering
> > what
> > > >> > Throwable "root cause" could it ever be? Since I'm guessing here
> > that
> > > we
> > > >> > would just use a single error code in the protocol still to tell
> > other
> > > >> > instances to shutdown, and that error code would not allow us to
> > > encode
> > > >> any
> > > >> > more information like root causes at all, it seems that parameter
> > > would
> > > >> > always be null.
> > > >> >
> > > >> > 3. shutdownApplication: again I'd like to clarify, would this
> > function
> > > >> > block on the local instance to complete shutting down all its
> > threads
> > > >> like
> > > >> > `close()` as well, or would it just to initiate the shutdown and
> not
> > > >> wait
> > > >> > for local threads at all? Also a nit suggestion regarding the
> name,
> > if
> > > >> it
> > > >> > is only for initiating the shutdown, maybe naming as
> > > "initiateCloseAll"
> > > >> > would be more specific?
> > > >> >
> > > >> >
> > > >> > Guozhang
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> > On Mon, Sep 14, 2020 at 10:13 AM Walker Carlson <
> > > wcarl...@confluent.io>
> > > >> > wrote:
> > > >> >
> > > >> >> Hello Matthias and Sophie,
> > > >> >>
> > > >> >> You both make good points. I will respond to the separately
> below.
> > > >> >>
> > > >> >>
> > > >> >> Matthias:
> > > >> >> That is a fair point. KIP-662
> > > >> >> <
> > > >> >>
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-662%3A+Throw+Exception+when+Source+Topics+of+a+Streams+App+are+Deleted
> > > >> >>> ,
> > > >> >> which
> > > >> >> is accepted, will make it so Source topic deletion will make it
> to
> > > the
> > > >> >> uncaught exception handler. Shutdown can be initiated from there.
> > > >> However
> > > >> >> this would mean that the stream thread is already dead. So I
> would
> > > >> have to
> > > >> >> rethink the exception for this use case, perhaps it would be
> needed
> > > in
> > > >> the
> > > >> >> KakfaStreams object. But this still leaves the case where there
> is
> > > >> only one
> > > >> >> stream thread. I will think about it.
> > > >> >>
> > > >> >> Maybe the source topics are a bad example as it makes this kip
> > > >> dependent on
> > > >> >> Kip-662 getting implemented in a certain way. However this is not
> > the
> > > >> only
> > > >> >> reason this could be useful here
> > > >> >> <https://issues.apache.org/jira/browse/KAFKA-4748> is a jira
> > ticket
> > > >> asking
> > > >> >> for the same functionality. I have added a few other use cases to
> > the
> > > >> kip.
> > > >> >> Although I will still be rethinking where I want to add this
> > > >> functionality
> > > >> >> and whether it should be an exception or not.
> > > >> >>
> > > >> >> Sophie:
> > > >> >> I agree that shutting down an instance could also be useful.
> There
> > > was
> > > >> some
> > > >> >> discussion about this on KIP-663. It seems that we came to the
> > > >> conclusion
> > > >> >> that close(Duration.ZERO) would be sufficient. link
> > > >> >> <
> > > >> >>
> > > >>
> > >
> >
> https://mail-archives.apache.org/mod_mbox/kafka-dev/202008.mbox/%3c95f95168-2811-e57e-96e2-fb5e71d92...@confluent.io%3e
> > > >> >>>
> > > >> >> to
> > > >> >> thread
> > > >> >>
> > > >> >> Also I am not set on the name ShutdownRequested. If we decide to
> > keep
> > > >> at as
> > > >> >> an exception your idea is probably a better name.
> > > >> >>
> > > >> >>
> > > >> >> Thanks for the feedback,
> > > >> >> Walker
> > > >> >>
> > > >> >>
> > > >> >> On Fri, Sep 11, 2020 at 11:08 AM Matthias J. Sax <
> mj...@apache.org
> > >
> > > >> wrote:
> > > >> >>
> > > >> >>> Thanks for the KIP.
> > > >> >>>
> > > >> >>> It seem that the new exception would need to be thrown by user
> > code?
> > > >> >>> However, in the motivation you mention the scenario of a missing
> > > >> source
> > > >> >>> topic that a user cannot detect, but KafkaStreams runtime would
> be
> > > >> >>> responsible to handle.
> > > >> >>>
> > > >> >>> How do both things go together?
> > > >> >>>
> > > >> >>>
> > > >> >>> -Matthias
> > > >> >>>
> > > >> >>> On 9/11/20 10:31 AM, Walker Carlson wrote:
> > > >> >>>> Hello all,
> > > >> >>>>
> > > >> >>>> I have created KIP-671 to give the option to shutdown a streams
> > > >> >>>> application in response to an error.
> > > >> >>>>
> > > >> >>>
> > > >> >>
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-671%3A+Shutdown+Streams+Application+when+appropriate+exception+is+thrown
> > > >> >>>>
> > > >> >>>> This is because of the Jira ticket
> > > >> >>>> <https://issues.apache.org/jira/browse/KAFKA-9331>
> > > >> >>>>
> > > >> >>>> Please give it a look and let me know if you have any feedback.
> > > >> >>>>
> > > >> >>>> Thanks,
> > > >> >>>> Walker
> > > >> >>>>
> > > >> >>>
> > > >> >>>
> > > >> >>
> > > >> >
> > > >> >
> > > >>
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Reply via email to