Actually, could you clarify this a bit (since I'm not sure which thread you are referring to) - specifically, how would this tie in with the current timeout we have for the producer (for example)?
On Tue, Feb 17, 2015 at 02:55:44PM -0800, Jay Kreps wrote: > Yeah there was a separate thread on adding a client-side timeout to > requests. We should have this in the new java clients, it just isn't there > yet. When we do this the flush() call will implicitly have the same timeout > as the requests (since they will complete or fail by then). I think this > makes flush(timeout) and potentially close(timeout) both unnecessary. > > -Jay > > On Tue, Feb 17, 2015 at 2:44 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > > In the scala clients we have the socket.timeout config as we are using > > blocking IOs, when such timeout is reached the TimeoutException will be > > thrown from the socket and the client can handle it accordingly; in the > > java clients we are switching to non-blocking IOs and hence we will not > > have the socket timeout any more. > > > > I agree that we could add this client request timeout back in the java > > clients, in addition to allowing client / server's non-blocking selector to > > close idle sockets. > > > > Guozhang > > > > On Tue, Feb 17, 2015 at 1:55 PM, Jiangjie Qin <j...@linkedin.com.invalid> > > wrote: > > > > > I'm thinking the flush call timeout will naturally be the timeout for a > > > produce request, No? > > > > > > Currently it seems we don¹t have a timeout for client requests, should we > > > have one? > > > > > > ‹Jiangjie (Becket) Qin > > > > > > On 2/16/15, 8:19 PM, "Jay Kreps" <jay.kr...@gmail.com> wrote: > > > > > > >Yes, I think we all agree it would be good to add a client-side request > > > >timeout. That would effectively imply a flush timeout as well since any > > > >requests that couldn't complete in that time would be errors and hence > > > >completed in the definition we gave. > > > > > > > >-Jay > > > > > > > >On Mon, Feb 16, 2015 at 7:57 PM, Bhavesh Mistry > > > ><mistry.p.bhav...@gmail.com> > > > >wrote: > > > > > > > >> Hi All, > > > >> > > > >> Thanks Jay and all address concern. I am fine with just having > > flush() > > > >> method as long as it covers failure mode and resiliency. e.g We had > > > >> situation where entire Kafka cluster brokers were reachable, but upon > > > >> adding new kafka node and admin migrated "leader to new brokers" that > > > >>new > > > >> brokers is NOT reachable from producer stand point due to fire wall > > but > > > >> metadata would continue to elect new broker as leader for that > > > >>partition. > > > >> > > > >> All I am asking is either you will have to give-up sending to this > > > >>broker > > > >> or do something in this scenario. As for the current code 0.8.2 > > > >>release, > > > >> caller thread of flush() or close() method would be blocked for > > ever.... > > > >> so all I am asking is > > > >> > > > >> https://issues.apache.org/jira/browse/KAFKA-1659 > > > >> https://issues.apache.org/jira/browse/KAFKA-1660 > > > >> > > > >> Also, I recall that there is timeout also added to batch to indicate > > how > > > >> long "message" can retain in memory before expiring. > > > >> > > > >> Given, all this should this API be consistent with others up coming > > > >> patches for addressing similar problem(s). > > > >> > > > >> > > > >> Otherwise, what we have done is spawn a thread for just calling > > close() > > > >>or > > > >> flush with timeout for join on caller end. > > > >> > > > >> Anyway, I just wanted to give you issues with existing API and if you > > > >>guys > > > >> think this is fine then, I am ok with this approach. It is just that > > > >>caller > > > >> will have to do bit more work. > > > >> > > > >> > > > >> Thanks, > > > >> > > > >> Bhavesh > > > >> > > > >> On Thursday, February 12, 2015, Joel Koshy <jjkosh...@gmail.com> > > wrote: > > > >> > > > >> > Yes that is a counter-example. I'm okay either way on whether we > > > >> > should have just flush() or have a timeout. Bhavesh, does Jay's > > > >> > explanation a few replies prior address your concern? If so, shall > > we > > > >> > consider this closed? > > > >> > > > > >> > On Tue, Feb 10, 2015 at 01:36:23PM -0800, Jay Kreps wrote: > > > >> > > Yeah we could do that, I guess I just feel like it adds confusion > > > >> because > > > >> > > then you have to think about which timeout you want, when likely > > you > > > >> > don't > > > >> > > want a timeout at all. > > > >> > > > > > >> > > I guess the pattern I was thinking of was fflush or the java > > > >> equivalent, > > > >> > > which don't have timeouts: > > > >> > > > > > >> > > > > >> > > > >> > > > > > http://docs.oracle.com/javase/7/docs/api/java/io/OutputStream.html#flush( > > > >>) > > > >> > > > > > >> > > -Jay > > > >> > > > > > >> > > On Tue, Feb 10, 2015 at 10:41 AM, Joel Koshy <jjkosh...@gmail.com > > > > > > >> > wrote: > > > >> > > > > > >> > > > I think tryFlush with a timeout sounds good to me. This is > > really > > > >> more > > > >> > > > for consistency than anything else. I cannot think of any > > standard > > > >> > > > blocking calls off the top of my head that don't have a timed > > > >> variant. > > > >> > > > E.g., Thread.join, Object.wait, Future.get Either that, or they > > > >> > > > provide an entirely non-blocking mode (e.g., > > socketChannel.connect > > > >> > > > followed by finishConnect) > > > >> > > > > > > >> > > > Thanks, > > > >> > > > > > > >> > > > Joel > > > >> > > > > > > >> > > > On Tue, Feb 10, 2015 at 11:30:47AM -0500, Joe Stein wrote: > > > >> > > > > Jay, > > > >> > > > > > > > >> > > > > The .flush() call seems like it would be the best way if you > > > >>wanted > > > >> > > > to-do a > > > >> > > > > clean shutdown of the new producer? > > > >> > > > > > > > >> > > > > So, you could in your code "stop all incoming requests && > > > >> > > > producer.flush() > > > >> > > > > && system.exit(value)" and know pretty much you won't drop > > > >>anything > > > >> > on > > > >> > > > the > > > >> > > > > floor. > > > >> > > > > > > > >> > > > > This can be done with the callbacks and futures (sure) but > > > >>.flush() > > > >> > seems > > > >> > > > > to be the right time to block and a few lines of code, no? > > > >> > > > > > > > >> > > > > ~ Joestein > > > >> > > > > > > > >> > > > > On Tue, Feb 10, 2015 at 11:25 AM, Jay Kreps > > > >><jay.kr...@gmail.com> > > > >> > wrote: > > > >> > > > > > > > >> > > > > > Hey Bhavesh, > > > >> > > > > > > > > >> > > > > > If a broker is not available a new one should be elected to > > > >>take > > > >> > over, > > > >> > > > so > > > >> > > > > > although the flush might take longer it should still be > > quick. > > > >> > Even if > > > >> > > > not > > > >> > > > > > this should result in an error not a hang. > > > >> > > > > > > > > >> > > > > > The cases you enumerated are all covered already--if the > > user > > > >> > wants to > > > >> > > > > > retry that is covered by the retry setting in the client, > > for > > > >>all > > > >> > the > > > >> > > > > > errors that is considered completion of the request. The > > post > > > >> > > > condition of > > > >> > > > > > flush isn't that all sends complete successfully, just that > > > >>they > > > >> > > > complete. > > > >> > > > > > So if you try to send a message that is too big, when flush > > > >> returns > > > >> > > > calling > > > >> > > > > > .get() on the future should not block and should produce the > > > >> error. > > > >> > > > > > > > > >> > > > > > Basically the argument I am making is that the only reason > > you > > > >> > want to > > > >> > > > call > > > >> > > > > > flush() is to guarantee all the sends complete so if it > > > >>doesn't > > > >> > > > guarantee > > > >> > > > > > that it will be somewhat confusing. This does mean blocking, > > > >>but > > > >> > if you > > > >> > > > > > don't want to block on the send then you wouldn't call > > > >>flush(). > > > >> > > > > > > > > >> > > > > > This has no impact on the block.on.buffer full setting. That > > > >> > impacts > > > >> > > > what > > > >> > > > > > happens when send() can't append to the buffer because it is > > > >> full. > > > >> > > > flush() > > > >> > > > > > means any message previously sent (i.e. for which send() > > call > > > >>has > > > >> > > > returned) > > > >> > > > > > needs to have its request completed. Hope that makes sense. > > > >> > > > > > > > > >> > > > > > -Jay > > > >> > > > > > > > > >> > > > > > On Mon, Feb 9, 2015 at 11:52 PM, Bhavesh Mistry < > > > >> > > > > > mistry.p.bhav...@gmail.com> > > > >> > > > > > wrote: > > > >> > > > > > > > > >> > > > > > > HI Jay, > > > >> > > > > > > > > > >> > > > > > > Imagine, if you have flaky network connection to brokers, > > > >>and > > > >> if > > > >> > > > flush() > > > >> > > > > > > will be blocked if "one of broker is not available" ( > > > >>basically > > > >> > How > > > >> > > > would > > > >> > > > > > > be address failure mode and io thread not able to drain > > > >>records > > > >> > or > > > >> > > > busy > > > >> > > > > > due > > > >> > > > > > > to pending request". Do you flush() method is only to > > flush > > > >>to > > > >> > in mem > > > >> > > > > > queue > > > >> > > > > > > or flush to broker over the network(). > > > >> > > > > > > > > > >> > > > > > > Timeout helps with and pushing caller to handle what to do > > > >>? > > > >> > e.g > > > >> > > > > > > re-enqueue records, drop entire batch or one of message is > > > >>too > > > >> > big > > > >> > > > cross > > > >> > > > > > > the limit of max.message.size etc... > > > >> > > > > > > > > > >> > > > > > > Also, according to java doc for API "The method will > > block > > > >> > until all > > > >> > > > > > > previously sent records have completed sending (either > > > >> > successfully > > > >> > > > or > > > >> > > > > > with > > > >> > > > > > > an error)", does this by-pass rule set by for > > > >> > block.on.buffer.full or > > > >> > > > > > > batch.size > > > >> > > > > > > when under load. > > > >> > > > > > > > > > >> > > > > > > That was my intention, and I am sorry I mixed-up close() > > > >>method > > > >> > here > > > >> > > > > > > without knowing that this is only for bulk send. > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > Thanks, > > > >> > > > > > > > > > >> > > > > > > Bhavesh > > > >> > > > > > > > > > >> > > > > > > On Mon, Feb 9, 2015 at 8:17 PM, Jay Kreps > > > >><jay.kr...@gmail.com > > > >> > > > > >> > > > wrote: > > > >> > > > > > > > > > >> > > > > > > > Yeah I second the problem Guozhang flags with giving > > > >>flush a > > > >> > > > timeout. > > > >> > > > > > In > > > >> > > > > > > > general failover in Kafka is a bounded thing unless you > > > >>have > > > >> > > > brought > > > >> > > > > > your > > > >> > > > > > > > Kafka cluster down entirely so I think depending on that > > > >> bound > > > >> > > > > > implicitly > > > >> > > > > > > > is okay. > > > >> > > > > > > > > > > >> > > > > > > > It is possible to make flush() be instead > > > >> > > > > > > > boolean tryFlush(long timeout, TimeUnit unit); > > > >> > > > > > > > > > > >> > > > > > > > But I am somewhat skeptical that people will use this > > > >> > correctly. > > > >> > > > I.e > > > >> > > > > > > > consider the mirror maker code snippet I gave above, how > > > >> would > > > >> > one > > > >> > > > > > > actually > > > >> > > > > > > > recover in this case other than retrying (which the > > client > > > >> > already > > > >> > > > does > > > >> > > > > > > > automatically)? After all if you are okay losing data > > then > > > >> you > > > >> > > > don't > > > >> > > > > > need > > > >> > > > > > > > to bother calling flush at all, you can just let the > > > >>messages > > > >> > be > > > >> > > > sent > > > >> > > > > > > > asynchronously. > > > >> > > > > > > > > > > >> > > > > > > > I think close() is actually different because you may > > well > > > >> > want to > > > >> > > > > > > shutdown > > > >> > > > > > > > immediately and just throw away unsent events. > > > >> > > > > > > > > > > >> > > > > > > > -Jay > > > >> > > > > > > > > > > >> > > > > > > > On Mon, Feb 9, 2015 at 2:44 PM, Guozhang Wang < > > > >> > wangg...@gmail.com> > > > >> > > > > > > wrote: > > > >> > > > > > > > > > > >> > > > > > > > > The proposal looks good to me, will need some time to > > > >> review > > > >> > the > > > >> > > > > > > > > implementation RB later. > > > >> > > > > > > > > > > > >> > > > > > > > > Bhavesh, I am wondering how you will use a flush() > > with > > > >>a > > > >> > timeout > > > >> > > > > > since > > > >> > > > > > > > > such a call does not actually provide any flushing > > > >> > guarantees? > > > >> > > > > > > > > > > > >> > > > > > > > > As for close(), there is a separate JIRA for this: > > > >> > > > > > > > > > > > >> > > > > > > > > KAFKA-1660 < > > > >> https://issues.apache.org/jira/browse/KAFKA-1660 > > > >> > > > > > >> > > > > > > > > > > > >> > > > > > > > > Guozhang > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > On Mon, Feb 9, 2015 at 2:29 PM, Bhavesh Mistry < > > > >> > > > > > > > mistry.p.bhav...@gmail.com > > > >> > > > > > > > > > > > > >> > > > > > > > > wrote: > > > >> > > > > > > > > > > > >> > > > > > > > > > Hi Jay, > > > >> > > > > > > > > > > > > >> > > > > > > > > > How about adding timeout for each method calls > > > >> > > > > > > flush(timeout,TimeUnit) > > > >> > > > > > > > > and > > > >> > > > > > > > > > close(timeout,TimeUNIT) ? We had runway io thread > > > >>issue > > > >> > and > > > >> > > > caller > > > >> > > > > > > > > thread > > > >> > > > > > > > > > should not blocked for ever for these methods ? > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > Thanks, > > > >> > > > > > > > > > > > > >> > > > > > > > > > Bhavesh > > > >> > > > > > > > > > > > > >> > > > > > > > > > On Sun, Feb 8, 2015 at 12:18 PM, Jay Kreps < > > > >> > > > jay.kr...@gmail.com> > > > >> > > > > > > > wrote: > > > >> > > > > > > > > > > > > >> > > > > > > > > > > Well actually in the case of linger.ms = 0 the > > send > > > >>is > > > >> > still > > > >> > > > > > > > > > asynchronous > > > >> > > > > > > > > > > so calling flush() blocks until all the previously > > > >>sent > > > >> > > > records > > > >> > > > > > > have > > > >> > > > > > > > > > > completed. It doesn't speed anything up in that > > > >>case, > > > >> > though, > > > >> > > > > > since > > > >> > > > > > > > > they > > > >> > > > > > > > > > > are already available to send. > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > -Jay > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > On Sun, Feb 8, 2015 at 10:36 AM, Gwen Shapira < > > > >> > > > > > > gshap...@cloudera.com > > > >> > > > > > > > > > > > >> > > > > > > > > > > wrote: > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > Looks good to me. > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > I like the idea of not blocking additional sends > > > >>but > > > >> > not > > > >> > > > > > > > guaranteeing > > > >> > > > > > > > > > > that > > > >> > > > > > > > > > > > flush() will deliver them. > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > I assume that with linger.ms = 0, flush will > > just > > > >> be a > > > >> > > > noop > > > >> > > > > > > (since > > > >> > > > > > > > > the > > > >> > > > > > > > > > > > queue will be empty). Is that correct? > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > Gwen > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps < > > > >> > > > > > jay.kr...@gmail.com> > > > >> > > > > > > > > > wrote: > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > Following up on our previous thread on making > > > >>batch > > > >> > send > > > >> > > > a > > > >> > > > > > > little > > > >> > > > > > > > > > > easier, > > > >> > > > > > > > > > > > > here is a concrete proposal to add a flush() > > > >>method > > > >> > to > > > >> > > > the > > > >> > > > > > > > > producer: > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > >> > > > >> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+met > > > >>hod+to+the+producer+API > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > A proposed implementation is here: > > > >> > > > > > > > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-1865 > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > Thoughts? > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > -Jay > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > -- > > > >> > > > > > > > > -- Guozhang > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > > >> > > > > >> > > > > >> > > > > > > > > > > > > -- > > -- Guozhang > >