@Joel, If flush() works for this use case it may be an acceptable starting point (although not as clean as a native batched sync). I am not as yet clear about some aspects of flush's batch semantics and its suitability for this mode of operation. Allow me explore it with you folks..
1) flush() guarantees: What is the guarantees that one can expect when a flush() call returns ? Is it successful delivery of all events in the buffer to broker as per the configured ack setting ? 2) flush() error handling: It does not throw any exception. What is the mechanism for indicating failure in delivery of one or more events in the batch ? Is future.get() the way to detect it ? If so, will future.get() be applicable to all types of delivery failures (could be a network glitch or something simpler like Kafka responding that it was not being able to accept some events) 2) Multithreaded Clients: The situation being that each client thread is trying to push out a batch. flush() will pump out data from the not just the calling thread but also from other threads. Does Need to think through this a bit more if that¹s ok for Multi Threaded clients. 3) Extra synchronization and Object creation: Like Ewen pointed out, this method definitely creates too many (Future) objects and also too much locking/synchronization due to repeated calls to Producer.send() and future.get(). However, if the measured perf impact of this not too much then I guess its ok. @Ewen, I am unable to find the email where I mentioned 10bytes event size. To me 500byte to 1kB event size is more interesting. I had run measurements with event sizes 500byte, 1k, 4k, 8k and 16k. In 0.8.1 producer_perf_test tool, the 8k and 16k event sizes showed much better throughput in --sync mode (throughput almost doubled with doubling of event size). The perf tool was not using the Producer.send(list<> ) api though. I saw great improvement when I changed it to use the Producer.send(list<> ). Sometimes it easily exceeded the throughput async mode. -roshan On 4/27/15 10:32 PM, "Ewen Cheslack-Postava" <e...@confluent.io> wrote: >A couple of thoughts: > >1. @Joel I agree it's not hard to use the new API but it definitely is >more >verbose. If that snippet of code is being written across hundreds of >projects, that probably means we're missing an important API. Right now >I've only seen the one complaint, but it's worth finding out how many >people feel like it's missing. And given that internally each of the >returned Futures just uses the future for the entire batch, I think it's >probably worth investigating if getting rid of millions of allocs per >second is worth it, even if they should be in the nursery and fast to >collect. > >2. For lots of small messages, there's definitely the potential for a >performance benefit by avoiding a lot of lock acquire/release in send(). >If >you make a first pass to organize by topic partition and then process each >group, you lock # of partitions times rather than # of messages times. One >major drawback I see is that it seems to make a mess of error >handling/blocking when the RecordAccumulator runs out of space. > >3. @Roshan In the other thread you mentioned 10 byte messages. Is this a >realistic payload size for you? I can imagine applications where it is >(and >we should support those well), it just sounds unusually small. > >4. I reproduced Jay's benchmark blog post awhile ago in an automated test >(see >https://github.com/confluentinc/muckrake/blob/master/muckrake/tests/kafka_ >benchmark_test.py). >Here's a snippet from the output on m3.2xlarge instances that might help >shed some light on the situation: >INFO:_.KafkaBenchmark:Message size: >INFO:_.KafkaBenchmark: 10: 1637825.195625 rec/sec (15.620000 MB/s) >INFO:_.KafkaBenchmark: 100: 605504.877911 rec/sec (57.750000 MB/s) >INFO:_.KafkaBenchmark: 1000: 90351.817570 rec/sec (86.170000 MB/s) >INFO:_.KafkaBenchmark: 10000: 8306.180862 rec/sec (79.210000 MB/s) >INFO:_.KafkaBenchmark: 100000: 978.403499 rec/sec (93.310000 MB/s) > >That's using the single-threaded new ProducerPerformance class, so the >m3.2xlarge's # of cores probably has little influence. There's clearly a >sharp increase in throughput from 10 -> 100 byte messages. I recall double >checking that the CPU was fully utilized. Note that this is with the >acks=1 >setting that doesn't actually exist anymore, so take with a grain of salt. > >5. I'd suggest that there may be other APIs that give the implementation >more flexibility but still provide batching. For example: >* Require batched inputs to be prepartitioned so each call specifies the >TopicPartition. Main benefit here is that the producer avoids having to do >all the sorting, which the application may already be doing anyway. >* How about an API similar to fwrite() where you provide a set of messages >but it may only write some of them and tells you how many it wrote? This >could be a clean way to expose the underlying batching that is performed >without being a completely leaky abstraction. We could then return just a >single future for the entire batch, we'd do minimal locking, etc. Not sure >how to handle different TopicPartitions in the same set. I think this >could >be a good pattern for people who want maximally efficient ordered writes >where errors are properly handled too. > >6. If I recall correctly, doesn't compression occur in a synchronized >block, I think in the RecordAccumulator? Or maybe it was in the network >thread? In any case, I seem to recall compression also possibly playing an >important role in performance because it operates over a set of records >which limits where you can run it. @Roshan, are you using compression, >both >in your microbenchmarks and your application? > >I think there's almost definitely a good case to be made for a batch API, >but probably needs some very clear motivating use cases and perf >measurements showing why it's not going to be feasible to accomplish with >the current API + a few helpers to wrap it in a batch API. > >-Ewen > > >On Mon, Apr 27, 2015 at 4:24 PM, Joel Koshy <jjkosh...@gmail.com> wrote: > >> >> > Fine grained tracking of status of individual events is quite >>painful >> in >> > contrast to simply blocking on every batch. Old style Batched-sync >>mode >> > has great advantages in terms of simplicity and performance. >> >> I may be missing something, but I'm not so convinced that it is that >> painful/very different from the old-style. >> >> In the old approach, you would compose a batch (in a list of messages) >> and do a synchronous send: >> >> try { >> producer.send(recordsToSend) >> } >> catch (...) { >> // handle (e.g., retry sending recordsToSend) >> } >> >> In the new approach, you would do (something like) this: >> >> for (record: recordsToSend) { >> futureList.add(producer.send(record)); >> } >> producer.flush(); >> for (result: futureList) { >> try { result.get(); } >> catch (...) { // handle (e.g., retry sending recordsToSend) } >> } >> >> >> > > >-- >Thanks, >Ewen