Fwd: MirrorMaker improvements

2015-03-24 Thread vlad...@gmail.com
Dear all,

I had a short discussion with Jay yesterday at the ACM meetup and he
suggested writing an email regarding a few possible MirrorMaker
improvements.

At Turn, we have been using MirrorMaker for a a few months now to
asynchronously replicate our key/value store data between our datacenters.
In a way, our system is similar to Linkedin's Databus, but it uses Kafka
clusters and MirrorMaker as its building blocks. Our overall message rate
peaks at about 650K/sec and, when pushing data over high bandwidth delay
product links, we have found some minor bottlenecks.

The MirrorMaker process uses a standard consumer to pull data from a remote
datacenter. This implies that it opens a single TCP connection to each of
the remote brokers and muxes requests for different topics and partitions
over this connection. While this is a good thing in terms of maintaining
the congestion window open, over long RTT lines with rather high loss rate
the congestion window will cap, in our case at just a few Mbps. While the
overall line bandwidth is much higher, this means that we have to start
multiple MirrorMaker processes (somewhere in the hundreds), in order to
completely use the line capacity. Being able to pool multiple TCP
connections from a single consumer to a broker would solve this
complication.

The standard consumer also uses the remote ZooKeeper in order to manage the
consumer group. While consumer group management is moving closer to the
brokers, it might make sense to move the group management to the local
datacenter, since that would avoid using the long-distance connection for
this purpose.

Another possible improvement assumes a further constraint, namely that the
number of partitions for a topic in both datacenters is the same. In my
opinion, this is a sane constraint, since it preserves the Kafka ordering
guarantees (per partition), instead of a simple guarantee per key. This
kind of guarantee can be for example useful in a system that compares
partition contents to reach eventual consistency using Merkle trees. If the
number of partitions is equal, then offsets have the same meaning for the
same partition in both clusters, since the data for both partitions is
identical before the offset. This allows a simple consumer to just inquire
the local broker and the remote broker for their current offsets and, in
case the remote broker is ahead, copy the extra data to the local cluster.
Since the consumer offsets are no longer bound to the specific partitioning
of a single remote cluster, the consumer could pull from one of any number
of remote clusters, BitTorrent-style, if their offsets are ahead of the
local offset. The group management problem would reduce to assigning local
partitions to different MirrorMaker processes, so the group management
could be done locally also in this situation.

Regards,
Vlad

PS: Sorry if this is a double posting! The original posting did not appear
in the archives for a while.


MirrorMaker improvements

2015-03-24 Thread vlad...@gmail.com
Dear all,

I had a short discussion with Jay yesterday at the ACM meetup and he
suggested writing an email regarding a few possible MirrorMaker
improvements.

At Turn, we have been using MirrorMaker for a a few months now to
asynchronously replicate our key/value store data between our datacenters.
In a way, our system is similar to Linkedin's Databus, but it uses Kafka
clusters and MirrorMaker as its building blocks. Our overall message rate
peaks at about 650K/sec and, when pushing data over high bandwidth delay
product links, we have found some minor bottlenecks.

The MirrorMaker process uses a standard consumer to pull data from a remote
datacenter. This implies that it opens a single TCP connection to each of
the remote brokers and muxes requests for different topics and partitions
over this connection. While this is a good thing in terms of maintaining
the congestion window open, over long RTT lines with rather high loss rate
the congestion window will cap, in our case at just a few Mbps. While the
overall line bandwidth is much higher, this means that we have to start
multiple MirrorMaker processes (somewhere in the hundreds), in order to
completely use the line capacity. Being able to pool multiple TCP
connections from a single consumer to a broker would solve this
complication.

The standard consumer also uses the remote ZooKeeper in order to manage the
consumer group. While consumer group management is moving closer to the
brokers, it might make sense to move the group management to the local
datacenter, since that would avoid using the long-distance connection for
this purpose.

Another possible improvement assumes a further constraint, namely that the
number of partitions for a topic in both datacenters is the same. In my
opinion, this is a sane constraint, since it preserves the Kafka ordering
guarantees (per partition), instead of a simple guarantee per key. This
kind of guarantee can be for example useful in a system that compares
partition contents to reach eventual consistency using Merkle trees. If the
number of partitions is equal, then offsets have the same meaning for the
same partition in both clusters, since the data for both partitions is
identical before the offset. This allows a simple consumer to just inquire
the local broker and the remote broker for their current offsets and, in
case the remote broker is ahead, copy the extra data to the local cluster.
Since the consumer offsets are no longer bound to the specific partitioning
of a single remote cluster, the consumer could pull from one of any number
of remote clusters, BitTorrent-style, if their offsets are ahead of the
local offset. The group management problem would reduce to assigning local
partitions to different MirrorMaker processes, so the group management
could be done locally also in this situation.

Regards,
Vlad


Re: MirrorMaker improvements

2015-03-25 Thread vlad...@gmail.com
@Guozhang
We actually have separate topics depending on the source of the message and
the multicast distribution group (the set of destinations). Our topics are
named: source_multicast-group. We do not aggregate data but we do static
routing based on the destination and the destination set (that is, we set
up a tree of mirrormakers to copy the topic from the original datacenter to
the others). This gives us a static topology (no path failure resilience)
and limits the number of multicast groups (since each multicase group needs
a different topic for every source), but for our data replication pattern
is a good match. It also helps that the order of writes in our system is
not important, so we do not need a single point of aggregation :)

@Jun
The actual problem is the congestion window, I do not think that the we are
suffering due to the transmit/receive socket buffers (we are using the same
buffers over different links with similar RTT but different loss rates and
the TCP connection throughput varies a lot, this would not be the case if
the amount of in-flight data would be limited by buffer size). The
socket-level cwnd metrics also support our hypothesis and we also have
measured using iperf what a single connection can transport across a lossy
inter-DC link. Jianqie seems to be suggesting a different blocking
scenario, similar to head-of-line blocking because of other requests,
however increasing the number of fetchers will not necessarily help since
all fetchers will mux their request over a single TCP connection when
sending requests to a single broker. The TCP connection's congestion window
will continue to be the limiting factor. I would say that the only way out
of this is to pool multiple TCP connections from a single consumer to a
broker.

For identical mirroring, I thought that when asking for data between a pair
of offsets the result should always be the same. Would it be possible to
produce also indicating the offsets where the data should go?

Regards,
Vlad

On Wed, Mar 25, 2015 at 10:21 AM, Jiangjie Qin 
wrote:

> Hi Jun, I think bumping up socket.receive.buffer.bytes will help but might
> not be sufficient.
> There are actually two related factors here:
> 1. Pipelining TCP packets when send a single request/response.
> 2. Pipelining multiple requests/responses
> Bumping up socket.receive.buffer.bytes help with the 1) but does not help
> with 2).
>
> For example, consider the following scenario.
> RTT = 100 ms
> Bandwidth = 1 Gbps(128 MBps).
> Request size = 10KB
> Response size = 1MB
> If we only have a single fetcher which is working in a blocking way. The
> max number of requests we can achieve is 10 requests/sec because its
> restricted by the RTT. In this case, bumping up socket buffer size will
> not help. I think this is the situation Vlad mentioned.
>
> One option might be increase num.consumer.fetchers, so we might have more
> fetcher thread for a since consumer instance (due to the implementation,
> num.consumer.fetchers actually means "at most num.consumer.fetchers²).
>
> One thing might worth considering is that can we enforce pipelining in new
> consumer like we do for new producer. Since we have correlation ID,
> reorder should be easily handled. I haven¹t got a chance to read the new
> consumer code, but I think it is worth doing if we haven¹t done so.
>
> Jiangjie (Becket) Qin
>
> On 3/25/15, 9:50 AM, "Jun Rao"  wrote:
>
> >To amortize the long RTT across data centers, you can tune the TCP window
> >size by configuring a larger socket.receive.buffer.bytes in the consumer.
> >
> >For the last one, it seems that you want identical mirroring. The tricky
> >thing is to figure out how to avoid duplicates when there is a failure. We
> >had some related discussion in the context of transactional messaging (
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+
> >in+Kafka
> >).
> >
> >Thanks,
> >
> >Jun
> >
> >On Tue, Mar 24, 2015 at 11:44 AM, vlad...@gmail.com 
> >wrote:
> >
> >> Dear all,
> >>
> >> I had a short discussion with Jay yesterday at the ACM meetup and he
> >> suggested writing an email regarding a few possible MirrorMaker
> >> improvements.
> >>
> >> At Turn, we have been using MirrorMaker for a a few months now to
> >> asynchronously replicate our key/value store data between our
> >>datacenters.
> >> In a way, our system is similar to Linkedin's Databus, but it uses Kafka
> >> clusters and MirrorMaker as its building blocks. Our overall message
> >>rate
> >> peaks at about 650K/sec and, when pushing data over high bandwidth delay
> >> product links, we have found some minor bottlenecks.
>

Re: MirrorMaker improvements

2015-03-25 Thread vlad...@gmail.com
Hi Jianqjie,

I only noticed a single TCP connection between a MM process to a single
broker. Is there something I could have done to open up more connections?

TCP can actually cap before saturating the network, which is the reason for
which it is hard to utilize a high bandwidth latency link with a single TCP
connection. There is an equation that links the MSS, RTT and loss rate of
the link to the TCP achievable throughput. Notice that the link bandwidth
does not come into play, since the only way it can affect throughput is by
increasing the loss rate due to drops when the link is congested. On WAN
links, however, usually a single connection will cap (due to random losses
and high RTT), long before achieving the capacity of the link. Here is a
reference for this:
http://www.ece.virginia.edu/~mv/edu/715/lectures/TCP/padhye98modeling.pdf

Regards,
Vlad

On Wed, Mar 25, 2015 at 3:43 PM, Jiangjie Qin 
wrote:

> Hi Vlad,
>
> I am not sure I understand the congestion window part. So TCP congestion
> control will only occur when you are saturating the network. If that is
> the case, bandwidth has already become the bottleneck. But we are talking
> about network under utilization, no?
>
> Another thing is that each fetcher thread has their own BlockingChannel to
> the broker, so they have dedicated TCP connections. Could you explain more
> on the Mux?
>
> Jiangjie (Becket) Qin
>
> On 3/25/15, 2:59 PM, "vlad...@gmail.com"  wrote:
>
> >@Guozhang
> >We actually have separate topics depending on the source of the message
> >and
> >the multicast distribution group (the set of destinations). Our topics are
> >named: source_multicast-group. We do not aggregate data but we do static
> >routing based on the destination and the destination set (that is, we set
> >up a tree of mirrormakers to copy the topic from the original datacenter
> >to
> >the others). This gives us a static topology (no path failure resilience)
> >and limits the number of multicast groups (since each multicase group
> >needs
> >a different topic for every source), but for our data replication pattern
> >is a good match. It also helps that the order of writes in our system is
> >not important, so we do not need a single point of aggregation :)
> >
> >@Jun
> >The actual problem is the congestion window, I do not think that the we
> >are
> >suffering due to the transmit/receive socket buffers (we are using the
> >same
> >buffers over different links with similar RTT but different loss rates and
> >the TCP connection throughput varies a lot, this would not be the case if
> >the amount of in-flight data would be limited by buffer size). The
> >socket-level cwnd metrics also support our hypothesis and we also have
> >measured using iperf what a single connection can transport across a lossy
> >inter-DC link. Jianqie seems to be suggesting a different blocking
> >scenario, similar to head-of-line blocking because of other requests,
> >however increasing the number of fetchers will not necessarily help since
> >all fetchers will mux their request over a single TCP connection when
> >sending requests to a single broker. The TCP connection's congestion
> >window
> >will continue to be the limiting factor. I would say that the only way out
> >of this is to pool multiple TCP connections from a single consumer to a
> >broker.
> >
> >For identical mirroring, I thought that when asking for data between a
> >pair
> >of offsets the result should always be the same. Would it be possible to
> >produce also indicating the offsets where the data should go?
> >
> >Regards,
> >Vlad
> >
> >On Wed, Mar 25, 2015 at 10:21 AM, Jiangjie Qin  >
> >wrote:
> >
> >> Hi Jun, I think bumping up socket.receive.buffer.bytes will help but
> >>might
> >> not be sufficient.
> >> There are actually two related factors here:
> >> 1. Pipelining TCP packets when send a single request/response.
> >> 2. Pipelining multiple requests/responses
> >> Bumping up socket.receive.buffer.bytes help with the 1) but does not
> >>help
> >> with 2).
> >>
> >> For example, consider the following scenario.
> >> RTT = 100 ms
> >> Bandwidth = 1 Gbps(128 MBps).
> >> Request size = 10KB
> >> Response size = 1MB
> >> If we only have a single fetcher which is working in a blocking way. The
> >> max number of requests we can achieve is 10 requests/sec because its
> >> restricted by the RTT. In this case, bumping up socket buffer size will
> >> not help. I think this is the situation Vlad mentioned.
> >>
&