Hi Matthias,

I investigated the inconsistencies between `close` semantics of `Producer`,
`Consumer`, and `AdminClient`. And I found that this inconsistency was
intended. Here are the details:

The current `KafkaConsumer#close`'s default timeout, 30 seconds, was
introduced in KIP-102 (0.10.2.0)[^1]. According to the document, there are
two differences between `Consumer` and `Producer`;

1. `Consumer`s don't have large requests.
2. `Consumer#close` is affected by consumer coordinator, whose close
operation is affected by `request.timeout.ms`.

By the above reasons, Consumer's default timeout was set a little bit
different.[^3] (It is done by Rajini.)

At the initial proposal, I proposed to change the default timeout value of
`[Producer, AdminClient]#close` from `Long.MAX_VALUE` into another one;
However, since it is now clear that the current implementation is totally
reasonable, *it seems like changing the approach into just providing a
close timeout into the clients used by KafkaStreams is a more suitable
one.*[^4]
This approach has the following advantages:

1. The problem described in KAFKA-7996 now resolved, since Producer doesn't
hang up while its `close` operation.
2. We don't have to change the semantics of `Producer#close`,
`AdminClient#close` nor `KafkaStreams#close`. As you pointed out, these
kinds of changes are hard for users to reason about.

How do you think?

Thanks,
Dongjin

[^1]:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-102+-+Add+close+with+timeout+for+consumers
[^2]: "The existing close() method without a timeout will attempt to close
the consumer gracefully with a default timeout of 30 seconds. This is
different from the producer default of Long.MAX_VALUE since consumers don't
have large requests."
[^3]: 'Rejected Alternatives' section explains it.
[^4]: In the case of Streams reset tool, `KafkaAdminClient`'s close timeout
is 60 seconds (KIP-198): https://github.com/apache/kafka/pull/3927/files

On Fri, Apr 26, 2019 at 5:16 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> Thanks for the KIP.
>
> Overall, I agree with the sentiment of the KIP. The current semantics of
> `KafkaStreams#close(timeout)` are not well defined. Also the general
> client inconsistencies are annoying.
>
>
> > This KIP make any change on public interfaces; however, it makes a
> subtle change to the existing API's semantics. If this KIP is accepted,
> documenting these semantics with as much detail as possible may much better.
>
> I am not sure if I would call this change "subtle". It might actually be
> rather big impact and hence I am also wondering about backward
> compatibility (details below). Overall, I am not sure if documenting the
> change would be sufficient.
>
>
>
> > Change the default close timeout of Producer, AdminClient into more
> reasonable one, not Long.MAX_VALUE.
>
> Can you be more specific than "more reasonable", and propose a concrete
> value? What about backward compatibility? Assume an application wants to
> block forever by default: with this change, it's required to rewrite
> code to keep the intended semantics. Hence, the change does not seems to
> be backward compatible. Also note, that a config change would not be
> sufficient, but an actual code change would be required.
>
> Also, why not go the other direction and default `KafkaConsumer#close()`
> to use Long.MAX_VALUE, too? Note that current KafkaStreams#close() also
> uses Long.MAX_VALUE (ie, over all 4 clients, it's 50:50). Of course, a
> similar backward compatibility concern raises.
>
> Making close() blocking by default seems not un-reasonable per-se. Can
> you elaborate why Long.MAX_VALUE is "bad" compared to eg, 30 seconds?
>
>
> > If succeeded, simply return; if not, close remaining resources with
> default close timeout.
>
> Why do you want to apply the default timeout as fallback? This would
> violate the user intention, too, and thus, might result in a situation
> that is not much better than the current one.
>
>
> For KafkaStreams, if there are multiple StreamThreads, would the full
> timeout be passed into each thread as all of them could shut down in
> parallel? Or would the timeout be divided over all threads?
>
> What about the case when there is a different number of client? For
> example, with EOS enabled, there are multiple Producers that need to be
> closed, however, the user might not even be aware of the increased
> number of producers (or not know how many there actually are).
>
>
> It seems to be hard for users to reason about those dependencies.
>
>
> -Matthias
>
>
> On 4/23/19 6:13 PM, Dongjin Lee wrote:
> > Hi dev,
> >
> > I would like to start the discussion of KIP-459: Improve
> KafkaStreams#close
> > <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-459%3A+Improve+KafkaStreams%23close
> >.
> > This proposal is originated from the issue reported via community slack,
> > KAFKA-7996 <https://issues.apache.org/jira/browse/KAFKA-7996>. In short,
> > this KIP proposes to resolve this problem by improving existing API's
> > semantics, not adding any public API changes.
> >
> > Please have a look when you are free. All opinions will be highly
> > appreciated.
> >
> > Thanks,
> > Dongjin
> >
>
>

-- 
*Dongjin Lee*

*A hitchhiker in the mathematical world.*
*github:  <http://goog_969573159/>github.com/dongjinleekr
<https://github.com/dongjinleekr>linkedin: kr.linkedin.com/in/dongjinleekr
<https://kr.linkedin.com/in/dongjinleekr>speakerdeck: speakerdeck.com/dongjin
<https://speakerdeck.com/dongjin>*

Reply via email to