Maysam Yabandeh created KAFKA-3428:
--------------------------------------
Summary: Remove metadata sync bottleneck from mirrormaker's
producer
Key: KAFKA-3428
URL: https://issues.apache.org/jira/browse/KAFKA-3428
Project: Kafka
Issue Type: Improvement
Reporter: Maysam Yabandeh
Due to sync on the single producer, MM in a setup with 32 consumer threads
could not send more than
358k msg/sec hence not being able to saturate the NIC. Profiling showed the
producer.send takes 0.080 ms in average, which explains the bottleneck of 358k
msg/sec. The following explains the bottleneck in producer.send and suggests
how to improve it.
Current impl of MM relies on a single reducer. For EACH message, the
producer.send() calls waitOnMetadata which runs the following synchronized
method
{code}
// add topic to metadata topic list if it is not there already.
if (!this.metadata.containsTopic(topic))
this.metadata.add(topic);
{code}
Although the code is mostly noop, since containsTopic is synchronized it
becomes the bottleneck in MM.
Profiling highlights this bottleneck:
{code}
100.0% - 65,539 ms kafka.tools.MirrorMaker$MirrorMakerThread.run
18.9% - 12,403 ms org.apache.kafka.clients.producer.KafkaProducer.send
13.8% - 9,056 ms
org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata
12.1% - 7,933 ms org.apache.kafka.clients.Metadata.containsTopic
1.7% - 1,088 ms org.apache.kafka.clients.Metadata.fetch
2.6% - 1,729 ms org.apache.kafka.clients.Metadata.fetch
2.2% - 1,442 ms
org.apache.kafka.clients.producer.internals.RecordAccumulator.append
{code}
After replacing this bottleneck with a kind of noop, another run of the
profiler shows that fetch is the next bottleneck:
{code}
org.xerial.snappy.SnappyNative.arrayCopy 132 s (54 %) n/a n/a
java.lang.Thread.run 50,776 ms (21 %) n/a n/a
org.apache.kafka.clients.Metadata.fetch 20,881 ms (8 %) n/a
n/a
6.8% - 16,546 ms
org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata
6.8% - 16,546 ms org.apache.kafka.clients.producer.KafkaProducer.send
6.8% - 16,546 ms kafka.tools.MirrorMaker$MirrorMakerProducer.send
{code}
however the fetch method does not need to be syncronized
{code}
public synchronized Cluster fetch() {
return this.cluster;
}
{code}
removing sync from the fetch method shows that bottleneck is disappeared:
{code}
org.xerial.snappy.SnappyNative.arrayCopy 249 s (78 %) n/a n/a
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel
24,489 ms (7 %) n/a n/a
org.xerial.snappy.SnappyNative.rawUncompress 17,024 ms (5 %)
n/a n/a
org.apache.kafka.clients.producer.internals.RecordAccumulator.append
13,817 ms (4 %) n/a n/a
4.3% - 13,817 ms org.apache.kafka.clients.producer.KafkaProducer.send
{code}
Internally we have applied a patch to remove this bottleneck. The patch does
the following:
1. replace HashSet with a concurrent hash set
2. remove sync from containsTopic and fetch
3. pass a replica of topics to getClusterForCurrentTopics since this
synchronized method access topics at two locations and topics being hanged in
the middle might mess with the semantics.
Any interest in applying this patch? Any alternative suggestions?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)