Hi Jay, Currently I can see use case for 4). We’ve seen some application in LinkedIn having issue of undeployment failure because of timeout, it was because many Kafka consumers were shutdown at the same time and caused serialized consumer rebalance until retries are exhausted. I guess in practice normally producer shutdown won’t take that long, but theoretically it is possible. I remember Andrew mentioned similar use case in KAFKA-1660. We can also ask him about it. Maybe it depends on how we implement request timeout but currently I am not sure whether merely request timeout itself would be effective enough to limit shutdown time as required.
For 2), I don’t have a real world use case now. But I would argue it is very likely a valid use case. Imagine in the callback, user wants to do some stuff (checkpoint maybe?) but failed, in this case user probably want to stop sending further messages even though the message that triggered the callback was sent successfully. What do you think? Thanks. Jiangjie (Becket) Qin On 3/12/15, 8:11 PM, "Jay Kreps" <jay.kr...@gmail.com> wrote: >Hey Jiangjie, > >Thanks for the awesome summary, I think that really captures it. I agree, >the question is exactly whether there would be a motivational use case we >can brainstorm for those additional use cases? > >-Jay > >On Thu, Mar 12, 2015 at 4:24 PM, Jiangjie Qin <j...@linkedin.com.invalid> >wrote: > >> Hi folks, >> >> Since there are actually a long interleaved discussions in KAFKA-1660 >>and >> KAFKA-1659, I summarized them in the rejected alternatives sections - >>the >> section exists for a reason and I shouldn’t have always put None there. >>. >> . It looks we took a while to agree upon the current interface. >> >> We basically has the following requirements: >> >> *Assuming request timeout is available* >> >> 1. Need to close a producer forcefully if a message send failed. >> 2. Need to close a producer forcefully even if no message send failed. >> 3. Need to close a producer elegantly but with a bounded waiting time. >> 4. Need to close a producer forcefully after an arbitrary waiting time >> other than request timeout. >> >> The options we have now are (): >> A. close(timeout) - address all four. >> B. Close() + abort.on.send.fail - address 1) and 3) >> C. abort() + close() - address 1) and 2) and 3) >> >> So I think we can narrow the discussion to be whether 2) and 4) are >>valid >> use case or not. As soon as we reach conclusion on what requirements >>need >> to be met, we would have a conclusion to the solution. >> I think 2) might be a valid because of some application level logic. >> I am not sure if 4) is a valid requirement, but form KAFKA-1660, my >> understanding is it is. >> >> Any idea? >> >> Jiangjie (Becket) Qin >> >> >> On 3/12/15, 11:50 AM, "Jiangjie Qin" <j...@linkedin.com> wrote: >> >> >Hey Guozhang, >> > >> >Thanks for the comments. I updated the page as suggested. >> >For 3), that’s right, I put this in java doc. Do you think we need to >> >reject value other than -1? >> >For 4), I think user will notice this easily because the thread will >>block >> >and producer is not going to shutdown. About using close(-1) quietly in >> >sender thread when close() is called, my concern is that user might >>not be >> >aware of that. Maybe we can put an error messages if user call close() >>in >> >sender thread? >> > >> >Thanks, >> > >> >Jiangjie (Becket) Qin >> > >> >On 3/12/15, 5:13 AM, "Guozhang Wang" <wangg...@gmail.com> wrote: >> > >> >>Hi Becket, >> >> >> >>Some comments on the wiki page: >> >> >> >>1. There are a few typos, for example "multivations", "wiithin". >> >> >> >>2. I think the main motivation could just be "current close() needs to >> >>block on flushing all buffered data, however there are scenarios in >>which >> >>producers would like to close without blocking on flushing data, or >>even >> >>close immediately and make sure any buffered data are dropped instead >>of >> >>sending out." You can probably give some examples for such scenarios. >> >> >> >>3. close(-1, TimeUnit.MILLISECONDS) => from the implementation it >>seems >> >>any >> >>negative value has the same semantics. >> >> >> >>4. In sender thread only close(-1, TimeUnit.MILLISECONDS) should be >> >>called >> >>=> this is not programmatically enforced. Shall we just try to >>enforce it >> >>via, for example, checking if caller thread is the IO thread, and if >>yes >> >>just use close(-1)? >> >> >> >>5. Proposed Changes => it seems you only talked about the close(-1) >>case, >> >>how about close(>=0)? >> >> >> >>Guozhang >> >> >> >>On Thu, Mar 12, 2015 at 1:49 AM, Jiangjie Qin >><j...@linkedin.com.invalid >> > >> >>wrote: >> >> >> >>> Hey Joe & Jay, >> >>> >> >>> Thanks for the comments on the voting thread. Since it seems we >> >>>probably >> >>> will have more discussion on this, I am just replying from the >> >>>discussion >> >>> thread here. >> >>> I’ve updated the KIP page to make it less like half-baked, apologize >> >>>for >> >>> the rush... >> >>> >> >>> The contract in current KIP is: >> >>> 1. close() - wait until all requests either are sent or reach >>request >> >>> timeout. >> >>> 2. close(-1, TimeUnit.MILLISECONDS) - close immediately >> >>> 3. close(0, TimeUnit.MILLISECONDS) - equivalent to close(), i.e. >>Wait >> >>> until all requests are sent or reach request timeout >> >>> 4. close(5, TimeUnit.MILLISECONDS) - try the best to finish >>sending >> >>>in 5 >> >>> milliseconds, if something went wrong, just shutdown the producer >> >>>anyway, >> >>> my callback will handle the failures. >> >>> >> >>> About how we define what timeout value stands for, I actually >>struggled >> >>>a >> >>> little bit when wrote the patch. Intuitively, close(0) should mean >> >>> immediately, however it seems that all the existing java class have >> >>>this >> >>> convention of timeout=0 means no timeout or never timeout >> >>>(Thread.join(0), >> >>> Object.wait(0), etc.) So here the dilemma is either we follow the >> >>> intuition or we follow the convention. What I chose is to follow the >> >>> convention but document the interface to let user be aware of the >> >>>usage. >> >>> The reason is that I think producer.close() is a public interface >>so it >> >>> might be better to follow java convention. Whereas selector is not a >> >>> public interface that used by user, so as long as it makes sense to >>us, >> >>>it >> >>> is less a problem to be different from java convention. That said >>since >> >>> consumer.poll(timeout) is also a public interface, I think it also >> >>>makes >> >>> sense to make producer.close() to have the same definition of >> >>> consumer.poll(timeout). >> >>> >> >>> The main argument for keeping a timeout in close would be separating >> >>>the >> >>> close timeout from request timeout, which probably makes sense. I >>would >> >>> guess typically the request timeout would be long (e.g. 60 seconds) >> >>> because we might want to consider retries with back off time. If we >> >>>have >> >>> multiple batches in accumulator, in worst case that could take up to >> >>> several minutes to complete all the requests. But when we close a >> >>> producer, we might not want to wait for that long as it might cause >> >>>some >> >>> other problem like deployment tool timeout. >> >>> >> >>> There is also a subtle difference between close(timeout) and >> >>> flush(timeout). The only purpose for flush() is to write data to the >> >>> broker, so it makes perfect sense to wait until request timeout. I >> >>>think >> >>> that is why flush(timeout) looks strange. On the other hand, the top >> >>> priority for close() is to close the producer rather than flush() >>data, >> >>>so >> >>> close(timeout) gives guarantee on bounded waiting for its main job. >> >>> >> >>> Sorry for the confusion about forceClose flag. It is not a public >> >>> interface. I mentioned it in Proposed Changes section which I >>thought >> >>>was >> >>> supposed to provide implementation details. >> >>> >> >>> Thanks again for all the comments and suggestions! >> >>> >> >>> Jiangjie (Becket) Qin >> >>> >> >>> On 3/10/15, 8:57 PM, "Jiangjie Qin" <j...@linkedin.com> wrote: >> >>> >> >>> >The KIP page has been updated per Jay¹s comments. >> >>> >I¹d like to initiate the voting process if no further comments are >> >>> >received by tomorrow. >> >>> > >> >>> >Jiangjie (Becket) Qin >> >>> > >> >>> >On 3/8/15, 9:45 AM, "Jay Kreps" <jay.kr...@gmail.com> wrote: >> >>> > >> >>> >>Hey Jiangjie, >> >>> >> >> >>> >>Can you capture the full motivation and use cases for the feature? >> >>>This >> >>> >>mentions your interest in having a way of aborting from inside the >> >>> >>Callback. But it doesn't really explain that usage or why other >> >>>people >> >>> >>would want to do that. It also doesn't list the primary use case >>for >> >>> >>having >> >>> >>close with a bounded timeout which was to avoid blocking too long >>on >> >>> >>shutdown. >> >>> >> >> >>> >>-Jay >> >>> >> >> >>> >> >> >>> >> >> >>> >>On Sat, Mar 7, 2015 at 12:25 PM, Jiangjie Qin >> >>><j...@linkedin.com.invalid >> >>> > >> >>> >>wrote: >> >>> >> >> >>> >>> Hi, >> >>> >>> >> >>> >>> I just created a KIP for adding a close(timeout) to new >>producer. >> >>>Most >> >>> >>>of >> >>> >>> the previous discussions are in KAFKA-1660 where Parth >>Brahmbhatt >> >>>has >> >>> >>> already done a lot of work. >> >>> >>> Since this is an interface change so we are going through the >>KIP >> >>> >>>process. >> >>> >>> Here is the KIP link: >> >>> >>> >> >>> >>> >> >>> >> >>> >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=5373978 >> >>> >>>2 >> >>> >>> >> >>> >>> Thanks. >> >>> >>> >> >>> >>> Jiangjie (Becket) Qin >> >>> >>> >> >>> > >> >>> >> >>> >> >> >> >> >> >>-- >> >>-- Guozhang >> > >> >>