Hi Jun, I think bumping up socket.receive.buffer.bytes will help but might not be sufficient. There are actually two related factors here: 1. Pipelining TCP packets when send a single request/response. 2. Pipelining multiple requests/responses Bumping up socket.receive.buffer.bytes help with the 1) but does not help with 2).
For example, consider the following scenario. RTT = 100 ms Bandwidth = 1 Gbps(128 MBps). Request size = 10KB Response size = 1MB If we only have a single fetcher which is working in a blocking way. The max number of requests we can achieve is 10 requests/sec because its restricted by the RTT. In this case, bumping up socket buffer size will not help. I think this is the situation Vlad mentioned. One option might be increase num.consumer.fetchers, so we might have more fetcher thread for a since consumer instance (due to the implementation, num.consumer.fetchers actually means "at most num.consumer.fetchers²). One thing might worth considering is that can we enforce pipelining in new consumer like we do for new producer. Since we have correlation ID, reorder should be easily handled. I haven¹t got a chance to read the new consumer code, but I think it is worth doing if we haven¹t done so. Jiangjie (Becket) Qin On 3/25/15, 9:50 AM, "Jun Rao" <j...@confluent.io> wrote: >To amortize the long RTT across data centers, you can tune the TCP window >size by configuring a larger socket.receive.buffer.bytes in the consumer. > >For the last one, it seems that you want identical mirroring. The tricky >thing is to figure out how to avoid duplicates when there is a failure. We >had some related discussion in the context of transactional messaging ( >https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+ >in+Kafka >). > >Thanks, > >Jun > >On Tue, Mar 24, 2015 at 11:44 AM, vlad...@gmail.com <vlad...@gmail.com> >wrote: > >> Dear all, >> >> I had a short discussion with Jay yesterday at the ACM meetup and he >> suggested writing an email regarding a few possible MirrorMaker >> improvements. >> >> At Turn, we have been using MirrorMaker for a a few months now to >> asynchronously replicate our key/value store data between our >>datacenters. >> In a way, our system is similar to Linkedin's Databus, but it uses Kafka >> clusters and MirrorMaker as its building blocks. Our overall message >>rate >> peaks at about 650K/sec and, when pushing data over high bandwidth delay >> product links, we have found some minor bottlenecks. >> >> The MirrorMaker process uses a standard consumer to pull data from a >>remote >> datacenter. This implies that it opens a single TCP connection to each >>of >> the remote brokers and muxes requests for different topics and >>partitions >> over this connection. While this is a good thing in terms of maintaining >> the congestion window open, over long RTT lines with rather high loss >>rate >> the congestion window will cap, in our case at just a few Mbps. While >>the >> overall line bandwidth is much higher, this means that we have to start >> multiple MirrorMaker processes (somewhere in the hundreds), in order to >> completely use the line capacity. Being able to pool multiple TCP >> connections from a single consumer to a broker would solve this >> complication. >> >> The standard consumer also uses the remote ZooKeeper in order to manage >>the >> consumer group. While consumer group management is moving closer to the >> brokers, it might make sense to move the group management to the local >> datacenter, since that would avoid using the long-distance connection >>for >> this purpose. >> >> Another possible improvement assumes a further constraint, namely that >>the >> number of partitions for a topic in both datacenters is the same. In my >> opinion, this is a sane constraint, since it preserves the Kafka >>ordering >> guarantees (per partition), instead of a simple guarantee per key. This >> kind of guarantee can be for example useful in a system that compares >> partition contents to reach eventual consistency using Merkle trees. If >>the >> number of partitions is equal, then offsets have the same meaning for >>the >> same partition in both clusters, since the data for both partitions is >> identical before the offset. This allows a simple consumer to just >>inquire >> the local broker and the remote broker for their current offsets and, in >> case the remote broker is ahead, copy the extra data to the local >>cluster. >> Since the consumer offsets are no longer bound to the specific >>partitioning >> of a single remote cluster, the consumer could pull from one of any >>number >> of remote clusters, BitTorrent-style, if their offsets are ahead of the >> local offset. The group management problem would reduce to assigning >>local >> partitions to different MirrorMaker processes, so the group management >> could be done locally also in this situation. >> >> Regards, >> Vlad >>