Hi Federico,

Thanks for the updates. Just one minor point, apart from that, looks good.

RS14: KIP still shows "bin/kafka-configs.sh --bootstrap-server :9094
--entity-type mirrors".
Will be good to change that to `--entity-type cluster-mirrors`.

Thanks,

Rajini


On Tue, May 19, 2026 at 5:28 PM vaquar khan <[email protected]> wrote:

> Hi Federico and team,
>
> Thank you for your detailed response on May 11, 2026. I greatly appreciate
> the collaborative effort over the past few months to harden the KIP-1279
> architecture.
>
> * 1. Resolved Architectural & Stability Vulnerabilities*
>
> I am pleased to confirm that the KIP has successfully integrated fixes for
> the critical vulnerabilities I flagged, thereby protecting the cluster's
> state machine, memory bounds, and control plane. Consider the following
> items fully resolved on my end:
>
>    -
>
>    *VK4 / VK8 (Negative PID Bug & State Machine Failure):* I previously
>    identified that the proposed -(sourceProducerId + 2) PID rewriting
>    formula would fundamentally break hasProducerId() in
>    AbstractRecordBatch.java, causing the broker to bypass
>    ProducerStateManager.update() and default the Last Stable Offset (LSO)
>    to the High Watermark. I am glad to see the KIP authors abandoned PID
>    mapping entirely and adopted the deterministic MIRROR_PID_RESET control
>    record barrier. This perfectly protects Kafka's exactly-once semantics.
>
>    -
>
>    *VK11 (TransactionIndex Rebuild Ambiguity):* Thank you for officially
>    confirming that the TransactionIndex is strictly rebuilt locally during
>    log appends rather than copied byte-for-byte. This resolves my concern
>    regarding PID mismatches inducing consumer read anomalies.
>    -
>
>    *VK1 (Thundering Herd / OOM Heap Allocation):* Your clarification that
>    fetcher threads multiplex partitions meaning the memory footprint is
>    strictly bounded by num_fetcher_threads * response_max_bytes rather than
>    a concurrent 1MB buffer per partition fully resolves my concern
> regarding
>    50GB broker-wide OOM spikes during mass partition wake-ups.
>    -
>
>    *VK3 (Control Plane Hotspots):* I had flagged the severe risk of
>    metadata saturation on a single broker during a "link flap" event. Your
>    confirmation that the __mirror_state topic utilizes a compound hash
> of (mirrorName,
>    topicId, partition number) mathematically ensures that the 50,000+ state
>    transitions will be safely distributed across the cluster, neutralizing
> the
>    single-node hotspot risk.
>
> 2. Outstanding Critical Blocker: VK12 (Offset Sync Data Loss)
>
> Regarding VK12, there is a fundamental misunderstanding in your previous
> reply. You stated: *"The scenario described can only occur if offsets are
> force-written to an active group, which the design prevents."*
>
> My concern has absolutely nothing to do with overwriting active groups. My
> concern applies strictly to the normal synchronization of inactive/dead
> groups, and is based directly on the race condition currently documented in
> the official KIP-1279 text:
>
> *"During offset synchronization, the committed offset in the destination
> cluster may temporarily exceed the current log end offset (LEO) of the
> mirror topic... consumers attempting to resume from offset 100 will receive
> an OffsetOutOfRangeException. To handle this gracefully, consumers should
> configure auto.offset.reset=latest..."*
>
> If a failover happens during this documented divergence window, a
> reconnecting consumer will hit the OffsetOutOfRangeException. If the
> downstream consumer follows the KIP's official advice and relies on
> auto.offset.reset=latest, the consumer will jump to the absolute newest
> offset on the partition. *This completely skips any newly produced records
> that arrived between the failover and the consumer reconnecting, resulting
> in silent, unrecoverable data loss for the downstream application.*
> *Proposed Resolution: Double-Clamped Offset Safety Invariant* Instead of
> requiring consumers to use auto.offset.reset=latest and endorsing a known
> data loss vector, I propose that the ClusterMirrorCoordinator enforce a
> pre-persist offset validation invariant during offset synchronization.
> Before any translated offset is committed to the destination cluster, the
> coordinator must apply a double-clamp:
>
> *syncedOffset = max(destinationLSO, min(sourceCommitted, destinationLEO))*
>
> Where destinationLSO is the Log Start Offset (earliest readable position
> post-retention) and destinationLEO is the Log End Offset (latest replicated
> position) of the destination partition.
>
> This guarantees that every persisted offset falls within the physically
> valid range  eliminating both the OffsetOutOfRangeException caused by
> exceeding the LEO during replication lag, and the expired-offset rewind
> caused by falling below the LSO due to destination retention policies.
>
> The worst-case trade-off under this invariant is bounded re-processing
> proportional to the replication lag at failover time not total partition
> depth which is perfectly consistent with Kafka's documented at-least-once
> delivery guarantees. Broad enterprise production evidence from large-scale
> cross-cluster failovers confirms that state checks alone (preventing active
> group overwrites) are insufficient; strict offset bounds clamping is
> required to achieve enterprise-grade data integrity.
>
> I look forward to your thoughts on implementing this final offset capping
> logic. Once VK12 is patched, I believe this architecture will be
> exceptionally robust and ready for enterprise deployment.
>
> Best Regards,
>
> Viquar Khan
>
>
> On Mon, 18 May 2026 at 14:23, Rajini Sivaram <[email protected]>
> wrote:
>
> > Hi Federico,
> >
> > Thanks for the updates! The KIP is looking good. A few more small
> comments.
> >
> >
> > RS13: A couple of places still refer to `kafka-mirror.sh` like under
> > `Failover Process`. Can we change them to `*kafka-cluster-mirrors.sh*`?
> >
> > RS14: Should we change `--entity-type mirrors` for kafka-configs to be `
> > --entity-type *cluster-mirrors*` to be consistent? Also,
> > CLUSTER_MIRROR((byte)
> > 64, "mirror"); could be `*cluster-mirror*`?
> >
> > RS15: It may be useful to rename `mirror.topic.num.partitions` and `
> > mirror.topic.replication.factor` since they are very similar to `
> > mirror.topic.properties.exclude`, but the `mirror.topic` prefix refers to
> > different topics (the internal topic for the first two and actual mirror
> > topics for the other one).
> >
> > RS16: ACL Sync: KIP says "Deletes ACLs that exist in destination but not
> in
> > source using DeleteAcls request."
> > What happens if someone creates an ACL on the destination to deny
> > User:Alice access to all topics?
> >
> >    1. If that ACL also existed on the source cluster and then it was
> >    removed, will it get removed from the destination?
> >    2. If that ACL never existed on the source cluster, will it get
> removed
> >    from the destination?
> >
> > RS17: The table in the Source ACLs section says: "DescribeClusterMirrors
> >    MC      Read    Cluster     Log truncation"
> > Should that be `ClusterMirror:Read` instead of `Cluster:Read`?
> >
> > Regards,
> >
> > Rajini
> >
> >
> >
> > On Fri, May 15, 2026 at 8:41 AM Federico Valeri <[email protected]>
> > wrote:
> >
> > > Hi Rajini, we finally addressed the API and tool naming refactoring as
> > > you suggested in RS6. Please take a look when you have time. Thanks.
> > >
> > >
> > > On Mon, May 11, 2026 at 6:17 PM Federico Valeri <[email protected]>
> > > wrote:
> > > >
> > > > Hello all, I want to highlight a couple of new paragraphs:
> > > >
> > > > 1. Leader Epoch Invariant: Cluster mirroring enforces the invariant
> > > > that the destination leader epoch must always be greater than or
> equal
> > > > to the source leader epoch (DLE>=SLE). Without this, consumers on the
> > > > destination cluster can get stuck in an infinite metadata refresh
> loop
> > > > when they encounter committed offsets carrying source epochs higher
> > > > than the local epoch. The invariant is maintained through three
> > > > mechanisms: reactive bumping (epoch fencing triggered when SLE > DLE
> > > > during fetch), proactive bumping (scheduled when SLE approaches DLE
> > > > within a threshold), and periodic bumping (checked during coordinator
> > > > metadata sync).
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406620973#KIP1279:ClusterMirroring-LeaderEpochInvariant
> > > >
> > > > 2. Group Offsets: The coordinator periodically syncs consumer and
> > > > share group offsets from the source cluster to the destination for
> all
> > > > mirrored topics. Groups are filtered by configurable include/exclude
> > > > patterns, and offsets are only synced for groups that are not
> > > > currently active on the destination cluster, preventing overwrites of
> > > > local consumer progress. Because source and destination share the
> same
> > > > topic offsets (no offset translation), synced offsets can be used
> > > > directly without mapping.
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406620973#KIP1279:ClusterMirroring-GroupOffsets
> > > >
> > > > These new paragraphs directly address some of your questions, but let
> > > > me list them here:
> > > >
> > > > JR2: Yes, we removed the incorrect phrase and added more details to
> > > > the paragraph.
> > > >
> > > > JR4: When source cluster topic has tiered storage enabled, CM works
> by
> > > > mirroring remote and local log into destination cluster. When
> > > > destination cluster topic has tiered storage enabled, CM fails in
> > > > PREPARING state because the LME may be in remote storage, but works
> > > > fine if already MIRRORING because no truncation is needed.
> > > >
> > > > JR11: See "Leader Epoch Invariant" paragraph mentioned above.
> > > >
> > > > JR13: We can't support stateful Streams application because
> > > > asynchronous replication cannot preserve the transactional boundaries
> > > > between input offset commits, state store mutations written to
> > > > changelog topics, and intermediate records written to repartition
> > > > topics. The synchronous extension of this design will be able to
> > > > support them. Existing Features Integration paragraph updated.
> > > >
> > > > JR18: See "Group Offsets" paragraph mentioned above.
> > > >
> > > > IY1: See "Group Offsets" paragraph mentioned above.
> > > >
> > > > Thanks
> > > > Fede
> > > >
> > > > On Mon, May 11, 2026 at 6:08 PM Federico Valeri <
> [email protected]>
> > > wrote:
> > > > >
> > > > > Hi Vaquar,
> > > > >
> > > > > VK4/VK8: We don't do PID mapping anymore. The KIP was updated some
> > > > > time ago with the new approach based on the new PID reset control
> > > > > record.
> > > > >
> > > > > VK11: The transaction index is always built locally during log
> > append,
> > > > > never copied.
> > > > >
> > > > > VK1: The 50,000 * 1MB = 50GB calculation misunderstands the fetch
> > > > > model. Fetcher threads don't allocate one buffer per partition.
> > Actual
> > > > > peak memory is roughly num_fetcher_threads * response_max_bytes,
> not
> > > > > num_partitions * partition_max_bytes. With 1 fetcher thread and the
> > > > > default response max, the memory footprint is modest regardless of
> > > > > partition count. We are leveraging the same proven pattern used by
> > the
> > > > > internal replication.
> > > > >
> > > > > VK3: The __mirror_state topic uses hash-based partitioning based on
> > > > > mirrorName, topicId and partition number. With the production
> default
> > > > > of 50 partitions, 50,000 partition transitions distribute across
> ~50
> > > > > partition leaders on different brokers, not a single broker. This
> is
> > > > > the same proven pattern as __consumer_offsets, which handles
> millions
> > > > > of commits.
> > > > >
> > > > > VK12: The scenario described can only occur if offsets are
> > > > > force-written to an active group, which the design prevents.
> > > > >
> > > > > Cheers
> > > > > Fede
> > >
> >
>

Reply via email to