Hello. I just uploaded the draft implementation of the three proposed alternatives.
- Type A: define a close timeout constant - https://github.com/dongjinleekr/kafka/tree/feature/KAFKA-7996-a - Type B: Provide a new configuration option, 'close.wait.ms' - https://github.com/dongjinleekr/kafka/tree/feature/KAFKA-7996-b - Type C: Extend KafkaStreams constructor to support a close timeout parameter - https://github.com/dongjinleekr/kafka/tree/feature/KAFKA-7996-c As you can see in the branches, Type B and C are a little bit more complicated than A, since it provides an option to control the timeout to close AdminClient and Producer. To provide that functionality, B and C share a refactoring commit, which replaces KafkaStreams#create into KafkaStreams.builder. It is why they are consist of two commits. Please have a look you are free. Thanks, Dongjin On Thu, May 30, 2019 at 12:26 PM Dongjin Lee <dong...@apache.org> wrote: > I just updated the KIP document reflecting what I found about the clients > API inconsistency and Matthias's comments. Since it is now obvious that > modifying the default close timeout for the client is not feasible, the > updated document proposes totally different alternatives. (see Rejected > Alternatives section) > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-459%3A+Improve+KafkaStreams%23close > > Please have a look you are free. All kinds of feedbacks are welcomed! > > Thanks, > Dongjin > > On Fri, May 24, 2019 at 1:07 PM Matthias J. Sax <matth...@confluent.io> > wrote: > >> Thanks for digging into the back ground. >> >> I think it would be good to get feedback from people who work on >> clients, too. >> >> >> -Matthias >> >> >> On 5/19/19 12:58 PM, Dongjin Lee wrote: >> > Hi Matthias, >> > >> > I investigated the inconsistencies between `close` semantics of >> `Producer`, >> > `Consumer`, and `AdminClient`. And I found that this inconsistency was >> > intended. Here are the details: >> > >> > The current `KafkaConsumer#close`'s default timeout, 30 seconds, was >> > introduced in KIP-102 (0.10.2.0)[^1]. According to the document, there >> are >> > two differences between `Consumer` and `Producer`; >> > >> > 1. `Consumer`s don't have large requests. >> > 2. `Consumer#close` is affected by consumer coordinator, whose close >> > operation is affected by `request.timeout.ms`. >> > >> > By the above reasons, Consumer's default timeout was set a little bit >> > different.[^3] (It is done by Rajini.) >> > >> > At the initial proposal, I proposed to change the default timeout value >> of >> > `[Producer, AdminClient]#close` from `Long.MAX_VALUE` into another one; >> > However, since it is now clear that the current implementation is >> totally >> > reasonable, *it seems like changing the approach into just providing a >> > close timeout into the clients used by KafkaStreams is a more suitable >> > one.*[^4] >> > This approach has the following advantages: >> > >> > 1. The problem described in KAFKA-7996 now resolved, since Producer >> doesn't >> > hang up while its `close` operation. >> > 2. We don't have to change the semantics of `Producer#close`, >> > `AdminClient#close` nor `KafkaStreams#close`. As you pointed out, these >> > kinds of changes are hard for users to reason about. >> > >> > How do you think? >> > >> > Thanks, >> > Dongjin >> > >> > [^1]: >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-102+-+Add+close+with+timeout+for+consumers >> > [^2]: "The existing close() method without a timeout will attempt to >> close >> > the consumer gracefully with a default timeout of 30 seconds. This is >> > different from the producer default of Long.MAX_VALUE since consumers >> don't >> > have large requests." >> > [^3]: 'Rejected Alternatives' section explains it. >> > [^4]: In the case of Streams reset tool, `KafkaAdminClient`'s close >> timeout >> > is 60 seconds (KIP-198): >> https://github.com/apache/kafka/pull/3927/files >> > >> > On Fri, Apr 26, 2019 at 5:16 PM Matthias J. Sax <matth...@confluent.io> >> > wrote: >> > >> >> Thanks for the KIP. >> >> >> >> Overall, I agree with the sentiment of the KIP. The current semantics >> of >> >> `KafkaStreams#close(timeout)` are not well defined. Also the general >> >> client inconsistencies are annoying. >> >> >> >> >> >>> This KIP make any change on public interfaces; however, it makes a >> >> subtle change to the existing API's semantics. If this KIP is accepted, >> >> documenting these semantics with as much detail as possible may much >> better. >> >> >> >> I am not sure if I would call this change "subtle". It might actually >> be >> >> rather big impact and hence I am also wondering about backward >> >> compatibility (details below). Overall, I am not sure if documenting >> the >> >> change would be sufficient. >> >> >> >> >> >> >> >>> Change the default close timeout of Producer, AdminClient into more >> >> reasonable one, not Long.MAX_VALUE. >> >> >> >> Can you be more specific than "more reasonable", and propose a concrete >> >> value? What about backward compatibility? Assume an application wants >> to >> >> block forever by default: with this change, it's required to rewrite >> >> code to keep the intended semantics. Hence, the change does not seems >> to >> >> be backward compatible. Also note, that a config change would not be >> >> sufficient, but an actual code change would be required. >> >> >> >> Also, why not go the other direction and default >> `KafkaConsumer#close()` >> >> to use Long.MAX_VALUE, too? Note that current KafkaStreams#close() also >> >> uses Long.MAX_VALUE (ie, over all 4 clients, it's 50:50). Of course, a >> >> similar backward compatibility concern raises. >> >> >> >> Making close() blocking by default seems not un-reasonable per-se. Can >> >> you elaborate why Long.MAX_VALUE is "bad" compared to eg, 30 seconds? >> >> >> >> >> >>> If succeeded, simply return; if not, close remaining resources with >> >> default close timeout. >> >> >> >> Why do you want to apply the default timeout as fallback? This would >> >> violate the user intention, too, and thus, might result in a situation >> >> that is not much better than the current one. >> >> >> >> >> >> For KafkaStreams, if there are multiple StreamThreads, would the full >> >> timeout be passed into each thread as all of them could shut down in >> >> parallel? Or would the timeout be divided over all threads? >> >> >> >> What about the case when there is a different number of client? For >> >> example, with EOS enabled, there are multiple Producers that need to be >> >> closed, however, the user might not even be aware of the increased >> >> number of producers (or not know how many there actually are). >> >> >> >> >> >> It seems to be hard for users to reason about those dependencies. >> >> >> >> >> >> -Matthias >> >> >> >> >> >> On 4/23/19 6:13 PM, Dongjin Lee wrote: >> >>> Hi dev, >> >>> >> >>> I would like to start the discussion of KIP-459: Improve >> >> KafkaStreams#close >> >>> < >> >> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-459%3A+Improve+KafkaStreams%23close >> >>> . >> >>> This proposal is originated from the issue reported via community >> slack, >> >>> KAFKA-7996 <https://issues.apache.org/jira/browse/KAFKA-7996>. In >> short, >> >>> this KIP proposes to resolve this problem by improving existing API's >> >>> semantics, not adding any public API changes. >> >>> >> >>> Please have a look when you are free. All opinions will be highly >> >>> appreciated. >> >>> >> >>> Thanks, >> >>> Dongjin >> >>> >> >> >> >> >> > >> >> > > -- > *Dongjin Lee* > > *A hitchhiker in the mathematical world.* > *github: <http://goog_969573159/>github.com/dongjinleekr > <https://github.com/dongjinleekr>linkedin: kr.linkedin.com/in/dongjinleekr > <https://kr.linkedin.com/in/dongjinleekr>speakerdeck: speakerdeck.com/dongjin > <https://speakerdeck.com/dongjin>* > -- *Dongjin Lee* *A hitchhiker in the mathematical world.* *github: <http://goog_969573159/>github.com/dongjinleekr <https://github.com/dongjinleekr>linkedin: kr.linkedin.com/in/dongjinleekr <https://kr.linkedin.com/in/dongjinleekr>speakerdeck: speakerdeck.com/dongjin <https://speakerdeck.com/dongjin>*