I have 8 partitions / 1 topic. Do you mean hack the new producer to add
more threads ? The current implementation manages a single send thread.

Thanks,
Manu

On Wed, Nov 26, 2014 at 1:28 AM, Jay Kreps <jay.kr...@gmail.com> wrote:

> Yeah, neither of those are simple to optimize. The CRC is already the
> optimized java crc we stole from Hadoop. It may be possible to make that
> faster still but probably not easy. It might be possible to optimize out
> some of the interrupt calls, though I'm not exactly sure.
>
> One thing, though, is that the crc stuff should be highly paralellizable.
> How many partitions/topics do you have? I suspect more writer threads (and
> ensuring there are multiple partitions) would also improve throughput.
> Basically we need to serialize writes per-partition which also involves
> waiting on the crc, but this lock is per partition so if you have N
> partitions N threads can all be working at once.
>
> On Tue, Nov 25, 2014 at 3:03 AM, Manu Zhang <owenzhang1...@gmail.com>
> wrote:
>
> > Thanks for the explanation.
> >
> > Here are some stats for I/O thread.
> >
> > *io-ratio 0.155*
> > *io-time-ns-avg 16418.5*
> > *io-wait-ratio 0.59*
> > *io-wait-time-ns-avg  62881*
> >
> > It seems to confirm that IO spent much more time waiting than doing real
> > work.
> >
> > Given the above stats, how could I trace down and pinpoint the
> bottleneck ?
> > I guess computing crc32s can not be avoided.
> >
> > On Fri, Nov 21, 2014 at 12:34 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
> >
> > > So I suspect that the bottleneck is actually in the writer thread (the
> > one
> > > calling send()), not the I/O thread. You could verify this by checking
> > the
> > > JMX stats which will give the amount of time the I/O thread spends
> > waiting.
> > > But since epollWait shows up first that is the I/O thread waiting for
> > work.
> > >
> > > It looks like the big bottleneck is computing the crc32s for the
> > messages.
> > > The next big hit after that is signaling the I/O thread to wake-up and
> do
> > > work.
> > >
> > > Here is an annotated version of those traces:
> > >
> > > These two are bogus and are just background JMX things I think:
> > >  1 39.30% 39.30%   79585 300923 java.net.SocketInputStream.socketRead0
> > >    2 20.62% 59.92%   41750 300450 java.net.PlainSocketImpl.socketAccept
> > >
> > > This is the I/O thread waiting for work to do
> > >    3  9.52% 69.45%   19287 300660
> sun.nio.ch.EPollArrayWrapper.epollWait
> > >
> > > These are the real problems:
> > >    4  9.50% 78.94%   19234 300728 org.apache.kafka.common.
> > > record.Record.computeChecksum
> > >    5  4.14% 83.08%    8377 300777
> sun.nio.ch.EPollArrayWrapper.interrupt
> > >
> > > I/O thread doing a write
> > >    6  2.30% 85.38%    4662 300708 sun.nio.ch.FileDispatcherImpl.writev0
> > >
> > > This is a one time thing when fetching metadata on startup
> > >    7  1.61% 86.99%    3260 300752
> > > org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata
> > >
> > > These are all in the I/O thread so not relevant:
> > >    8  1.24% 88.23%    2501 300804
> sun.nio.ch.EPollArrayWrapper.epollWait
> > >    9  1.08% 89.31%    2187 300734
> > > org.apache.kafka.clients.producer.internals.RecordBatch.done
> > >   10  0.98% 90.29%    1991 300870
> > > org.apache.kafka.common.protocol.types.Type$6.write
> > >   11  0.97% 91.26%    1961 300789
> > > org.apache.kafka.clients.producer.internals.RecordAccumulator.ready
> > >   12  0.96% 92.22%    1951 300726
> > >
> > > -Jay
> > >
> > > On Thu, Nov 20, 2014 at 5:42 PM, Manu Zhang <owenzhang1...@gmail.com>
> > > wrote:
> > >
> > > > Ok, here is the hrpof output
> > > >
> > > > CPU SAMPLES BEGIN (total = 202493) Fri Nov 21 08:07:51 2014
> > > > rank   self  accum   count trace method
> > > >    1 39.30% 39.30%   79585 300923
> > java.net.SocketInputStream.socketRead0
> > > >    2 20.62% 59.92%   41750 300450
> java.net.PlainSocketImpl.socketAccept
> > > >    3  9.52% 69.45%   19287 300660
> > sun.nio.ch.EPollArrayWrapper.epollWait
> > > >    4  9.50% 78.94%   19234 300728
> > > > org.apache.kafka.common.record.Record.computeChecksum
> > > >    5  4.14% 83.08%    8377 300777
> > sun.nio.ch.EPollArrayWrapper.interrupt
> > > >    6  2.30% 85.38%    4662 300708
> sun.nio.ch.FileDispatcherImpl.writev0
> > > >    7  1.61% 86.99%    3260 300752
> > > > org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata
> > > >    8  1.24% 88.23%    2501 300804
> > sun.nio.ch.EPollArrayWrapper.epollWait
> > > >    9  1.08% 89.31%    2187 300734
> > > > org.apache.kafka.clients.producer.internals.RecordBatch.done
> > > >   10  0.98% 90.29%    1991 300870
> > > > org.apache.kafka.common.protocol.types.Type$6.write
> > > >   11  0.97% 91.26%    1961 300789
> > > > org.apache.kafka.clients.producer.internals.RecordAccumulator.ready
> > > >   12  0.96% 92.22%    1951 300726
> > > > org.apache.kafka.common.record.MemoryRecords.append
> > > >   13  0.89% 93.12%    1809 300829 java.nio.Bits.copyFromArray
> > > >   14  0.75% 93.86%    1510 300722 java.nio.HeapByteBuffer.<init>
> > > >   15  0.54% 94.41%    1100 300730
> > > > org.apache.kafka.common.record.Compressor.put
> > > >   16  0.54% 94.95%    1094 300749
> > > > org.apache.kafka.clients.producer.internals.RecordAccumulator.append
> > > >   17  0.38% 95.33%     771 300755
> > > > org.apache.kafka.clients.producer.KafkaProducer.send
> > > >   18  0.36% 95.69%     736 300830
> > > > org.apache.kafka.common.metrics.Sensor.record
> > > >   19  0.35% 96.04%     709 300848 sun.nio.ch.IOUtil.drain
> > > >   20  0.33% 96.37%     665 300814 sun.nio.ch.IOUtil.drain
> > > >   21  0.32% 96.69%     644 300812
> > > > org.apache.kafka.common.metrics.Sensor.record
> > > >   22  0.31% 97.00%     626 300725
> > > > org.apache.kafka.clients.producer.internals.Partitioner.partition
> > > >   23  0.28% 97.28%     571 300729
> > > > org.apache.kafka.clients.producer.internals.RecordAccumulator.append
> > > >   24  0.26% 97.54%     535 300764
> > > > org.apache.log4j.Category.getEffectiveLevel
> > > >   25  0.25% 97.79%     501 300924
> > > > org.apache.kafka.common.protocol.types.Schema.write
> > > >   26  0.19% 97.98%     392 300802
> > > > org.apache.kafka.common.metrics.Sensor.record
> > > >   27  0.19% 98.17%     386 300797
> > > > org.apache.kafka.common.metrics.Sensor.record
> > > >   28  0.17% 98.34%     342 300739
> > > > org.apache.kafka.common.record.Record.write
> > > >   29  0.16% 98.50%     315 300792
> > > > org.apache.kafka.common.record.Record.write
> > > >   30  0.15% 98.64%     294 300757
> > > > org.apache.kafka.common.record.Record.write
> > > >   31  0.12% 98.76%     238 300731
> > > > org.apache.kafka.common.record.Record.write
> > > >   32  0.09% 98.85%     180 300747
> > > > org.apache.kafka.clients.producer.KafkaProducer.send
> > > >   33  0.09% 98.94%     177 300750
> > > > org.apache.kafka.common.record.Record.write
> > > >   34  0.06% 98.99%     112 300851 sun.nio.ch.NativeThread.current
> > > >   35  0.05% 99.05%     110 300753
> > > > org.apache.kafka.clients.producer.KafkaProducer.send
> > > >   36  0.05% 99.09%      93 300723 java.lang.System.arraycopy
> > > >   37  0.04% 99.13%      80 300872
> > > > org.apache.kafka.clients.tools.ProducerPerformance.main
> > > >   38  0.04% 99.17%      78 300770 java.util.HashMap.getEntry
> > > >   39  0.04% 99.21%      78 300859
> > > > org.apache.kafka.clients.producer.internals.RecordAccumulator.append
> > > >   40  0.04% 99.25%      73 300861
> sun.nio.ch.EPollArrayWrapper.epollCtl
> > > >   41  0.03% 99.28%      67 300718 sun.misc.Unsafe.copyMemory
> > > >   42  0.03% 99.31%      59 300737
> > > > org.apache.kafka.clients.producer.internals.Metadata.timeToNextUpdate
> > > >   43  0.03% 99.33%      52 300816
> > > > org.apache.kafka.clients.producer.internals.Metadata.fetch
> > > >   44  0.02% 99.36%      48 300715 sun.nio.ch.FileDispatcherImpl.read0
> > > >   45  0.02% 99.38%      42 300794
> > > > org.apache.log4j.Category.getEffectiveLevel
> > > >   46  0.02% 99.40%      41 300740
> > > > org.apache.kafka.clients.producer.internals.Metadata.timeToNextUpdate
> > > >   47  0.02% 99.42%      40 300795 sun.nio.ch.NativeThread.current
> > > >   48  0.01% 99.43%      28 300785
> sun.nio.ch.EPollArrayWrapper.epollCtl
> > > >   49  0.01% 99.44%      25 301055 sun.nio.ch.EPollSelectorImpl.wakeup
> > > >   50  0.01% 99.45%      22 300806 java.lang.Thread.currentThread
> > > > CPU SAMPLES END
> > > >
> > > >
> > > > On Fri, Nov 21, 2014 at 5:05 AM, Jay Kreps <jay.kr...@gmail.com>
> > wrote:
> > > >
> > > > > Great. There is a single I/O thread per producer client that does
> the
> > > > > sending so it could be either that the sender thread or that thread
> > is
> > > > just
> > > > > pegged. One way to dive in and see what is happening is to add the
> > > > command
> > > > > line option
> > > > > *  -agentlib:hprof=cpu=samples,depth=10*
> > > > > This will tell us where the time is going. If you sent that around
> it
> > > may
> > > > > be informative.
> > > > >
> > > > > -Jay
> > > > >
> > > > >
> > > > > On Thu, Nov 20, 2014 at 12:41 AM, Manu Zhang <
> > owenzhang1...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Thanks Jay. The producer metrics from jconsole is quite helpful.
> > > > > >
> > > > > > I've switched to the new producer and run producer benchmark with
> > > > > >
> > > > > > */usr/lib/kafka/bin/kafka-run-class.sh
> > > > > > org.apache.kafka.clients.tools.ProducerPerformance topic1
> 500000000
> > > > 1000
> > > > > -1
> > > > > > acks=1
> > bootstrap.servers=node1:9092,node2:9092,node3:9092,node4:9092
> > > > > > buffer.memory=2097152000 batch.size=1000000 linger.ms
> > > > > > <http://linger.ms>=100*
> > > > > >
> > > > > > so my message size is 1000 bytes (gave up on 100 bytes after
> > > fruitless
> > > > > > experiments) and I've deliberately batched outgoing messages with
> > > the "
> > > > > > linger.ms" conf. A single producer could send 300 MB/s on
> average
> > > and
> > > > 3
> > > > > > producers almost saturated the network bandwidth. CPU is fully
> > > utilized
> > > > > for
> > > > > > each producer thread. It seems that I can't go further in a
> single
> > > > > > producer. Any thoughts ?
> > > > > >
> > > > > > Also, I've noticed this kafka-fast
> > > > > > <https://github.com/gerritjvv/kafka-fast> project,
> > > > > > who claimed producer throughput could reach 191975 K messages/s
> for
> > > 1KB
> > > > > > message on 10GbE network. The difference is that a producer is
> > > created
> > > > > per
> > > > > > topic partition.
> > > > > >
> > > > > >
> > > > > > On Wed, Nov 19, 2014 at 12:34 PM, Jay Kreps <jay.kr...@gmail.com
> >
> > > > wrote:
> > > > > >
> > > > > > > Yeah this will involve some experimentation.
> > > > > > >
> > > > > > > The metrics are visible with jconsole or another jmx viewer.
> > > > > > >
> > > > > > > It may also be worth looking at the cpu usage per-thread (e.g.
> > > start
> > > > > top
> > > > > > > and press 't' I think).
> > > > > > >
> > > > > > > Another simple test for broker vs client as the bottleneck is
> > just
> > > to
> > > > > > start
> > > > > > > another producer or consumer and see if that improves
> throughput
> > > (if
> > > > so
> > > > > > it
> > > > > > > is probably a client bottleneck).
> > > > > > >
> > > > > > > -Jay
> > > > > > >
> > > > > > > On Tue, Nov 18, 2014 at 4:44 PM, Manu Zhang <
> > > owenzhang1...@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Thanks Jay for the quick response.
> > > > > > > >
> > > > > > > > Yes, it's a single producer and consumer both configured with
> > > > > multiple
> > > > > > > > threads but I'm not using the new producer.
> > > > > > > > CPU is typically 50% utilized on client and merely used on
> > > broker.
> > > > > > Disks
> > > > > > > > aren't busy either as a lot of data are cached in memory.
> > > > > > > > Would you please give a link for the producer metrics you are
> > > > > referring
> > > > > > > to
> > > > > > > > ?
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Manu
> > > > > > > >
> > > > > > > > On Wed, Nov 19, 2014 at 2:39 AM, Jay Kreps <
> > jay.kr...@gmail.com>
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hey Manu,
> > > > > > > > >
> > > > > > > > > I'm not aware of a benchmark on 10GbE. I'd love to see that
> > > > though.
> > > > > > > > Diving
> > > > > > > > > into the results may help us find bottlenecks hidden by the
> > > > slower
> > > > > > > > network.
> > > > > > > > >
> > > > > > > > > Can you figure out where the bottleneck is in your test? I
> > > assume
> > > > > > this
> > > > > > > > is a
> > > > > > > > > single producer and consumer instance and you are using the
> > new
> > > > > > > producer
> > > > > > > > as
> > > > > > > > > in those benchmarks?
> > > > > > > > >
> > > > > > > > > This can be slightly tricky as it can be cpu or I/O on
> either
> > > the
> > > > > > > clients
> > > > > > > > > or the brokers. You basically have to look at top, iostat,
> > and
> > > > the
> > > > > > jmx
> > > > > > > > > metrics for clues. The producer has good metrics that
> explain
> > > > > whether
> > > > > > > it
> > > > > > > > is
> > > > > > > > > spending most of its time waiting or sending data. Not sure
> > if
> > > > > there
> > > > > > > is a
> > > > > > > > > similar diagnostic for the consumer.
> > > > > > > > >
> > > > > > > > > -Jay
> > > > > > > > >
> > > > > > > > > On Tue, Nov 18, 2014 at 5:10 AM, Manu Zhang <
> > > > > owenzhang1...@gmail.com
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi all,
> > > > > > > > > >
> > > > > > > > > > I have been trying out kafka benchmarks described in
> Jay's
> > > > > > > > > >
> > > > > > >
> > > >
> benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
> > > > > > > > > > <
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machine
> > > > > > > > > > >.
> > > > > > > > > > I'm able to get similar results on a 4-node GbE network
> > whose
> > > > > > > in-bytes
> > > > > > > > > > could be saturated at 120MB/s. However, on a 4-node,
> 10GbE
> > > > > > network, I
> > > > > > > > can
> > > > > > > > > > not get in-bytes higher than 150MB/s. *Has anyone
> > benchmarked
> > > > > kafka
> > > > > > > on
> > > > > > > > a
> > > > > > > > > > 10GbE network ? Any rule of thumb on 10GbE network for
> > > > > > configurations
> > > > > > > > of
> > > > > > > > > > broker, producer and consumer ? *
> > > > > > > > > >
> > > > > > > > > > My kafka version is 0.8.1.1 and I've created a topic
> with 8
> > > > > > > partitions
> > > > > > > > > with
> > > > > > > > > > 1 replica distributed evenly among the 4 nodes. Message
> > size
> > > is
> > > > > 100
> > > > > > > > > bytes.
> > > > > > > > > > I use all the default kafka settings.
> > > > > > > > > > My cluster has 4 nodes, where each node has 32 cores,
> 128MB
> > > RAM
> > > > > > and 3
> > > > > > > > > disks
> > > > > > > > > > for kafka.
> > > > > > > > > >
> > > > > > > > > > I've tried increasing message size to 1000 bytes which
> > > improved
> > > > > > > > > producer's
> > > > > > > > > > throughput but not consumer's.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Manu
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to