Cross-pollinating from some discussion we've had on KIP-288,

I think there's a good reason that poll() takes a timeout when none of the
other methods do, and it's relevant to this discussion. The timeout in
poll() is effectively implementing a long-poll API (on the client side, so
it's not really long-poll, but the programmer-facing behavior is the same).
The timeout isn't really bounding the execution time of the method, but
instead giving a max time that callers are willing to wait around and see
if any results show up.

If I understand the code sufficiently, it would be perfectly reasonable for
a caller to use a timeout of 0 to implement async poll, it would just mean
that KafkaConsumer would just check on each call if there's a response
ready and if not, fire off a new request without waiting for a response.

As such, it seems inappropriate to throw a ClientTimeoutException from
poll(), except possibly if the initial phase of ensuring an assignment
times out. We wouldn't want the method contract to be "returns a non-empty
collection or throws a ClientTimeoutException"

Now, I'm wondering if we should actually consider one of my rejected
alternatives, to treat the "operation timeout" as a separate parameter from
the "long-poll time". Or maybe adding an "asyncPoll(timeout, time unit)"
that only uses the timeout to bound metadata updates and otherwise behaves
like the current "poll(0)".

Thanks,
-John

On Tue, Apr 17, 2018 at 2:05 PM, John Roesler <j...@confluent.io> wrote:

> Hey Richard,
>
> As you noticed, the newly introduced KIP-288 overlaps with this one. Sorry
> for stepping on your toes... How would you like to proceed? I'm happy to
> "close" KIP-288 in deference to this KIP.
>
> With respect to poll(), reading this discussion gave me a new idea for
> providing a non-breaking update path... What if we introduce a new variant
> 'poll(long timeout, TimeUnit unit)' that displays the new, desired
> behavior, and just leave the old method alone?
>
> Thanks,
> -John
>
> On Tue, Apr 17, 2018 at 12:09 PM, Richard Yu <yohan.richard...@gmail.com>
> wrote:
>
>> Hi all,
>>
>> If possible, would a committer please review?
>>
>> Thanks
>>
>> On Sun, Apr 1, 2018 at 7:24 PM, Richard Yu <yohan.richard...@gmail.com>
>> wrote:
>>
>> > Hi Guozhang,
>> >
>> > I have clarified the KIP a bit to account for Becket's suggestion on
>> > ClientTimeoutException.
>> > About adding an extra config, you were right about my intentions. I am
>> > just wondering if the config
>> > should be included, since Ismael seems to favor an extra configuration,
>> >
>> > Thanks,
>> > Richard
>> >
>> > On Sun, Apr 1, 2018 at 5:35 PM, Guozhang Wang <wangg...@gmail.com>
>> wrote:
>> >
>> >> Hi Richard,
>> >>
>> >> Regarding the streams side changes, we plan to incorporate with the new
>> >> APIs once the KIP is done, which is only internal code changes and
>> hence
>> >> do
>> >> not need to include in the KIP.
>> >>
>> >> Could you update the KIP because it has been quite obsoleted from the
>> >> discussed topics, and I'm a bit loosing track on what is your final
>> >> proposal right now. For example, I'm not completely following your
>> >> "compromise
>> >> of sorts": are you suggesting that we still add overloading functions
>> and
>> >> add a config that will be applied to all overload functions without the
>> >> timeout, while for other overloaded functions with the timeout value
>> the
>> >> config will be ignored?
>> >>
>> >>
>> >> Guozhang
>> >>
>> >> On Fri, Mar 30, 2018 at 8:36 PM, Richard Yu <
>> yohan.richard...@gmail.com>
>> >> wrote:
>> >>
>> >> > On a side note, I have noticed that the several other methods in
>> classes
>> >> > such as StoreChangeLogReader in Streams calls position() which causes
>> >> tests
>> >> > to hang. It might be out of the scope of the KIP, but should I also
>> >> change
>> >> > the methods which use position() as a callback to at the very least
>> >> prevent
>> >> > the tests from hanging? This issue might be out of the KIP, but I
>> >> prefer it
>> >> > if we could at least make my PR pass the Jenkins Q&A.
>> >> >
>> >> > Thanks
>> >> >
>> >> > On Fri, Mar 30, 2018 at 8:24 PM, Richard Yu <
>> yohan.richard...@gmail.com
>> >> >
>> >> > wrote:
>> >> >
>> >> > > Thanks for the review Becket.
>> >> > >
>> >> > > About the methods beginningOffsets(), endOffsets(), ...:
>> >> > > I took a look through the code of KafkaConsumer, but after looking
>> >> > through
>> >> > > the offsetsByTimes() method
>> >> > > and its callbacks in Fetcher, I think these methods already block
>> for
>> >> a
>> >> > > set period of time. I know that there
>> >> > > is a chance that the offsets methods in KafkaConsumer might be like
>> >> poll
>> >> > > (that is one section of the method
>> >> > > honors the timeout while another -- updateFetchPositions -- does
>> not).
>> >> > > However, I don't think that this is the
>> >> > > case with offsetsByTimes since the callbacks that I checked does
>> not
>> >> seem
>> >> > > to hang.
>> >> > >
>> >> > > The clarity of the exception message is a problem. I thought your
>> >> > > suggestion there was reasonable. I included
>> >> > > it in the KIP.
>> >> > >
>> >> > > And on another note, I have noticed that several people has voiced
>> the
>> >> > > opinion that adding a config might
>> >> > > be advisable in relation to adding an extra parameter. I think
>> that we
>> >> > can
>> >> > > have a compromise of sorts: some
>> >> > > methods in KafkaConsumer are relatively similar -- for example,
>> >> > position()
>> >> > > and committed() both call
>> >> > > updateFetchPositions(). I think that we could use the same config
>> for
>> >> > > these method as a default timeout if
>> >> > > the user does not provide one. On the other hand, if they wish to
>> >> specify
>> >> > > a longer or shorter blocking time,
>> >> > > they have the option of changing the timeout. (I included the
>> config
>> >> as
>> >> > an
>> >> > > alternative in the KIP) WDYT?
>> >> > >
>> >> > > Thanks,
>> >> > > Richard
>> >> > >
>> >> > >
>> >> > > On Fri, Mar 30, 2018 at 1:26 AM, Becket Qin <becket....@gmail.com>
>> >> > wrote:
>> >> > >
>> >> > >> Glad to see the KIP, Richard. This has been a really long pending
>> >> issue.
>> >> > >>
>> >> > >> The original arguments from Jay for using config, such as
>> >> max.block.ms,
>> >> > >> instead of using timeout parameters was that people will always
>> hard
>> >> > code
>> >> > >> the timeout, and the hard coded timeout is rarely correct because
>> it
>> >> has
>> >> > >> to
>> >> > >> consider different scenarios. For example, users may receive
>> timeout
>> >> > >> exception when the group coordinator moves. Having a configuration
>> >> with
>> >> > >> some reasonable default value will make users' life easier.
>> >> > >>
>> >> > >> That said, in practice, it seems more useful to have timeout
>> >> parameters.
>> >> > >> We
>> >> > >> have seen some library, using the consumers internally, needs to
>> >> provide
>> >> > >> an
>> >> > >> external flexible timeout interface. Also, user can easily hard
>> code
>> >> a
>> >> > >> value to get the same as a config based solution.
>> >> > >>
>> >> > >> The KIP looks good overall. A few comments:
>> >> > >>
>> >> > >> 1. There are a few other blocking methods that are not included,
>> e.g.
>> >> > >> offsetsForTimes(), beginningOffsets(), endOffsets(). Is there any
>> >> > reason?
>> >> > >>
>> >> > >> 2. I am wondering can we take the KIP as a chance to clean up our
>> >> > timeout
>> >> > >> exception(s)? More specifically, instead of reusing
>> TimeoutException,
>> >> > can
>> >> > >> we introduce a new ClientTimeoutException with different causes,
>> e.g.
>> >> > >> UnknownTopicOrPartition, RequestTimeout, LeaderNotAvailable, etc.
>> >> > >> As of now, the TimeoutException is used in the following three
>> cases:
>> >> > >>
>> >> > >>    1. TimeoutException is a subclass of ApiException which
>> indicates
>> >> the
>> >> > >>    exception was returned by the broker. The TimeoutException was
>> >> > >> initially
>> >> > >>    returned by the leaders when replication was not done within
>> the
>> >> > >> specified
>> >> > >>    timeout in the ProduceRequest. It has an error code of 7,
>> which is
>> >> > >> returned
>> >> > >>    by the broker.
>> >> > >>    2. When we migrate to Java clients, in Errors definition, we
>> >> extended
>> >> > >> it
>> >> > >>    to indicate request timeout, i.e. a request was sent but the
>> >> response
>> >> > >> was
>> >> > >>    not received before timeout. In this case, the clients did not
>> >> have a
>> >> > >>    return code from the broker.
>> >> > >>    3. Later at some point, we started to use the TimeoutException
>> for
>> >> > >>    clients method call timeout. It is neither related to any
>> broker
>> >> > >> returned
>> >> > >>    error code, nor to request timeout on the wire.
>> >> > >>
>> >> > >> Due to the various interpretations, users can easily be confused.
>> As
>> >> an
>> >> > >> example, when a timeout is thrown with "Failed to refresh metadata
>> >> in X
>> >> > >> ms", it is hard to tell what exactly happened. Since we are
>> changing
>> >> the
>> >> > >> API here, it would be good to avoid introducing more ambiguity and
>> >> see
>> >> > >> whether this can be improved. It would be at least one step
>> forward
>> >> to
>> >> > >> remove the usage of case 3.
>> >> > >>
>> >> > >> Thanks,
>> >> > >>
>> >> > >> Jiangjie (Becket) Qin
>> >> > >>
>> >> > >>
>> >> > >>
>> >> > >>
>> >> > >> On Mon, Mar 26, 2018 at 5:50 PM, Guozhang Wang <
>> wangg...@gmail.com>
>> >> > >> wrote:
>> >> > >>
>> >> > >> > @Richard: TimeoutException inherits from RetriableException
>> which
>> >> > >> inherits
>> >> > >> > from ApiException. So users should explicitly try to capture
>> >> > >> > RetriableException in their code and handle the exception.
>> >> > >> >
>> >> > >> > @Isamel, Ewen: I'm trying to push progress forward on this one,
>> >> are we
>> >> > >> now
>> >> > >> > on the same page for using function parameters than configs?
>> >> > >> >
>> >> > >> >
>> >> > >> > Guozhang
>> >> > >> >
>> >> > >> >
>> >> > >> > On Fri, Mar 23, 2018 at 4:42 PM, Ismael Juma <ism...@juma.me.uk
>> >
>> >> > wrote:
>> >> > >> >
>> >> > >> > > Hi Ewen,
>> >> > >> > >
>> >> > >> > > Yeah, I mentioned KAFKA-2391 where some of this was discussed.
>> >> Jay
>> >> > was
>> >> > >> > > against having timeouts in the methods at the time. However,
>> as
>> >> > Jason
>> >> > >> > said
>> >> > >> > > offline, we did end up with a timeout parameter in `poll`.
>> >> > >> > >
>> >> > >> > > Ismael
>> >> > >> > >
>> >> > >> > > On Fri, Mar 23, 2018 at 4:26 PM, Ewen Cheslack-Postava <
>> >> > >> > e...@confluent.io>
>> >> > >> > > wrote:
>> >> > >> > >
>> >> > >> > > > Regarding the flexibility question, has someone tried to
>> dig up
>> >> > the
>> >> > >> > > > discussion of the new consumer APIs when they were being
>> >> written?
>> >> > I
>> >> > >> > > vaguely
>> >> > >> > > > recall these exact questions about using APIs vs configs and
>> >> > >> > flexibility
>> >> > >> > > vs
>> >> > >> > > > bloating the API surface area having already been discussed.
>> >> (Not
>> >> > >> that
>> >> > >> > we
>> >> > >> > > > shouldn't revisit, just that it might also be a faster way
>> to
>> >> get
>> >> > >> to a
>> >> > >> > > full
>> >> > >> > > > understanding of the options, concerns, and tradeoffs).
>> >> > >> > > >
>> >> > >> > > > -Ewen
>> >> > >> > > >
>> >> > >> > > > On Thu, Mar 22, 2018 at 7:19 AM, Richard Yu <
>> >> > >> > yohan.richard...@gmail.com>
>> >> > >> > > > wrote:
>> >> > >> > > >
>> >> > >> > > > > I do have one question though: in the current KIP,
>> throwing
>> >> > >> > > > > TimeoutException to mark
>> >> > >> > > > > that time limit is exceeded is applied to all new methods
>> >> > >> introduced
>> >> > >> > in
>> >> > >> > > > > this proposal.
>> >> > >> > > > > However, how would users respond when a TimeoutException
>> >> (since
>> >> > >> it is
>> >> > >> > > > > considered
>> >> > >> > > > > a RuntimeException)?
>> >> > >> > > > >
>> >> > >> > > > > Thanks,
>> >> > >> > > > > Richard
>> >> > >> > > > >
>> >> > >> > > > >
>> >> > >> > > > >
>> >> > >> > > > > On Mon, Mar 19, 2018 at 6:10 PM, Richard Yu <
>> >> > >> > > yohan.richard...@gmail.com>
>> >> > >> > > > > wrote:
>> >> > >> > > > >
>> >> > >> > > > > > Hi Ismael,
>> >> > >> > > > > >
>> >> > >> > > > > > You have a great point. Since most of the methods in
>> this
>> >> KIP
>> >> > >> have
>> >> > >> > > > > similar
>> >> > >> > > > > > callbacks (position() and committed() both use
>> >> > >> > > fetchCommittedOffsets(),
>> >> > >> > > > > > and
>> >> > >> > > > > > commitSync() is similar to position(), except just
>> updating
>> >> > >> > offsets),
>> >> > >> > > > the
>> >> > >> > > > > > amount of time
>> >> > >> > > > > > they block should be also about equal.
>> >> > >> > > > > >
>> >> > >> > > > > > However, I think that we need to take into account a
>> >> couple of
>> >> > >> > > things.
>> >> > >> > > > > For
>> >> > >> > > > > > starters,
>> >> > >> > > > > > if the new methods were all reliant on one config,
>> there is
>> >> > >> > > likelihood
>> >> > >> > > > > > that the
>> >> > >> > > > > > shortcomings for this approach would be similar to what
>> we
>> >> > >> faced if
>> >> > >> > > we
>> >> > >> > > > > let
>> >> > >> > > > > > request.timeout.ms control all method timeouts. In
>> >> > comparison,
>> >> > >> > > adding
>> >> > >> > > > > > overloads
>> >> > >> > > > > > does not have this problem.
>> >> > >> > > > > >
>> >> > >> > > > > > If you have further thoughts, please let me know.
>> >> > >> > > > > >
>> >> > >> > > > > > Richard
>> >> > >> > > > > >
>> >> > >> > > > > >
>> >> > >> > > > > > On Mon, Mar 19, 2018 at 5:12 PM, Ismael Juma <
>> >> > ism...@juma.me.uk
>> >> > >> >
>> >> > >> > > > wrote:
>> >> > >> > > > > >
>> >> > >> > > > > >> Hi,
>> >> > >> > > > > >>
>> >> > >> > > > > >> An option that is not currently covered in the KIP is
>> to
>> >> > have a
>> >> > >> > > > separate
>> >> > >> > > > > >> config max.block.ms, which is similar to the producer
>> >> config
>> >> > >> with
>> >> > >> > > the
>> >> > >> > > > > >> same
>> >> > >> > > > > >> name. This came up during the KAFKA-2391 discussion. I
>> >> think
>> >> > >> it's
>> >> > >> > > > clear
>> >> > >> > > > > >> that we can't rely on request.timeout.ms, so the
>> >> decision is
>> >> > >> > > between
>> >> > >> > > > > >> adding
>> >> > >> > > > > >> overloads or adding a new config. People seemed to be
>> >> leaning
>> >> > >> > > towards
>> >> > >> > > > > the
>> >> > >> > > > > >> latter in KAFKA-2391, but Jason makes a good point that
>> >> the
>> >> > >> > > overloads
>> >> > >> > > > > are
>> >> > >> > > > > >> more flexible. A couple of questions from me:
>> >> > >> > > > > >>
>> >> > >> > > > > >> 1. Do we need the additional flexibility?
>> >> > >> > > > > >> 2. If we do, do we need it for every blocking method?
>> >> > >> > > > > >>
>> >> > >> > > > > >> Ismael
>> >> > >> > > > > >>
>> >> > >> > > > > >> On Mon, Mar 19, 2018 at 5:03 PM, Richard Yu <
>> >> > >> > > > yohan.richard...@gmail.com
>> >> > >> > > > > >
>> >> > >> > > > > >> wrote:
>> >> > >> > > > > >>
>> >> > >> > > > > >> > Hi Guozhang,
>> >> > >> > > > > >> >
>> >> > >> > > > > >> > I made some clarifications to KIP-266, namely:
>> >> > >> > > > > >> > 1. Stated more specifically that commitSync will
>> accept
>> >> > user
>> >> > >> > > input.
>> >> > >> > > > > >> > 2. fetchCommittedOffsets(): Made its role in blocking
>> >> more
>> >> > >> clear
>> >> > >> > > to
>> >> > >> > > > > the
>> >> > >> > > > > >> > reader.
>> >> > >> > > > > >> > 3. Sketched what would happen when time limit is
>> >> exceeded.
>> >> > >> > > > > >> >
>> >> > >> > > > > >> > These changes should make the KIP easier to
>> understand.
>> >> > >> > > > > >> >
>> >> > >> > > > > >> > Cheers,
>> >> > >> > > > > >> > Richard
>> >> > >> > > > > >> >
>> >> > >> > > > > >> > On Mon, Mar 19, 2018 at 9:33 AM, Guozhang Wang <
>> >> > >> > > wangg...@gmail.com>
>> >> > >> > > > > >> wrote:
>> >> > >> > > > > >> >
>> >> > >> > > > > >> > > Hi Richard,
>> >> > >> > > > > >> > >
>> >> > >> > > > > >> > > I made a pass over the KIP again, some more
>> >> > clarifications
>> >> > >> /
>> >> > >> > > > > comments:
>> >> > >> > > > > >> > >
>> >> > >> > > > > >> > > 1. seek() call itself is not blocking, only the
>> >> following
>> >> > >> > poll()
>> >> > >> > > > > call
>> >> > >> > > > > >> may
>> >> > >> > > > > >> > > be blocking as the actually metadata rq will
>> happen.
>> >> > >> > > > > >> > >
>> >> > >> > > > > >> > > 2. I saw you did not include
>> Consumer.partitionFor(),
>> >> > >> > > > > >> > > Consumer.OffsetAndTimestamp() and
>> >> Consumer.listTopics()
>> >> > in
>> >> > >> > your
>> >> > >> > > > KIP.
>> >> > >> > > > > >> > After
>> >> > >> > > > > >> > > a second thought, I think this may be a better
>> idea to
>> >> > not
>> >> > >> > > tackle
>> >> > >> > > > > >> them in
>> >> > >> > > > > >> > > the same KIP, and probably we should consider
>> whether
>> >> we
>> >> > >> would
>> >> > >> > > > > change
>> >> > >> > > > > >> the
>> >> > >> > > > > >> > > behavior or not in another discussion. So I agree
>> to
>> >> not
>> >> > >> > include
>> >> > >> > > > > them.
>> >> > >> > > > > >> > >
>> >> > >> > > > > >> > > 3. In your wiki you mentioned "Another change
>> shall be
>> >> > >> made to
>> >> > >> > > > > >> > > KafkaConsumer#poll(), due to its call to
>> >> > >> > updateFetchPositions()
>> >> > >> > > > > which
>> >> > >> > > > > >> > > blocks indefinitely." This part may a bit obscure
>> to
>> >> most
>> >> > >> > > readers
>> >> > >> > > > > >> who's
>> >> > >> > > > > >> > not
>> >> > >> > > > > >> > > familiar with the KafkaConsumer internals, could
>> you
>> >> > please
>> >> > >> > add
>> >> > >> > > > more
>> >> > >> > > > > >> > > elaborations. More specifically, I think the root
>> >> causes
>> >> > of
>> >> > >> > the
>> >> > >> > > > > public
>> >> > >> > > > > >> > APIs
>> >> > >> > > > > >> > > mentioned are a bit different while the KIP's
>> >> explanation
>> >> > >> > sounds
>> >> > >> > > > > like
>> >> > >> > > > > >> > they
>> >> > >> > > > > >> > > are due to the same reason:
>> >> > >> > > > > >> > >
>> >> > >> > > > > >> > > 3.1 fetchCommittedOffsets(): this internal call
>> will
>> >> > block
>> >> > >> > > forever
>> >> > >> > > > > if
>> >> > >> > > > > >> the
>> >> > >> > > > > >> > > committed offsets cannot be fetched successfully
>> and
>> >> > affect
>> >> > >> > > > > position()
>> >> > >> > > > > >> > and
>> >> > >> > > > > >> > > committed(). We need to break out of its internal
>> >> while
>> >> > >> loop.
>> >> > >> > > > > >> > > 3.2 position() itself will while loop when offsets
>> >> cannot
>> >> > >> be
>> >> > >> > > > > >> retrieved in
>> >> > >> > > > > >> > > the underlying async call. We need to break out
>> this
>> >> > while
>> >> > >> > loop.
>> >> > >> > > > > >> > > 3.3 commitSync() passed Long.MAX_VALUE as the
>> timeout
>> >> > >> value,
>> >> > >> > we
>> >> > >> > > > > should
>> >> > >> > > > > >> > take
>> >> > >> > > > > >> > > the user specified timeouts when applicable.
>> >> > >> > > > > >> > >
>> >> > >> > > > > >> > >
>> >> > >> > > > > >> > >
>> >> > >> > > > > >> > > Guozhang
>> >> > >> > > > > >> > >
>> >> > >> > > > > >> > > On Sat, Mar 17, 2018 at 4:44 PM, Richard Yu <
>> >> > >> > > > > >> yohan.richard...@gmail.com>
>> >> > >> > > > > >> > > wrote:
>> >> > >> > > > > >> > >
>> >> > >> > > > > >> > > > Actually, what I said above is inaccurate. In
>> >> > >> > > > > >> > > > testSeekAndCommitWithBrokerFailures,
>> >> > >> > TestUtils.waitUntilTrue
>> >> > >> > > > > >> blocks,
>> >> > >> > > > > >> > not
>> >> > >> > > > > >> > > > seek.
>> >> > >> > > > > >> > > > My assumption is that seek did not update
>> >> correctly. I
>> >> > >> will
>> >> > >> > be
>> >> > >> > > > > >> digging
>> >> > >> > > > > >> > > > further into this.
>> >> > >> > > > > >> > > >
>> >> > >> > > > > >> > > >
>> >> > >> > > > > >> > > >
>> >> > >> > > > > >> > > > On Sat, Mar 17, 2018 at 4:16 PM, Richard Yu <
>> >> > >> > > > > >> > yohan.richard...@gmail.com>
>> >> > >> > > > > >> > > > wrote:
>> >> > >> > > > > >> > > >
>> >> > >> > > > > >> > > > > One more thing: when looking through tests, I
>> have
>> >> > >> > realized
>> >> > >> > > > that
>> >> > >> > > > > >> > seek()
>> >> > >> > > > > >> > > > > methods can potentially block indefinitely. As
>> you
>> >> > well
>> >> > >> > > know,
>> >> > >> > > > > >> seek()
>> >> > >> > > > > >> > is
>> >> > >> > > > > >> > > > > called when pollOnce() or position() is active.
>> >> Thus,
>> >> > >> if
>> >> > >> > > > > >> position()
>> >> > >> > > > > >> > > > blocks
>> >> > >> > > > > >> > > > > indefinitely, then so would seek(). Should
>> >> bounding
>> >> > >> seek()
>> >> > >> > > > also
>> >> > >> > > > > be
>> >> > >> > > > > >> > > > included
>> >> > >> > > > > >> > > > > in this KIP?
>> >> > >> > > > > >> > > > >
>> >> > >> > > > > >> > > > > Thanks, Richard
>> >> > >> > > > > >> > > > >
>> >> > >> > > > > >> > > > > On Sat, Mar 17, 2018 at 1:16 PM, Richard Yu <
>> >> > >> > > > > >> > > yohan.richard...@gmail.com>
>> >> > >> > > > > >> > > > > wrote:
>> >> > >> > > > > >> > > > >
>> >> > >> > > > > >> > > > >> Thanks for the advice, Jason
>> >> > >> > > > > >> > > > >>
>> >> > >> > > > > >> > > > >> I have modified KIP-266 to include the java
>> doc
>> >> for
>> >> > >> > > > committed()
>> >> > >> > > > > >> and
>> >> > >> > > > > >> > > > other
>> >> > >> > > > > >> > > > >> blocking methods, and I also
>> >> > >> > > > > >> > > > >> mentioned poll() which will also be bounded.
>> Let
>> >> me
>> >> > >> know
>> >> > >> > if
>> >> > >> > > > > >> there is
>> >> > >> > > > > >> > > > >> anything else. :)
>> >> > >> > > > > >> > > > >>
>> >> > >> > > > > >> > > > >> Sincerely, Richard
>> >> > >> > > > > >> > > > >>
>> >> > >> > > > > >> > > > >>
>> >> > >> > > > > >> > > > >>
>> >> > >> > > > > >> > > > >>
>> >> > >> > > > > >> > > > >>
>> >> > >> > > > > >> > > > >> On Sat, Mar 17, 2018 at 12:00 PM, Jason
>> >> Gustafson <
>> >> > >> > > > > >> > ja...@confluent.io
>> >> > >> > > > > >> > > >
>> >> > >> > > > > >> > > > >> wrote:
>> >> > >> > > > > >> > > > >>
>> >> > >> > > > > >> > > > >>> Hi Richard,
>> >> > >> > > > > >> > > > >>>
>> >> > >> > > > > >> > > > >>> Thanks for the updates. I'm really glad you
>> >> picked
>> >> > >> this
>> >> > >> > > up.
>> >> > >> > > > A
>> >> > >> > > > > >> > couple
>> >> > >> > > > > >> > > > >>> minor
>> >> > >> > > > > >> > > > >>> comments:
>> >> > >> > > > > >> > > > >>>
>> >> > >> > > > > >> > > > >>> 1. Can you list the full set of new APIs
>> >> explicitly
>> >> > >> in
>> >> > >> > the
>> >> > >> > > > > KIP?
>> >> > >> > > > > >> > > > >>> Currently I
>> >> > >> > > > > >> > > > >>> only see the javadoc for `position()`.
>> >> > >> > > > > >> > > > >>>
>> >> > >> > > > > >> > > > >>> 2. We should consider adding `TimeUnit` to
>> the
>> >> new
>> >> > >> > methods
>> >> > >> > > > to
>> >> > >> > > > > >> avoid
>> >> > >> > > > > >> > > > unit
>> >> > >> > > > > >> > > > >>> confusion. I know it's inconsistent with the
>> >> poll()
>> >> > >> API,
>> >> > >> > > > but I
>> >> > >> > > > > >> > think
>> >> > >> > > > > >> > > it
>> >> > >> > > > > >> > > > >>> was
>> >> > >> > > > > >> > > > >>> probably a mistake not to include it there,
>> so
>> >> > better
>> >> > >> > not
>> >> > >> > > to
>> >> > >> > > > > >> double
>> >> > >> > > > > >> > > > down
>> >> > >> > > > > >> > > > >>> on
>> >> > >> > > > > >> > > > >>> that mistake. And note that we do already
>> have
>> >> > >> > > `close(long,
>> >> > >> > > > > >> > > TimeUnit)`.
>> >> > >> > > > > >> > > > >>>
>> >> > >> > > > > >> > > > >>> Other than that, I think the current KIP
>> seems
>> >> > >> > reasonable.
>> >> > >> > > > > >> > > > >>>
>> >> > >> > > > > >> > > > >>> Thanks,
>> >> > >> > > > > >> > > > >>> Jason
>> >> > >> > > > > >> > > > >>>
>> >> > >> > > > > >> > > > >>> On Wed, Mar 14, 2018 at 5:00 PM, Richard Yu <
>> >> > >> > > > > >> > > > yohan.richard...@gmail.com>
>> >> > >> > > > > >> > > > >>> wrote:
>> >> > >> > > > > >> > > > >>>
>> >> > >> > > > > >> > > > >>> > Note to all: I have included bounding
>> >> > commitSync()
>> >> > >> and
>> >> > >> > > > > >> > committed()
>> >> > >> > > > > >> > > in
>> >> > >> > > > > >> > > > >>> this
>> >> > >> > > > > >> > > > >>> > KIP.
>> >> > >> > > > > >> > > > >>> >
>> >> > >> > > > > >> > > > >>> > On Sun, Mar 11, 2018 at 5:05 PM, Richard
>> Yu <
>> >> > >> > > > > >> > > > >>> yohan.richard...@gmail.com>
>> >> > >> > > > > >> > > > >>> > wrote:
>> >> > >> > > > > >> > > > >>> >
>> >> > >> > > > > >> > > > >>> > > Hi all,
>> >> > >> > > > > >> > > > >>> > >
>> >> > >> > > > > >> > > > >>> > > I updated the KIP where overloading
>> >> position()
>> >> > is
>> >> > >> > now
>> >> > >> > > > the
>> >> > >> > > > > >> > favored
>> >> > >> > > > > >> > > > >>> > approach.
>> >> > >> > > > > >> > > > >>> > > Bounding position() using
>> requestTimeoutMs
>> >> has
>> >> > >> been
>> >> > >> > > > listed
>> >> > >> > > > > >> as
>> >> > >> > > > > >> > > > >>> rejected.
>> >> > >> > > > > >> > > > >>> > >
>> >> > >> > > > > >> > > > >>> > > Any thoughts?
>> >> > >> > > > > >> > > > >>> > >
>> >> > >> > > > > >> > > > >>> > > On Tue, Mar 6, 2018 at 6:00 PM, Guozhang
>> >> Wang <
>> >> > >> > > > > >> > > wangg...@gmail.com>
>> >> > >> > > > > >> > > > >>> > wrote:
>> >> > >> > > > > >> > > > >>> > >
>> >> > >> > > > > >> > > > >>> > >> I agree that adding the overloads is
>> most
>> >> > >> flexible.
>> >> > >> > > But
>> >> > >> > > > > >> going
>> >> > >> > > > > >> > > for
>> >> > >> > > > > >> > > > >>> that
>> >> > >> > > > > >> > > > >>> > >> direction we'd do that for all the
>> blocking
>> >> > call
>> >> > >> > that
>> >> > >> > > > > I've
>> >> > >> > > > > >> > > listed
>> >> > >> > > > > >> > > > >>> above,
>> >> > >> > > > > >> > > > >>> > >> with this timeout value covering the
>> >> > end-to-end
>> >> > >> > > waiting
>> >> > >> > > > > >> time.
>> >> > >> > > > > >> > > > >>> > >>
>> >> > >> > > > > >> > > > >>> > >>
>> >> > >> > > > > >> > > > >>> > >> Guozhang
>> >> > >> > > > > >> > > > >>> > >>
>> >> > >> > > > > >> > > > >>> > >> On Tue, Mar 6, 2018 at 10:02 AM, Ted Yu
>> <
>> >> > >> > > > > >> yuzhih...@gmail.com>
>> >> > >> > > > > >> > > > >>> wrote:
>> >> > >> > > > > >> > > > >>> > >>
>> >> > >> > > > > >> > > > >>> > >> > bq. The most flexible option is to add
>> >> > >> overloads
>> >> > >> > to
>> >> > >> > > > the
>> >> > >> > > > > >> > > consumer
>> >> > >> > > > > >> > > > >>> > >> >
>> >> > >> > > > > >> > > > >>> > >> > This option is flexible.
>> >> > >> > > > > >> > > > >>> > >> >
>> >> > >> > > > > >> > > > >>> > >> > Looking at the tail of SPARK-18057,
>> Spark
>> >> > dev
>> >> > >> > > voiced
>> >> > >> > > > > the
>> >> > >> > > > > >> > same
>> >> > >> > > > > >> > > > >>> choice.
>> >> > >> > > > > >> > > > >>> > >> >
>> >> > >> > > > > >> > > > >>> > >> > +1 for adding overload with timeout
>> >> > parameter.
>> >> > >> > > > > >> > > > >>> > >> >
>> >> > >> > > > > >> > > > >>> > >> > Cheers
>> >> > >> > > > > >> > > > >>> > >> >
>> >> > >> > > > > >> > > > >>> > >> > On Mon, Mar 5, 2018 at 2:42 PM, Jason
>> >> > >> Gustafson <
>> >> > >> > > > > >> > > > >>> ja...@confluent.io>
>> >> > >> > > > > >> > > > >>> > >> > wrote:
>> >> > >> > > > > >> > > > >>> > >> >
>> >> > >> > > > > >> > > > >>> > >> > > @Guozhang I probably have suggested
>> all
>> >> > >> options
>> >> > >> > > at
>> >> > >> > > > > some
>> >> > >> > > > > >> > > point
>> >> > >> > > > > >> > > > or
>> >> > >> > > > > >> > > > >>> > >> another,
>> >> > >> > > > > >> > > > >>> > >> > > including most recently, the current
>> >> KIP!
>> >> > I
>> >> > >> was
>> >> > >> > > > > >> thinking
>> >> > >> > > > > >> > > that
>> >> > >> > > > > >> > > > >>> > >> practically
>> >> > >> > > > > >> > > > >>> > >> > > speaking, the request timeout
>> defines
>> >> how
>> >> > >> long
>> >> > >> > > the
>> >> > >> > > > > >> user is
>> >> > >> > > > > >> > > > >>> willing
>> >> > >> > > > > >> > > > >>> > to
>> >> > >> > > > > >> > > > >>> > >> > wait
>> >> > >> > > > > >> > > > >>> > >> > > for a response. The consumer doesn't
>> >> > really
>> >> > >> > have
>> >> > >> > > a
>> >> > >> > > > > >> complex
>> >> > >> > > > > >> > > > send
>> >> > >> > > > > >> > > > >>> > >> process
>> >> > >> > > > > >> > > > >>> > >> > > like the producer for any of these
>> >> APIs,
>> >> > so
>> >> > >> I
>> >> > >> > > > wasn't
>> >> > >> > > > > >> sure
>> >> > >> > > > > >> > > how
>> >> > >> > > > > >> > > > >>> much
>> >> > >> > > > > >> > > > >>> > >> > benefit
>> >> > >> > > > > >> > > > >>> > >> > > there would be from having more
>> >> granular
>> >> > >> > control
>> >> > >> > > > over
>> >> > >> > > > > >> > > timeouts
>> >> > >> > > > > >> > > > >>> (in
>> >> > >> > > > > >> > > > >>> > the
>> >> > >> > > > > >> > > > >>> > >> > end,
>> >> > >> > > > > >> > > > >>> > >> > > KIP-91 just adds a single timeout to
>> >> > control
>> >> > >> > the
>> >> > >> > > > > whole
>> >> > >> > > > > >> > > send).
>> >> > >> > > > > >> > > > >>> That
>> >> > >> > > > > >> > > > >>> > >> said,
>> >> > >> > > > > >> > > > >>> > >> > it
>> >> > >> > > > > >> > > > >>> > >> > > might indeed be better to avoid
>> >> > overloading
>> >> > >> the
>> >> > >> > > > > config
>> >> > >> > > > > >> as
>> >> > >> > > > > >> > > you
>> >> > >> > > > > >> > > > >>> > suggest
>> >> > >> > > > > >> > > > >>> > >> > since
>> >> > >> > > > > >> > > > >>> > >> > > at least it avoids inconsistency
>> with
>> >> the
>> >> > >> > > > producer's
>> >> > >> > > > > >> > usage.
>> >> > >> > > > > >> > > > >>> > >> > >
>> >> > >> > > > > >> > > > >>> > >> > > The most flexible option is to add
>> >> > >> overloads to
>> >> > >> > > the
>> >> > >> > > > > >> > consumer
>> >> > >> > > > > >> > > > so
>> >> > >> > > > > >> > > > >>> that
>> >> > >> > > > > >> > > > >>> > >> > users
>> >> > >> > > > > >> > > > >>> > >> > > can pass the timeout directly. I'm
>> not
>> >> > sure
>> >> > >> if
>> >> > >> > > that
>> >> > >> > > > > is
>> >> > >> > > > > >> > more
>> >> > >> > > > > >> > > or
>> >> > >> > > > > >> > > > >>> less
>> >> > >> > > > > >> > > > >>> > >> > > annoying than a new config, but I've
>> >> found
>> >> > >> > config
>> >> > >> > > > > >> > timeouts a
>> >> > >> > > > > >> > > > >>> little
>> >> > >> > > > > >> > > > >>> > >> > > constraining in practice. For
>> example,
>> >> I
>> >> > >> could
>> >> > >> > > > > imagine
>> >> > >> > > > > >> > users
>> >> > >> > > > > >> > > > >>> wanting
>> >> > >> > > > > >> > > > >>> > >> to
>> >> > >> > > > > >> > > > >>> > >> > > wait longer for an offset commit
>> >> operation
>> >> > >> > than a
>> >> > >> > > > > >> position
>> >> > >> > > > > >> > > > >>> lookup;
>> >> > >> > > > > >> > > > >>> > if
>> >> > >> > > > > >> > > > >>> > >> the
>> >> > >> > > > > >> > > > >>> > >> > > latter isn't timely, users can just
>> >> pause
>> >> > >> the
>> >> > >> > > > > partition
>> >> > >> > > > > >> > and
>> >> > >> > > > > >> > > > >>> continue
>> >> > >> > > > > >> > > > >>> > >> > > fetching on others. If you cannot
>> >> commit
>> >> > >> > offsets,
>> >> > >> > > > > >> however,
>> >> > >> > > > > >> > > it
>> >> > >> > > > > >> > > > >>> might
>> >> > >> > > > > >> > > > >>> > be
>> >> > >> > > > > >> > > > >>> > >> > > safer for an application to wait
>> >> > >> availability
>> >> > >> > of
>> >> > >> > > > the
>> >> > >> > > > > >> > > > coordinator
>> >> > >> > > > > >> > > > >>> > than
>> >> > >> > > > > >> > > > >>> > >> > > continuing.
>> >> > >> > > > > >> > > > >>> > >> > >
>> >> > >> > > > > >> > > > >>> > >> > > -Jason
>> >> > >> > > > > >> > > > >>> > >> > >
>> >> > >> > > > > >> > > > >>> > >> > > On Sun, Mar 4, 2018 at 10:14 PM,
>> >> Guozhang
>> >> > >> Wang
>> >> > >> > <
>> >> > >> > > > > >> > > > >>> wangg...@gmail.com>
>> >> > >> > > > > >> > > > >>> > >> > wrote:
>> >> > >> > > > > >> > > > >>> > >> > >
>> >> > >> > > > > >> > > > >>> > >> > > > Hello Richard,
>> >> > >> > > > > >> > > > >>> > >> > > >
>> >> > >> > > > > >> > > > >>> > >> > > > Thanks for the proposed KIP. I
>> have a
>> >> > >> couple
>> >> > >> > of
>> >> > >> > > > > >> general
>> >> > >> > > > > >> > > > >>> comments:
>> >> > >> > > > > >> > > > >>> > >> > > >
>> >> > >> > > > > >> > > > >>> > >> > > > 1. I'm not sure if piggy-backing
>> the
>> >> > >> timeout
>> >> > >> > > > > >> exception
>> >> > >> > > > > >> > on
>> >> > >> > > > > >> > > > the
>> >> > >> > > > > >> > > > >>> > >> > > > existing requestTimeoutMs
>> configured
>> >> in
>> >> > "
>> >> > >> > > > > >> > > request.timeout.ms
>> >> > >> > > > > >> > > > "
>> >> > >> > > > > >> > > > >>> is a
>> >> > >> > > > > >> > > > >>> > >> good
>> >> > >> > > > > >> > > > >>> > >> > > > idea
>> >> > >> > > > > >> > > > >>> > >> > > > since a) it is a general config
>> that
>> >> > >> applies
>> >> > >> > > for
>> >> > >> > > > > all
>> >> > >> > > > > >> > types
>> >> > >> > > > > >> > > > of
>> >> > >> > > > > >> > > > >>> > >> requests,
>> >> > >> > > > > >> > > > >>> > >> > > and
>> >> > >> > > > > >> > > > >>> > >> > > > 2) using it to cover all the
>> phases
>> >> of
>> >> > an
>> >> > >> API
>> >> > >> > > > call,
>> >> > >> > > > > >> > > > including
>> >> > >> > > > > >> > > > >>> > >> network
>> >> > >> > > > > >> > > > >>> > >> > > round
>> >> > >> > > > > >> > > > >>> > >> > > > trip and potential metadata
>> refresh
>> >> is
>> >> > >> shown
>> >> > >> > to
>> >> > >> > > > not
>> >> > >> > > > > >> be a
>> >> > >> > > > > >> > > > good
>> >> > >> > > > > >> > > > >>> > idea,
>> >> > >> > > > > >> > > > >>> > >> as
>> >> > >> > > > > >> > > > >>> > >> > > > illustrated in KIP-91:
>> >> > >> > > > > >> > > > >>> > >> > > >
>> >> > >> > > > > >> > > > >>> > >> > > > https://cwiki.apache.org/confl
>> >> > >> > > > > >> uence/display/KAFKA/KIP-
>> >> > >> > > > > >> > > > >>> > >> > > > 91+Provide+Intuitive+User+
>> >> > >> > > > Timeouts+in+The+Producer
>> >> > >> > > > > >> > > > >>> > >> > > >
>> >> > >> > > > > >> > > > >>> > >> > > > In fact, I think in KAFKA-4879
>> which
>> >> is
>> >> > >> aimed
>> >> > >> > > for
>> >> > >> > > > > the
>> >> > >> > > > > >> > same
>> >> > >> > > > > >> > > > >>> issue
>> >> > >> > > > > >> > > > >>> > as
>> >> > >> > > > > >> > > > >>> > >> > > > KAFKA-6608,
>> >> > >> > > > > >> > > > >>> > >> > > > Jason has suggested we use a new
>> >> config
>> >> > >> for
>> >> > >> > the
>> >> > >> > > > > API.
>> >> > >> > > > > >> > Maybe
>> >> > >> > > > > >> > > > >>> this
>> >> > >> > > > > >> > > > >>> > >> would
>> >> > >> > > > > >> > > > >>> > >> > be
>> >> > >> > > > > >> > > > >>> > >> > > a
>> >> > >> > > > > >> > > > >>> > >> > > > more intuitive manner than reusing
>> >> the
>> >> > >> > > > > >> > request.timeout.ms
>> >> > >> > > > > >> > > > >>> config.
>> >> > >> > > > > >> > > > >>> > >> > > >
>> >> > >> > > > > >> > > > >>> > >> > > >
>> >> > >> > > > > >> > > > >>> > >> > > > 2. Besides the Consumer.position()
>> >> call,
>> >> > >> > there
>> >> > >> > > > are
>> >> > >> > > > > a
>> >> > >> > > > > >> > > couple
>> >> > >> > > > > >> > > > of
>> >> > >> > > > > >> > > > >>> > more
>> >> > >> > > > > >> > > > >>> > >> > > > blocking calls today that could
>> >> result
>> >> > in
>> >> > >> > > > infinite
>> >> > >> > > > > >> > > blocking:
>> >> > >> > > > > >> > > > >>> > >> > > > Consumer.commitSync() and
>> >> > >> > Consumer.committed(),
>> >> > >> > > > > >> should
>> >> > >> > > > > >> > > they
>> >> > >> > > > > >> > > > be
>> >> > >> > > > > >> > > > >>> > >> > considered
>> >> > >> > > > > >> > > > >>> > >> > > > in this KIP as well?
>> >> > >> > > > > >> > > > >>> > >> > > >
>> >> > >> > > > > >> > > > >>> > >> > > > 3. There are a few other APIs that
>> >> are
>> >> > >> today
>> >> > >> > > > > relying
>> >> > >> > > > > >> on
>> >> > >> > > > > >> > > > >>> > >> > > request.timeout.ms
>> >> > >> > > > > >> > > > >>> > >> > > > already for breaking the infinite
>> >> > >> blocking,
>> >> > >> > > > namely
>> >> > >> > > > > >> > > > >>> > >> > > Consumer.partitionFor(),
>> >> > >> > > > > >> > > > >>> > >> > > > Consumer.OffsetAndTimestamp() and
>> >> > >> > > > > >> Consumer.listTopics(),
>> >> > >> > > > > >> > > if
>> >> > >> > > > > >> > > > >>> we are
>> >> > >> > > > > >> > > > >>> > >> > making
>> >> > >> > > > > >> > > > >>> > >> > > > the other blocking calls to be
>> >> relying a
>> >> > >> new
>> >> > >> > > > config
>> >> > >> > > > > >> as
>> >> > >> > > > > >> > > > >>> suggested
>> >> > >> > > > > >> > > > >>> > in
>> >> > >> > > > > >> > > > >>> > >> 1)
>> >> > >> > > > > >> > > > >>> > >> > > > above, should we also change the
>> >> > >> semantics of
>> >> > >> > > > these
>> >> > >> > > > > >> API
>> >> > >> > > > > >> > > > >>> functions
>> >> > >> > > > > >> > > > >>> > >> for
>> >> > >> > > > > >> > > > >>> > >> > > > consistency?
>> >> > >> > > > > >> > > > >>> > >> > > >
>> >> > >> > > > > >> > > > >>> > >> > > >
>> >> > >> > > > > >> > > > >>> > >> > > > Guozhang
>> >> > >> > > > > >> > > > >>> > >> > > >
>> >> > >> > > > > >> > > > >>> > >> > > >
>> >> > >> > > > > >> > > > >>> > >> > > >
>> >> > >> > > > > >> > > > >>> > >> > > >
>> >> > >> > > > > >> > > > >>> > >> > > > On Sun, Mar 4, 2018 at 11:13 AM,
>> >> Richard
>> >> > >> Yu <
>> >> > >> > > > > >> > > > >>> > >> > yohan.richard...@gmail.com>
>> >> > >> > > > > >> > > > >>> > >> > > > wrote:
>> >> > >> > > > > >> > > > >>> > >> > > >
>> >> > >> > > > > >> > > > >>> > >> > > > > Hi all,
>> >> > >> > > > > >> > > > >>> > >> > > > >
>> >> > >> > > > > >> > > > >>> > >> > > > > I would like to discuss a
>> potential
>> >> > >> change
>> >> > >> > > > which
>> >> > >> > > > > >> would
>> >> > >> > > > > >> > > be
>> >> > >> > > > > >> > > > >>> made
>> >> > >> > > > > >> > > > >>> > to
>> >> > >> > > > > >> > > > >>> > >> > > > > KafkaConsumer:
>> >> > >> > > > > >> > > > >>> > >> > > > > https://cwiki.apache.org/
>> >> > >> > > > > confluence/pages/viewpage
>> >> > >> > > > > >> .
>> >> > >> > > > > >> > > > >>> > >> > > > action?pageId=75974886
>> >> > >> > > > > >> > > > >>> > >> > > > >
>> >> > >> > > > > >> > > > >>> > >> > > > > Thanks,
>> >> > >> > > > > >> > > > >>> > >> > > > > Richard Yu
>> >> > >> > > > > >> > > > >>> > >> > > > >
>> >> > >> > > > > >> > > > >>> > >> > > >
>> >> > >> > > > > >> > > > >>> > >> > > >
>> >> > >> > > > > >> > > > >>> > >> > > >
>> >> > >> > > > > >> > > > >>> > >> > > > --
>> >> > >> > > > > >> > > > >>> > >> > > > -- Guozhang
>> >> > >> > > > > >> > > > >>> > >> > > >
>> >> > >> > > > > >> > > > >>> > >> > >
>> >> > >> > > > > >> > > > >>> > >> >
>> >> > >> > > > > >> > > > >>> > >>
>> >> > >> > > > > >> > > > >>> > >>
>> >> > >> > > > > >> > > > >>> > >>
>> >> > >> > > > > >> > > > >>> > >> --
>> >> > >> > > > > >> > > > >>> > >> -- Guozhang
>> >> > >> > > > > >> > > > >>> > >>
>> >> > >> > > > > >> > > > >>> > >
>> >> > >> > > > > >> > > > >>> > >
>> >> > >> > > > > >> > > > >>> >
>> >> > >> > > > > >> > > > >>>
>> >> > >> > > > > >> > > > >>
>> >> > >> > > > > >> > > > >>
>> >> > >> > > > > >> > > > >
>> >> > >> > > > > >> > > >
>> >> > >> > > > > >> > >
>> >> > >> > > > > >> > >
>> >> > >> > > > > >> > >
>> >> > >> > > > > >> > > --
>> >> > >> > > > > >> > > -- Guozhang
>> >> > >> > > > > >> > >
>> >> > >> > > > > >> >
>> >> > >> > > > > >>
>> >> > >> > > > > >
>> >> > >> > > > > >
>> >> > >> > > > >
>> >> > >> > > >
>> >> > >> > >
>> >> > >> >
>> >> > >> >
>> >> > >> >
>> >> > >> > --
>> >> > >> > -- Guozhang
>> >> > >> >
>> >> > >>
>> >> > >
>> >> > >
>> >> >
>> >>
>> >>
>> >>
>> >> --
>> >> -- Guozhang
>> >>
>> >
>> >
>>
>
>

Reply via email to