Hi Viquar,

Thanks for the comment.

Regarding VK12, thanks for raising the issue.
We didn't think about that.
I agree that compared with data re-processing, it's worse to have data loss.
The proposed formula makes sense to me.
*syncedOffset = max(destinationLogStartOffset, min(sourceCommitted,
destinationLEO))*

Let us have some discussion internally and then update the KIP and reply to
the thread.

Thanks,
Luke

On Wed, May 20, 2026 at 2:39 AM Federico Valeri <[email protected]>
wrote:

> Hi Rajini,
>
> RS14: Done. Missed that, sorry.
>
> Thanks again.
>
> On Tue, May 19, 2026 at 7:28 PM Rajini Sivaram <[email protected]>
> wrote:
> >
> > Hi Federico,
> >
> > Thanks for the updates. Just one minor point, apart from that, looks
> good.
> >
> > RS14: KIP still shows "bin/kafka-configs.sh --bootstrap-server :9094
> > --entity-type mirrors".
> > Will be good to change that to `--entity-type cluster-mirrors`.
> >
> > Thanks,
> >
> > Rajini
> >
> >
> > On Tue, May 19, 2026 at 5:28 PM vaquar khan <[email protected]>
> wrote:
> >
> > > Hi Federico and team,
> > >
> > > Thank you for your detailed response on May 11, 2026. I greatly
> appreciate
> > > the collaborative effort over the past few months to harden the
> KIP-1279
> > > architecture.
> > >
> > > * 1. Resolved Architectural & Stability Vulnerabilities*
> > >
> > > I am pleased to confirm that the KIP has successfully integrated fixes
> for
> > > the critical vulnerabilities I flagged, thereby protecting the
> cluster's
> > > state machine, memory bounds, and control plane. Consider the following
> > > items fully resolved on my end:
> > >
> > >    -
> > >
> > >    *VK4 / VK8 (Negative PID Bug & State Machine Failure):* I previously
> > >    identified that the proposed -(sourceProducerId + 2) PID rewriting
> > >    formula would fundamentally break hasProducerId() in
> > >    AbstractRecordBatch.java, causing the broker to bypass
> > >    ProducerStateManager.update() and default the Last Stable Offset
> (LSO)
> > >    to the High Watermark. I am glad to see the KIP authors abandoned
> PID
> > >    mapping entirely and adopted the deterministic MIRROR_PID_RESET
> control
> > >    record barrier. This perfectly protects Kafka's exactly-once
> semantics.
> > >
> > >    -
> > >
> > >    *VK11 (TransactionIndex Rebuild Ambiguity):* Thank you for
> officially
> > >    confirming that the TransactionIndex is strictly rebuilt locally
> during
> > >    log appends rather than copied byte-for-byte. This resolves my
> concern
> > >    regarding PID mismatches inducing consumer read anomalies.
> > >    -
> > >
> > >    *VK1 (Thundering Herd / OOM Heap Allocation):* Your clarification
> that
> > >    fetcher threads multiplex partitions meaning the memory footprint is
> > >    strictly bounded by num_fetcher_threads * response_max_bytes rather
> than
> > >    a concurrent 1MB buffer per partition fully resolves my concern
> > > regarding
> > >    50GB broker-wide OOM spikes during mass partition wake-ups.
> > >    -
> > >
> > >    *VK3 (Control Plane Hotspots):* I had flagged the severe risk of
> > >    metadata saturation on a single broker during a "link flap" event.
> Your
> > >    confirmation that the __mirror_state topic utilizes a compound hash
> > > of (mirrorName,
> > >    topicId, partition number) mathematically ensures that the 50,000+
> state
> > >    transitions will be safely distributed across the cluster,
> neutralizing
> > > the
> > >    single-node hotspot risk.
> > >
> > > 2. Outstanding Critical Blocker: VK12 (Offset Sync Data Loss)
> > >
> > > Regarding VK12, there is a fundamental misunderstanding in your
> previous
> > > reply. You stated: *"The scenario described can only occur if offsets
> are
> > > force-written to an active group, which the design prevents."*
> > >
> > > My concern has absolutely nothing to do with overwriting active
> groups. My
> > > concern applies strictly to the normal synchronization of inactive/dead
> > > groups, and is based directly on the race condition currently
> documented in
> > > the official KIP-1279 text:
> > >
> > > *"During offset synchronization, the committed offset in the
> destination
> > > cluster may temporarily exceed the current log end offset (LEO) of the
> > > mirror topic... consumers attempting to resume from offset 100 will
> receive
> > > an OffsetOutOfRangeException. To handle this gracefully, consumers
> should
> > > configure auto.offset.reset=latest..."*
> > >
> > > If a failover happens during this documented divergence window, a
> > > reconnecting consumer will hit the OffsetOutOfRangeException. If the
> > > downstream consumer follows the KIP's official advice and relies on
> > > auto.offset.reset=latest, the consumer will jump to the absolute newest
> > > offset on the partition. *This completely skips any newly produced
> records
> > > that arrived between the failover and the consumer reconnecting,
> resulting
> > > in silent, unrecoverable data loss for the downstream application.*
> > > *Proposed Resolution: Double-Clamped Offset Safety Invariant* Instead
> of
> > > requiring consumers to use auto.offset.reset=latest and endorsing a
> known
> > > data loss vector, I propose that the ClusterMirrorCoordinator enforce a
> > > pre-persist offset validation invariant during offset synchronization.
> > > Before any translated offset is committed to the destination cluster,
> the
> > > coordinator must apply a double-clamp:
> > >
> > > *syncedOffset = max(destinationLSO, min(sourceCommitted,
> destinationLEO))*
> > >
> > > Where destinationLSO is the Log Start Offset (earliest readable
> position
> > > post-retention) and destinationLEO is the Log End Offset (latest
> replicated
> > > position) of the destination partition.
> > >
> > > This guarantees that every persisted offset falls within the physically
> > > valid range  eliminating both the OffsetOutOfRangeException caused by
> > > exceeding the LEO during replication lag, and the expired-offset rewind
> > > caused by falling below the LSO due to destination retention policies.
> > >
> > > The worst-case trade-off under this invariant is bounded re-processing
> > > proportional to the replication lag at failover time not total
> partition
> > > depth which is perfectly consistent with Kafka's documented
> at-least-once
> > > delivery guarantees. Broad enterprise production evidence from
> large-scale
> > > cross-cluster failovers confirms that state checks alone (preventing
> active
> > > group overwrites) are insufficient; strict offset bounds clamping is
> > > required to achieve enterprise-grade data integrity.
> > >
> > > I look forward to your thoughts on implementing this final offset
> capping
> > > logic. Once VK12 is patched, I believe this architecture will be
> > > exceptionally robust and ready for enterprise deployment.
> > >
> > > Best Regards,
> > >
> > > Viquar Khan
> > >
> > >
> > > On Mon, 18 May 2026 at 14:23, Rajini Sivaram <[email protected]>
> > > wrote:
> > >
> > > > Hi Federico,
> > > >
> > > > Thanks for the updates! The KIP is looking good. A few more small
> > > comments.
> > > >
> > > >
> > > > RS13: A couple of places still refer to `kafka-mirror.sh` like under
> > > > `Failover Process`. Can we change them to
> `*kafka-cluster-mirrors.sh*`?
> > > >
> > > > RS14: Should we change `--entity-type mirrors` for kafka-configs to
> be `
> > > > --entity-type *cluster-mirrors*` to be consistent? Also,
> > > > CLUSTER_MIRROR((byte)
> > > > 64, "mirror"); could be `*cluster-mirror*`?
> > > >
> > > > RS15: It may be useful to rename `mirror.topic.num.partitions` and `
> > > > mirror.topic.replication.factor` since they are very similar to `
> > > > mirror.topic.properties.exclude`, but the `mirror.topic` prefix
> refers to
> > > > different topics (the internal topic for the first two and actual
> mirror
> > > > topics for the other one).
> > > >
> > > > RS16: ACL Sync: KIP says "Deletes ACLs that exist in destination but
> not
> > > in
> > > > source using DeleteAcls request."
> > > > What happens if someone creates an ACL on the destination to deny
> > > > User:Alice access to all topics?
> > > >
> > > >    1. If that ACL also existed on the source cluster and then it was
> > > >    removed, will it get removed from the destination?
> > > >    2. If that ACL never existed on the source cluster, will it get
> > > removed
> > > >    from the destination?
> > > >
> > > > RS17: The table in the Source ACLs section says:
> "DescribeClusterMirrors
> > > >    MC      Read    Cluster     Log truncation"
> > > > Should that be `ClusterMirror:Read` instead of `Cluster:Read`?
> > > >
> > > > Regards,
> > > >
> > > > Rajini
> > > >
> > > >
> > > >
> > > > On Fri, May 15, 2026 at 8:41 AM Federico Valeri <
> [email protected]>
> > > > wrote:
> > > >
> > > > > Hi Rajini, we finally addressed the API and tool naming
> refactoring as
> > > > > you suggested in RS6. Please take a look when you have time.
> Thanks.
> > > > >
> > > > >
> > > > > On Mon, May 11, 2026 at 6:17 PM Federico Valeri <
> [email protected]>
> > > > > wrote:
> > > > > >
> > > > > > Hello all, I want to highlight a couple of new paragraphs:
> > > > > >
> > > > > > 1. Leader Epoch Invariant: Cluster mirroring enforces the
> invariant
> > > > > > that the destination leader epoch must always be greater than or
> > > equal
> > > > > > to the source leader epoch (DLE>=SLE). Without this, consumers
> on the
> > > > > > destination cluster can get stuck in an infinite metadata refresh
> > > loop
> > > > > > when they encounter committed offsets carrying source epochs
> higher
> > > > > > than the local epoch. The invariant is maintained through three
> > > > > > mechanisms: reactive bumping (epoch fencing triggered when SLE >
> DLE
> > > > > > during fetch), proactive bumping (scheduled when SLE approaches
> DLE
> > > > > > within a threshold), and periodic bumping (checked during
> coordinator
> > > > > > metadata sync).
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406620973#KIP1279:ClusterMirroring-LeaderEpochInvariant
> > > > > >
> > > > > > 2. Group Offsets: The coordinator periodically syncs consumer and
> > > > > > share group offsets from the source cluster to the destination
> for
> > > all
> > > > > > mirrored topics. Groups are filtered by configurable
> include/exclude
> > > > > > patterns, and offsets are only synced for groups that are not
> > > > > > currently active on the destination cluster, preventing
> overwrites of
> > > > > > local consumer progress. Because source and destination share the
> > > same
> > > > > > topic offsets (no offset translation), synced offsets can be used
> > > > > > directly without mapping.
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406620973#KIP1279:ClusterMirroring-GroupOffsets
> > > > > >
> > > > > > These new paragraphs directly address some of your questions,
> but let
> > > > > > me list them here:
> > > > > >
> > > > > > JR2: Yes, we removed the incorrect phrase and added more details
> to
> > > > > > the paragraph.
> > > > > >
> > > > > > JR4: When source cluster topic has tiered storage enabled, CM
> works
> > > by
> > > > > > mirroring remote and local log into destination cluster. When
> > > > > > destination cluster topic has tiered storage enabled, CM fails in
> > > > > > PREPARING state because the LME may be in remote storage, but
> works
> > > > > > fine if already MIRRORING because no truncation is needed.
> > > > > >
> > > > > > JR11: See "Leader Epoch Invariant" paragraph mentioned above.
> > > > > >
> > > > > > JR13: We can't support stateful Streams application because
> > > > > > asynchronous replication cannot preserve the transactional
> boundaries
> > > > > > between input offset commits, state store mutations written to
> > > > > > changelog topics, and intermediate records written to repartition
> > > > > > topics. The synchronous extension of this design will be able to
> > > > > > support them. Existing Features Integration paragraph updated.
> > > > > >
> > > > > > JR18: See "Group Offsets" paragraph mentioned above.
> > > > > >
> > > > > > IY1: See "Group Offsets" paragraph mentioned above.
> > > > > >
> > > > > > Thanks
> > > > > > Fede
> > > > > >
> > > > > > On Mon, May 11, 2026 at 6:08 PM Federico Valeri <
> > > [email protected]>
> > > > > wrote:
> > > > > > >
> > > > > > > Hi Vaquar,
> > > > > > >
> > > > > > > VK4/VK8: We don't do PID mapping anymore. The KIP was updated
> some
> > > > > > > time ago with the new approach based on the new PID reset
> control
> > > > > > > record.
> > > > > > >
> > > > > > > VK11: The transaction index is always built locally during log
> > > > append,
> > > > > > > never copied.
> > > > > > >
> > > > > > > VK1: The 50,000 * 1MB = 50GB calculation misunderstands the
> fetch
> > > > > > > model. Fetcher threads don't allocate one buffer per partition.
> > > > Actual
> > > > > > > peak memory is roughly num_fetcher_threads *
> response_max_bytes,
> > > not
> > > > > > > num_partitions * partition_max_bytes. With 1 fetcher thread
> and the
> > > > > > > default response max, the memory footprint is modest
> regardless of
> > > > > > > partition count. We are leveraging the same proven pattern
> used by
> > > > the
> > > > > > > internal replication.
> > > > > > >
> > > > > > > VK3: The __mirror_state topic uses hash-based partitioning
> based on
> > > > > > > mirrorName, topicId and partition number. With the production
> > > default
> > > > > > > of 50 partitions, 50,000 partition transitions distribute
> across
> > > ~50
> > > > > > > partition leaders on different brokers, not a single broker.
> This
> > > is
> > > > > > > the same proven pattern as __consumer_offsets, which handles
> > > millions
> > > > > > > of commits.
> > > > > > >
> > > > > > > VK12: The scenario described can only occur if offsets are
> > > > > > > force-written to an active group, which the design prevents.
> > > > > > >
> > > > > > > Cheers
> > > > > > > Fede
> > > > >
> > > >
> > >
>

Reply via email to