Fwd: MirrorMaker improvements
Dear all, I had a short discussion with Jay yesterday at the ACM meetup and he suggested writing an email regarding a few possible MirrorMaker improvements. At Turn, we have been using MirrorMaker for a a few months now to asynchronously replicate our key/value store data between our datacenters. In a way, our system is similar to Linkedin's Databus, but it uses Kafka clusters and MirrorMaker as its building blocks. Our overall message rate peaks at about 650K/sec and, when pushing data over high bandwidth delay product links, we have found some minor bottlenecks. The MirrorMaker process uses a standard consumer to pull data from a remote datacenter. This implies that it opens a single TCP connection to each of the remote brokers and muxes requests for different topics and partitions over this connection. While this is a good thing in terms of maintaining the congestion window open, over long RTT lines with rather high loss rate the congestion window will cap, in our case at just a few Mbps. While the overall line bandwidth is much higher, this means that we have to start multiple MirrorMaker processes (somewhere in the hundreds), in order to completely use the line capacity. Being able to pool multiple TCP connections from a single consumer to a broker would solve this complication. The standard consumer also uses the remote ZooKeeper in order to manage the consumer group. While consumer group management is moving closer to the brokers, it might make sense to move the group management to the local datacenter, since that would avoid using the long-distance connection for this purpose. Another possible improvement assumes a further constraint, namely that the number of partitions for a topic in both datacenters is the same. In my opinion, this is a sane constraint, since it preserves the Kafka ordering guarantees (per partition), instead of a simple guarantee per key. This kind of guarantee can be for example useful in a system that compares partition contents to reach eventual consistency using Merkle trees. If the number of partitions is equal, then offsets have the same meaning for the same partition in both clusters, since the data for both partitions is identical before the offset. This allows a simple consumer to just inquire the local broker and the remote broker for their current offsets and, in case the remote broker is ahead, copy the extra data to the local cluster. Since the consumer offsets are no longer bound to the specific partitioning of a single remote cluster, the consumer could pull from one of any number of remote clusters, BitTorrent-style, if their offsets are ahead of the local offset. The group management problem would reduce to assigning local partitions to different MirrorMaker processes, so the group management could be done locally also in this situation. Regards, Vlad PS: Sorry if this is a double posting! The original posting did not appear in the archives for a while.
MirrorMaker improvements
Dear all, I had a short discussion with Jay yesterday at the ACM meetup and he suggested writing an email regarding a few possible MirrorMaker improvements. At Turn, we have been using MirrorMaker for a a few months now to asynchronously replicate our key/value store data between our datacenters. In a way, our system is similar to Linkedin's Databus, but it uses Kafka clusters and MirrorMaker as its building blocks. Our overall message rate peaks at about 650K/sec and, when pushing data over high bandwidth delay product links, we have found some minor bottlenecks. The MirrorMaker process uses a standard consumer to pull data from a remote datacenter. This implies that it opens a single TCP connection to each of the remote brokers and muxes requests for different topics and partitions over this connection. While this is a good thing in terms of maintaining the congestion window open, over long RTT lines with rather high loss rate the congestion window will cap, in our case at just a few Mbps. While the overall line bandwidth is much higher, this means that we have to start multiple MirrorMaker processes (somewhere in the hundreds), in order to completely use the line capacity. Being able to pool multiple TCP connections from a single consumer to a broker would solve this complication. The standard consumer also uses the remote ZooKeeper in order to manage the consumer group. While consumer group management is moving closer to the brokers, it might make sense to move the group management to the local datacenter, since that would avoid using the long-distance connection for this purpose. Another possible improvement assumes a further constraint, namely that the number of partitions for a topic in both datacenters is the same. In my opinion, this is a sane constraint, since it preserves the Kafka ordering guarantees (per partition), instead of a simple guarantee per key. This kind of guarantee can be for example useful in a system that compares partition contents to reach eventual consistency using Merkle trees. If the number of partitions is equal, then offsets have the same meaning for the same partition in both clusters, since the data for both partitions is identical before the offset. This allows a simple consumer to just inquire the local broker and the remote broker for their current offsets and, in case the remote broker is ahead, copy the extra data to the local cluster. Since the consumer offsets are no longer bound to the specific partitioning of a single remote cluster, the consumer could pull from one of any number of remote clusters, BitTorrent-style, if their offsets are ahead of the local offset. The group management problem would reduce to assigning local partitions to different MirrorMaker processes, so the group management could be done locally also in this situation. Regards, Vlad
Re: MirrorMaker improvements
@Guozhang We actually have separate topics depending on the source of the message and the multicast distribution group (the set of destinations). Our topics are named: source_multicast-group. We do not aggregate data but we do static routing based on the destination and the destination set (that is, we set up a tree of mirrormakers to copy the topic from the original datacenter to the others). This gives us a static topology (no path failure resilience) and limits the number of multicast groups (since each multicase group needs a different topic for every source), but for our data replication pattern is a good match. It also helps that the order of writes in our system is not important, so we do not need a single point of aggregation :) @Jun The actual problem is the congestion window, I do not think that the we are suffering due to the transmit/receive socket buffers (we are using the same buffers over different links with similar RTT but different loss rates and the TCP connection throughput varies a lot, this would not be the case if the amount of in-flight data would be limited by buffer size). The socket-level cwnd metrics also support our hypothesis and we also have measured using iperf what a single connection can transport across a lossy inter-DC link. Jianqie seems to be suggesting a different blocking scenario, similar to head-of-line blocking because of other requests, however increasing the number of fetchers will not necessarily help since all fetchers will mux their request over a single TCP connection when sending requests to a single broker. The TCP connection's congestion window will continue to be the limiting factor. I would say that the only way out of this is to pool multiple TCP connections from a single consumer to a broker. For identical mirroring, I thought that when asking for data between a pair of offsets the result should always be the same. Would it be possible to produce also indicating the offsets where the data should go? Regards, Vlad On Wed, Mar 25, 2015 at 10:21 AM, Jiangjie Qin wrote: > Hi Jun, I think bumping up socket.receive.buffer.bytes will help but might > not be sufficient. > There are actually two related factors here: > 1. Pipelining TCP packets when send a single request/response. > 2. Pipelining multiple requests/responses > Bumping up socket.receive.buffer.bytes help with the 1) but does not help > with 2). > > For example, consider the following scenario. > RTT = 100 ms > Bandwidth = 1 Gbps(128 MBps). > Request size = 10KB > Response size = 1MB > If we only have a single fetcher which is working in a blocking way. The > max number of requests we can achieve is 10 requests/sec because its > restricted by the RTT. In this case, bumping up socket buffer size will > not help. I think this is the situation Vlad mentioned. > > One option might be increase num.consumer.fetchers, so we might have more > fetcher thread for a since consumer instance (due to the implementation, > num.consumer.fetchers actually means "at most num.consumer.fetchers²). > > One thing might worth considering is that can we enforce pipelining in new > consumer like we do for new producer. Since we have correlation ID, > reorder should be easily handled. I haven¹t got a chance to read the new > consumer code, but I think it is worth doing if we haven¹t done so. > > Jiangjie (Becket) Qin > > On 3/25/15, 9:50 AM, "Jun Rao" wrote: > > >To amortize the long RTT across data centers, you can tune the TCP window > >size by configuring a larger socket.receive.buffer.bytes in the consumer. > > > >For the last one, it seems that you want identical mirroring. The tricky > >thing is to figure out how to avoid duplicates when there is a failure. We > >had some related discussion in the context of transactional messaging ( > > > https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+ > >in+Kafka > >). > > > >Thanks, > > > >Jun > > > >On Tue, Mar 24, 2015 at 11:44 AM, vlad...@gmail.com > >wrote: > > > >> Dear all, > >> > >> I had a short discussion with Jay yesterday at the ACM meetup and he > >> suggested writing an email regarding a few possible MirrorMaker > >> improvements. > >> > >> At Turn, we have been using MirrorMaker for a a few months now to > >> asynchronously replicate our key/value store data between our > >>datacenters. > >> In a way, our system is similar to Linkedin's Databus, but it uses Kafka > >> clusters and MirrorMaker as its building blocks. Our overall message > >>rate > >> peaks at about 650K/sec and, when pushing data over high bandwidth delay > >> product links, we have found some minor bottlenecks. >
Re: MirrorMaker improvements
Hi Jianqjie, I only noticed a single TCP connection between a MM process to a single broker. Is there something I could have done to open up more connections? TCP can actually cap before saturating the network, which is the reason for which it is hard to utilize a high bandwidth latency link with a single TCP connection. There is an equation that links the MSS, RTT and loss rate of the link to the TCP achievable throughput. Notice that the link bandwidth does not come into play, since the only way it can affect throughput is by increasing the loss rate due to drops when the link is congested. On WAN links, however, usually a single connection will cap (due to random losses and high RTT), long before achieving the capacity of the link. Here is a reference for this: http://www.ece.virginia.edu/~mv/edu/715/lectures/TCP/padhye98modeling.pdf Regards, Vlad On Wed, Mar 25, 2015 at 3:43 PM, Jiangjie Qin wrote: > Hi Vlad, > > I am not sure I understand the congestion window part. So TCP congestion > control will only occur when you are saturating the network. If that is > the case, bandwidth has already become the bottleneck. But we are talking > about network under utilization, no? > > Another thing is that each fetcher thread has their own BlockingChannel to > the broker, so they have dedicated TCP connections. Could you explain more > on the Mux? > > Jiangjie (Becket) Qin > > On 3/25/15, 2:59 PM, "vlad...@gmail.com" wrote: > > >@Guozhang > >We actually have separate topics depending on the source of the message > >and > >the multicast distribution group (the set of destinations). Our topics are > >named: source_multicast-group. We do not aggregate data but we do static > >routing based on the destination and the destination set (that is, we set > >up a tree of mirrormakers to copy the topic from the original datacenter > >to > >the others). This gives us a static topology (no path failure resilience) > >and limits the number of multicast groups (since each multicase group > >needs > >a different topic for every source), but for our data replication pattern > >is a good match. It also helps that the order of writes in our system is > >not important, so we do not need a single point of aggregation :) > > > >@Jun > >The actual problem is the congestion window, I do not think that the we > >are > >suffering due to the transmit/receive socket buffers (we are using the > >same > >buffers over different links with similar RTT but different loss rates and > >the TCP connection throughput varies a lot, this would not be the case if > >the amount of in-flight data would be limited by buffer size). The > >socket-level cwnd metrics also support our hypothesis and we also have > >measured using iperf what a single connection can transport across a lossy > >inter-DC link. Jianqie seems to be suggesting a different blocking > >scenario, similar to head-of-line blocking because of other requests, > >however increasing the number of fetchers will not necessarily help since > >all fetchers will mux their request over a single TCP connection when > >sending requests to a single broker. The TCP connection's congestion > >window > >will continue to be the limiting factor. I would say that the only way out > >of this is to pool multiple TCP connections from a single consumer to a > >broker. > > > >For identical mirroring, I thought that when asking for data between a > >pair > >of offsets the result should always be the same. Would it be possible to > >produce also indicating the offsets where the data should go? > > > >Regards, > >Vlad > > > >On Wed, Mar 25, 2015 at 10:21 AM, Jiangjie Qin > > >wrote: > > > >> Hi Jun, I think bumping up socket.receive.buffer.bytes will help but > >>might > >> not be sufficient. > >> There are actually two related factors here: > >> 1. Pipelining TCP packets when send a single request/response. > >> 2. Pipelining multiple requests/responses > >> Bumping up socket.receive.buffer.bytes help with the 1) but does not > >>help > >> with 2). > >> > >> For example, consider the following scenario. > >> RTT = 100 ms > >> Bandwidth = 1 Gbps(128 MBps). > >> Request size = 10KB > >> Response size = 1MB > >> If we only have a single fetcher which is working in a blocking way. The > >> max number of requests we can achieve is 10 requests/sec because its > >> restricted by the RTT. In this case, bumping up socket buffer size will > >> not help. I think this is the situation Vlad mentioned. > >> &