You can set the num.consumer.fetchers to be a larger number (e.g. 3) and there will be more fetchers created to fetch from different partitions in the same broker. Each fetcher will have there own TCP connection.
WRT the congestion window, yes, if the link has a high packet drop rate then congestion avoidance will occur before the bandwidth get fully used. I thought in normal cases, packets only got dropped occasionally even with a long link (that’s why congestion avoidance only occur when there are 3 duplicated acks or a timeout). So if packet drop rate is high, it sounds more like a link quality issue or the network is really congested - like in the paper you mentioned, one of the internet router enforces drop-tail policy because buffer is full due to bursty traffic. Jiangjie (Becket) Qin On 3/25/15, 4:08 PM, "vlad...@gmail.com" <vlad...@gmail.com> wrote: >Hi Jianqjie, > >I only noticed a single TCP connection between a MM process to a single >broker. Is there something I could have done to open up more connections? > >TCP can actually cap before saturating the network, which is the reason >for >which it is hard to utilize a high bandwidth latency link with a single >TCP >connection. There is an equation that links the MSS, RTT and loss rate of >the link to the TCP achievable throughput. Notice that the link bandwidth >does not come into play, since the only way it can affect throughput is by >increasing the loss rate due to drops when the link is congested. On WAN >links, however, usually a single connection will cap (due to random losses >and high RTT), long before achieving the capacity of the link. Here is a >reference for this: >http://www.ece.virginia.edu/~mv/edu/715/lectures/TCP/padhye98modeling.pdf > >Regards, >Vlad > >On Wed, Mar 25, 2015 at 3:43 PM, Jiangjie Qin <j...@linkedin.com.invalid> >wrote: > >> Hi Vlad, >> >> I am not sure I understand the congestion window part. So TCP congestion >> control will only occur when you are saturating the network. If that is >> the case, bandwidth has already become the bottleneck. But we are >>talking >> about network under utilization, no? >> >> Another thing is that each fetcher thread has their own BlockingChannel >>to >> the broker, so they have dedicated TCP connections. Could you explain >>more >> on the Mux? >> >> Jiangjie (Becket) Qin >> >> On 3/25/15, 2:59 PM, "vlad...@gmail.com" <vlad...@gmail.com> wrote: >> >> >@Guozhang >> >We actually have separate topics depending on the source of the message >> >and >> >the multicast distribution group (the set of destinations). Our topics >>are >> >named: source_multicast-group. We do not aggregate data but we do >>static >> >routing based on the destination and the destination set (that is, we >>set >> >up a tree of mirrormakers to copy the topic from the original >>datacenter >> >to >> >the others). This gives us a static topology (no path failure >>resilience) >> >and limits the number of multicast groups (since each multicase group >> >needs >> >a different topic for every source), but for our data replication >>pattern >> >is a good match. It also helps that the order of writes in our system >>is >> >not important, so we do not need a single point of aggregation :) >> > >> >@Jun >> >The actual problem is the congestion window, I do not think that the we >> >are >> >suffering due to the transmit/receive socket buffers (we are using the >> >same >> >buffers over different links with similar RTT but different loss rates >>and >> >the TCP connection throughput varies a lot, this would not be the case >>if >> >the amount of in-flight data would be limited by buffer size). The >> >socket-level cwnd metrics also support our hypothesis and we also have >> >measured using iperf what a single connection can transport across a >>lossy >> >inter-DC link. Jianqie seems to be suggesting a different blocking >> >scenario, similar to head-of-line blocking because of other requests, >> >however increasing the number of fetchers will not necessarily help >>since >> >all fetchers will mux their request over a single TCP connection when >> >sending requests to a single broker. The TCP connection's congestion >> >window >> >will continue to be the limiting factor. I would say that the only way >>out >> >of this is to pool multiple TCP connections from a single consumer to a >> >broker. >> > >> >For identical mirroring, I thought that when asking for data between a >> >pair >> >of offsets the result should always be the same. Would it be possible >>to >> >produce also indicating the offsets where the data should go? >> > >> >Regards, >> >Vlad >> > >> >On Wed, Mar 25, 2015 at 10:21 AM, Jiangjie Qin >><j...@linkedin.com.invalid >> > >> >wrote: >> > >> >> Hi Jun, I think bumping up socket.receive.buffer.bytes will help but >> >>might >> >> not be sufficient. >> >> There are actually two related factors here: >> >> 1. Pipelining TCP packets when send a single request/response. >> >> 2. Pipelining multiple requests/responses >> >> Bumping up socket.receive.buffer.bytes help with the 1) but does not >> >>help >> >> with 2). >> >> >> >> For example, consider the following scenario. >> >> RTT = 100 ms >> >> Bandwidth = 1 Gbps(128 MBps). >> >> Request size = 10KB >> >> Response size = 1MB >> >> If we only have a single fetcher which is working in a blocking way. >>The >> >> max number of requests we can achieve is 10 requests/sec because its >> >> restricted by the RTT. In this case, bumping up socket buffer size >>will >> >> not help. I think this is the situation Vlad mentioned. >> >> >> >> One option might be increase num.consumer.fetchers, so we might have >> >>more >> >> fetcher thread for a since consumer instance (due to the >>implementation, >> >> num.consumer.fetchers actually means "at most >>num.consumer.fetchers²). >> >> >> >> One thing might worth considering is that can we enforce pipelining >>in >> >>new >> >> consumer like we do for new producer. Since we have correlation ID, >> >> reorder should be easily handled. I haven¹t got a chance to read the >>new >> >> consumer code, but I think it is worth doing if we haven¹t done so. >> >> >> >> Jiangjie (Becket) Qin >> >> >> >> On 3/25/15, 9:50 AM, "Jun Rao" <j...@confluent.io> wrote: >> >> >> >> >To amortize the long RTT across data centers, you can tune the TCP >> >>window >> >> >size by configuring a larger socket.receive.buffer.bytes in the >> >>consumer. >> >> > >> >> >For the last one, it seems that you want identical mirroring. The >> >>tricky >> >> >thing is to figure out how to avoid duplicates when there is a >> >>failure. We >> >> >had some related discussion in the context of transactional >>messaging ( >> >> > >> >> >> >> >> >>https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging >> >>+ >> >> >in+Kafka >> >> >). >> >> > >> >> >Thanks, >> >> > >> >> >Jun >> >> > >> >> >On Tue, Mar 24, 2015 at 11:44 AM, vlad...@gmail.com >><vlad...@gmail.com >> > >> >> >wrote: >> >> > >> >> >> Dear all, >> >> >> >> >> >> I had a short discussion with Jay yesterday at the ACM meetup and >>he >> >> >> suggested writing an email regarding a few possible MirrorMaker >> >> >> improvements. >> >> >> >> >> >> At Turn, we have been using MirrorMaker for a a few months now to >> >> >> asynchronously replicate our key/value store data between our >> >> >>datacenters. >> >> >> In a way, our system is similar to Linkedin's Databus, but it uses >> >>Kafka >> >> >> clusters and MirrorMaker as its building blocks. Our overall >>message >> >> >>rate >> >> >> peaks at about 650K/sec and, when pushing data over high bandwidth >> >>delay >> >> >> product links, we have found some minor bottlenecks. >> >> >> >> >> >> The MirrorMaker process uses a standard consumer to pull data >>from a >> >> >>remote >> >> >> datacenter. This implies that it opens a single TCP connection to >> >>each >> >> >>of >> >> >> the remote brokers and muxes requests for different topics and >> >> >>partitions >> >> >> over this connection. While this is a good thing in terms of >> >>maintaining >> >> >> the congestion window open, over long RTT lines with rather high >>loss >> >> >>rate >> >> >> the congestion window will cap, in our case at just a few Mbps. >>While >> >> >>the >> >> >> overall line bandwidth is much higher, this means that we have to >> >>start >> >> >> multiple MirrorMaker processes (somewhere in the hundreds), in >>order >> >>to >> >> >> completely use the line capacity. Being able to pool multiple TCP >> >> >> connections from a single consumer to a broker would solve this >> >> >> complication. >> >> >> >> >> >> The standard consumer also uses the remote ZooKeeper in order to >> >>manage >> >> >>the >> >> >> consumer group. While consumer group management is moving closer >>to >> >>the >> >> >> brokers, it might make sense to move the group management to the >> >>local >> >> >> datacenter, since that would avoid using the long-distance >>connection >> >> >>for >> >> >> this purpose. >> >> >> >> >> >> Another possible improvement assumes a further constraint, namely >> >>that >> >> >>the >> >> >> number of partitions for a topic in both datacenters is the same. >>In >> >>my >> >> >> opinion, this is a sane constraint, since it preserves the Kafka >> >> >>ordering >> >> >> guarantees (per partition), instead of a simple guarantee per key. >> >>This >> >> >> kind of guarantee can be for example useful in a system that >>compares >> >> >> partition contents to reach eventual consistency using Merkle >>trees. >> >>If >> >> >>the >> >> >> number of partitions is equal, then offsets have the same meaning >>for >> >> >>the >> >> >> same partition in both clusters, since the data for both >>partitions >> >>is >> >> >> identical before the offset. This allows a simple consumer to just >> >> >>inquire >> >> >> the local broker and the remote broker for their current offsets >> >>and, in >> >> >> case the remote broker is ahead, copy the extra data to the local >> >> >>cluster. >> >> >> Since the consumer offsets are no longer bound to the specific >> >> >>partitioning >> >> >> of a single remote cluster, the consumer could pull from one of >>any >> >> >>number >> >> >> of remote clusters, BitTorrent-style, if their offsets are ahead >>of >> >>the >> >> >> local offset. The group management problem would reduce to >>assigning >> >> >>local >> >> >> partitions to different MirrorMaker processes, so the group >> >>management >> >> >> could be done locally also in this situation. >> >> >> >> >> >> Regards, >> >> >> Vlad >> >> >> >> >> >> >> >> >>