Hi Matthias, I investigated the inconsistencies between `close` semantics of `Producer`, `Consumer`, and `AdminClient`. And I found that this inconsistency was intended. Here are the details:
The current `KafkaConsumer#close`'s default timeout, 30 seconds, was introduced in KIP-102 (0.10.2.0)[^1]. According to the document, there are two differences between `Consumer` and `Producer`; 1. `Consumer`s don't have large requests. 2. `Consumer#close` is affected by consumer coordinator, whose close operation is affected by `request.timeout.ms`. By the above reasons, Consumer's default timeout was set a little bit different.[^3] (It is done by Rajini.) At the initial proposal, I proposed to change the default timeout value of `[Producer, AdminClient]#close` from `Long.MAX_VALUE` into another one; However, since it is now clear that the current implementation is totally reasonable, *it seems like changing the approach into just providing a close timeout into the clients used by KafkaStreams is a more suitable one.*[^4] This approach has the following advantages: 1. The problem described in KAFKA-7996 now resolved, since Producer doesn't hang up while its `close` operation. 2. We don't have to change the semantics of `Producer#close`, `AdminClient#close` nor `KafkaStreams#close`. As you pointed out, these kinds of changes are hard for users to reason about. How do you think? Thanks, Dongjin [^1]: https://cwiki.apache.org/confluence/display/KAFKA/KIP-102+-+Add+close+with+timeout+for+consumers [^2]: "The existing close() method without a timeout will attempt to close the consumer gracefully with a default timeout of 30 seconds. This is different from the producer default of Long.MAX_VALUE since consumers don't have large requests." [^3]: 'Rejected Alternatives' section explains it. [^4]: In the case of Streams reset tool, `KafkaAdminClient`'s close timeout is 60 seconds (KIP-198): https://github.com/apache/kafka/pull/3927/files On Fri, Apr 26, 2019 at 5:16 PM Matthias J. Sax <matth...@confluent.io> wrote: > Thanks for the KIP. > > Overall, I agree with the sentiment of the KIP. The current semantics of > `KafkaStreams#close(timeout)` are not well defined. Also the general > client inconsistencies are annoying. > > > > This KIP make any change on public interfaces; however, it makes a > subtle change to the existing API's semantics. If this KIP is accepted, > documenting these semantics with as much detail as possible may much better. > > I am not sure if I would call this change "subtle". It might actually be > rather big impact and hence I am also wondering about backward > compatibility (details below). Overall, I am not sure if documenting the > change would be sufficient. > > > > > Change the default close timeout of Producer, AdminClient into more > reasonable one, not Long.MAX_VALUE. > > Can you be more specific than "more reasonable", and propose a concrete > value? What about backward compatibility? Assume an application wants to > block forever by default: with this change, it's required to rewrite > code to keep the intended semantics. Hence, the change does not seems to > be backward compatible. Also note, that a config change would not be > sufficient, but an actual code change would be required. > > Also, why not go the other direction and default `KafkaConsumer#close()` > to use Long.MAX_VALUE, too? Note that current KafkaStreams#close() also > uses Long.MAX_VALUE (ie, over all 4 clients, it's 50:50). Of course, a > similar backward compatibility concern raises. > > Making close() blocking by default seems not un-reasonable per-se. Can > you elaborate why Long.MAX_VALUE is "bad" compared to eg, 30 seconds? > > > > If succeeded, simply return; if not, close remaining resources with > default close timeout. > > Why do you want to apply the default timeout as fallback? This would > violate the user intention, too, and thus, might result in a situation > that is not much better than the current one. > > > For KafkaStreams, if there are multiple StreamThreads, would the full > timeout be passed into each thread as all of them could shut down in > parallel? Or would the timeout be divided over all threads? > > What about the case when there is a different number of client? For > example, with EOS enabled, there are multiple Producers that need to be > closed, however, the user might not even be aware of the increased > number of producers (or not know how many there actually are). > > > It seems to be hard for users to reason about those dependencies. > > > -Matthias > > > On 4/23/19 6:13 PM, Dongjin Lee wrote: > > Hi dev, > > > > I would like to start the discussion of KIP-459: Improve > KafkaStreams#close > > < > https://cwiki.apache.org/confluence/display/KAFKA/KIP-459%3A+Improve+KafkaStreams%23close > >. > > This proposal is originated from the issue reported via community slack, > > KAFKA-7996 <https://issues.apache.org/jira/browse/KAFKA-7996>. In short, > > this KIP proposes to resolve this problem by improving existing API's > > semantics, not adding any public API changes. > > > > Please have a look when you are free. All opinions will be highly > > appreciated. > > > > Thanks, > > Dongjin > > > > -- *Dongjin Lee* *A hitchhiker in the mathematical world.* *github: <http://goog_969573159/>github.com/dongjinleekr <https://github.com/dongjinleekr>linkedin: kr.linkedin.com/in/dongjinleekr <https://kr.linkedin.com/in/dongjinleekr>speakerdeck: speakerdeck.com/dongjin <https://speakerdeck.com/dongjin>*