> so as to avoid creating a perverse incentive for users to not adopt MM2?
Tom, I know there are several of us chomping at the bit to enable this in MM2, so I don't think that's a problem. AFAICT there won't be a separate KIP required -- we just need to enable this new option in MM2 by default. Ryanne On Fri, Mar 26, 2021 at 12:53 PM Tom Bentley <tbent...@redhat.com> wrote: > Hi, > > Thanks for the KIP. I've not yet read it in detail, so have no technical > points to make at this point. > > I'm having trouble reconciling KIP-720's deprecation of MM1 with this > proposal to add a new feature to MM1 and not to MM2. I think this would > create a confusing impression to users. Given that the change to MM2 is > supposed to be straightforward is there any reason why not to implement it > as part of this KIP, so as to avoid creating a perverse incentive for users > to not adopt MM2? > > Kind regards, > > Tom > > On Mon, Mar 22, 2021 at 6:21 PM Henry Cai <h...@pinterest.com.invalid> > wrote: > > > If there are no additional comments you’d start a vote in a couple of > days. > > > > On Sat, Feb 27, 2021 at 9:26 AM A S <asharma52...@gmail.com> wrote: > > > > > +1 to adding latency metrics. > > > > > > To add context on why CPU, memory and GC has a bigger impact than > network > > > in a Mirror for compressed topics without KIP-712 is: *a failing / > > > unstable mirror cluster will have lag perpetually spiking having much > > > larger impact on e2e latencies*. To explain a bit more: > > > > > > Less data moved: > > > Compressed topics "usually" should move less data over the network and > > are > > > useful to reduce the network cost / footprint of replication. > Therefore, > > > network usage may naturally be less than if this data were > uncompressed. > > > Instead the CPU usage bottleneck hits first due to decompression of > data. > > > Prior to KIP-712 we had never been able to operate a mirror at wire > > speed. > > > > > > Stability: > > > If there is a load spike, there can be a few scenarios played out: > > > - more data in a batch i.e. larger uncompressed size i.e. larger memory > > > footprint > > > - more number of batches i.e. larger memory footprint > > > > > > In either case higher memory usage and more CPU cycles are used due to > > > this. > > > If the GC throughput or heap size is insufficient leads to OOMs. > > > > > > Domino Effect: > > > Just like any Kafka Consumer, if a consumer instance in a consumer > group > > > terminates it triggers a rebalance. In this case that rebalance happens > > due > > > to an OOM. If a Mirror instance that fails due to an OOM triggered by > > > traffic load (explained above) will result in a domino effect of more > > > Mirror instances having OOMs as the load increases due to an even > smaller > > > number of running instances remaining in the group. Eventually leading > > to a > > > total failure of the mirror cluster. > > > > > > Memory Limits & Ineffective workarounds: > > > A question that could be asked couldn't we configure the Mirror > instance > > > in such a way that this doesn't happen? The answer is it's expensive > and > > > difficult. > > > Let's say we are using a 4 core host with X GBs of memory and configure > > > the Mirror to use 4 Streams and this configuration leads to an OOM, we > > > could try to reduce the number of Streams to 3 or 2. That's a 25-50% > loss > > > in efficiency i.e. we may now need 2x the number of nodes (& 2x cost) > > > without any guarantees that this configuration will never result in an > > OOM > > > (since future traffic characteristics are unpredictable) but it may > > reduce > > > the probability of an OOM. > > > > > > Summary: > > > Since the root cause is memory usage due to decompression of data in > > > flight, the ideal way to resolve this was to eliminate the > decompression > > of > > > data which isn't a hard requirement for the mirror to operate since it > > was > > > not performing any transformation or repartitioning in our case. > > > > > > Thanks, > > > - Ambud > > > > > > On Mon, Feb 22, 2021 at 9:20 AM Vahid Hashemian < > > vahid.hashem...@gmail.com> > > > wrote: > > > > > >> As Henry mentions in the KIP, we are seeing a great deal of > improvements > > >> and efficiency by using the mirroring enhancement proposed in this > KIP, > > >> and > > >> believe it would be equally beneficial to everyone that runs Kafka and > > >> Kafka Mirror at scale. > > >> > > >> I'm bumping up this thread in case there are additional feedback or > > >> comments. > > >> > > >> Thanks, > > >> --Vahid > > >> > > >> On Sat, Feb 13, 2021, 13:59 Ryanne Dolan <ryannedo...@gmail.com> > wrote: > > >> > > >> > Glad to hear that latency and thruput aren't negatively affected > > >> somehow. I > > >> > would love to see this KIP move forward. > > >> > > > >> > Ryanne > > >> > > > >> > On Sat, Feb 13, 2021, 3:00 PM Henry Cai <h...@pinterest.com> wrote: > > >> > > > >> > > Ryanne, > > >> > > > > >> > > Yes, network performance is also important. In our deployment, we > > are > > >> > > bottlenecked on the CPU/memory on the mirror hosts. We are using > > >> c5.2x > > >> > and > > >> > > m5.2x nodes in AWS, before the deployment, CPU would peak to 80% > but > > >> > there > > >> > > is enough network bandwidth left on those hosts. Having said > that, > > we > > >> > > maintain the same network throughput before and after the switch. > > >> > > > > >> > > On Fri, Feb 12, 2021 at 12:20 PM Ryanne Dolan < > > ryannedo...@gmail.com> > > >> > > wrote: > > >> > > > > >> > >> Hey Henry, great KIP. The performance improvements are > impressive! > > >> > >> However, often cpu, ram, gc are not the metrics most important > to a > > >> > >> replication pipeline -- often the network is mostly saturated > > >> anyway. Do > > >> > >> you know how this change affects latency or thruput? I suspect > less > > >> GC > > >> > >> pressure means slightly less p99 latency, but it would be great > to > > >> see > > >> > that > > >> > >> confirmed. I don't think it's necessary that this KIP improves > > these > > >> > >> metrics, but I think it's important to show that they at least > > aren't > > >> > made > > >> > >> worse. > > >> > >> > > >> > >> I suspect any improvement in MM1 would be magnified in MM2, given > > >> there > > >> > >> is a lot more machinery between consumer and producer in MM2. > > >> > >> > > >> > >> > > >> > >> I'd like to do some performance analysis based on these changes. > > >> Looking > > >> > >> forward to a PR! > > >> > >> > > >> > >> Ryanne > > >> > >> > > >> > >> On Wed, Feb 10, 2021, 3:50 PM Henry Cai <h...@pinterest.com> > > wrote: > > >> > >> > > >> > >>> On the question "whether shallow mirror is only applied on > mirror > > >> maker > > >> > >>> v1", the code change is mostly on consumer and producer code > path, > > >> the > > >> > >>> change to mirrormaker v1 is very trivial. We chose to modify > the > > >> > >>> consumer/producer path (instead of creating a new mirror > product) > > so > > >> > other > > >> > >>> use cases can use that feature as well. The change to mirror > > maker > > >> v2 > > >> > >>> should be straightforward as well but we don't have that > > >> environment in > > >> > >>> house. I think the community can easily port this change to > > mirror > > >> > maker > > >> > >>> v2. > > >> > >>> > > >> > >>> > > >> > >>> > > >> > >>> On Wed, Feb 10, 2021 at 12:58 PM Vahid Hashemian < > > >> > >>> vahid.hashem...@gmail.com> wrote: > > >> > >>> > > >> > >>>> Retitled the thread to conform to the common format. > > >> > >>>> > > >> > >>>> On Fri, Feb 5, 2021 at 4:00 PM Ning Zhang < > > ning2008w...@gmail.com> > > >> > >>>> wrote: > > >> > >>>> > > >> > >>>> > Hello Henry, > > >> > >>>> > > > >> > >>>> > This is a very interesting proposal. > > >> > >>>> > https://issues.apache.org/jira/browse/KAFKA-10728 reflects > the > > >> > >>>> similar > > >> > >>>> > concern of re-compressing data in mirror maker. > > >> > >>>> > > > >> > >>>> > Probably one thing may need to clarify is: how "shallow" > > >> mirroring > > >> > is > > >> > >>>> only > > >> > >>>> > applied to mirrormaker use case, if the changes need to be > made > > >> on > > >> > >>>> generic > > >> > >>>> > consumer and producer (e.g. by adding `fetch.raw.bytes` and > > >> > >>>> > `send.raw.bytes` to producer and consumer config) > > >> > >>>> > > > >> > >>>> > On 2021/02/05 00:59:57, Henry Cai <h...@pinterest.com.INVALID > > > > >> > wrote: > > >> > >>>> > > Dear Community members, > > >> > >>>> > > > > >> > >>>> > > We are proposing a new feature to improve the performance > of > > >> Kafka > > >> > >>>> mirror > > >> > >>>> > > maker: > > >> > >>>> > > > > >> > >>>> > > > >> > >>>> > > >> > > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-712%3A+Shallow+Mirroring > > >> > >>>> > > > > >> > >>>> > > The current Kafka MirrorMaker process (with the underlying > > >> > Consumer > > >> > >>>> and > > >> > >>>> > > Producer library) uses significant CPU cycles and memory to > > >> > >>>> > > decompress/recompress, deserialize/re-serialize messages > and > > >> copy > > >> > >>>> > multiple > > >> > >>>> > > times of messages bytes along the mirroring/replicating > > stages. > > >> > >>>> > > > > >> > >>>> > > The KIP proposes a *shallow mirror* feature which brings > back > > >> the > > >> > >>>> shallow > > >> > >>>> > > iterator concept to the mirror process and also proposes to > > >> skip > > >> > the > > >> > >>>> > > unnecessary message decompression and recompression steps. > > We > > >> > >>>> argue in > > >> > >>>> > > many cases users just want a simple replication pipeline to > > >> > >>>> replicate the > > >> > >>>> > > message as it is from the source cluster to the destination > > >> > >>>> cluster. In > > >> > >>>> > > many cases the messages in the source cluster are already > > >> > >>>> compressed and > > >> > >>>> > > properly batched, users just need an identical copy of the > > >> message > > >> > >>>> bytes > > >> > >>>> > > through the mirroring without any transformation or > > >> > repartitioning. > > >> > >>>> > > > > >> > >>>> > > We have a prototype implementation in house with > MirrorMaker > > v1 > > >> > and > > >> > >>>> > > observed *CPU usage dropped from 50% to 15%* for some > mirror > > >> > >>>> pipelines. > > >> > >>>> > > > > >> > >>>> > > We name this feature: *shallow mirroring* since it has some > > >> > >>>> resemblance > > >> > >>>> > to > > >> > >>>> > > the old Kafka 0.7 namesake feature but the implementations > > are > > >> not > > >> > >>>> quite > > >> > >>>> > > the same. ‘*Shallow*’ means 1. we *shallowly* iterate > > >> > RecordBatches > > >> > >>>> > inside > > >> > >>>> > > MemoryRecords structure instead of deep iterating records > > >> inside > > >> > >>>> > > RecordBatch; 2. We *shallowly* copy (share) pointers inside > > >> > >>>> ByteBuffer > > >> > >>>> > > instead of deep copying and deserializing bytes into > objects. > > >> > >>>> > > > > >> > >>>> > > Please share discussions/feedback along this email thread. > > >> > >>>> > > > > >> > >>>> > > > >> > >>>> > > >> > >>>> > > >> > >>>> -- > > >> > >>>> > > >> > >>>> Thanks! > > >> > >>>> --Vahid > > >> > >>>> > > >> > >>> > > >> > > > >> > > > > > >