If flush() works for this use case it may be an acceptable starting point
(although not as clean as a native batched sync). I am not as yet clear
about some aspects of flush's batch semantics and its suitability for this
mode of operation. Allow me explore it with you folks..

 1) flush() guarantees: What is the guarantees that one can expect when a
flush() call returns ?  Is it successful delivery of all events in the
buffer to broker as per the configured ack setting ?

 2) flush() error handling:  It does not throw any exception. What is the
mechanism for indicating failure in delivery of one or more events in the
batch ? Is future.get() the way to detect it ? If so, will future.get() be
applicable to all types of delivery failures (could be a network glitch or
something simpler like Kafka responding that it was not being able to
accept some events)

 2) Multithreaded Clients: The situation being that each client thread is
trying to push out a batch. flush() will pump out data from the not just
the calling thread but also from other threads. Does  Need to think
through this a bit more if that¹s ok for Multi Threaded clients.

 3) Extra synchronization and Object creation: Like Ewen pointed out, this
method definitely creates too many (Future) objects and also too much
locking/synchronization due to repeated calls to Producer.send() and
future.get(). However, if the measured perf impact of this not too much
then I guess its ok.

  I am unable to find the email where I mentioned 10bytes event size. To
me 500byte to 1kB event size is more interesting. I had run measurements
with event sizes 500byte, 1k, 4k, 8k and 16k. In 0.8.1 producer_perf_test
tool, the 8k and 16k event sizes showed much better throughput in --sync
mode (throughput almost doubled with doubling of event size). The perf
tool was not using the Producer.send(list<> ) api though. I saw great
improvement when I changed it to use the Producer.send(list<> ). Sometimes
it easily exceeded the throughput async mode.


On 4/27/15 10:32 PM, "Ewen Cheslack-Postava" <e...@confluent.io> wrote:

>A couple of thoughts:
>1. @Joel I agree it's not hard to use the new API but it definitely is
>verbose. If that snippet of code is being written across hundreds of
>projects, that probably means we're missing an important API. Right now
>I've only seen the one complaint, but it's worth finding out how many
>people feel like it's missing. And given that internally each of the
>returned Futures just uses the future for the entire batch, I think it's
>probably worth investigating if getting rid of millions of allocs per
>second is worth it, even if they should be in the nursery and fast to
>2. For lots of small messages, there's definitely the potential for a
>performance benefit by avoiding a lot of lock acquire/release in send().
>you make a first pass to organize by topic partition and then process each
>group, you lock # of partitions times rather than # of messages times. One
>major drawback I see is that it seems to make a mess of error
>handling/blocking when the RecordAccumulator runs out of space.
>3. @Roshan In the other thread you mentioned 10 byte messages. Is this a
>realistic payload size for you? I can imagine applications where it is
>we should support those well), it just sounds unusually small.
>4. I reproduced Jay's benchmark blog post awhile ago in an automated test
>Here's a snippet from the output on m3.2xlarge instances that might help
>shed some light on the situation:
>INFO:_.KafkaBenchmark:Message size:
>INFO:_.KafkaBenchmark: 10: 1637825.195625 rec/sec (15.620000 MB/s)
>INFO:_.KafkaBenchmark: 100: 605504.877911 rec/sec (57.750000 MB/s)
>INFO:_.KafkaBenchmark: 1000: 90351.817570 rec/sec (86.170000 MB/s)
>INFO:_.KafkaBenchmark: 10000: 8306.180862 rec/sec (79.210000 MB/s)
>INFO:_.KafkaBenchmark: 100000: 978.403499 rec/sec (93.310000 MB/s)
>That's using the single-threaded new ProducerPerformance class, so the
>m3.2xlarge's # of cores probably has little influence. There's clearly a
>sharp increase in throughput from 10 -> 100 byte messages. I recall double
>checking that the CPU was fully utilized. Note that this is with the
>setting that doesn't actually exist anymore, so take with a grain of salt.
>5. I'd suggest that there may be other APIs that give the implementation
>more flexibility but still provide batching. For example:
>* Require batched inputs to be prepartitioned so each call specifies the
>TopicPartition. Main benefit here is that the producer avoids having to do
>all the sorting, which the application may already be doing anyway.
>* How about an API similar to fwrite() where you provide a set of messages
>but it may only write some of them and tells you how many it wrote? This
>could be a clean way to expose the underlying batching that is performed
>without being a completely leaky abstraction. We could then return just a
>single future for the entire batch, we'd do minimal locking, etc. Not sure
>how to handle different TopicPartitions in the same set. I think this
>be a good pattern for people who want maximally efficient ordered writes
>where errors are properly handled too.
>6. If I recall correctly, doesn't compression occur in a synchronized
>block, I think in the RecordAccumulator? Or maybe it was in the network
>thread? In any case, I seem to recall compression also possibly playing an
>important role in performance because it operates over a set of records
>which limits where you can run it. @Roshan, are you using compression,
>in your microbenchmarks and your application?
>I think there's almost definitely a good case to be made for a batch API,
>but probably needs some very clear motivating use cases and perf
>measurements showing why it's not going to be feasible to accomplish with
>the current API + a few helpers to wrap it in a batch API.
>On Mon, Apr 27, 2015 at 4:24 PM, Joel Koshy <jjkosh...@gmail.com> wrote:
>> >   Fine grained tracking of status of individual events is quite
>> in
>> > contrast to simply blocking on every batch. Old style Batched-sync
>> > has great advantages in terms of simplicity and performance.
>> I may be missing something, but I'm not so convinced that it is that
>> painful/very different from the old-style.
>> In the old approach, you would compose a batch (in a list of messages)
>> and do a synchronous send:
>> try {
>>   producer.send(recordsToSend)
>> }
>> catch (...) {
>>   // handle (e.g., retry sending recordsToSend)
>> }
>> In the new approach, you would do (something like) this:
>> for (record: recordsToSend) {
>>   futureList.add(producer.send(record));
>> }
>> producer.flush();
>> for (result: futureList) {
>>   try { result.get(); }
>>   catch (...) { // handle (e.g., retry sending recordsToSend) }
>> }

