Hi Rajini,

RS14: Done. Missed that, sorry.

Thanks again.

On Tue, May 19, 2026 at 7:28 PM Rajini Sivaram <[email protected]> wrote:
>
> Hi Federico,
>
> Thanks for the updates. Just one minor point, apart from that, looks good.
>
> RS14: KIP still shows "bin/kafka-configs.sh --bootstrap-server :9094
> --entity-type mirrors".
> Will be good to change that to `--entity-type cluster-mirrors`.
>
> Thanks,
>
> Rajini
>
>
> On Tue, May 19, 2026 at 5:28 PM vaquar khan <[email protected]> wrote:
>
> > Hi Federico and team,
> >
> > Thank you for your detailed response on May 11, 2026. I greatly appreciate
> > the collaborative effort over the past few months to harden the KIP-1279
> > architecture.
> >
> > * 1. Resolved Architectural & Stability Vulnerabilities*
> >
> > I am pleased to confirm that the KIP has successfully integrated fixes for
> > the critical vulnerabilities I flagged, thereby protecting the cluster's
> > state machine, memory bounds, and control plane. Consider the following
> > items fully resolved on my end:
> >
> >    -
> >
> >    *VK4 / VK8 (Negative PID Bug & State Machine Failure):* I previously
> >    identified that the proposed -(sourceProducerId + 2) PID rewriting
> >    formula would fundamentally break hasProducerId() in
> >    AbstractRecordBatch.java, causing the broker to bypass
> >    ProducerStateManager.update() and default the Last Stable Offset (LSO)
> >    to the High Watermark. I am glad to see the KIP authors abandoned PID
> >    mapping entirely and adopted the deterministic MIRROR_PID_RESET control
> >    record barrier. This perfectly protects Kafka's exactly-once semantics.
> >
> >    -
> >
> >    *VK11 (TransactionIndex Rebuild Ambiguity):* Thank you for officially
> >    confirming that the TransactionIndex is strictly rebuilt locally during
> >    log appends rather than copied byte-for-byte. This resolves my concern
> >    regarding PID mismatches inducing consumer read anomalies.
> >    -
> >
> >    *VK1 (Thundering Herd / OOM Heap Allocation):* Your clarification that
> >    fetcher threads multiplex partitions meaning the memory footprint is
> >    strictly bounded by num_fetcher_threads * response_max_bytes rather than
> >    a concurrent 1MB buffer per partition fully resolves my concern
> > regarding
> >    50GB broker-wide OOM spikes during mass partition wake-ups.
> >    -
> >
> >    *VK3 (Control Plane Hotspots):* I had flagged the severe risk of
> >    metadata saturation on a single broker during a "link flap" event. Your
> >    confirmation that the __mirror_state topic utilizes a compound hash
> > of (mirrorName,
> >    topicId, partition number) mathematically ensures that the 50,000+ state
> >    transitions will be safely distributed across the cluster, neutralizing
> > the
> >    single-node hotspot risk.
> >
> > 2. Outstanding Critical Blocker: VK12 (Offset Sync Data Loss)
> >
> > Regarding VK12, there is a fundamental misunderstanding in your previous
> > reply. You stated: *"The scenario described can only occur if offsets are
> > force-written to an active group, which the design prevents."*
> >
> > My concern has absolutely nothing to do with overwriting active groups. My
> > concern applies strictly to the normal synchronization of inactive/dead
> > groups, and is based directly on the race condition currently documented in
> > the official KIP-1279 text:
> >
> > *"During offset synchronization, the committed offset in the destination
> > cluster may temporarily exceed the current log end offset (LEO) of the
> > mirror topic... consumers attempting to resume from offset 100 will receive
> > an OffsetOutOfRangeException. To handle this gracefully, consumers should
> > configure auto.offset.reset=latest..."*
> >
> > If a failover happens during this documented divergence window, a
> > reconnecting consumer will hit the OffsetOutOfRangeException. If the
> > downstream consumer follows the KIP's official advice and relies on
> > auto.offset.reset=latest, the consumer will jump to the absolute newest
> > offset on the partition. *This completely skips any newly produced records
> > that arrived between the failover and the consumer reconnecting, resulting
> > in silent, unrecoverable data loss for the downstream application.*
> > *Proposed Resolution: Double-Clamped Offset Safety Invariant* Instead of
> > requiring consumers to use auto.offset.reset=latest and endorsing a known
> > data loss vector, I propose that the ClusterMirrorCoordinator enforce a
> > pre-persist offset validation invariant during offset synchronization.
> > Before any translated offset is committed to the destination cluster, the
> > coordinator must apply a double-clamp:
> >
> > *syncedOffset = max(destinationLSO, min(sourceCommitted, destinationLEO))*
> >
> > Where destinationLSO is the Log Start Offset (earliest readable position
> > post-retention) and destinationLEO is the Log End Offset (latest replicated
> > position) of the destination partition.
> >
> > This guarantees that every persisted offset falls within the physically
> > valid range  eliminating both the OffsetOutOfRangeException caused by
> > exceeding the LEO during replication lag, and the expired-offset rewind
> > caused by falling below the LSO due to destination retention policies.
> >
> > The worst-case trade-off under this invariant is bounded re-processing
> > proportional to the replication lag at failover time not total partition
> > depth which is perfectly consistent with Kafka's documented at-least-once
> > delivery guarantees. Broad enterprise production evidence from large-scale
> > cross-cluster failovers confirms that state checks alone (preventing active
> > group overwrites) are insufficient; strict offset bounds clamping is
> > required to achieve enterprise-grade data integrity.
> >
> > I look forward to your thoughts on implementing this final offset capping
> > logic. Once VK12 is patched, I believe this architecture will be
> > exceptionally robust and ready for enterprise deployment.
> >
> > Best Regards,
> >
> > Viquar Khan
> >
> >
> > On Mon, 18 May 2026 at 14:23, Rajini Sivaram <[email protected]>
> > wrote:
> >
> > > Hi Federico,
> > >
> > > Thanks for the updates! The KIP is looking good. A few more small
> > comments.
> > >
> > >
> > > RS13: A couple of places still refer to `kafka-mirror.sh` like under
> > > `Failover Process`. Can we change them to `*kafka-cluster-mirrors.sh*`?
> > >
> > > RS14: Should we change `--entity-type mirrors` for kafka-configs to be `
> > > --entity-type *cluster-mirrors*` to be consistent? Also,
> > > CLUSTER_MIRROR((byte)
> > > 64, "mirror"); could be `*cluster-mirror*`?
> > >
> > > RS15: It may be useful to rename `mirror.topic.num.partitions` and `
> > > mirror.topic.replication.factor` since they are very similar to `
> > > mirror.topic.properties.exclude`, but the `mirror.topic` prefix refers to
> > > different topics (the internal topic for the first two and actual mirror
> > > topics for the other one).
> > >
> > > RS16: ACL Sync: KIP says "Deletes ACLs that exist in destination but not
> > in
> > > source using DeleteAcls request."
> > > What happens if someone creates an ACL on the destination to deny
> > > User:Alice access to all topics?
> > >
> > >    1. If that ACL also existed on the source cluster and then it was
> > >    removed, will it get removed from the destination?
> > >    2. If that ACL never existed on the source cluster, will it get
> > removed
> > >    from the destination?
> > >
> > > RS17: The table in the Source ACLs section says: "DescribeClusterMirrors
> > >    MC      Read    Cluster     Log truncation"
> > > Should that be `ClusterMirror:Read` instead of `Cluster:Read`?
> > >
> > > Regards,
> > >
> > > Rajini
> > >
> > >
> > >
> > > On Fri, May 15, 2026 at 8:41 AM Federico Valeri <[email protected]>
> > > wrote:
> > >
> > > > Hi Rajini, we finally addressed the API and tool naming refactoring as
> > > > you suggested in RS6. Please take a look when you have time. Thanks.
> > > >
> > > >
> > > > On Mon, May 11, 2026 at 6:17 PM Federico Valeri <[email protected]>
> > > > wrote:
> > > > >
> > > > > Hello all, I want to highlight a couple of new paragraphs:
> > > > >
> > > > > 1. Leader Epoch Invariant: Cluster mirroring enforces the invariant
> > > > > that the destination leader epoch must always be greater than or
> > equal
> > > > > to the source leader epoch (DLE>=SLE). Without this, consumers on the
> > > > > destination cluster can get stuck in an infinite metadata refresh
> > loop
> > > > > when they encounter committed offsets carrying source epochs higher
> > > > > than the local epoch. The invariant is maintained through three
> > > > > mechanisms: reactive bumping (epoch fencing triggered when SLE > DLE
> > > > > during fetch), proactive bumping (scheduled when SLE approaches DLE
> > > > > within a threshold), and periodic bumping (checked during coordinator
> > > > > metadata sync).
> > > > >
> > > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406620973#KIP1279:ClusterMirroring-LeaderEpochInvariant
> > > > >
> > > > > 2. Group Offsets: The coordinator periodically syncs consumer and
> > > > > share group offsets from the source cluster to the destination for
> > all
> > > > > mirrored topics. Groups are filtered by configurable include/exclude
> > > > > patterns, and offsets are only synced for groups that are not
> > > > > currently active on the destination cluster, preventing overwrites of
> > > > > local consumer progress. Because source and destination share the
> > same
> > > > > topic offsets (no offset translation), synced offsets can be used
> > > > > directly without mapping.
> > > > >
> > > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406620973#KIP1279:ClusterMirroring-GroupOffsets
> > > > >
> > > > > These new paragraphs directly address some of your questions, but let
> > > > > me list them here:
> > > > >
> > > > > JR2: Yes, we removed the incorrect phrase and added more details to
> > > > > the paragraph.
> > > > >
> > > > > JR4: When source cluster topic has tiered storage enabled, CM works
> > by
> > > > > mirroring remote and local log into destination cluster. When
> > > > > destination cluster topic has tiered storage enabled, CM fails in
> > > > > PREPARING state because the LME may be in remote storage, but works
> > > > > fine if already MIRRORING because no truncation is needed.
> > > > >
> > > > > JR11: See "Leader Epoch Invariant" paragraph mentioned above.
> > > > >
> > > > > JR13: We can't support stateful Streams application because
> > > > > asynchronous replication cannot preserve the transactional boundaries
> > > > > between input offset commits, state store mutations written to
> > > > > changelog topics, and intermediate records written to repartition
> > > > > topics. The synchronous extension of this design will be able to
> > > > > support them. Existing Features Integration paragraph updated.
> > > > >
> > > > > JR18: See "Group Offsets" paragraph mentioned above.
> > > > >
> > > > > IY1: See "Group Offsets" paragraph mentioned above.
> > > > >
> > > > > Thanks
> > > > > Fede
> > > > >
> > > > > On Mon, May 11, 2026 at 6:08 PM Federico Valeri <
> > [email protected]>
> > > > wrote:
> > > > > >
> > > > > > Hi Vaquar,
> > > > > >
> > > > > > VK4/VK8: We don't do PID mapping anymore. The KIP was updated some
> > > > > > time ago with the new approach based on the new PID reset control
> > > > > > record.
> > > > > >
> > > > > > VK11: The transaction index is always built locally during log
> > > append,
> > > > > > never copied.
> > > > > >
> > > > > > VK1: The 50,000 * 1MB = 50GB calculation misunderstands the fetch
> > > > > > model. Fetcher threads don't allocate one buffer per partition.
> > > Actual
> > > > > > peak memory is roughly num_fetcher_threads * response_max_bytes,
> > not
> > > > > > num_partitions * partition_max_bytes. With 1 fetcher thread and the
> > > > > > default response max, the memory footprint is modest regardless of
> > > > > > partition count. We are leveraging the same proven pattern used by
> > > the
> > > > > > internal replication.
> > > > > >
> > > > > > VK3: The __mirror_state topic uses hash-based partitioning based on
> > > > > > mirrorName, topicId and partition number. With the production
> > default
> > > > > > of 50 partitions, 50,000 partition transitions distribute across
> > ~50
> > > > > > partition leaders on different brokers, not a single broker. This
> > is
> > > > > > the same proven pattern as __consumer_offsets, which handles
> > millions
> > > > > > of commits.
> > > > > >
> > > > > > VK12: The scenario described can only occur if offsets are
> > > > > > force-written to an active group, which the design prevents.
> > > > > >
> > > > > > Cheers
> > > > > > Fede
> > > >
> > >
> >

Reply via email to