> silly. But in the absense of flush there is no way to say that. As you say > you only may that penalty on one of the get() calls, but if the linger.ms > is high (say 60 seconds) that will be a huge penalty.
That makes sense - thanks for clarifying. On Mon, Feb 09, 2015 at 08:11:46PM -0800, Jay Kreps wrote: > Hey Joel, > > The use case would be for something like mirror maker. You want to do > something like the following: > > while(true) { > val recs = consumer.poll(time); > for(rec <- recs) > producer.send(rec); > producer.flush(); > consumer.commit(); > } > > If you replace flush() with just calling get() on the records the problem > is that the get call will block for linger.ms plus the time to send. But at > the time you call flush you are actually done sending new stuff and you > want that stuff to get sent, lingering around in case of new writes is > silly. But in the absense of flush there is no way to say that. As you say > you only may that penalty on one of the get() calls, but if the linger.ms > is high (say 60 seconds) that will be a huge penalty. > > -Jay > > On Mon, Feb 9, 2015 at 6:23 PM, Joel Koshy <jjkosh...@gmail.com> wrote: > > > - WRT the motivation: "if you set linger.ms > 0 to encourage batching > > of messages, which is likely a good idea for this kind of use case, > > then the second for loop will block for a ms" -> however, in > > practice this will really only be for the first couple of calls > > right? Since the subsequent calls would return immediately since in > > all likelihood those subsequent messages would have gone out on the > > previous message's batch. > > - I think Bhavesh's suggestion on the timeout makes sense for > > consistency (with other blocking-style calls) if nothing else. > > - Does it make sense to fold in the API changes for KAFKA-1660 and > > KAFKA-1669 and do all at once? > > > > Thanks, > > > > Joel > > > > > > On Mon, Feb 09, 2015 at 02:44:06PM -0800, Guozhang Wang wrote: > > > The proposal looks good to me, will need some time to review the > > > implementation RB later. > > > > > > Bhavesh, I am wondering how you will use a flush() with a timeout since > > > such a call does not actually provide any flushing guarantees? > > > > > > As for close(), there is a separate JIRA for this: > > > > > > KAFKA-1660 <https://issues.apache.org/jira/browse/KAFKA-1660> > > > > > > Guozhang > > > > > > > > > On Mon, Feb 9, 2015 at 2:29 PM, Bhavesh Mistry < > > mistry.p.bhav...@gmail.com> > > > wrote: > > > > > > > Hi Jay, > > > > > > > > How about adding timeout for each method calls flush(timeout,TimeUnit) > > and > > > > close(timeout,TimeUNIT) ? We had runway io thread issue and caller > > thread > > > > should not blocked for ever for these methods ? > > > > > > > > > > > > Thanks, > > > > > > > > Bhavesh > > > > > > > > On Sun, Feb 8, 2015 at 12:18 PM, Jay Kreps <jay.kr...@gmail.com> > > wrote: > > > > > > > > > Well actually in the case of linger.ms = 0 the send is still > > > > asynchronous > > > > > so calling flush() blocks until all the previously sent records have > > > > > completed. It doesn't speed anything up in that case, though, since > > they > > > > > are already available to send. > > > > > > > > > > -Jay > > > > > > > > > > On Sun, Feb 8, 2015 at 10:36 AM, Gwen Shapira <gshap...@cloudera.com > > > > > > > > wrote: > > > > > > > > > > > Looks good to me. > > > > > > > > > > > > I like the idea of not blocking additional sends but not > > guaranteeing > > > > > that > > > > > > flush() will deliver them. > > > > > > > > > > > > I assume that with linger.ms = 0, flush will just be a noop > > (since the > > > > > > queue will be empty). Is that correct? > > > > > > > > > > > > Gwen > > > > > > > > > > > > On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps <jay.kr...@gmail.com> > > > > wrote: > > > > > > > > > > > > > Following up on our previous thread on making batch send a little > > > > > easier, > > > > > > > here is a concrete proposal to add a flush() method to the > > producer: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API > > > > > > > > > > > > > > A proposed implementation is here: > > > > > > > https://issues.apache.org/jira/browse/KAFKA-1865 > > > > > > > > > > > > > > Thoughts? > > > > > > > > > > > > > > -Jay > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > >