Hey Bhavesh, If a broker is not available a new one should be elected to take over, so although the flush might take longer it should still be quick. Even if not this should result in an error not a hang.
The cases you enumerated are all covered already--if the user wants to retry that is covered by the retry setting in the client, for all the errors that is considered completion of the request. The post condition of flush isn't that all sends complete successfully, just that they complete. So if you try to send a message that is too big, when flush returns calling .get() on the future should not block and should produce the error. Basically the argument I am making is that the only reason you want to call flush() is to guarantee all the sends complete so if it doesn't guarantee that it will be somewhat confusing. This does mean blocking, but if you don't want to block on the send then you wouldn't call flush(). This has no impact on the block.on.buffer full setting. That impacts what happens when send() can't append to the buffer because it is full. flush() means any message previously sent (i.e. for which send() call has returned) needs to have its request completed. Hope that makes sense. -Jay On Mon, Feb 9, 2015 at 11:52 PM, Bhavesh Mistry <mistry.p.bhav...@gmail.com> wrote: > HI Jay, > > Imagine, if you have flaky network connection to brokers, and if flush() > will be blocked if "one of broker is not available" ( basically How would > be address failure mode and io thread not able to drain records or busy due > to pending request". Do you flush() method is only to flush to in mem queue > or flush to broker over the network(). > > Timeout helps with and pushing caller to handle what to do ? e.g > re-enqueue records, drop entire batch or one of message is too big cross > the limit of max.message.size etc... > > Also, according to java doc for API "The method will block until all > previously sent records have completed sending (either successfully or with > an error)", does this by-pass rule set by for block.on.buffer.full or > batch.size > when under load. > > That was my intention, and I am sorry I mixed-up close() method here > without knowing that this is only for bulk send. > > > Thanks, > > Bhavesh > > On Mon, Feb 9, 2015 at 8:17 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > > > Yeah I second the problem Guozhang flags with giving flush a timeout. In > > general failover in Kafka is a bounded thing unless you have brought your > > Kafka cluster down entirely so I think depending on that bound implicitly > > is okay. > > > > It is possible to make flush() be instead > > boolean tryFlush(long timeout, TimeUnit unit); > > > > But I am somewhat skeptical that people will use this correctly. I.e > > consider the mirror maker code snippet I gave above, how would one > actually > > recover in this case other than retrying (which the client already does > > automatically)? After all if you are okay losing data then you don't need > > to bother calling flush at all, you can just let the messages be sent > > asynchronously. > > > > I think close() is actually different because you may well want to > shutdown > > immediately and just throw away unsent events. > > > > -Jay > > > > On Mon, Feb 9, 2015 at 2:44 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > The proposal looks good to me, will need some time to review the > > > implementation RB later. > > > > > > Bhavesh, I am wondering how you will use a flush() with a timeout since > > > such a call does not actually provide any flushing guarantees? > > > > > > As for close(), there is a separate JIRA for this: > > > > > > KAFKA-1660 <https://issues.apache.org/jira/browse/KAFKA-1660> > > > > > > Guozhang > > > > > > > > > On Mon, Feb 9, 2015 at 2:29 PM, Bhavesh Mistry < > > mistry.p.bhav...@gmail.com > > > > > > > wrote: > > > > > > > Hi Jay, > > > > > > > > How about adding timeout for each method calls > flush(timeout,TimeUnit) > > > and > > > > close(timeout,TimeUNIT) ? We had runway io thread issue and caller > > > thread > > > > should not blocked for ever for these methods ? > > > > > > > > > > > > Thanks, > > > > > > > > Bhavesh > > > > > > > > On Sun, Feb 8, 2015 at 12:18 PM, Jay Kreps <jay.kr...@gmail.com> > > wrote: > > > > > > > > > Well actually in the case of linger.ms = 0 the send is still > > > > asynchronous > > > > > so calling flush() blocks until all the previously sent records > have > > > > > completed. It doesn't speed anything up in that case, though, since > > > they > > > > > are already available to send. > > > > > > > > > > -Jay > > > > > > > > > > On Sun, Feb 8, 2015 at 10:36 AM, Gwen Shapira < > gshap...@cloudera.com > > > > > > > > wrote: > > > > > > > > > > > Looks good to me. > > > > > > > > > > > > I like the idea of not blocking additional sends but not > > guaranteeing > > > > > that > > > > > > flush() will deliver them. > > > > > > > > > > > > I assume that with linger.ms = 0, flush will just be a noop > (since > > > the > > > > > > queue will be empty). Is that correct? > > > > > > > > > > > > Gwen > > > > > > > > > > > > On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps <jay.kr...@gmail.com> > > > > wrote: > > > > > > > > > > > > > Following up on our previous thread on making batch send a > little > > > > > easier, > > > > > > > here is a concrete proposal to add a flush() method to the > > > producer: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API > > > > > > > > > > > > > > A proposed implementation is here: > > > > > > > https://issues.apache.org/jira/browse/KAFKA-1865 > > > > > > > > > > > > > > Thoughts? > > > > > > > > > > > > > > -Jay > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > >