Cross-pollinating from some discussion we've had on KIP-288, I think there's a good reason that poll() takes a timeout when none of the other methods do, and it's relevant to this discussion. The timeout in poll() is effectively implementing a long-poll API (on the client side, so it's not really long-poll, but the programmer-facing behavior is the same). The timeout isn't really bounding the execution time of the method, but instead giving a max time that callers are willing to wait around and see if any results show up.
If I understand the code sufficiently, it would be perfectly reasonable for a caller to use a timeout of 0 to implement async poll, it would just mean that KafkaConsumer would just check on each call if there's a response ready and if not, fire off a new request without waiting for a response. As such, it seems inappropriate to throw a ClientTimeoutException from poll(), except possibly if the initial phase of ensuring an assignment times out. We wouldn't want the method contract to be "returns a non-empty collection or throws a ClientTimeoutException" Now, I'm wondering if we should actually consider one of my rejected alternatives, to treat the "operation timeout" as a separate parameter from the "long-poll time". Or maybe adding an "asyncPoll(timeout, time unit)" that only uses the timeout to bound metadata updates and otherwise behaves like the current "poll(0)". Thanks, -John On Tue, Apr 17, 2018 at 2:05 PM, John Roesler <j...@confluent.io> wrote: > Hey Richard, > > As you noticed, the newly introduced KIP-288 overlaps with this one. Sorry > for stepping on your toes... How would you like to proceed? I'm happy to > "close" KIP-288 in deference to this KIP. > > With respect to poll(), reading this discussion gave me a new idea for > providing a non-breaking update path... What if we introduce a new variant > 'poll(long timeout, TimeUnit unit)' that displays the new, desired > behavior, and just leave the old method alone? > > Thanks, > -John > > On Tue, Apr 17, 2018 at 12:09 PM, Richard Yu <yohan.richard...@gmail.com> > wrote: > >> Hi all, >> >> If possible, would a committer please review? >> >> Thanks >> >> On Sun, Apr 1, 2018 at 7:24 PM, Richard Yu <yohan.richard...@gmail.com> >> wrote: >> >> > Hi Guozhang, >> > >> > I have clarified the KIP a bit to account for Becket's suggestion on >> > ClientTimeoutException. >> > About adding an extra config, you were right about my intentions. I am >> > just wondering if the config >> > should be included, since Ismael seems to favor an extra configuration, >> > >> > Thanks, >> > Richard >> > >> > On Sun, Apr 1, 2018 at 5:35 PM, Guozhang Wang <wangg...@gmail.com> >> wrote: >> > >> >> Hi Richard, >> >> >> >> Regarding the streams side changes, we plan to incorporate with the new >> >> APIs once the KIP is done, which is only internal code changes and >> hence >> >> do >> >> not need to include in the KIP. >> >> >> >> Could you update the KIP because it has been quite obsoleted from the >> >> discussed topics, and I'm a bit loosing track on what is your final >> >> proposal right now. For example, I'm not completely following your >> >> "compromise >> >> of sorts": are you suggesting that we still add overloading functions >> and >> >> add a config that will be applied to all overload functions without the >> >> timeout, while for other overloaded functions with the timeout value >> the >> >> config will be ignored? >> >> >> >> >> >> Guozhang >> >> >> >> On Fri, Mar 30, 2018 at 8:36 PM, Richard Yu < >> yohan.richard...@gmail.com> >> >> wrote: >> >> >> >> > On a side note, I have noticed that the several other methods in >> classes >> >> > such as StoreChangeLogReader in Streams calls position() which causes >> >> tests >> >> > to hang. It might be out of the scope of the KIP, but should I also >> >> change >> >> > the methods which use position() as a callback to at the very least >> >> prevent >> >> > the tests from hanging? This issue might be out of the KIP, but I >> >> prefer it >> >> > if we could at least make my PR pass the Jenkins Q&A. >> >> > >> >> > Thanks >> >> > >> >> > On Fri, Mar 30, 2018 at 8:24 PM, Richard Yu < >> yohan.richard...@gmail.com >> >> > >> >> > wrote: >> >> > >> >> > > Thanks for the review Becket. >> >> > > >> >> > > About the methods beginningOffsets(), endOffsets(), ...: >> >> > > I took a look through the code of KafkaConsumer, but after looking >> >> > through >> >> > > the offsetsByTimes() method >> >> > > and its callbacks in Fetcher, I think these methods already block >> for >> >> a >> >> > > set period of time. I know that there >> >> > > is a chance that the offsets methods in KafkaConsumer might be like >> >> poll >> >> > > (that is one section of the method >> >> > > honors the timeout while another -- updateFetchPositions -- does >> not). >> >> > > However, I don't think that this is the >> >> > > case with offsetsByTimes since the callbacks that I checked does >> not >> >> seem >> >> > > to hang. >> >> > > >> >> > > The clarity of the exception message is a problem. I thought your >> >> > > suggestion there was reasonable. I included >> >> > > it in the KIP. >> >> > > >> >> > > And on another note, I have noticed that several people has voiced >> the >> >> > > opinion that adding a config might >> >> > > be advisable in relation to adding an extra parameter. I think >> that we >> >> > can >> >> > > have a compromise of sorts: some >> >> > > methods in KafkaConsumer are relatively similar -- for example, >> >> > position() >> >> > > and committed() both call >> >> > > updateFetchPositions(). I think that we could use the same config >> for >> >> > > these method as a default timeout if >> >> > > the user does not provide one. On the other hand, if they wish to >> >> specify >> >> > > a longer or shorter blocking time, >> >> > > they have the option of changing the timeout. (I included the >> config >> >> as >> >> > an >> >> > > alternative in the KIP) WDYT? >> >> > > >> >> > > Thanks, >> >> > > Richard >> >> > > >> >> > > >> >> > > On Fri, Mar 30, 2018 at 1:26 AM, Becket Qin <becket....@gmail.com> >> >> > wrote: >> >> > > >> >> > >> Glad to see the KIP, Richard. This has been a really long pending >> >> issue. >> >> > >> >> >> > >> The original arguments from Jay for using config, such as >> >> max.block.ms, >> >> > >> instead of using timeout parameters was that people will always >> hard >> >> > code >> >> > >> the timeout, and the hard coded timeout is rarely correct because >> it >> >> has >> >> > >> to >> >> > >> consider different scenarios. For example, users may receive >> timeout >> >> > >> exception when the group coordinator moves. Having a configuration >> >> with >> >> > >> some reasonable default value will make users' life easier. >> >> > >> >> >> > >> That said, in practice, it seems more useful to have timeout >> >> parameters. >> >> > >> We >> >> > >> have seen some library, using the consumers internally, needs to >> >> provide >> >> > >> an >> >> > >> external flexible timeout interface. Also, user can easily hard >> code >> >> a >> >> > >> value to get the same as a config based solution. >> >> > >> >> >> > >> The KIP looks good overall. A few comments: >> >> > >> >> >> > >> 1. There are a few other blocking methods that are not included, >> e.g. >> >> > >> offsetsForTimes(), beginningOffsets(), endOffsets(). Is there any >> >> > reason? >> >> > >> >> >> > >> 2. I am wondering can we take the KIP as a chance to clean up our >> >> > timeout >> >> > >> exception(s)? More specifically, instead of reusing >> TimeoutException, >> >> > can >> >> > >> we introduce a new ClientTimeoutException with different causes, >> e.g. >> >> > >> UnknownTopicOrPartition, RequestTimeout, LeaderNotAvailable, etc. >> >> > >> As of now, the TimeoutException is used in the following three >> cases: >> >> > >> >> >> > >> 1. TimeoutException is a subclass of ApiException which >> indicates >> >> the >> >> > >> exception was returned by the broker. The TimeoutException was >> >> > >> initially >> >> > >> returned by the leaders when replication was not done within >> the >> >> > >> specified >> >> > >> timeout in the ProduceRequest. It has an error code of 7, >> which is >> >> > >> returned >> >> > >> by the broker. >> >> > >> 2. When we migrate to Java clients, in Errors definition, we >> >> extended >> >> > >> it >> >> > >> to indicate request timeout, i.e. a request was sent but the >> >> response >> >> > >> was >> >> > >> not received before timeout. In this case, the clients did not >> >> have a >> >> > >> return code from the broker. >> >> > >> 3. Later at some point, we started to use the TimeoutException >> for >> >> > >> clients method call timeout. It is neither related to any >> broker >> >> > >> returned >> >> > >> error code, nor to request timeout on the wire. >> >> > >> >> >> > >> Due to the various interpretations, users can easily be confused. >> As >> >> an >> >> > >> example, when a timeout is thrown with "Failed to refresh metadata >> >> in X >> >> > >> ms", it is hard to tell what exactly happened. Since we are >> changing >> >> the >> >> > >> API here, it would be good to avoid introducing more ambiguity and >> >> see >> >> > >> whether this can be improved. It would be at least one step >> forward >> >> to >> >> > >> remove the usage of case 3. >> >> > >> >> >> > >> Thanks, >> >> > >> >> >> > >> Jiangjie (Becket) Qin >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> On Mon, Mar 26, 2018 at 5:50 PM, Guozhang Wang < >> wangg...@gmail.com> >> >> > >> wrote: >> >> > >> >> >> > >> > @Richard: TimeoutException inherits from RetriableException >> which >> >> > >> inherits >> >> > >> > from ApiException. So users should explicitly try to capture >> >> > >> > RetriableException in their code and handle the exception. >> >> > >> > >> >> > >> > @Isamel, Ewen: I'm trying to push progress forward on this one, >> >> are we >> >> > >> now >> >> > >> > on the same page for using function parameters than configs? >> >> > >> > >> >> > >> > >> >> > >> > Guozhang >> >> > >> > >> >> > >> > >> >> > >> > On Fri, Mar 23, 2018 at 4:42 PM, Ismael Juma <ism...@juma.me.uk >> > >> >> > wrote: >> >> > >> > >> >> > >> > > Hi Ewen, >> >> > >> > > >> >> > >> > > Yeah, I mentioned KAFKA-2391 where some of this was discussed. >> >> Jay >> >> > was >> >> > >> > > against having timeouts in the methods at the time. However, >> as >> >> > Jason >> >> > >> > said >> >> > >> > > offline, we did end up with a timeout parameter in `poll`. >> >> > >> > > >> >> > >> > > Ismael >> >> > >> > > >> >> > >> > > On Fri, Mar 23, 2018 at 4:26 PM, Ewen Cheslack-Postava < >> >> > >> > e...@confluent.io> >> >> > >> > > wrote: >> >> > >> > > >> >> > >> > > > Regarding the flexibility question, has someone tried to >> dig up >> >> > the >> >> > >> > > > discussion of the new consumer APIs when they were being >> >> written? >> >> > I >> >> > >> > > vaguely >> >> > >> > > > recall these exact questions about using APIs vs configs and >> >> > >> > flexibility >> >> > >> > > vs >> >> > >> > > > bloating the API surface area having already been discussed. >> >> (Not >> >> > >> that >> >> > >> > we >> >> > >> > > > shouldn't revisit, just that it might also be a faster way >> to >> >> get >> >> > >> to a >> >> > >> > > full >> >> > >> > > > understanding of the options, concerns, and tradeoffs). >> >> > >> > > > >> >> > >> > > > -Ewen >> >> > >> > > > >> >> > >> > > > On Thu, Mar 22, 2018 at 7:19 AM, Richard Yu < >> >> > >> > yohan.richard...@gmail.com> >> >> > >> > > > wrote: >> >> > >> > > > >> >> > >> > > > > I do have one question though: in the current KIP, >> throwing >> >> > >> > > > > TimeoutException to mark >> >> > >> > > > > that time limit is exceeded is applied to all new methods >> >> > >> introduced >> >> > >> > in >> >> > >> > > > > this proposal. >> >> > >> > > > > However, how would users respond when a TimeoutException >> >> (since >> >> > >> it is >> >> > >> > > > > considered >> >> > >> > > > > a RuntimeException)? >> >> > >> > > > > >> >> > >> > > > > Thanks, >> >> > >> > > > > Richard >> >> > >> > > > > >> >> > >> > > > > >> >> > >> > > > > >> >> > >> > > > > On Mon, Mar 19, 2018 at 6:10 PM, Richard Yu < >> >> > >> > > yohan.richard...@gmail.com> >> >> > >> > > > > wrote: >> >> > >> > > > > >> >> > >> > > > > > Hi Ismael, >> >> > >> > > > > > >> >> > >> > > > > > You have a great point. Since most of the methods in >> this >> >> KIP >> >> > >> have >> >> > >> > > > > similar >> >> > >> > > > > > callbacks (position() and committed() both use >> >> > >> > > fetchCommittedOffsets(), >> >> > >> > > > > > and >> >> > >> > > > > > commitSync() is similar to position(), except just >> updating >> >> > >> > offsets), >> >> > >> > > > the >> >> > >> > > > > > amount of time >> >> > >> > > > > > they block should be also about equal. >> >> > >> > > > > > >> >> > >> > > > > > However, I think that we need to take into account a >> >> couple of >> >> > >> > > things. >> >> > >> > > > > For >> >> > >> > > > > > starters, >> >> > >> > > > > > if the new methods were all reliant on one config, >> there is >> >> > >> > > likelihood >> >> > >> > > > > > that the >> >> > >> > > > > > shortcomings for this approach would be similar to what >> we >> >> > >> faced if >> >> > >> > > we >> >> > >> > > > > let >> >> > >> > > > > > request.timeout.ms control all method timeouts. In >> >> > comparison, >> >> > >> > > adding >> >> > >> > > > > > overloads >> >> > >> > > > > > does not have this problem. >> >> > >> > > > > > >> >> > >> > > > > > If you have further thoughts, please let me know. >> >> > >> > > > > > >> >> > >> > > > > > Richard >> >> > >> > > > > > >> >> > >> > > > > > >> >> > >> > > > > > On Mon, Mar 19, 2018 at 5:12 PM, Ismael Juma < >> >> > ism...@juma.me.uk >> >> > >> > >> >> > >> > > > wrote: >> >> > >> > > > > > >> >> > >> > > > > >> Hi, >> >> > >> > > > > >> >> >> > >> > > > > >> An option that is not currently covered in the KIP is >> to >> >> > have a >> >> > >> > > > separate >> >> > >> > > > > >> config max.block.ms, which is similar to the producer >> >> config >> >> > >> with >> >> > >> > > the >> >> > >> > > > > >> same >> >> > >> > > > > >> name. This came up during the KAFKA-2391 discussion. I >> >> think >> >> > >> it's >> >> > >> > > > clear >> >> > >> > > > > >> that we can't rely on request.timeout.ms, so the >> >> decision is >> >> > >> > > between >> >> > >> > > > > >> adding >> >> > >> > > > > >> overloads or adding a new config. People seemed to be >> >> leaning >> >> > >> > > towards >> >> > >> > > > > the >> >> > >> > > > > >> latter in KAFKA-2391, but Jason makes a good point that >> >> the >> >> > >> > > overloads >> >> > >> > > > > are >> >> > >> > > > > >> more flexible. A couple of questions from me: >> >> > >> > > > > >> >> >> > >> > > > > >> 1. Do we need the additional flexibility? >> >> > >> > > > > >> 2. If we do, do we need it for every blocking method? >> >> > >> > > > > >> >> >> > >> > > > > >> Ismael >> >> > >> > > > > >> >> >> > >> > > > > >> On Mon, Mar 19, 2018 at 5:03 PM, Richard Yu < >> >> > >> > > > yohan.richard...@gmail.com >> >> > >> > > > > > >> >> > >> > > > > >> wrote: >> >> > >> > > > > >> >> >> > >> > > > > >> > Hi Guozhang, >> >> > >> > > > > >> > >> >> > >> > > > > >> > I made some clarifications to KIP-266, namely: >> >> > >> > > > > >> > 1. Stated more specifically that commitSync will >> accept >> >> > user >> >> > >> > > input. >> >> > >> > > > > >> > 2. fetchCommittedOffsets(): Made its role in blocking >> >> more >> >> > >> clear >> >> > >> > > to >> >> > >> > > > > the >> >> > >> > > > > >> > reader. >> >> > >> > > > > >> > 3. Sketched what would happen when time limit is >> >> exceeded. >> >> > >> > > > > >> > >> >> > >> > > > > >> > These changes should make the KIP easier to >> understand. >> >> > >> > > > > >> > >> >> > >> > > > > >> > Cheers, >> >> > >> > > > > >> > Richard >> >> > >> > > > > >> > >> >> > >> > > > > >> > On Mon, Mar 19, 2018 at 9:33 AM, Guozhang Wang < >> >> > >> > > wangg...@gmail.com> >> >> > >> > > > > >> wrote: >> >> > >> > > > > >> > >> >> > >> > > > > >> > > Hi Richard, >> >> > >> > > > > >> > > >> >> > >> > > > > >> > > I made a pass over the KIP again, some more >> >> > clarifications >> >> > >> / >> >> > >> > > > > comments: >> >> > >> > > > > >> > > >> >> > >> > > > > >> > > 1. seek() call itself is not blocking, only the >> >> following >> >> > >> > poll() >> >> > >> > > > > call >> >> > >> > > > > >> may >> >> > >> > > > > >> > > be blocking as the actually metadata rq will >> happen. >> >> > >> > > > > >> > > >> >> > >> > > > > >> > > 2. I saw you did not include >> Consumer.partitionFor(), >> >> > >> > > > > >> > > Consumer.OffsetAndTimestamp() and >> >> Consumer.listTopics() >> >> > in >> >> > >> > your >> >> > >> > > > KIP. >> >> > >> > > > > >> > After >> >> > >> > > > > >> > > a second thought, I think this may be a better >> idea to >> >> > not >> >> > >> > > tackle >> >> > >> > > > > >> them in >> >> > >> > > > > >> > > the same KIP, and probably we should consider >> whether >> >> we >> >> > >> would >> >> > >> > > > > change >> >> > >> > > > > >> the >> >> > >> > > > > >> > > behavior or not in another discussion. So I agree >> to >> >> not >> >> > >> > include >> >> > >> > > > > them. >> >> > >> > > > > >> > > >> >> > >> > > > > >> > > 3. In your wiki you mentioned "Another change >> shall be >> >> > >> made to >> >> > >> > > > > >> > > KafkaConsumer#poll(), due to its call to >> >> > >> > updateFetchPositions() >> >> > >> > > > > which >> >> > >> > > > > >> > > blocks indefinitely." This part may a bit obscure >> to >> >> most >> >> > >> > > readers >> >> > >> > > > > >> who's >> >> > >> > > > > >> > not >> >> > >> > > > > >> > > familiar with the KafkaConsumer internals, could >> you >> >> > please >> >> > >> > add >> >> > >> > > > more >> >> > >> > > > > >> > > elaborations. More specifically, I think the root >> >> causes >> >> > of >> >> > >> > the >> >> > >> > > > > public >> >> > >> > > > > >> > APIs >> >> > >> > > > > >> > > mentioned are a bit different while the KIP's >> >> explanation >> >> > >> > sounds >> >> > >> > > > > like >> >> > >> > > > > >> > they >> >> > >> > > > > >> > > are due to the same reason: >> >> > >> > > > > >> > > >> >> > >> > > > > >> > > 3.1 fetchCommittedOffsets(): this internal call >> will >> >> > block >> >> > >> > > forever >> >> > >> > > > > if >> >> > >> > > > > >> the >> >> > >> > > > > >> > > committed offsets cannot be fetched successfully >> and >> >> > affect >> >> > >> > > > > position() >> >> > >> > > > > >> > and >> >> > >> > > > > >> > > committed(). We need to break out of its internal >> >> while >> >> > >> loop. >> >> > >> > > > > >> > > 3.2 position() itself will while loop when offsets >> >> cannot >> >> > >> be >> >> > >> > > > > >> retrieved in >> >> > >> > > > > >> > > the underlying async call. We need to break out >> this >> >> > while >> >> > >> > loop. >> >> > >> > > > > >> > > 3.3 commitSync() passed Long.MAX_VALUE as the >> timeout >> >> > >> value, >> >> > >> > we >> >> > >> > > > > should >> >> > >> > > > > >> > take >> >> > >> > > > > >> > > the user specified timeouts when applicable. >> >> > >> > > > > >> > > >> >> > >> > > > > >> > > >> >> > >> > > > > >> > > >> >> > >> > > > > >> > > Guozhang >> >> > >> > > > > >> > > >> >> > >> > > > > >> > > On Sat, Mar 17, 2018 at 4:44 PM, Richard Yu < >> >> > >> > > > > >> yohan.richard...@gmail.com> >> >> > >> > > > > >> > > wrote: >> >> > >> > > > > >> > > >> >> > >> > > > > >> > > > Actually, what I said above is inaccurate. In >> >> > >> > > > > >> > > > testSeekAndCommitWithBrokerFailures, >> >> > >> > TestUtils.waitUntilTrue >> >> > >> > > > > >> blocks, >> >> > >> > > > > >> > not >> >> > >> > > > > >> > > > seek. >> >> > >> > > > > >> > > > My assumption is that seek did not update >> >> correctly. I >> >> > >> will >> >> > >> > be >> >> > >> > > > > >> digging >> >> > >> > > > > >> > > > further into this. >> >> > >> > > > > >> > > > >> >> > >> > > > > >> > > > >> >> > >> > > > > >> > > > >> >> > >> > > > > >> > > > On Sat, Mar 17, 2018 at 4:16 PM, Richard Yu < >> >> > >> > > > > >> > yohan.richard...@gmail.com> >> >> > >> > > > > >> > > > wrote: >> >> > >> > > > > >> > > > >> >> > >> > > > > >> > > > > One more thing: when looking through tests, I >> have >> >> > >> > realized >> >> > >> > > > that >> >> > >> > > > > >> > seek() >> >> > >> > > > > >> > > > > methods can potentially block indefinitely. As >> you >> >> > well >> >> > >> > > know, >> >> > >> > > > > >> seek() >> >> > >> > > > > >> > is >> >> > >> > > > > >> > > > > called when pollOnce() or position() is active. >> >> Thus, >> >> > >> if >> >> > >> > > > > >> position() >> >> > >> > > > > >> > > > blocks >> >> > >> > > > > >> > > > > indefinitely, then so would seek(). Should >> >> bounding >> >> > >> seek() >> >> > >> > > > also >> >> > >> > > > > be >> >> > >> > > > > >> > > > included >> >> > >> > > > > >> > > > > in this KIP? >> >> > >> > > > > >> > > > > >> >> > >> > > > > >> > > > > Thanks, Richard >> >> > >> > > > > >> > > > > >> >> > >> > > > > >> > > > > On Sat, Mar 17, 2018 at 1:16 PM, Richard Yu < >> >> > >> > > > > >> > > yohan.richard...@gmail.com> >> >> > >> > > > > >> > > > > wrote: >> >> > >> > > > > >> > > > > >> >> > >> > > > > >> > > > >> Thanks for the advice, Jason >> >> > >> > > > > >> > > > >> >> >> > >> > > > > >> > > > >> I have modified KIP-266 to include the java >> doc >> >> for >> >> > >> > > > committed() >> >> > >> > > > > >> and >> >> > >> > > > > >> > > > other >> >> > >> > > > > >> > > > >> blocking methods, and I also >> >> > >> > > > > >> > > > >> mentioned poll() which will also be bounded. >> Let >> >> me >> >> > >> know >> >> > >> > if >> >> > >> > > > > >> there is >> >> > >> > > > > >> > > > >> anything else. :) >> >> > >> > > > > >> > > > >> >> >> > >> > > > > >> > > > >> Sincerely, Richard >> >> > >> > > > > >> > > > >> >> >> > >> > > > > >> > > > >> >> >> > >> > > > > >> > > > >> >> >> > >> > > > > >> > > > >> >> >> > >> > > > > >> > > > >> >> >> > >> > > > > >> > > > >> On Sat, Mar 17, 2018 at 12:00 PM, Jason >> >> Gustafson < >> >> > >> > > > > >> > ja...@confluent.io >> >> > >> > > > > >> > > > >> >> > >> > > > > >> > > > >> wrote: >> >> > >> > > > > >> > > > >> >> >> > >> > > > > >> > > > >>> Hi Richard, >> >> > >> > > > > >> > > > >>> >> >> > >> > > > > >> > > > >>> Thanks for the updates. I'm really glad you >> >> picked >> >> > >> this >> >> > >> > > up. >> >> > >> > > > A >> >> > >> > > > > >> > couple >> >> > >> > > > > >> > > > >>> minor >> >> > >> > > > > >> > > > >>> comments: >> >> > >> > > > > >> > > > >>> >> >> > >> > > > > >> > > > >>> 1. Can you list the full set of new APIs >> >> explicitly >> >> > >> in >> >> > >> > the >> >> > >> > > > > KIP? >> >> > >> > > > > >> > > > >>> Currently I >> >> > >> > > > > >> > > > >>> only see the javadoc for `position()`. >> >> > >> > > > > >> > > > >>> >> >> > >> > > > > >> > > > >>> 2. We should consider adding `TimeUnit` to >> the >> >> new >> >> > >> > methods >> >> > >> > > > to >> >> > >> > > > > >> avoid >> >> > >> > > > > >> > > > unit >> >> > >> > > > > >> > > > >>> confusion. I know it's inconsistent with the >> >> poll() >> >> > >> API, >> >> > >> > > > but I >> >> > >> > > > > >> > think >> >> > >> > > > > >> > > it >> >> > >> > > > > >> > > > >>> was >> >> > >> > > > > >> > > > >>> probably a mistake not to include it there, >> so >> >> > better >> >> > >> > not >> >> > >> > > to >> >> > >> > > > > >> double >> >> > >> > > > > >> > > > down >> >> > >> > > > > >> > > > >>> on >> >> > >> > > > > >> > > > >>> that mistake. And note that we do already >> have >> >> > >> > > `close(long, >> >> > >> > > > > >> > > TimeUnit)`. >> >> > >> > > > > >> > > > >>> >> >> > >> > > > > >> > > > >>> Other than that, I think the current KIP >> seems >> >> > >> > reasonable. >> >> > >> > > > > >> > > > >>> >> >> > >> > > > > >> > > > >>> Thanks, >> >> > >> > > > > >> > > > >>> Jason >> >> > >> > > > > >> > > > >>> >> >> > >> > > > > >> > > > >>> On Wed, Mar 14, 2018 at 5:00 PM, Richard Yu < >> >> > >> > > > > >> > > > yohan.richard...@gmail.com> >> >> > >> > > > > >> > > > >>> wrote: >> >> > >> > > > > >> > > > >>> >> >> > >> > > > > >> > > > >>> > Note to all: I have included bounding >> >> > commitSync() >> >> > >> and >> >> > >> > > > > >> > committed() >> >> > >> > > > > >> > > in >> >> > >> > > > > >> > > > >>> this >> >> > >> > > > > >> > > > >>> > KIP. >> >> > >> > > > > >> > > > >>> > >> >> > >> > > > > >> > > > >>> > On Sun, Mar 11, 2018 at 5:05 PM, Richard >> Yu < >> >> > >> > > > > >> > > > >>> yohan.richard...@gmail.com> >> >> > >> > > > > >> > > > >>> > wrote: >> >> > >> > > > > >> > > > >>> > >> >> > >> > > > > >> > > > >>> > > Hi all, >> >> > >> > > > > >> > > > >>> > > >> >> > >> > > > > >> > > > >>> > > I updated the KIP where overloading >> >> position() >> >> > is >> >> > >> > now >> >> > >> > > > the >> >> > >> > > > > >> > favored >> >> > >> > > > > >> > > > >>> > approach. >> >> > >> > > > > >> > > > >>> > > Bounding position() using >> requestTimeoutMs >> >> has >> >> > >> been >> >> > >> > > > listed >> >> > >> > > > > >> as >> >> > >> > > > > >> > > > >>> rejected. >> >> > >> > > > > >> > > > >>> > > >> >> > >> > > > > >> > > > >>> > > Any thoughts? >> >> > >> > > > > >> > > > >>> > > >> >> > >> > > > > >> > > > >>> > > On Tue, Mar 6, 2018 at 6:00 PM, Guozhang >> >> Wang < >> >> > >> > > > > >> > > wangg...@gmail.com> >> >> > >> > > > > >> > > > >>> > wrote: >> >> > >> > > > > >> > > > >>> > > >> >> > >> > > > > >> > > > >>> > >> I agree that adding the overloads is >> most >> >> > >> flexible. >> >> > >> > > But >> >> > >> > > > > >> going >> >> > >> > > > > >> > > for >> >> > >> > > > > >> > > > >>> that >> >> > >> > > > > >> > > > >>> > >> direction we'd do that for all the >> blocking >> >> > call >> >> > >> > that >> >> > >> > > > > I've >> >> > >> > > > > >> > > listed >> >> > >> > > > > >> > > > >>> above, >> >> > >> > > > > >> > > > >>> > >> with this timeout value covering the >> >> > end-to-end >> >> > >> > > waiting >> >> > >> > > > > >> time. >> >> > >> > > > > >> > > > >>> > >> >> >> > >> > > > > >> > > > >>> > >> >> >> > >> > > > > >> > > > >>> > >> Guozhang >> >> > >> > > > > >> > > > >>> > >> >> >> > >> > > > > >> > > > >>> > >> On Tue, Mar 6, 2018 at 10:02 AM, Ted Yu >> < >> >> > >> > > > > >> yuzhih...@gmail.com> >> >> > >> > > > > >> > > > >>> wrote: >> >> > >> > > > > >> > > > >>> > >> >> >> > >> > > > > >> > > > >>> > >> > bq. The most flexible option is to add >> >> > >> overloads >> >> > >> > to >> >> > >> > > > the >> >> > >> > > > > >> > > consumer >> >> > >> > > > > >> > > > >>> > >> > >> >> > >> > > > > >> > > > >>> > >> > This option is flexible. >> >> > >> > > > > >> > > > >>> > >> > >> >> > >> > > > > >> > > > >>> > >> > Looking at the tail of SPARK-18057, >> Spark >> >> > dev >> >> > >> > > voiced >> >> > >> > > > > the >> >> > >> > > > > >> > same >> >> > >> > > > > >> > > > >>> choice. >> >> > >> > > > > >> > > > >>> > >> > >> >> > >> > > > > >> > > > >>> > >> > +1 for adding overload with timeout >> >> > parameter. >> >> > >> > > > > >> > > > >>> > >> > >> >> > >> > > > > >> > > > >>> > >> > Cheers >> >> > >> > > > > >> > > > >>> > >> > >> >> > >> > > > > >> > > > >>> > >> > On Mon, Mar 5, 2018 at 2:42 PM, Jason >> >> > >> Gustafson < >> >> > >> > > > > >> > > > >>> ja...@confluent.io> >> >> > >> > > > > >> > > > >>> > >> > wrote: >> >> > >> > > > > >> > > > >>> > >> > >> >> > >> > > > > >> > > > >>> > >> > > @Guozhang I probably have suggested >> all >> >> > >> options >> >> > >> > > at >> >> > >> > > > > some >> >> > >> > > > > >> > > point >> >> > >> > > > > >> > > > or >> >> > >> > > > > >> > > > >>> > >> another, >> >> > >> > > > > >> > > > >>> > >> > > including most recently, the current >> >> KIP! >> >> > I >> >> > >> was >> >> > >> > > > > >> thinking >> >> > >> > > > > >> > > that >> >> > >> > > > > >> > > > >>> > >> practically >> >> > >> > > > > >> > > > >>> > >> > > speaking, the request timeout >> defines >> >> how >> >> > >> long >> >> > >> > > the >> >> > >> > > > > >> user is >> >> > >> > > > > >> > > > >>> willing >> >> > >> > > > > >> > > > >>> > to >> >> > >> > > > > >> > > > >>> > >> > wait >> >> > >> > > > > >> > > > >>> > >> > > for a response. The consumer doesn't >> >> > really >> >> > >> > have >> >> > >> > > a >> >> > >> > > > > >> complex >> >> > >> > > > > >> > > > send >> >> > >> > > > > >> > > > >>> > >> process >> >> > >> > > > > >> > > > >>> > >> > > like the producer for any of these >> >> APIs, >> >> > so >> >> > >> I >> >> > >> > > > wasn't >> >> > >> > > > > >> sure >> >> > >> > > > > >> > > how >> >> > >> > > > > >> > > > >>> much >> >> > >> > > > > >> > > > >>> > >> > benefit >> >> > >> > > > > >> > > > >>> > >> > > there would be from having more >> >> granular >> >> > >> > control >> >> > >> > > > over >> >> > >> > > > > >> > > timeouts >> >> > >> > > > > >> > > > >>> (in >> >> > >> > > > > >> > > > >>> > the >> >> > >> > > > > >> > > > >>> > >> > end, >> >> > >> > > > > >> > > > >>> > >> > > KIP-91 just adds a single timeout to >> >> > control >> >> > >> > the >> >> > >> > > > > whole >> >> > >> > > > > >> > > send). >> >> > >> > > > > >> > > > >>> That >> >> > >> > > > > >> > > > >>> > >> said, >> >> > >> > > > > >> > > > >>> > >> > it >> >> > >> > > > > >> > > > >>> > >> > > might indeed be better to avoid >> >> > overloading >> >> > >> the >> >> > >> > > > > config >> >> > >> > > > > >> as >> >> > >> > > > > >> > > you >> >> > >> > > > > >> > > > >>> > suggest >> >> > >> > > > > >> > > > >>> > >> > since >> >> > >> > > > > >> > > > >>> > >> > > at least it avoids inconsistency >> with >> >> the >> >> > >> > > > producer's >> >> > >> > > > > >> > usage. >> >> > >> > > > > >> > > > >>> > >> > > >> >> > >> > > > > >> > > > >>> > >> > > The most flexible option is to add >> >> > >> overloads to >> >> > >> > > the >> >> > >> > > > > >> > consumer >> >> > >> > > > > >> > > > so >> >> > >> > > > > >> > > > >>> that >> >> > >> > > > > >> > > > >>> > >> > users >> >> > >> > > > > >> > > > >>> > >> > > can pass the timeout directly. I'm >> not >> >> > sure >> >> > >> if >> >> > >> > > that >> >> > >> > > > > is >> >> > >> > > > > >> > more >> >> > >> > > > > >> > > or >> >> > >> > > > > >> > > > >>> less >> >> > >> > > > > >> > > > >>> > >> > > annoying than a new config, but I've >> >> found >> >> > >> > config >> >> > >> > > > > >> > timeouts a >> >> > >> > > > > >> > > > >>> little >> >> > >> > > > > >> > > > >>> > >> > > constraining in practice. For >> example, >> >> I >> >> > >> could >> >> > >> > > > > imagine >> >> > >> > > > > >> > users >> >> > >> > > > > >> > > > >>> wanting >> >> > >> > > > > >> > > > >>> > >> to >> >> > >> > > > > >> > > > >>> > >> > > wait longer for an offset commit >> >> operation >> >> > >> > than a >> >> > >> > > > > >> position >> >> > >> > > > > >> > > > >>> lookup; >> >> > >> > > > > >> > > > >>> > if >> >> > >> > > > > >> > > > >>> > >> the >> >> > >> > > > > >> > > > >>> > >> > > latter isn't timely, users can just >> >> pause >> >> > >> the >> >> > >> > > > > partition >> >> > >> > > > > >> > and >> >> > >> > > > > >> > > > >>> continue >> >> > >> > > > > >> > > > >>> > >> > > fetching on others. If you cannot >> >> commit >> >> > >> > offsets, >> >> > >> > > > > >> however, >> >> > >> > > > > >> > > it >> >> > >> > > > > >> > > > >>> might >> >> > >> > > > > >> > > > >>> > be >> >> > >> > > > > >> > > > >>> > >> > > safer for an application to wait >> >> > >> availability >> >> > >> > of >> >> > >> > > > the >> >> > >> > > > > >> > > > coordinator >> >> > >> > > > > >> > > > >>> > than >> >> > >> > > > > >> > > > >>> > >> > > continuing. >> >> > >> > > > > >> > > > >>> > >> > > >> >> > >> > > > > >> > > > >>> > >> > > -Jason >> >> > >> > > > > >> > > > >>> > >> > > >> >> > >> > > > > >> > > > >>> > >> > > On Sun, Mar 4, 2018 at 10:14 PM, >> >> Guozhang >> >> > >> Wang >> >> > >> > < >> >> > >> > > > > >> > > > >>> wangg...@gmail.com> >> >> > >> > > > > >> > > > >>> > >> > wrote: >> >> > >> > > > > >> > > > >>> > >> > > >> >> > >> > > > > >> > > > >>> > >> > > > Hello Richard, >> >> > >> > > > > >> > > > >>> > >> > > > >> >> > >> > > > > >> > > > >>> > >> > > > Thanks for the proposed KIP. I >> have a >> >> > >> couple >> >> > >> > of >> >> > >> > > > > >> general >> >> > >> > > > > >> > > > >>> comments: >> >> > >> > > > > >> > > > >>> > >> > > > >> >> > >> > > > > >> > > > >>> > >> > > > 1. I'm not sure if piggy-backing >> the >> >> > >> timeout >> >> > >> > > > > >> exception >> >> > >> > > > > >> > on >> >> > >> > > > > >> > > > the >> >> > >> > > > > >> > > > >>> > >> > > > existing requestTimeoutMs >> configured >> >> in >> >> > " >> >> > >> > > > > >> > > request.timeout.ms >> >> > >> > > > > >> > > > " >> >> > >> > > > > >> > > > >>> is a >> >> > >> > > > > >> > > > >>> > >> good >> >> > >> > > > > >> > > > >>> > >> > > > idea >> >> > >> > > > > >> > > > >>> > >> > > > since a) it is a general config >> that >> >> > >> applies >> >> > >> > > for >> >> > >> > > > > all >> >> > >> > > > > >> > types >> >> > >> > > > > >> > > > of >> >> > >> > > > > >> > > > >>> > >> requests, >> >> > >> > > > > >> > > > >>> > >> > > and >> >> > >> > > > > >> > > > >>> > >> > > > 2) using it to cover all the >> phases >> >> of >> >> > an >> >> > >> API >> >> > >> > > > call, >> >> > >> > > > > >> > > > including >> >> > >> > > > > >> > > > >>> > >> network >> >> > >> > > > > >> > > > >>> > >> > > round >> >> > >> > > > > >> > > > >>> > >> > > > trip and potential metadata >> refresh >> >> is >> >> > >> shown >> >> > >> > to >> >> > >> > > > not >> >> > >> > > > > >> be a >> >> > >> > > > > >> > > > good >> >> > >> > > > > >> > > > >>> > idea, >> >> > >> > > > > >> > > > >>> > >> as >> >> > >> > > > > >> > > > >>> > >> > > > illustrated in KIP-91: >> >> > >> > > > > >> > > > >>> > >> > > > >> >> > >> > > > > >> > > > >>> > >> > > > https://cwiki.apache.org/confl >> >> > >> > > > > >> uence/display/KAFKA/KIP- >> >> > >> > > > > >> > > > >>> > >> > > > 91+Provide+Intuitive+User+ >> >> > >> > > > Timeouts+in+The+Producer >> >> > >> > > > > >> > > > >>> > >> > > > >> >> > >> > > > > >> > > > >>> > >> > > > In fact, I think in KAFKA-4879 >> which >> >> is >> >> > >> aimed >> >> > >> > > for >> >> > >> > > > > the >> >> > >> > > > > >> > same >> >> > >> > > > > >> > > > >>> issue >> >> > >> > > > > >> > > > >>> > as >> >> > >> > > > > >> > > > >>> > >> > > > KAFKA-6608, >> >> > >> > > > > >> > > > >>> > >> > > > Jason has suggested we use a new >> >> config >> >> > >> for >> >> > >> > the >> >> > >> > > > > API. >> >> > >> > > > > >> > Maybe >> >> > >> > > > > >> > > > >>> this >> >> > >> > > > > >> > > > >>> > >> would >> >> > >> > > > > >> > > > >>> > >> > be >> >> > >> > > > > >> > > > >>> > >> > > a >> >> > >> > > > > >> > > > >>> > >> > > > more intuitive manner than reusing >> >> the >> >> > >> > > > > >> > request.timeout.ms >> >> > >> > > > > >> > > > >>> config. >> >> > >> > > > > >> > > > >>> > >> > > > >> >> > >> > > > > >> > > > >>> > >> > > > >> >> > >> > > > > >> > > > >>> > >> > > > 2. Besides the Consumer.position() >> >> call, >> >> > >> > there >> >> > >> > > > are >> >> > >> > > > > a >> >> > >> > > > > >> > > couple >> >> > >> > > > > >> > > > of >> >> > >> > > > > >> > > > >>> > more >> >> > >> > > > > >> > > > >>> > >> > > > blocking calls today that could >> >> result >> >> > in >> >> > >> > > > infinite >> >> > >> > > > > >> > > blocking: >> >> > >> > > > > >> > > > >>> > >> > > > Consumer.commitSync() and >> >> > >> > Consumer.committed(), >> >> > >> > > > > >> should >> >> > >> > > > > >> > > they >> >> > >> > > > > >> > > > be >> >> > >> > > > > >> > > > >>> > >> > considered >> >> > >> > > > > >> > > > >>> > >> > > > in this KIP as well? >> >> > >> > > > > >> > > > >>> > >> > > > >> >> > >> > > > > >> > > > >>> > >> > > > 3. There are a few other APIs that >> >> are >> >> > >> today >> >> > >> > > > > relying >> >> > >> > > > > >> on >> >> > >> > > > > >> > > > >>> > >> > > request.timeout.ms >> >> > >> > > > > >> > > > >>> > >> > > > already for breaking the infinite >> >> > >> blocking, >> >> > >> > > > namely >> >> > >> > > > > >> > > > >>> > >> > > Consumer.partitionFor(), >> >> > >> > > > > >> > > > >>> > >> > > > Consumer.OffsetAndTimestamp() and >> >> > >> > > > > >> Consumer.listTopics(), >> >> > >> > > > > >> > > if >> >> > >> > > > > >> > > > >>> we are >> >> > >> > > > > >> > > > >>> > >> > making >> >> > >> > > > > >> > > > >>> > >> > > > the other blocking calls to be >> >> relying a >> >> > >> new >> >> > >> > > > config >> >> > >> > > > > >> as >> >> > >> > > > > >> > > > >>> suggested >> >> > >> > > > > >> > > > >>> > in >> >> > >> > > > > >> > > > >>> > >> 1) >> >> > >> > > > > >> > > > >>> > >> > > > above, should we also change the >> >> > >> semantics of >> >> > >> > > > these >> >> > >> > > > > >> API >> >> > >> > > > > >> > > > >>> functions >> >> > >> > > > > >> > > > >>> > >> for >> >> > >> > > > > >> > > > >>> > >> > > > consistency? >> >> > >> > > > > >> > > > >>> > >> > > > >> >> > >> > > > > >> > > > >>> > >> > > > >> >> > >> > > > > >> > > > >>> > >> > > > Guozhang >> >> > >> > > > > >> > > > >>> > >> > > > >> >> > >> > > > > >> > > > >>> > >> > > > >> >> > >> > > > > >> > > > >>> > >> > > > >> >> > >> > > > > >> > > > >>> > >> > > > >> >> > >> > > > > >> > > > >>> > >> > > > On Sun, Mar 4, 2018 at 11:13 AM, >> >> Richard >> >> > >> Yu < >> >> > >> > > > > >> > > > >>> > >> > yohan.richard...@gmail.com> >> >> > >> > > > > >> > > > >>> > >> > > > wrote: >> >> > >> > > > > >> > > > >>> > >> > > > >> >> > >> > > > > >> > > > >>> > >> > > > > Hi all, >> >> > >> > > > > >> > > > >>> > >> > > > > >> >> > >> > > > > >> > > > >>> > >> > > > > I would like to discuss a >> potential >> >> > >> change >> >> > >> > > > which >> >> > >> > > > > >> would >> >> > >> > > > > >> > > be >> >> > >> > > > > >> > > > >>> made >> >> > >> > > > > >> > > > >>> > to >> >> > >> > > > > >> > > > >>> > >> > > > > KafkaConsumer: >> >> > >> > > > > >> > > > >>> > >> > > > > https://cwiki.apache.org/ >> >> > >> > > > > confluence/pages/viewpage >> >> > >> > > > > >> . >> >> > >> > > > > >> > > > >>> > >> > > > action?pageId=75974886 >> >> > >> > > > > >> > > > >>> > >> > > > > >> >> > >> > > > > >> > > > >>> > >> > > > > Thanks, >> >> > >> > > > > >> > > > >>> > >> > > > > Richard Yu >> >> > >> > > > > >> > > > >>> > >> > > > > >> >> > >> > > > > >> > > > >>> > >> > > > >> >> > >> > > > > >> > > > >>> > >> > > > >> >> > >> > > > > >> > > > >>> > >> > > > >> >> > >> > > > > >> > > > >>> > >> > > > -- >> >> > >> > > > > >> > > > >>> > >> > > > -- Guozhang >> >> > >> > > > > >> > > > >>> > >> > > > >> >> > >> > > > > >> > > > >>> > >> > > >> >> > >> > > > > >> > > > >>> > >> > >> >> > >> > > > > >> > > > >>> > >> >> >> > >> > > > > >> > > > >>> > >> >> >> > >> > > > > >> > > > >>> > >> >> >> > >> > > > > >> > > > >>> > >> -- >> >> > >> > > > > >> > > > >>> > >> -- Guozhang >> >> > >> > > > > >> > > > >>> > >> >> >> > >> > > > > >> > > > >>> > > >> >> > >> > > > > >> > > > >>> > > >> >> > >> > > > > >> > > > >>> > >> >> > >> > > > > >> > > > >>> >> >> > >> > > > > >> > > > >> >> >> > >> > > > > >> > > > >> >> >> > >> > > > > >> > > > > >> >> > >> > > > > >> > > > >> >> > >> > > > > >> > > >> >> > >> > > > > >> > > >> >> > >> > > > > >> > > >> >> > >> > > > > >> > > -- >> >> > >> > > > > >> > > -- Guozhang >> >> > >> > > > > >> > > >> >> > >> > > > > >> > >> >> > >> > > > > >> >> >> > >> > > > > > >> >> > >> > > > > > >> >> > >> > > > > >> >> > >> > > > >> >> > >> > > >> >> > >> > >> >> > >> > >> >> > >> > >> >> > >> > -- >> >> > >> > -- Guozhang >> >> > >> > >> >> > >> >> >> > > >> >> > > >> >> > >> >> >> >> >> >> >> >> -- >> >> -- Guozhang >> >> >> > >> > >> > >