Hi Jun, I think bumping up socket.receive.buffer.bytes will help but might
not be sufficient.
There are actually two related factors here:
1. Pipelining TCP packets when send a single request/response.
2. Pipelining multiple requests/responses
Bumping up socket.receive.buffer.bytes help with the 1) but does not help
with 2).

For example, consider the following scenario.
RTT = 100 ms
Bandwidth = 1 Gbps(128 MBps).
Request size = 10KB
Response size = 1MB
If we only have a single fetcher which is working in a blocking way. The
max number of requests we can achieve is 10 requests/sec because its
restricted by the RTT. In this case, bumping up socket buffer size will
not help. I think this is the situation Vlad mentioned.

One option might be increase num.consumer.fetchers, so we might have more
fetcher thread for a since consumer instance (due to the implementation,
num.consumer.fetchers actually means "at most num.consumer.fetchers²).

One thing might worth considering is that can we enforce pipelining in new
consumer like we do for new producer. Since we have correlation ID,
reorder should be easily handled. I haven¹t got a chance to read the new
consumer code, but I think it is worth doing if we haven¹t done so.

Jiangjie (Becket) Qin

On 3/25/15, 9:50 AM, "Jun Rao" <j...@confluent.io> wrote:

>To amortize the long RTT across data centers, you can tune the TCP window
>size by configuring a larger socket.receive.buffer.bytes in the consumer.
>
>For the last one, it seems that you want identical mirroring. The tricky
>thing is to figure out how to avoid duplicates when there is a failure. We
>had some related discussion in the context of transactional messaging (
>https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+
>in+Kafka
>).
>
>Thanks,
>
>Jun
>
>On Tue, Mar 24, 2015 at 11:44 AM, vlad...@gmail.com <vlad...@gmail.com>
>wrote:
>
>> Dear all,
>>
>> I had a short discussion with Jay yesterday at the ACM meetup and he
>> suggested writing an email regarding a few possible MirrorMaker
>> improvements.
>>
>> At Turn, we have been using MirrorMaker for a a few months now to
>> asynchronously replicate our key/value store data between our
>>datacenters.
>> In a way, our system is similar to Linkedin's Databus, but it uses Kafka
>> clusters and MirrorMaker as its building blocks. Our overall message
>>rate
>> peaks at about 650K/sec and, when pushing data over high bandwidth delay
>> product links, we have found some minor bottlenecks.
>>
>> The MirrorMaker process uses a standard consumer to pull data from a
>>remote
>> datacenter. This implies that it opens a single TCP connection to each
>>of
>> the remote brokers and muxes requests for different topics and
>>partitions
>> over this connection. While this is a good thing in terms of maintaining
>> the congestion window open, over long RTT lines with rather high loss
>>rate
>> the congestion window will cap, in our case at just a few Mbps. While
>>the
>> overall line bandwidth is much higher, this means that we have to start
>> multiple MirrorMaker processes (somewhere in the hundreds), in order to
>> completely use the line capacity. Being able to pool multiple TCP
>> connections from a single consumer to a broker would solve this
>> complication.
>>
>> The standard consumer also uses the remote ZooKeeper in order to manage
>>the
>> consumer group. While consumer group management is moving closer to the
>> brokers, it might make sense to move the group management to the local
>> datacenter, since that would avoid using the long-distance connection
>>for
>> this purpose.
>>
>> Another possible improvement assumes a further constraint, namely that
>>the
>> number of partitions for a topic in both datacenters is the same. In my
>> opinion, this is a sane constraint, since it preserves the Kafka
>>ordering
>> guarantees (per partition), instead of a simple guarantee per key. This
>> kind of guarantee can be for example useful in a system that compares
>> partition contents to reach eventual consistency using Merkle trees. If
>>the
>> number of partitions is equal, then offsets have the same meaning for
>>the
>> same partition in both clusters, since the data for both partitions is
>> identical before the offset. This allows a simple consumer to just
>>inquire
>> the local broker and the remote broker for their current offsets and, in
>> case the remote broker is ahead, copy the extra data to the local
>>cluster.
>> Since the consumer offsets are no longer bound to the specific
>>partitioning
>> of a single remote cluster, the consumer could pull from one of any
>>number
>> of remote clusters, BitTorrent-style, if their offsets are ahead of the
>> local offset. The group management problem would reduce to assigning
>>local
>> partitions to different MirrorMaker processes, so the group management
>> could be done locally also in this situation.
>>
>> Regards,
>> Vlad
>>

Reply via email to