I just updated the KIP document reflecting what I found about the clients API inconsistency and Matthias's comments. Since it is now obvious that modifying the default close timeout for the client is not feasible, the updated document proposes totally different alternatives. (see Rejected Alternatives section)
https://cwiki.apache.org/confluence/display/KAFKA/KIP-459%3A+Improve+KafkaStreams%23close Please have a look you are free. All kinds of feedbacks are welcomed! Thanks, Dongjin On Fri, May 24, 2019 at 1:07 PM Matthias J. Sax <matth...@confluent.io> wrote: > Thanks for digging into the back ground. > > I think it would be good to get feedback from people who work on > clients, too. > > > -Matthias > > > On 5/19/19 12:58 PM, Dongjin Lee wrote: > > Hi Matthias, > > > > I investigated the inconsistencies between `close` semantics of > `Producer`, > > `Consumer`, and `AdminClient`. And I found that this inconsistency was > > intended. Here are the details: > > > > The current `KafkaConsumer#close`'s default timeout, 30 seconds, was > > introduced in KIP-102 (0.10.2.0)[^1]. According to the document, there > are > > two differences between `Consumer` and `Producer`; > > > > 1. `Consumer`s don't have large requests. > > 2. `Consumer#close` is affected by consumer coordinator, whose close > > operation is affected by `request.timeout.ms`. > > > > By the above reasons, Consumer's default timeout was set a little bit > > different.[^3] (It is done by Rajini.) > > > > At the initial proposal, I proposed to change the default timeout value > of > > `[Producer, AdminClient]#close` from `Long.MAX_VALUE` into another one; > > However, since it is now clear that the current implementation is totally > > reasonable, *it seems like changing the approach into just providing a > > close timeout into the clients used by KafkaStreams is a more suitable > > one.*[^4] > > This approach has the following advantages: > > > > 1. The problem described in KAFKA-7996 now resolved, since Producer > doesn't > > hang up while its `close` operation. > > 2. We don't have to change the semantics of `Producer#close`, > > `AdminClient#close` nor `KafkaStreams#close`. As you pointed out, these > > kinds of changes are hard for users to reason about. > > > > How do you think? > > > > Thanks, > > Dongjin > > > > [^1]: > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-102+-+Add+close+with+timeout+for+consumers > > [^2]: "The existing close() method without a timeout will attempt to > close > > the consumer gracefully with a default timeout of 30 seconds. This is > > different from the producer default of Long.MAX_VALUE since consumers > don't > > have large requests." > > [^3]: 'Rejected Alternatives' section explains it. > > [^4]: In the case of Streams reset tool, `KafkaAdminClient`'s close > timeout > > is 60 seconds (KIP-198): https://github.com/apache/kafka/pull/3927/files > > > > On Fri, Apr 26, 2019 at 5:16 PM Matthias J. Sax <matth...@confluent.io> > > wrote: > > > >> Thanks for the KIP. > >> > >> Overall, I agree with the sentiment of the KIP. The current semantics of > >> `KafkaStreams#close(timeout)` are not well defined. Also the general > >> client inconsistencies are annoying. > >> > >> > >>> This KIP make any change on public interfaces; however, it makes a > >> subtle change to the existing API's semantics. If this KIP is accepted, > >> documenting these semantics with as much detail as possible may much > better. > >> > >> I am not sure if I would call this change "subtle". It might actually be > >> rather big impact and hence I am also wondering about backward > >> compatibility (details below). Overall, I am not sure if documenting the > >> change would be sufficient. > >> > >> > >> > >>> Change the default close timeout of Producer, AdminClient into more > >> reasonable one, not Long.MAX_VALUE. > >> > >> Can you be more specific than "more reasonable", and propose a concrete > >> value? What about backward compatibility? Assume an application wants to > >> block forever by default: with this change, it's required to rewrite > >> code to keep the intended semantics. Hence, the change does not seems to > >> be backward compatible. Also note, that a config change would not be > >> sufficient, but an actual code change would be required. > >> > >> Also, why not go the other direction and default `KafkaConsumer#close()` > >> to use Long.MAX_VALUE, too? Note that current KafkaStreams#close() also > >> uses Long.MAX_VALUE (ie, over all 4 clients, it's 50:50). Of course, a > >> similar backward compatibility concern raises. > >> > >> Making close() blocking by default seems not un-reasonable per-se. Can > >> you elaborate why Long.MAX_VALUE is "bad" compared to eg, 30 seconds? > >> > >> > >>> If succeeded, simply return; if not, close remaining resources with > >> default close timeout. > >> > >> Why do you want to apply the default timeout as fallback? This would > >> violate the user intention, too, and thus, might result in a situation > >> that is not much better than the current one. > >> > >> > >> For KafkaStreams, if there are multiple StreamThreads, would the full > >> timeout be passed into each thread as all of them could shut down in > >> parallel? Or would the timeout be divided over all threads? > >> > >> What about the case when there is a different number of client? For > >> example, with EOS enabled, there are multiple Producers that need to be > >> closed, however, the user might not even be aware of the increased > >> number of producers (or not know how many there actually are). > >> > >> > >> It seems to be hard for users to reason about those dependencies. > >> > >> > >> -Matthias > >> > >> > >> On 4/23/19 6:13 PM, Dongjin Lee wrote: > >>> Hi dev, > >>> > >>> I would like to start the discussion of KIP-459: Improve > >> KafkaStreams#close > >>> < > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-459%3A+Improve+KafkaStreams%23close > >>> . > >>> This proposal is originated from the issue reported via community > slack, > >>> KAFKA-7996 <https://issues.apache.org/jira/browse/KAFKA-7996>. In > short, > >>> this KIP proposes to resolve this problem by improving existing API's > >>> semantics, not adding any public API changes. > >>> > >>> Please have a look when you are free. All opinions will be highly > >>> appreciated. > >>> > >>> Thanks, > >>> Dongjin > >>> > >> > >> > > > > -- *Dongjin Lee* *A hitchhiker in the mathematical world.* *github: <http://goog_969573159/>github.com/dongjinleekr <https://github.com/dongjinleekr>linkedin: kr.linkedin.com/in/dongjinleekr <https://kr.linkedin.com/in/dongjinleekr>speakerdeck: speakerdeck.com/dongjin <https://speakerdeck.com/dongjin>*