Hi Matthias, Have you thought about this issue?
Thanks, Dongjin On Wed, Jun 19, 2019 at 5:07 AM Dongjin Lee <dong...@apache.org> wrote: > Hello. > > I just uploaded the draft implementation of the three proposed > alternatives. > > - Type A: define a close timeout constant - > https://github.com/dongjinleekr/kafka/tree/feature/KAFKA-7996-a > - Type B: Provide a new configuration option, 'close.wait.ms' - > https://github.com/dongjinleekr/kafka/tree/feature/KAFKA-7996-b > - Type C: Extend KafkaStreams constructor to support a close timeout > parameter - > https://github.com/dongjinleekr/kafka/tree/feature/KAFKA-7996-c > > As you can see in the branches, Type B and C are a little bit more > complicated than A, since it provides an option to control the timeout to > close AdminClient and Producer. To provide that functionality, B and C > share a refactoring commit, which replaces KafkaStreams#create into > KafkaStreams.builder. It is why they are consist of two commits. > > Please have a look you are free. > > Thanks, > Dongjin > > On Thu, May 30, 2019 at 12:26 PM Dongjin Lee <dong...@apache.org> wrote: > >> I just updated the KIP document reflecting what I found about the clients >> API inconsistency and Matthias's comments. Since it is now obvious that >> modifying the default close timeout for the client is not feasible, the >> updated document proposes totally different alternatives. (see Rejected >> Alternatives section) >> >> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-459%3A+Improve+KafkaStreams%23close >> >> Please have a look you are free. All kinds of feedbacks are welcomed! >> >> Thanks, >> Dongjin >> >> On Fri, May 24, 2019 at 1:07 PM Matthias J. Sax <matth...@confluent.io> >> wrote: >> >>> Thanks for digging into the back ground. >>> >>> I think it would be good to get feedback from people who work on >>> clients, too. >>> >>> >>> -Matthias >>> >>> >>> On 5/19/19 12:58 PM, Dongjin Lee wrote: >>> > Hi Matthias, >>> > >>> > I investigated the inconsistencies between `close` semantics of >>> `Producer`, >>> > `Consumer`, and `AdminClient`. And I found that this inconsistency was >>> > intended. Here are the details: >>> > >>> > The current `KafkaConsumer#close`'s default timeout, 30 seconds, was >>> > introduced in KIP-102 (0.10.2.0)[^1]. According to the document, there >>> are >>> > two differences between `Consumer` and `Producer`; >>> > >>> > 1. `Consumer`s don't have large requests. >>> > 2. `Consumer#close` is affected by consumer coordinator, whose close >>> > operation is affected by `request.timeout.ms`. >>> > >>> > By the above reasons, Consumer's default timeout was set a little bit >>> > different.[^3] (It is done by Rajini.) >>> > >>> > At the initial proposal, I proposed to change the default timeout >>> value of >>> > `[Producer, AdminClient]#close` from `Long.MAX_VALUE` into another one; >>> > However, since it is now clear that the current implementation is >>> totally >>> > reasonable, *it seems like changing the approach into just providing a >>> > close timeout into the clients used by KafkaStreams is a more suitable >>> > one.*[^4] >>> > This approach has the following advantages: >>> > >>> > 1. The problem described in KAFKA-7996 now resolved, since Producer >>> doesn't >>> > hang up while its `close` operation. >>> > 2. We don't have to change the semantics of `Producer#close`, >>> > `AdminClient#close` nor `KafkaStreams#close`. As you pointed out, these >>> > kinds of changes are hard for users to reason about. >>> > >>> > How do you think? >>> > >>> > Thanks, >>> > Dongjin >>> > >>> > [^1]: >>> > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-102+-+Add+close+with+timeout+for+consumers >>> > [^2]: "The existing close() method without a timeout will attempt to >>> close >>> > the consumer gracefully with a default timeout of 30 seconds. This is >>> > different from the producer default of Long.MAX_VALUE since consumers >>> don't >>> > have large requests." >>> > [^3]: 'Rejected Alternatives' section explains it. >>> > [^4]: In the case of Streams reset tool, `KafkaAdminClient`'s close >>> timeout >>> > is 60 seconds (KIP-198): >>> https://github.com/apache/kafka/pull/3927/files >>> > >>> > On Fri, Apr 26, 2019 at 5:16 PM Matthias J. Sax <matth...@confluent.io >>> > >>> > wrote: >>> > >>> >> Thanks for the KIP. >>> >> >>> >> Overall, I agree with the sentiment of the KIP. The current semantics >>> of >>> >> `KafkaStreams#close(timeout)` are not well defined. Also the general >>> >> client inconsistencies are annoying. >>> >> >>> >> >>> >>> This KIP make any change on public interfaces; however, it makes a >>> >> subtle change to the existing API's semantics. If this KIP is >>> accepted, >>> >> documenting these semantics with as much detail as possible may much >>> better. >>> >> >>> >> I am not sure if I would call this change "subtle". It might actually >>> be >>> >> rather big impact and hence I am also wondering about backward >>> >> compatibility (details below). Overall, I am not sure if documenting >>> the >>> >> change would be sufficient. >>> >> >>> >> >>> >> >>> >>> Change the default close timeout of Producer, AdminClient into more >>> >> reasonable one, not Long.MAX_VALUE. >>> >> >>> >> Can you be more specific than "more reasonable", and propose a >>> concrete >>> >> value? What about backward compatibility? Assume an application wants >>> to >>> >> block forever by default: with this change, it's required to rewrite >>> >> code to keep the intended semantics. Hence, the change does not seems >>> to >>> >> be backward compatible. Also note, that a config change would not be >>> >> sufficient, but an actual code change would be required. >>> >> >>> >> Also, why not go the other direction and default >>> `KafkaConsumer#close()` >>> >> to use Long.MAX_VALUE, too? Note that current KafkaStreams#close() >>> also >>> >> uses Long.MAX_VALUE (ie, over all 4 clients, it's 50:50). Of course, a >>> >> similar backward compatibility concern raises. >>> >> >>> >> Making close() blocking by default seems not un-reasonable per-se. Can >>> >> you elaborate why Long.MAX_VALUE is "bad" compared to eg, 30 seconds? >>> >> >>> >> >>> >>> If succeeded, simply return; if not, close remaining resources with >>> >> default close timeout. >>> >> >>> >> Why do you want to apply the default timeout as fallback? This would >>> >> violate the user intention, too, and thus, might result in a situation >>> >> that is not much better than the current one. >>> >> >>> >> >>> >> For KafkaStreams, if there are multiple StreamThreads, would the full >>> >> timeout be passed into each thread as all of them could shut down in >>> >> parallel? Or would the timeout be divided over all threads? >>> >> >>> >> What about the case when there is a different number of client? For >>> >> example, with EOS enabled, there are multiple Producers that need to >>> be >>> >> closed, however, the user might not even be aware of the increased >>> >> number of producers (or not know how many there actually are). >>> >> >>> >> >>> >> It seems to be hard for users to reason about those dependencies. >>> >> >>> >> >>> >> -Matthias >>> >> >>> >> >>> >> On 4/23/19 6:13 PM, Dongjin Lee wrote: >>> >>> Hi dev, >>> >>> >>> >>> I would like to start the discussion of KIP-459: Improve >>> >> KafkaStreams#close >>> >>> < >>> >> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-459%3A+Improve+KafkaStreams%23close >>> >>> . >>> >>> This proposal is originated from the issue reported via community >>> slack, >>> >>> KAFKA-7996 <https://issues.apache.org/jira/browse/KAFKA-7996>. In >>> short, >>> >>> this KIP proposes to resolve this problem by improving existing API's >>> >>> semantics, not adding any public API changes. >>> >>> >>> >>> Please have a look when you are free. All opinions will be highly >>> >>> appreciated. >>> >>> >>> >>> Thanks, >>> >>> Dongjin >>> >>> >>> >> >>> >> >>> > >>> >>> >> >> -- >> *Dongjin Lee* >> >> *A hitchhiker in the mathematical world.* >> *github: <http://goog_969573159/>github.com/dongjinleekr >> <https://github.com/dongjinleekr>linkedin: kr.linkedin.com/in/dongjinleekr >> <https://kr.linkedin.com/in/dongjinleekr>speakerdeck: speakerdeck.com/dongjin >> <https://speakerdeck.com/dongjin>* >> > > > -- > *Dongjin Lee* > > *A hitchhiker in the mathematical world.* > *github: <http://goog_969573159/>github.com/dongjinleekr > <https://github.com/dongjinleekr>linkedin: kr.linkedin.com/in/dongjinleekr > <https://kr.linkedin.com/in/dongjinleekr>speakerdeck: speakerdeck.com/dongjin > <https://speakerdeck.com/dongjin>* > -- *Dongjin Lee* *A hitchhiker in the mathematical world.* *github: <http://goog_969573159/>github.com/dongjinleekr <https://github.com/dongjinleekr>linkedin: kr.linkedin.com/in/dongjinleekr <https://kr.linkedin.com/in/dongjinleekr>speakerdeck: speakerdeck.com/dongjin <https://speakerdeck.com/dongjin>*