Hi Rajini, RS14: Done. Missed that, sorry.
Thanks again. On Tue, May 19, 2026 at 7:28 PM Rajini Sivaram <[email protected]> wrote: > > Hi Federico, > > Thanks for the updates. Just one minor point, apart from that, looks good. > > RS14: KIP still shows "bin/kafka-configs.sh --bootstrap-server :9094 > --entity-type mirrors". > Will be good to change that to `--entity-type cluster-mirrors`. > > Thanks, > > Rajini > > > On Tue, May 19, 2026 at 5:28 PM vaquar khan <[email protected]> wrote: > > > Hi Federico and team, > > > > Thank you for your detailed response on May 11, 2026. I greatly appreciate > > the collaborative effort over the past few months to harden the KIP-1279 > > architecture. > > > > * 1. Resolved Architectural & Stability Vulnerabilities* > > > > I am pleased to confirm that the KIP has successfully integrated fixes for > > the critical vulnerabilities I flagged, thereby protecting the cluster's > > state machine, memory bounds, and control plane. Consider the following > > items fully resolved on my end: > > > > - > > > > *VK4 / VK8 (Negative PID Bug & State Machine Failure):* I previously > > identified that the proposed -(sourceProducerId + 2) PID rewriting > > formula would fundamentally break hasProducerId() in > > AbstractRecordBatch.java, causing the broker to bypass > > ProducerStateManager.update() and default the Last Stable Offset (LSO) > > to the High Watermark. I am glad to see the KIP authors abandoned PID > > mapping entirely and adopted the deterministic MIRROR_PID_RESET control > > record barrier. This perfectly protects Kafka's exactly-once semantics. > > > > - > > > > *VK11 (TransactionIndex Rebuild Ambiguity):* Thank you for officially > > confirming that the TransactionIndex is strictly rebuilt locally during > > log appends rather than copied byte-for-byte. This resolves my concern > > regarding PID mismatches inducing consumer read anomalies. > > - > > > > *VK1 (Thundering Herd / OOM Heap Allocation):* Your clarification that > > fetcher threads multiplex partitions meaning the memory footprint is > > strictly bounded by num_fetcher_threads * response_max_bytes rather than > > a concurrent 1MB buffer per partition fully resolves my concern > > regarding > > 50GB broker-wide OOM spikes during mass partition wake-ups. > > - > > > > *VK3 (Control Plane Hotspots):* I had flagged the severe risk of > > metadata saturation on a single broker during a "link flap" event. Your > > confirmation that the __mirror_state topic utilizes a compound hash > > of (mirrorName, > > topicId, partition number) mathematically ensures that the 50,000+ state > > transitions will be safely distributed across the cluster, neutralizing > > the > > single-node hotspot risk. > > > > 2. Outstanding Critical Blocker: VK12 (Offset Sync Data Loss) > > > > Regarding VK12, there is a fundamental misunderstanding in your previous > > reply. You stated: *"The scenario described can only occur if offsets are > > force-written to an active group, which the design prevents."* > > > > My concern has absolutely nothing to do with overwriting active groups. My > > concern applies strictly to the normal synchronization of inactive/dead > > groups, and is based directly on the race condition currently documented in > > the official KIP-1279 text: > > > > *"During offset synchronization, the committed offset in the destination > > cluster may temporarily exceed the current log end offset (LEO) of the > > mirror topic... consumers attempting to resume from offset 100 will receive > > an OffsetOutOfRangeException. To handle this gracefully, consumers should > > configure auto.offset.reset=latest..."* > > > > If a failover happens during this documented divergence window, a > > reconnecting consumer will hit the OffsetOutOfRangeException. If the > > downstream consumer follows the KIP's official advice and relies on > > auto.offset.reset=latest, the consumer will jump to the absolute newest > > offset on the partition. *This completely skips any newly produced records > > that arrived between the failover and the consumer reconnecting, resulting > > in silent, unrecoverable data loss for the downstream application.* > > *Proposed Resolution: Double-Clamped Offset Safety Invariant* Instead of > > requiring consumers to use auto.offset.reset=latest and endorsing a known > > data loss vector, I propose that the ClusterMirrorCoordinator enforce a > > pre-persist offset validation invariant during offset synchronization. > > Before any translated offset is committed to the destination cluster, the > > coordinator must apply a double-clamp: > > > > *syncedOffset = max(destinationLSO, min(sourceCommitted, destinationLEO))* > > > > Where destinationLSO is the Log Start Offset (earliest readable position > > post-retention) and destinationLEO is the Log End Offset (latest replicated > > position) of the destination partition. > > > > This guarantees that every persisted offset falls within the physically > > valid range eliminating both the OffsetOutOfRangeException caused by > > exceeding the LEO during replication lag, and the expired-offset rewind > > caused by falling below the LSO due to destination retention policies. > > > > The worst-case trade-off under this invariant is bounded re-processing > > proportional to the replication lag at failover time not total partition > > depth which is perfectly consistent with Kafka's documented at-least-once > > delivery guarantees. Broad enterprise production evidence from large-scale > > cross-cluster failovers confirms that state checks alone (preventing active > > group overwrites) are insufficient; strict offset bounds clamping is > > required to achieve enterprise-grade data integrity. > > > > I look forward to your thoughts on implementing this final offset capping > > logic. Once VK12 is patched, I believe this architecture will be > > exceptionally robust and ready for enterprise deployment. > > > > Best Regards, > > > > Viquar Khan > > > > > > On Mon, 18 May 2026 at 14:23, Rajini Sivaram <[email protected]> > > wrote: > > > > > Hi Federico, > > > > > > Thanks for the updates! The KIP is looking good. A few more small > > comments. > > > > > > > > > RS13: A couple of places still refer to `kafka-mirror.sh` like under > > > `Failover Process`. Can we change them to `*kafka-cluster-mirrors.sh*`? > > > > > > RS14: Should we change `--entity-type mirrors` for kafka-configs to be ` > > > --entity-type *cluster-mirrors*` to be consistent? Also, > > > CLUSTER_MIRROR((byte) > > > 64, "mirror"); could be `*cluster-mirror*`? > > > > > > RS15: It may be useful to rename `mirror.topic.num.partitions` and ` > > > mirror.topic.replication.factor` since they are very similar to ` > > > mirror.topic.properties.exclude`, but the `mirror.topic` prefix refers to > > > different topics (the internal topic for the first two and actual mirror > > > topics for the other one). > > > > > > RS16: ACL Sync: KIP says "Deletes ACLs that exist in destination but not > > in > > > source using DeleteAcls request." > > > What happens if someone creates an ACL on the destination to deny > > > User:Alice access to all topics? > > > > > > 1. If that ACL also existed on the source cluster and then it was > > > removed, will it get removed from the destination? > > > 2. If that ACL never existed on the source cluster, will it get > > removed > > > from the destination? > > > > > > RS17: The table in the Source ACLs section says: "DescribeClusterMirrors > > > MC Read Cluster Log truncation" > > > Should that be `ClusterMirror:Read` instead of `Cluster:Read`? > > > > > > Regards, > > > > > > Rajini > > > > > > > > > > > > On Fri, May 15, 2026 at 8:41 AM Federico Valeri <[email protected]> > > > wrote: > > > > > > > Hi Rajini, we finally addressed the API and tool naming refactoring as > > > > you suggested in RS6. Please take a look when you have time. Thanks. > > > > > > > > > > > > On Mon, May 11, 2026 at 6:17 PM Federico Valeri <[email protected]> > > > > wrote: > > > > > > > > > > Hello all, I want to highlight a couple of new paragraphs: > > > > > > > > > > 1. Leader Epoch Invariant: Cluster mirroring enforces the invariant > > > > > that the destination leader epoch must always be greater than or > > equal > > > > > to the source leader epoch (DLE>=SLE). Without this, consumers on the > > > > > destination cluster can get stuck in an infinite metadata refresh > > loop > > > > > when they encounter committed offsets carrying source epochs higher > > > > > than the local epoch. The invariant is maintained through three > > > > > mechanisms: reactive bumping (epoch fencing triggered when SLE > DLE > > > > > during fetch), proactive bumping (scheduled when SLE approaches DLE > > > > > within a threshold), and periodic bumping (checked during coordinator > > > > > metadata sync). > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406620973#KIP1279:ClusterMirroring-LeaderEpochInvariant > > > > > > > > > > 2. Group Offsets: The coordinator periodically syncs consumer and > > > > > share group offsets from the source cluster to the destination for > > all > > > > > mirrored topics. Groups are filtered by configurable include/exclude > > > > > patterns, and offsets are only synced for groups that are not > > > > > currently active on the destination cluster, preventing overwrites of > > > > > local consumer progress. Because source and destination share the > > same > > > > > topic offsets (no offset translation), synced offsets can be used > > > > > directly without mapping. > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406620973#KIP1279:ClusterMirroring-GroupOffsets > > > > > > > > > > These new paragraphs directly address some of your questions, but let > > > > > me list them here: > > > > > > > > > > JR2: Yes, we removed the incorrect phrase and added more details to > > > > > the paragraph. > > > > > > > > > > JR4: When source cluster topic has tiered storage enabled, CM works > > by > > > > > mirroring remote and local log into destination cluster. When > > > > > destination cluster topic has tiered storage enabled, CM fails in > > > > > PREPARING state because the LME may be in remote storage, but works > > > > > fine if already MIRRORING because no truncation is needed. > > > > > > > > > > JR11: See "Leader Epoch Invariant" paragraph mentioned above. > > > > > > > > > > JR13: We can't support stateful Streams application because > > > > > asynchronous replication cannot preserve the transactional boundaries > > > > > between input offset commits, state store mutations written to > > > > > changelog topics, and intermediate records written to repartition > > > > > topics. The synchronous extension of this design will be able to > > > > > support them. Existing Features Integration paragraph updated. > > > > > > > > > > JR18: See "Group Offsets" paragraph mentioned above. > > > > > > > > > > IY1: See "Group Offsets" paragraph mentioned above. > > > > > > > > > > Thanks > > > > > Fede > > > > > > > > > > On Mon, May 11, 2026 at 6:08 PM Federico Valeri < > > [email protected]> > > > > wrote: > > > > > > > > > > > > Hi Vaquar, > > > > > > > > > > > > VK4/VK8: We don't do PID mapping anymore. The KIP was updated some > > > > > > time ago with the new approach based on the new PID reset control > > > > > > record. > > > > > > > > > > > > VK11: The transaction index is always built locally during log > > > append, > > > > > > never copied. > > > > > > > > > > > > VK1: The 50,000 * 1MB = 50GB calculation misunderstands the fetch > > > > > > model. Fetcher threads don't allocate one buffer per partition. > > > Actual > > > > > > peak memory is roughly num_fetcher_threads * response_max_bytes, > > not > > > > > > num_partitions * partition_max_bytes. With 1 fetcher thread and the > > > > > > default response max, the memory footprint is modest regardless of > > > > > > partition count. We are leveraging the same proven pattern used by > > > the > > > > > > internal replication. > > > > > > > > > > > > VK3: The __mirror_state topic uses hash-based partitioning based on > > > > > > mirrorName, topicId and partition number. With the production > > default > > > > > > of 50 partitions, 50,000 partition transitions distribute across > > ~50 > > > > > > partition leaders on different brokers, not a single broker. This > > is > > > > > > the same proven pattern as __consumer_offsets, which handles > > millions > > > > > > of commits. > > > > > > > > > > > > VK12: The scenario described can only occur if offsets are > > > > > > force-written to an active group, which the design prevents. > > > > > > > > > > > > Cheers > > > > > > Fede > > > > > > > > >
