Jay, Thanks I'll look at that more closely. On Sat, Feb 7, 2015 at 1:23 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
> Steve > > In terms of mimicing the sync behavior, I think that is what .get() does, > no? > > We are always returning the offset and error information. The example I > gave didn't make use of it, but you definitely can make use of it if you > want to. > > -Jay > > On Wed, Feb 4, 2015 at 9:58 AM, Steve Morin <st...@stevemorin.com> wrote: > > > Looking at this thread I would ideally want something at least the right > > recipe to mimic sync behavior like Otis is talking about. > > > > In the second case, would like to be able to individually know if > messages > > have failed even regardless if they are in separate batches, sort of like > > what Kinesis does as Pradeep mentioned. > > -Steve > > > > On Wed, Feb 4, 2015 at 11:19 AM, Jay Kreps <jay.kr...@gmail.com> wrote: > > > > > Yeah totally. Using a callback is, of course, the Right Thing for this > > kind > > > of stuff. But I have found that kind of asynchronous thinking can be > hard > > > for people. Even if you get out of the pre-java 8 syntactic pain that > > > anonymous inner classes inflict just dealing with multiple threads of > > > control without creating async spaghetti can be a challenge for complex > > > stuff. That is really the only reason for the futures in the api, they > > are > > > strictly less powerful than the callbacks, but at least using them you > > can > > > just call .get() and pretend it is blocking. > > > > > > -Jay > > > > > > On Wed, Feb 4, 2015 at 7:19 AM, Joe Stein <joe.st...@stealth.ly> > wrote: > > > > > > > Now that 0.8.2.0 is in the wild I look forward to working with more > and > > > > seeing what folks start to-do with this function > > > > > > > > > > > > > > https://dist.apache.org/repos/dist/release/kafka/0.8.2.0/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send(org.apache.kafka.clients.producer.ProducerRecord > > > > , > > > > org.apache.kafka.clients.producer.Callback) and keeping it fully non > > > > blocking. > > > > > > > > One sprint I know of coming up is going to have the new producer as a > > > > component in their reactive calls and handling bookkeeping and > retries > > > > through that type of call back approach. Should work well (haven't > > tried > > > > but don't see why not) with Akka, ScalaZ, RxJava, Finagle, etc, etc, > > etc > > > in > > > > functional languages and frameworks. > > > > > > > > I think as JDK 8 starts to get out in the wild too more (may after > jdk7 > > > > eol) the use of .get will be reduced (imho) and folks will be > thinking > > > more > > > > about non-blocking vs blocking and not as so much sync vs async but > my > > > > crystal ball just back from the shop so well see =8^) > > > > > > > > /******************************************* > > > > Joe Stein > > > > Founder, Principal Consultant > > > > Big Data Open Source Security LLC > > > > http://www.stealth.ly > > > > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> > > > > ********************************************/ > > > > > > > > On Tue, Feb 3, 2015 at 10:45 PM, Jay Kreps <jay.kr...@gmail.com> > > wrote: > > > > > > > > > Hey guys, > > > > > > > > > > I guess the question is whether it really matters how many > underlying > > > > > network requests occur? It is very hard for an application to > depend > > on > > > > > this even in the old producer since it depends on the partitions > > > > placement > > > > > (a send to two partitions may go to either one machine or two and > so > > it > > > > > will send either one or two requests). So when you send a batch in > > one > > > > call > > > > > you may feel that is "all at once", but that is only actually > > > guaranteed > > > > if > > > > > all messages have the same partition. > > > > > > > > > > The challenge is allowing even this in the presence of bounded > > request > > > > > sizes which we have in the new producer. The user sends a list of > > > objects > > > > > and the serialized size that will result is not very apparent to > > them. > > > If > > > > > you break it up into multiple requests then that is kind of further > > > > ruining > > > > > the illusion of a single send. If you don't then you have to just > > error > > > > out > > > > > which is equally annoying to have to handle. > > > > > > > > > > But I'm not sure if from your description you are saying you > actually > > > > care > > > > > how many physical requests are issued. I think it is more like it > is > > > just > > > > > syntactically annoying to send a batch of data now because it > needs a > > > for > > > > > loop. > > > > > > > > > > Currently to do this you would do: > > > > > > > > > > List responses = new ArrayList(); > > > > > for(input: recordBatch) > > > > > responses.add(producer.send(input)); > > > > > for(response: responses) > > > > > response.get > > > > > > > > > > If you don't depend on the offset/error info we could add a flush > > call > > > so > > > > > you could instead do > > > > > for(input: recordBatch) > > > > > producer.send(input); > > > > > producer.flush(); > > > > > > > > > > But if you do want the error/offset then you are going to be back > to > > > the > > > > > original case. > > > > > > > > > > Thoughts? > > > > > > > > > > -Jay > > > > > > > > > > On Mon, Feb 2, 2015 at 1:48 PM, Gwen Shapira < > gshap...@cloudera.com> > > > > > wrote: > > > > > > > > > > > I've been thinking about that too, since both Flume and Sqoop > rely > > on > > > > > > send(List) API of the old API. > > > > > > > > > > > > I'd like to see this API come back, but I'm debating how we'd > > handle > > > > > > errors. IIRC, the old API would fail an entire batch on a single > > > > > > error, which can lead to duplicates. Having N callbacks lets me > > retry > > > > > > / save / whatever just the messages that had issues. > > > > > > > > > > > > If messages had identifiers from the producer side, we could have > > the > > > > > > API call the callback with a list of message-ids and their > status. > > > But > > > > > > they don't :) > > > > > > > > > > > > Any thoughts on how you'd like it to work? > > > > > > > > > > > > Gwen > > > > > > > > > > > > > > > > > > On Mon, Feb 2, 2015 at 1:38 PM, Pradeep Gollakota < > > > > pradeep...@gmail.com> > > > > > > wrote: > > > > > > > This is a great question Otis. Like Gwen said, you can > accomplish > > > > Sync > > > > > > mode > > > > > > > by setting the batch size to 1. But this does highlight a > > > shortcoming > > > > > of > > > > > > > the new producer API. > > > > > > > > > > > > > > I really like the design of the new API and it has really great > > > > > > properties > > > > > > > and I'm enjoying working with it. However, once API that I > think > > > > we're > > > > > > > lacking is a "batch" API. Currently, I have to iterate over a > > batch > > > > and > > > > > > > call .send() on each record, which returns n callbacks instead > > of 1 > > > > > > > callback for the whole batch. This significantly complicates > > > recovery > > > > > > logic > > > > > > > where we need to commit a batch as opposed 1 record at a time. > > > > > > > > > > > > > > Do you guys have any plans to add better semantics around > > batches? > > > > > > > > > > > > > > On Mon, Feb 2, 2015 at 1:34 PM, Gwen Shapira < > > > gshap...@cloudera.com> > > > > > > wrote: > > > > > > > > > > > > > >> If I understood the code and Jay correctly - if you wait for > the > > > > > > >> future it will be a similar delay to that of the old sync > > > producer. > > > > > > >> > > > > > > >> Put another way, if you test it out and see longer delays than > > the > > > > > > >> sync producer had, we need to find out why and fix it. > > > > > > >> > > > > > > >> Gwen > > > > > > >> > > > > > > >> On Mon, Feb 2, 2015 at 1:27 PM, Otis Gospodnetic > > > > > > >> <otis.gospodne...@gmail.com> wrote: > > > > > > >> > Hi, > > > > > > >> > > > > > > > >> > Nope, unfortunately it can't do that. X is a remote app, > > > doesn't > > > > > > listen > > > > > > >> to > > > > > > >> > anything external, calls Y via HTTPS. So X has to decide > what > > > to > > > > do > > > > > > with > > > > > > >> > its data based on Y's synchronous response. It has to block > > > > until Y > > > > > > >> > responds. And it wouldn't be pretty, I think, because > nobody > > > > wants > > > > > to > > > > > > >> run > > > > > > >> > apps that talk to remove servers and hang on to connections > > more > > > > > than > > > > > > >> they > > > > > > >> > have to. But perhaps that is the only way? Or maybe the > > answer > > > > to > > > > > > "I'm > > > > > > >> > guessing the delay would be more or less the same as if the > > > > Producer > > > > > > was > > > > > > >> > using SYNC mode?" is YES, in which case the connection from > X > > > to Y > > > > > > would > > > > > > >> be > > > > > > >> > open for just as long as with a SYNC producer running in Y? > > > > > > >> > > > > > > > >> > Thanks, > > > > > > >> > Otis > > > > > > >> > -- > > > > > > >> > Monitoring * Alerting * Anomaly Detection * Centralized Log > > > > > Management > > > > > > >> > Solr & Elasticsearch Support * http://sematext.com/ > > > > > > >> > > > > > > > >> > > > > > > > >> > On Mon, Feb 2, 2015 at 4:03 PM, Gwen Shapira < > > > > gshap...@cloudera.com > > > > > > > > > > > > >> wrote: > > > > > > >> > > > > > > > >> >> Can Y have a callback that will handle the notification to > X? > > > > > > >> >> In this case, perhaps Y can be async and X can buffer the > > data > > > > > until > > > > > > >> >> the callback triggers and says "all good" (or resend if the > > > > > callback > > > > > > >> >> indicates an error) > > > > > > >> >> > > > > > > >> >> On Mon, Feb 2, 2015 at 12:56 PM, Otis Gospodnetic > > > > > > >> >> <otis.gospodne...@gmail.com> wrote: > > > > > > >> >> > Hi, > > > > > > >> >> > > > > > > > >> >> > Thanks for the info. Here's the use case. We have > > something > > > > up > > > > > > >> stream > > > > > > >> >> > sending data, say a log shipper called X. It sends it to > > > some > > > > > > remote > > > > > > >> >> > component Y. Y is the Kafka Producer and it puts data > into > > > > > Kafka. > > > > > > >> But Y > > > > > > >> >> > needs to send a reply to X and tell it whether it > > > successfully > > > > > put > > > > > > all > > > > > > >> >> its > > > > > > >> >> > data into Kafka. If it did not, Y wants to tell X to > > buffer > > > > data > > > > > > >> locally > > > > > > >> >> > and resend it later. > > > > > > >> >> > > > > > > > >> >> > If producer is ONLY async, Y can't easily do that. Or > > maybe > > > Y > > > > > > would > > > > > > >> just > > > > > > >> >> > need to wait for the Future to come back and only then > send > > > the > > > > > > >> response > > > > > > >> >> > back to X? If so, I'm guessing the delay would be more > or > > > less > > > > > the > > > > > > >> same > > > > > > >> >> as > > > > > > >> >> > if the Producer was using SYNC mode? > > > > > > >> >> > > > > > > > >> >> > Thanks, > > > > > > >> >> > Otis > > > > > > >> >> > -- > > > > > > >> >> > Monitoring * Alerting * Anomaly Detection * Centralized > Log > > > > > > Management > > > > > > >> >> > Solr & Elasticsearch Support * http://sematext.com/ > > > > > > >> >> > > > > > > > >> >> > > > > > > > >> >> > On Mon, Feb 2, 2015 at 3:13 PM, Jay Kreps < > > > jay.kr...@gmail.com > > > > > > > > > > > >> wrote: > > > > > > >> >> > > > > > > > >> >> >> Yeah as Gwen says there is no sync/async mode anymore. > > There > > > > is > > > > > a > > > > > > new > > > > > > >> >> >> configuration which does a lot of what async did in > terms > > of > > > > > > allowing > > > > > > >> >> >> batching: > > > > > > >> >> >> > > > > > > >> >> >> batch.size - This is the target amount of data per > > partition > > > > the > > > > > > >> server > > > > > > >> >> >> will attempt to batch together. > > > > > > >> >> >> linger.ms - This is the time the producer will wait for > > > more > > > > > data > > > > > > >> to be > > > > > > >> >> >> sent to better batch up writes. The default is 0 (send > > > > > > immediately). > > > > > > >> So > > > > > > >> >> if > > > > > > >> >> >> you set this to 50 ms the client will send immediately > if > > it > > > > has > > > > > > >> already > > > > > > >> >> >> filled up its batch, otherwise it will wait to > accumulate > > > the > > > > > > number > > > > > > >> of > > > > > > >> >> >> bytes given by batch.size. > > > > > > >> >> >> > > > > > > >> >> >> To send asynchronously you do > > > > > > >> >> >> producer.send(record) > > > > > > >> >> >> whereas to block on a response you do > > > > > > >> >> >> producer.send(record).get(); > > > > > > >> >> >> which will wait for acknowledgement from the server. > > > > > > >> >> >> > > > > > > >> >> >> One advantage of this model is that the client will do > > it's > > > > best > > > > > > to > > > > > > >> >> batch > > > > > > >> >> >> under the covers even if linger.ms=0. It will do this > by > > > > > batching > > > > > > >> any > > > > > > >> >> data > > > > > > >> >> >> that arrives while another send is in progress into a > > single > > > > > > >> >> >> request--giving a kind of "group commit" effect. > > > > > > >> >> >> > > > > > > >> >> >> The hope is that this will be both simpler to understand > > (a > > > > > single > > > > > > >> api > > > > > > >> >> that > > > > > > >> >> >> always works the same) and more powerful (you always > get a > > > > > > response > > > > > > >> with > > > > > > >> >> >> error and offset information whether or not you choose > to > > > use > > > > > it). > > > > > > >> >> >> > > > > > > >> >> >> -Jay > > > > > > >> >> >> > > > > > > >> >> >> > > > > > > >> >> >> On Mon, Feb 2, 2015 at 11:15 AM, Gwen Shapira < > > > > > > gshap...@cloudera.com > > > > > > >> > > > > > > > >> >> >> wrote: > > > > > > >> >> >> > > > > > > >> >> >> > If you want to emulate the old sync producer behavior, > > you > > > > > need > > > > > > to > > > > > > >> set > > > > > > >> >> >> > the batch size to 1 (in producer config) and wait on > > the > > > > > future > > > > > > >> you > > > > > > >> >> >> > get from Send (i.e. future.get) > > > > > > >> >> >> > > > > > > > >> >> >> > I can't think of good reasons to do so, though. > > > > > > >> >> >> > > > > > > > >> >> >> > Gwen > > > > > > >> >> >> > > > > > > > >> >> >> > > > > > > > >> >> >> > On Mon, Feb 2, 2015 at 11:08 AM, Otis Gospodnetic > > > > > > >> >> >> > <otis.gospodne...@gmail.com> wrote: > > > > > > >> >> >> > > Hi, > > > > > > >> >> >> > > > > > > > > >> >> >> > > Is the plan for New Producer to have ONLY async > mode? > > > I'm > > > > > > asking > > > > > > >> >> >> because > > > > > > >> >> >> > > of this info from the Wiki: > > > > > > >> >> >> > > > > > > > > >> >> >> > > > > > > > > >> >> >> > > - The producer will always attempt to batch data > > and > > > > will > > > > > > >> always > > > > > > >> >> >> > > immediately return a SendResponse which acts as a > > > > Future > > > > > to > > > > > > >> allow > > > > > > >> >> >> the > > > > > > >> >> >> > > client to await the completion of the request. > > > > > > >> >> >> > > > > > > > > >> >> >> > > > > > > > > >> >> >> > > The word "always" makes me think there will be no > sync > > > > mode. > > > > > > >> >> >> > > > > > > > > >> >> >> > > Thanks, > > > > > > >> >> >> > > Otis > > > > > > >> >> >> > > -- > > > > > > >> >> >> > > Monitoring * Alerting * Anomaly Detection * > > Centralized > > > > Log > > > > > > >> >> Management > > > > > > >> >> >> > > Solr & Elasticsearch Support * http://sematext.com/ > > > > > > >> >> >> > > > > > > > >> >> >> > > > > > > >> >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > >