I implemented the flush() call I hypothesized earlier in this thread as a
patch on KAFKA-1865.

So now
  producer.flush()
will block until all buffered requests complete. The post condition is that
all previous send futures are satisfied and have error/offset information.
This is a little easier to use then keeping a list of all the futures and
also probably a bit more efficient. It also immediately unblocks all
requests regardless of the linger.ms setting, which is in keeping with the
name I think and important for the use case we described.

Code here:
https://issues.apache.org/jira/browse/KAFKA-1865

-Jay

On Sat, Feb 7, 2015 at 1:24 PM, Jay Kreps <jay.kr...@gmail.com> wrote:

> Hey Otis,
>
> Yeah, Gwen is correct. The future from the send will be satisfied when the
> response is received so it will be exactly the same as the performance of
> the sync producer previously.
>
> -Jay
>
> On Mon, Feb 2, 2015 at 1:34 PM, Gwen Shapira <gshap...@cloudera.com>
> wrote:
>
>> If I understood the code and Jay correctly - if you wait for the
>> future it will be a similar delay to that of the old sync producer.
>>
>> Put another way, if you test it out and see longer delays than the
>> sync producer had, we need to find out why and fix it.
>>
>> Gwen
>>
>> On Mon, Feb 2, 2015 at 1:27 PM, Otis Gospodnetic
>> <otis.gospodne...@gmail.com> wrote:
>> > Hi,
>> >
>> > Nope, unfortunately it can't do that.  X is a remote app, doesn't
>> listen to
>> > anything external, calls Y via HTTPS.  So X has to decide what to do
>> with
>> > its data based on Y's synchronous response.  It has to block until Y
>> > responds.  And it wouldn't be pretty, I think, because nobody wants to
>> run
>> > apps that talk to remove servers and hang on to connections more than
>> they
>> > have to.  But perhaps that is the only way?  Or maybe the answer to "I'm
>> > guessing the delay would be more or less the same as if the Producer was
>> > using SYNC mode?" is YES, in which case the connection from X to Y
>> would be
>> > open for just as long as with a SYNC producer running in Y?
>> >
>> > Thanks,
>> > Otis
>> > --
>> > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
>> > Solr & Elasticsearch Support * http://sematext.com/
>> >
>> >
>> > On Mon, Feb 2, 2015 at 4:03 PM, Gwen Shapira <gshap...@cloudera.com>
>> wrote:
>> >
>> >> Can Y have a callback that will handle the notification to X?
>> >> In this case, perhaps Y can be async and X can buffer the data until
>> >> the callback triggers and says "all good" (or resend if the callback
>> >> indicates an error)
>> >>
>> >> On Mon, Feb 2, 2015 at 12:56 PM, Otis Gospodnetic
>> >> <otis.gospodne...@gmail.com> wrote:
>> >> > Hi,
>> >> >
>> >> > Thanks for the info.  Here's the use case.  We have something up
>> stream
>> >> > sending data, say a log shipper called X.  It sends it to some remote
>> >> > component Y.  Y is the Kafka Producer and it puts data into Kafka.
>> But Y
>> >> > needs to send a reply to X and tell it whether it successfully put
>> all
>> >> its
>> >> > data into Kafka.  If it did not, Y wants to tell X to buffer data
>> locally
>> >> > and resend it later.
>> >> >
>> >> > If producer is ONLY async, Y can't easily do that.  Or maybe Y would
>> just
>> >> > need to wait for the Future to come back and only then send the
>> response
>> >> > back to X?  If so, I'm guessing the delay would be more or less the
>> same
>> >> as
>> >> > if the Producer was using SYNC mode?
>> >> >
>> >> > Thanks,
>> >> > Otis
>> >> > --
>> >> > Monitoring * Alerting * Anomaly Detection * Centralized Log
>> Management
>> >> > Solr & Elasticsearch Support * http://sematext.com/
>> >> >
>> >> >
>> >> > On Mon, Feb 2, 2015 at 3:13 PM, Jay Kreps <jay.kr...@gmail.com>
>> wrote:
>> >> >
>> >> >> Yeah as Gwen says there is no sync/async mode anymore. There is a
>> new
>> >> >> configuration which does a lot of what async did in terms of
>> allowing
>> >> >> batching:
>> >> >>
>> >> >> batch.size - This is the target amount of data per partition the
>> server
>> >> >> will attempt to batch together.
>> >> >> linger.ms - This is the time the producer will wait for more data
>> to be
>> >> >> sent to better batch up writes. The default is 0 (send
>> immediately). So
>> >> if
>> >> >> you set this to 50 ms the client will send immediately if it has
>> already
>> >> >> filled up its batch, otherwise it will wait to accumulate the
>> number of
>> >> >> bytes given by batch.size.
>> >> >>
>> >> >> To send asynchronously you do
>> >> >>    producer.send(record)
>> >> >> whereas to block on a response you do
>> >> >>    producer.send(record).get();
>> >> >> which will wait for acknowledgement from the server.
>> >> >>
>> >> >> One advantage of this model is that the client will do it's best to
>> >> batch
>> >> >> under the covers even if linger.ms=0. It will do this by batching
>> any
>> >> data
>> >> >> that arrives while another send is in progress into a single
>> >> >> request--giving a kind of "group commit" effect.
>> >> >>
>> >> >> The hope is that this will be both simpler to understand (a single
>> api
>> >> that
>> >> >> always works the same) and more powerful (you always get a response
>> with
>> >> >> error and offset information whether or not you choose to use it).
>> >> >>
>> >> >> -Jay
>> >> >>
>> >> >>
>> >> >> On Mon, Feb 2, 2015 at 11:15 AM, Gwen Shapira <
>> gshap...@cloudera.com>
>> >> >> wrote:
>> >> >>
>> >> >> > If you want to emulate the old sync producer behavior, you need
>> to set
>> >> >> > the batch size to 1  (in producer config) and wait on the future
>> you
>> >> >> > get from Send (i.e. future.get)
>> >> >> >
>> >> >> > I can't think of good reasons to do so, though.
>> >> >> >
>> >> >> > Gwen
>> >> >> >
>> >> >> >
>> >> >> > On Mon, Feb 2, 2015 at 11:08 AM, Otis Gospodnetic
>> >> >> > <otis.gospodne...@gmail.com> wrote:
>> >> >> > > Hi,
>> >> >> > >
>> >> >> > > Is the plan for New Producer to have ONLY async mode?  I'm
>> asking
>> >> >> because
>> >> >> > > of this info from the Wiki:
>> >> >> > >
>> >> >> > >
>> >> >> > >    - The producer will always attempt to batch data and will
>> always
>> >> >> > >    immediately return a SendResponse which acts as a Future to
>> allow
>> >> >> the
>> >> >> > >    client to await the completion of the request.
>> >> >> > >
>> >> >> > >
>> >> >> > > The word "always" makes me think there will be no sync mode.
>> >> >> > >
>> >> >> > > Thanks,
>> >> >> > > Otis
>> >> >> > > --
>> >> >> > > Monitoring * Alerting * Anomaly Detection * Centralized Log
>> >> Management
>> >> >> > > Solr & Elasticsearch Support * http://sematext.com/
>> >> >> >
>> >> >>
>> >>
>>
>
>

Reply via email to