I do have one question though: in the current KIP, throwing TimeoutException to mark that time limit is exceeded is applied to all new methods introduced in this proposal. However, how would users respond when a TimeoutException (since it is considered a RuntimeException)?
Thanks, Richard On Mon, Mar 19, 2018 at 6:10 PM, Richard Yu <yohan.richard...@gmail.com> wrote: > Hi Ismael, > > You have a great point. Since most of the methods in this KIP have similar > callbacks (position() and committed() both use fetchCommittedOffsets(), > and > commitSync() is similar to position(), except just updating offsets), the > amount of time > they block should be also about equal. > > However, I think that we need to take into account a couple of things. For > starters, > if the new methods were all reliant on one config, there is likelihood > that the > shortcomings for this approach would be similar to what we faced if we let > request.timeout.ms control all method timeouts. In comparison, adding > overloads > does not have this problem. > > If you have further thoughts, please let me know. > > Richard > > > On Mon, Mar 19, 2018 at 5:12 PM, Ismael Juma <ism...@juma.me.uk> wrote: > >> Hi, >> >> An option that is not currently covered in the KIP is to have a separate >> config max.block.ms, which is similar to the producer config with the >> same >> name. This came up during the KAFKA-2391 discussion. I think it's clear >> that we can't rely on request.timeout.ms, so the decision is between >> adding >> overloads or adding a new config. People seemed to be leaning towards the >> latter in KAFKA-2391, but Jason makes a good point that the overloads are >> more flexible. A couple of questions from me: >> >> 1. Do we need the additional flexibility? >> 2. If we do, do we need it for every blocking method? >> >> Ismael >> >> On Mon, Mar 19, 2018 at 5:03 PM, Richard Yu <yohan.richard...@gmail.com> >> wrote: >> >> > Hi Guozhang, >> > >> > I made some clarifications to KIP-266, namely: >> > 1. Stated more specifically that commitSync will accept user input. >> > 2. fetchCommittedOffsets(): Made its role in blocking more clear to the >> > reader. >> > 3. Sketched what would happen when time limit is exceeded. >> > >> > These changes should make the KIP easier to understand. >> > >> > Cheers, >> > Richard >> > >> > On Mon, Mar 19, 2018 at 9:33 AM, Guozhang Wang <wangg...@gmail.com> >> wrote: >> > >> > > Hi Richard, >> > > >> > > I made a pass over the KIP again, some more clarifications / comments: >> > > >> > > 1. seek() call itself is not blocking, only the following poll() call >> may >> > > be blocking as the actually metadata rq will happen. >> > > >> > > 2. I saw you did not include Consumer.partitionFor(), >> > > Consumer.OffsetAndTimestamp() and Consumer.listTopics() in your KIP. >> > After >> > > a second thought, I think this may be a better idea to not tackle >> them in >> > > the same KIP, and probably we should consider whether we would change >> the >> > > behavior or not in another discussion. So I agree to not include them. >> > > >> > > 3. In your wiki you mentioned "Another change shall be made to >> > > KafkaConsumer#poll(), due to its call to updateFetchPositions() which >> > > blocks indefinitely." This part may a bit obscure to most readers >> who's >> > not >> > > familiar with the KafkaConsumer internals, could you please add more >> > > elaborations. More specifically, I think the root causes of the public >> > APIs >> > > mentioned are a bit different while the KIP's explanation sounds like >> > they >> > > are due to the same reason: >> > > >> > > 3.1 fetchCommittedOffsets(): this internal call will block forever if >> the >> > > committed offsets cannot be fetched successfully and affect position() >> > and >> > > committed(). We need to break out of its internal while loop. >> > > 3.2 position() itself will while loop when offsets cannot be >> retrieved in >> > > the underlying async call. We need to break out this while loop. >> > > 3.3 commitSync() passed Long.MAX_VALUE as the timeout value, we should >> > take >> > > the user specified timeouts when applicable. >> > > >> > > >> > > >> > > Guozhang >> > > >> > > On Sat, Mar 17, 2018 at 4:44 PM, Richard Yu < >> yohan.richard...@gmail.com> >> > > wrote: >> > > >> > > > Actually, what I said above is inaccurate. In >> > > > testSeekAndCommitWithBrokerFailures, TestUtils.waitUntilTrue >> blocks, >> > not >> > > > seek. >> > > > My assumption is that seek did not update correctly. I will be >> digging >> > > > further into this. >> > > > >> > > > >> > > > >> > > > On Sat, Mar 17, 2018 at 4:16 PM, Richard Yu < >> > yohan.richard...@gmail.com> >> > > > wrote: >> > > > >> > > > > One more thing: when looking through tests, I have realized that >> > seek() >> > > > > methods can potentially block indefinitely. As you well know, >> seek() >> > is >> > > > > called when pollOnce() or position() is active. Thus, if >> position() >> > > > blocks >> > > > > indefinitely, then so would seek(). Should bounding seek() also be >> > > > included >> > > > > in this KIP? >> > > > > >> > > > > Thanks, Richard >> > > > > >> > > > > On Sat, Mar 17, 2018 at 1:16 PM, Richard Yu < >> > > yohan.richard...@gmail.com> >> > > > > wrote: >> > > > > >> > > > >> Thanks for the advice, Jason >> > > > >> >> > > > >> I have modified KIP-266 to include the java doc for committed() >> and >> > > > other >> > > > >> blocking methods, and I also >> > > > >> mentioned poll() which will also be bounded. Let me know if >> there is >> > > > >> anything else. :) >> > > > >> >> > > > >> Sincerely, Richard >> > > > >> >> > > > >> >> > > > >> >> > > > >> >> > > > >> >> > > > >> On Sat, Mar 17, 2018 at 12:00 PM, Jason Gustafson < >> > ja...@confluent.io >> > > > >> > > > >> wrote: >> > > > >> >> > > > >>> Hi Richard, >> > > > >>> >> > > > >>> Thanks for the updates. I'm really glad you picked this up. A >> > couple >> > > > >>> minor >> > > > >>> comments: >> > > > >>> >> > > > >>> 1. Can you list the full set of new APIs explicitly in the KIP? >> > > > >>> Currently I >> > > > >>> only see the javadoc for `position()`. >> > > > >>> >> > > > >>> 2. We should consider adding `TimeUnit` to the new methods to >> avoid >> > > > unit >> > > > >>> confusion. I know it's inconsistent with the poll() API, but I >> > think >> > > it >> > > > >>> was >> > > > >>> probably a mistake not to include it there, so better not to >> double >> > > > down >> > > > >>> on >> > > > >>> that mistake. And note that we do already have `close(long, >> > > TimeUnit)`. >> > > > >>> >> > > > >>> Other than that, I think the current KIP seems reasonable. >> > > > >>> >> > > > >>> Thanks, >> > > > >>> Jason >> > > > >>> >> > > > >>> On Wed, Mar 14, 2018 at 5:00 PM, Richard Yu < >> > > > yohan.richard...@gmail.com> >> > > > >>> wrote: >> > > > >>> >> > > > >>> > Note to all: I have included bounding commitSync() and >> > committed() >> > > in >> > > > >>> this >> > > > >>> > KIP. >> > > > >>> > >> > > > >>> > On Sun, Mar 11, 2018 at 5:05 PM, Richard Yu < >> > > > >>> yohan.richard...@gmail.com> >> > > > >>> > wrote: >> > > > >>> > >> > > > >>> > > Hi all, >> > > > >>> > > >> > > > >>> > > I updated the KIP where overloading position() is now the >> > favored >> > > > >>> > approach. >> > > > >>> > > Bounding position() using requestTimeoutMs has been listed >> as >> > > > >>> rejected. >> > > > >>> > > >> > > > >>> > > Any thoughts? >> > > > >>> > > >> > > > >>> > > On Tue, Mar 6, 2018 at 6:00 PM, Guozhang Wang < >> > > wangg...@gmail.com> >> > > > >>> > wrote: >> > > > >>> > > >> > > > >>> > >> I agree that adding the overloads is most flexible. But >> going >> > > for >> > > > >>> that >> > > > >>> > >> direction we'd do that for all the blocking call that I've >> > > listed >> > > > >>> above, >> > > > >>> > >> with this timeout value covering the end-to-end waiting >> time. >> > > > >>> > >> >> > > > >>> > >> >> > > > >>> > >> Guozhang >> > > > >>> > >> >> > > > >>> > >> On Tue, Mar 6, 2018 at 10:02 AM, Ted Yu < >> yuzhih...@gmail.com> >> > > > >>> wrote: >> > > > >>> > >> >> > > > >>> > >> > bq. The most flexible option is to add overloads to the >> > > consumer >> > > > >>> > >> > >> > > > >>> > >> > This option is flexible. >> > > > >>> > >> > >> > > > >>> > >> > Looking at the tail of SPARK-18057, Spark dev voiced the >> > same >> > > > >>> choice. >> > > > >>> > >> > >> > > > >>> > >> > +1 for adding overload with timeout parameter. >> > > > >>> > >> > >> > > > >>> > >> > Cheers >> > > > >>> > >> > >> > > > >>> > >> > On Mon, Mar 5, 2018 at 2:42 PM, Jason Gustafson < >> > > > >>> ja...@confluent.io> >> > > > >>> > >> > wrote: >> > > > >>> > >> > >> > > > >>> > >> > > @Guozhang I probably have suggested all options at some >> > > point >> > > > or >> > > > >>> > >> another, >> > > > >>> > >> > > including most recently, the current KIP! I was >> thinking >> > > that >> > > > >>> > >> practically >> > > > >>> > >> > > speaking, the request timeout defines how long the >> user is >> > > > >>> willing >> > > > >>> > to >> > > > >>> > >> > wait >> > > > >>> > >> > > for a response. The consumer doesn't really have a >> complex >> > > > send >> > > > >>> > >> process >> > > > >>> > >> > > like the producer for any of these APIs, so I wasn't >> sure >> > > how >> > > > >>> much >> > > > >>> > >> > benefit >> > > > >>> > >> > > there would be from having more granular control over >> > > timeouts >> > > > >>> (in >> > > > >>> > the >> > > > >>> > >> > end, >> > > > >>> > >> > > KIP-91 just adds a single timeout to control the whole >> > > send). >> > > > >>> That >> > > > >>> > >> said, >> > > > >>> > >> > it >> > > > >>> > >> > > might indeed be better to avoid overloading the config >> as >> > > you >> > > > >>> > suggest >> > > > >>> > >> > since >> > > > >>> > >> > > at least it avoids inconsistency with the producer's >> > usage. >> > > > >>> > >> > > >> > > > >>> > >> > > The most flexible option is to add overloads to the >> > consumer >> > > > so >> > > > >>> that >> > > > >>> > >> > users >> > > > >>> > >> > > can pass the timeout directly. I'm not sure if that is >> > more >> > > or >> > > > >>> less >> > > > >>> > >> > > annoying than a new config, but I've found config >> > timeouts a >> > > > >>> little >> > > > >>> > >> > > constraining in practice. For example, I could imagine >> > users >> > > > >>> wanting >> > > > >>> > >> to >> > > > >>> > >> > > wait longer for an offset commit operation than a >> position >> > > > >>> lookup; >> > > > >>> > if >> > > > >>> > >> the >> > > > >>> > >> > > latter isn't timely, users can just pause the partition >> > and >> > > > >>> continue >> > > > >>> > >> > > fetching on others. If you cannot commit offsets, >> however, >> > > it >> > > > >>> might >> > > > >>> > be >> > > > >>> > >> > > safer for an application to wait availability of the >> > > > coordinator >> > > > >>> > than >> > > > >>> > >> > > continuing. >> > > > >>> > >> > > >> > > > >>> > >> > > -Jason >> > > > >>> > >> > > >> > > > >>> > >> > > On Sun, Mar 4, 2018 at 10:14 PM, Guozhang Wang < >> > > > >>> wangg...@gmail.com> >> > > > >>> > >> > wrote: >> > > > >>> > >> > > >> > > > >>> > >> > > > Hello Richard, >> > > > >>> > >> > > > >> > > > >>> > >> > > > Thanks for the proposed KIP. I have a couple of >> general >> > > > >>> comments: >> > > > >>> > >> > > > >> > > > >>> > >> > > > 1. I'm not sure if piggy-backing the timeout >> exception >> > on >> > > > the >> > > > >>> > >> > > > existing requestTimeoutMs configured in " >> > > request.timeout.ms >> > > > " >> > > > >>> is a >> > > > >>> > >> good >> > > > >>> > >> > > > idea >> > > > >>> > >> > > > since a) it is a general config that applies for all >> > types >> > > > of >> > > > >>> > >> requests, >> > > > >>> > >> > > and >> > > > >>> > >> > > > 2) using it to cover all the phases of an API call, >> > > > including >> > > > >>> > >> network >> > > > >>> > >> > > round >> > > > >>> > >> > > > trip and potential metadata refresh is shown to not >> be a >> > > > good >> > > > >>> > idea, >> > > > >>> > >> as >> > > > >>> > >> > > > illustrated in KIP-91: >> > > > >>> > >> > > > >> > > > >>> > >> > > > https://cwiki.apache.org/confl >> uence/display/KAFKA/KIP- >> > > > >>> > >> > > > 91+Provide+Intuitive+User+Timeouts+in+The+Producer >> > > > >>> > >> > > > >> > > > >>> > >> > > > In fact, I think in KAFKA-4879 which is aimed for the >> > same >> > > > >>> issue >> > > > >>> > as >> > > > >>> > >> > > > KAFKA-6608, >> > > > >>> > >> > > > Jason has suggested we use a new config for the API. >> > Maybe >> > > > >>> this >> > > > >>> > >> would >> > > > >>> > >> > be >> > > > >>> > >> > > a >> > > > >>> > >> > > > more intuitive manner than reusing the >> > request.timeout.ms >> > > > >>> config. >> > > > >>> > >> > > > >> > > > >>> > >> > > > >> > > > >>> > >> > > > 2. Besides the Consumer.position() call, there are a >> > > couple >> > > > of >> > > > >>> > more >> > > > >>> > >> > > > blocking calls today that could result in infinite >> > > blocking: >> > > > >>> > >> > > > Consumer.commitSync() and Consumer.committed(), >> should >> > > they >> > > > be >> > > > >>> > >> > considered >> > > > >>> > >> > > > in this KIP as well? >> > > > >>> > >> > > > >> > > > >>> > >> > > > 3. There are a few other APIs that are today relying >> on >> > > > >>> > >> > > request.timeout.ms >> > > > >>> > >> > > > already for breaking the infinite blocking, namely >> > > > >>> > >> > > Consumer.partitionFor(), >> > > > >>> > >> > > > Consumer.OffsetAndTimestamp() and >> Consumer.listTopics(), >> > > if >> > > > >>> we are >> > > > >>> > >> > making >> > > > >>> > >> > > > the other blocking calls to be relying a new config >> as >> > > > >>> suggested >> > > > >>> > in >> > > > >>> > >> 1) >> > > > >>> > >> > > > above, should we also change the semantics of these >> API >> > > > >>> functions >> > > > >>> > >> for >> > > > >>> > >> > > > consistency? >> > > > >>> > >> > > > >> > > > >>> > >> > > > >> > > > >>> > >> > > > Guozhang >> > > > >>> > >> > > > >> > > > >>> > >> > > > >> > > > >>> > >> > > > >> > > > >>> > >> > > > >> > > > >>> > >> > > > On Sun, Mar 4, 2018 at 11:13 AM, Richard Yu < >> > > > >>> > >> > yohan.richard...@gmail.com> >> > > > >>> > >> > > > wrote: >> > > > >>> > >> > > > >> > > > >>> > >> > > > > Hi all, >> > > > >>> > >> > > > > >> > > > >>> > >> > > > > I would like to discuss a potential change which >> would >> > > be >> > > > >>> made >> > > > >>> > to >> > > > >>> > >> > > > > KafkaConsumer: >> > > > >>> > >> > > > > https://cwiki.apache.org/confluence/pages/viewpage >> . >> > > > >>> > >> > > > action?pageId=75974886 >> > > > >>> > >> > > > > >> > > > >>> > >> > > > > Thanks, >> > > > >>> > >> > > > > Richard Yu >> > > > >>> > >> > > > > >> > > > >>> > >> > > > >> > > > >>> > >> > > > >> > > > >>> > >> > > > >> > > > >>> > >> > > > -- >> > > > >>> > >> > > > -- Guozhang >> > > > >>> > >> > > > >> > > > >>> > >> > > >> > > > >>> > >> > >> > > > >>> > >> >> > > > >>> > >> >> > > > >>> > >> >> > > > >>> > >> -- >> > > > >>> > >> -- Guozhang >> > > > >>> > >> >> > > > >>> > > >> > > > >>> > > >> > > > >>> > >> > > > >>> >> > > > >> >> > > > >> >> > > > > >> > > > >> > > >> > > >> > > >> > > -- >> > > -- Guozhang >> > > >> > >> > >