Hello.

I just uploaded the draft implementation of the three proposed alternatives.

- Type A: define a close timeout constant -
https://github.com/dongjinleekr/kafka/tree/feature/KAFKA-7996-a
- Type B: Provide a new configuration option, 'close.wait.ms' -
https://github.com/dongjinleekr/kafka/tree/feature/KAFKA-7996-b
- Type C: Extend KafkaStreams constructor to support a close timeout
parameter - https://github.com/dongjinleekr/kafka/tree/feature/KAFKA-7996-c

As you can see in the branches, Type B and C are a little bit more
complicated than A, since it provides an option to control the timeout to
close AdminClient and Producer. To provide that functionality, B and C
share a refactoring commit, which replaces KafkaStreams#create into
KafkaStreams.builder. It is why they are consist of two commits.

Please have a look you are free.

Thanks,
Dongjin

On Thu, May 30, 2019 at 12:26 PM Dongjin Lee <dong...@apache.org> wrote:

> I just updated the KIP document reflecting what I found about the clients
> API inconsistency and Matthias's comments. Since it is now obvious that
> modifying the default close timeout for the client is not feasible, the
> updated document proposes totally different alternatives. (see Rejected
> Alternatives section)
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-459%3A+Improve+KafkaStreams%23close
>
> Please have a look you are free. All kinds of feedbacks are welcomed!
>
> Thanks,
> Dongjin
>
> On Fri, May 24, 2019 at 1:07 PM Matthias J. Sax <matth...@confluent.io>
> wrote:
>
>> Thanks for digging into the back ground.
>>
>> I think it would be good to get feedback from people who work on
>> clients, too.
>>
>>
>> -Matthias
>>
>>
>> On 5/19/19 12:58 PM, Dongjin Lee wrote:
>> > Hi Matthias,
>> >
>> > I investigated the inconsistencies between `close` semantics of
>> `Producer`,
>> > `Consumer`, and `AdminClient`. And I found that this inconsistency was
>> > intended. Here are the details:
>> >
>> > The current `KafkaConsumer#close`'s default timeout, 30 seconds, was
>> > introduced in KIP-102 (0.10.2.0)[^1]. According to the document, there
>> are
>> > two differences between `Consumer` and `Producer`;
>> >
>> > 1. `Consumer`s don't have large requests.
>> > 2. `Consumer#close` is affected by consumer coordinator, whose close
>> > operation is affected by `request.timeout.ms`.
>> >
>> > By the above reasons, Consumer's default timeout was set a little bit
>> > different.[^3] (It is done by Rajini.)
>> >
>> > At the initial proposal, I proposed to change the default timeout value
>> of
>> > `[Producer, AdminClient]#close` from `Long.MAX_VALUE` into another one;
>> > However, since it is now clear that the current implementation is
>> totally
>> > reasonable, *it seems like changing the approach into just providing a
>> > close timeout into the clients used by KafkaStreams is a more suitable
>> > one.*[^4]
>> > This approach has the following advantages:
>> >
>> > 1. The problem described in KAFKA-7996 now resolved, since Producer
>> doesn't
>> > hang up while its `close` operation.
>> > 2. We don't have to change the semantics of `Producer#close`,
>> > `AdminClient#close` nor `KafkaStreams#close`. As you pointed out, these
>> > kinds of changes are hard for users to reason about.
>> >
>> > How do you think?
>> >
>> > Thanks,
>> > Dongjin
>> >
>> > [^1]:
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-102+-+Add+close+with+timeout+for+consumers
>> > [^2]: "The existing close() method without a timeout will attempt to
>> close
>> > the consumer gracefully with a default timeout of 30 seconds. This is
>> > different from the producer default of Long.MAX_VALUE since consumers
>> don't
>> > have large requests."
>> > [^3]: 'Rejected Alternatives' section explains it.
>> > [^4]: In the case of Streams reset tool, `KafkaAdminClient`'s close
>> timeout
>> > is 60 seconds (KIP-198):
>> https://github.com/apache/kafka/pull/3927/files
>> >
>> > On Fri, Apr 26, 2019 at 5:16 PM Matthias J. Sax <matth...@confluent.io>
>> > wrote:
>> >
>> >> Thanks for the KIP.
>> >>
>> >> Overall, I agree with the sentiment of the KIP. The current semantics
>> of
>> >> `KafkaStreams#close(timeout)` are not well defined. Also the general
>> >> client inconsistencies are annoying.
>> >>
>> >>
>> >>> This KIP make any change on public interfaces; however, it makes a
>> >> subtle change to the existing API's semantics. If this KIP is accepted,
>> >> documenting these semantics with as much detail as possible may much
>> better.
>> >>
>> >> I am not sure if I would call this change "subtle". It might actually
>> be
>> >> rather big impact and hence I am also wondering about backward
>> >> compatibility (details below). Overall, I am not sure if documenting
>> the
>> >> change would be sufficient.
>> >>
>> >>
>> >>
>> >>> Change the default close timeout of Producer, AdminClient into more
>> >> reasonable one, not Long.MAX_VALUE.
>> >>
>> >> Can you be more specific than "more reasonable", and propose a concrete
>> >> value? What about backward compatibility? Assume an application wants
>> to
>> >> block forever by default: with this change, it's required to rewrite
>> >> code to keep the intended semantics. Hence, the change does not seems
>> to
>> >> be backward compatible. Also note, that a config change would not be
>> >> sufficient, but an actual code change would be required.
>> >>
>> >> Also, why not go the other direction and default
>> `KafkaConsumer#close()`
>> >> to use Long.MAX_VALUE, too? Note that current KafkaStreams#close() also
>> >> uses Long.MAX_VALUE (ie, over all 4 clients, it's 50:50). Of course, a
>> >> similar backward compatibility concern raises.
>> >>
>> >> Making close() blocking by default seems not un-reasonable per-se. Can
>> >> you elaborate why Long.MAX_VALUE is "bad" compared to eg, 30 seconds?
>> >>
>> >>
>> >>> If succeeded, simply return; if not, close remaining resources with
>> >> default close timeout.
>> >>
>> >> Why do you want to apply the default timeout as fallback? This would
>> >> violate the user intention, too, and thus, might result in a situation
>> >> that is not much better than the current one.
>> >>
>> >>
>> >> For KafkaStreams, if there are multiple StreamThreads, would the full
>> >> timeout be passed into each thread as all of them could shut down in
>> >> parallel? Or would the timeout be divided over all threads?
>> >>
>> >> What about the case when there is a different number of client? For
>> >> example, with EOS enabled, there are multiple Producers that need to be
>> >> closed, however, the user might not even be aware of the increased
>> >> number of producers (or not know how many there actually are).
>> >>
>> >>
>> >> It seems to be hard for users to reason about those dependencies.
>> >>
>> >>
>> >> -Matthias
>> >>
>> >>
>> >> On 4/23/19 6:13 PM, Dongjin Lee wrote:
>> >>> Hi dev,
>> >>>
>> >>> I would like to start the discussion of KIP-459: Improve
>> >> KafkaStreams#close
>> >>> <
>> >>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-459%3A+Improve+KafkaStreams%23close
>> >>> .
>> >>> This proposal is originated from the issue reported via community
>> slack,
>> >>> KAFKA-7996 <https://issues.apache.org/jira/browse/KAFKA-7996>. In
>> short,
>> >>> this KIP proposes to resolve this problem by improving existing API's
>> >>> semantics, not adding any public API changes.
>> >>>
>> >>> Please have a look when you are free. All opinions will be highly
>> >>> appreciated.
>> >>>
>> >>> Thanks,
>> >>> Dongjin
>> >>>
>> >>
>> >>
>> >
>>
>>
>
> --
> *Dongjin Lee*
>
> *A hitchhiker in the mathematical world.*
> *github:  <http://goog_969573159/>github.com/dongjinleekr
> <https://github.com/dongjinleekr>linkedin: kr.linkedin.com/in/dongjinleekr
> <https://kr.linkedin.com/in/dongjinleekr>speakerdeck: speakerdeck.com/dongjin
> <https://speakerdeck.com/dongjin>*
>


-- 
*Dongjin Lee*

*A hitchhiker in the mathematical world.*
*github:  <http://goog_969573159/>github.com/dongjinleekr
<https://github.com/dongjinleekr>linkedin: kr.linkedin.com/in/dongjinleekr
<https://kr.linkedin.com/in/dongjinleekr>speakerdeck: speakerdeck.com/dongjin
<https://speakerdeck.com/dongjin>*

Reply via email to