I implemented the flush() call I hypothesized earlier in this thread as a patch on KAFKA-1865.
So now producer.flush() will block until all buffered requests complete. The post condition is that all previous send futures are satisfied and have error/offset information. This is a little easier to use then keeping a list of all the futures and also probably a bit more efficient. It also immediately unblocks all requests regardless of the linger.ms setting, which is in keeping with the name I think and important for the use case we described. Code here: https://issues.apache.org/jira/browse/KAFKA-1865 -Jay On Sat, Feb 7, 2015 at 1:24 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > Hey Otis, > > Yeah, Gwen is correct. The future from the send will be satisfied when the > response is received so it will be exactly the same as the performance of > the sync producer previously. > > -Jay > > On Mon, Feb 2, 2015 at 1:34 PM, Gwen Shapira <gshap...@cloudera.com> > wrote: > >> If I understood the code and Jay correctly - if you wait for the >> future it will be a similar delay to that of the old sync producer. >> >> Put another way, if you test it out and see longer delays than the >> sync producer had, we need to find out why and fix it. >> >> Gwen >> >> On Mon, Feb 2, 2015 at 1:27 PM, Otis Gospodnetic >> <otis.gospodne...@gmail.com> wrote: >> > Hi, >> > >> > Nope, unfortunately it can't do that. X is a remote app, doesn't >> listen to >> > anything external, calls Y via HTTPS. So X has to decide what to do >> with >> > its data based on Y's synchronous response. It has to block until Y >> > responds. And it wouldn't be pretty, I think, because nobody wants to >> run >> > apps that talk to remove servers and hang on to connections more than >> they >> > have to. But perhaps that is the only way? Or maybe the answer to "I'm >> > guessing the delay would be more or less the same as if the Producer was >> > using SYNC mode?" is YES, in which case the connection from X to Y >> would be >> > open for just as long as with a SYNC producer running in Y? >> > >> > Thanks, >> > Otis >> > -- >> > Monitoring * Alerting * Anomaly Detection * Centralized Log Management >> > Solr & Elasticsearch Support * http://sematext.com/ >> > >> > >> > On Mon, Feb 2, 2015 at 4:03 PM, Gwen Shapira <gshap...@cloudera.com> >> wrote: >> > >> >> Can Y have a callback that will handle the notification to X? >> >> In this case, perhaps Y can be async and X can buffer the data until >> >> the callback triggers and says "all good" (or resend if the callback >> >> indicates an error) >> >> >> >> On Mon, Feb 2, 2015 at 12:56 PM, Otis Gospodnetic >> >> <otis.gospodne...@gmail.com> wrote: >> >> > Hi, >> >> > >> >> > Thanks for the info. Here's the use case. We have something up >> stream >> >> > sending data, say a log shipper called X. It sends it to some remote >> >> > component Y. Y is the Kafka Producer and it puts data into Kafka. >> But Y >> >> > needs to send a reply to X and tell it whether it successfully put >> all >> >> its >> >> > data into Kafka. If it did not, Y wants to tell X to buffer data >> locally >> >> > and resend it later. >> >> > >> >> > If producer is ONLY async, Y can't easily do that. Or maybe Y would >> just >> >> > need to wait for the Future to come back and only then send the >> response >> >> > back to X? If so, I'm guessing the delay would be more or less the >> same >> >> as >> >> > if the Producer was using SYNC mode? >> >> > >> >> > Thanks, >> >> > Otis >> >> > -- >> >> > Monitoring * Alerting * Anomaly Detection * Centralized Log >> Management >> >> > Solr & Elasticsearch Support * http://sematext.com/ >> >> > >> >> > >> >> > On Mon, Feb 2, 2015 at 3:13 PM, Jay Kreps <jay.kr...@gmail.com> >> wrote: >> >> > >> >> >> Yeah as Gwen says there is no sync/async mode anymore. There is a >> new >> >> >> configuration which does a lot of what async did in terms of >> allowing >> >> >> batching: >> >> >> >> >> >> batch.size - This is the target amount of data per partition the >> server >> >> >> will attempt to batch together. >> >> >> linger.ms - This is the time the producer will wait for more data >> to be >> >> >> sent to better batch up writes. The default is 0 (send >> immediately). So >> >> if >> >> >> you set this to 50 ms the client will send immediately if it has >> already >> >> >> filled up its batch, otherwise it will wait to accumulate the >> number of >> >> >> bytes given by batch.size. >> >> >> >> >> >> To send asynchronously you do >> >> >> producer.send(record) >> >> >> whereas to block on a response you do >> >> >> producer.send(record).get(); >> >> >> which will wait for acknowledgement from the server. >> >> >> >> >> >> One advantage of this model is that the client will do it's best to >> >> batch >> >> >> under the covers even if linger.ms=0. It will do this by batching >> any >> >> data >> >> >> that arrives while another send is in progress into a single >> >> >> request--giving a kind of "group commit" effect. >> >> >> >> >> >> The hope is that this will be both simpler to understand (a single >> api >> >> that >> >> >> always works the same) and more powerful (you always get a response >> with >> >> >> error and offset information whether or not you choose to use it). >> >> >> >> >> >> -Jay >> >> >> >> >> >> >> >> >> On Mon, Feb 2, 2015 at 11:15 AM, Gwen Shapira < >> gshap...@cloudera.com> >> >> >> wrote: >> >> >> >> >> >> > If you want to emulate the old sync producer behavior, you need >> to set >> >> >> > the batch size to 1 (in producer config) and wait on the future >> you >> >> >> > get from Send (i.e. future.get) >> >> >> > >> >> >> > I can't think of good reasons to do so, though. >> >> >> > >> >> >> > Gwen >> >> >> > >> >> >> > >> >> >> > On Mon, Feb 2, 2015 at 11:08 AM, Otis Gospodnetic >> >> >> > <otis.gospodne...@gmail.com> wrote: >> >> >> > > Hi, >> >> >> > > >> >> >> > > Is the plan for New Producer to have ONLY async mode? I'm >> asking >> >> >> because >> >> >> > > of this info from the Wiki: >> >> >> > > >> >> >> > > >> >> >> > > - The producer will always attempt to batch data and will >> always >> >> >> > > immediately return a SendResponse which acts as a Future to >> allow >> >> >> the >> >> >> > > client to await the completion of the request. >> >> >> > > >> >> >> > > >> >> >> > > The word "always" makes me think there will be no sync mode. >> >> >> > > >> >> >> > > Thanks, >> >> >> > > Otis >> >> >> > > -- >> >> >> > > Monitoring * Alerting * Anomaly Detection * Centralized Log >> >> Management >> >> >> > > Solr & Elasticsearch Support * http://sematext.com/ >> >> >> > >> >> >> >> >> >> > >