Hello, additional updates and replies:

VK12: Consumer data loss risk is a good point. We updated the "Group
Offsets" paragraph.

Some of you raised the point that using mirror.name suffixes for pause
and stop operations is a bit inelegant and we agreed. Now we propose a
better approach:

We replaced the topic config based approach (mirror.name and
.stopped/.paused suffix conventions) with a first class metadata
record for tracking mirror topic state changes. A new
MirrorTopicStateChangeRecord is added to the metadata log with three
fields: TopicId (uuid), MirrorName (string), and DesiredState (int8,
where 0 = start mirroring, 1 = stop, 2 = pause). When a user calls
startMirrorTopics, stopMirrorTopics, or pauseMirrorTopics, the
controller writes this record to the metadata log instead of
manipulating config suffixes. Brokers receive the record via metadata
updates and react accordingly, triggering partition level state
transitions through the existing MirrorPartitionState state machine.
More details in the updated KIP.

Let us know what you think.

Thanks
Fede


On Tue, Jun 2, 2026 at 8:50 AM Luke Chen <[email protected]> wrote:
>
> Hi Viquar,
>
> Thanks for the comment.
>
> Regarding VK12, thanks for raising the issue.
> We didn't think about that.
> I agree that compared with data re-processing, it's worse to have data loss.
> The proposed formula makes sense to me.
> *syncedOffset = max(destinationLogStartOffset, min(sourceCommitted,
> destinationLEO))*
>
> Let us have some discussion internally and then update the KIP and reply to
> the thread.
>
> Thanks,
> Luke
>
> On Wed, May 20, 2026 at 2:39 AM Federico Valeri <[email protected]>
> wrote:
>
> > Hi Rajini,
> >
> > RS14: Done. Missed that, sorry.
> >
> > Thanks again.
> >
> > On Tue, May 19, 2026 at 7:28 PM Rajini Sivaram <[email protected]>
> > wrote:
> > >
> > > Hi Federico,
> > >
> > > Thanks for the updates. Just one minor point, apart from that, looks
> > good.
> > >
> > > RS14: KIP still shows "bin/kafka-configs.sh --bootstrap-server :9094
> > > --entity-type mirrors".
> > > Will be good to change that to `--entity-type cluster-mirrors`.
> > >
> > > Thanks,
> > >
> > > Rajini
> > >
> > >
> > > On Tue, May 19, 2026 at 5:28 PM vaquar khan <[email protected]>
> > wrote:
> > >
> > > > Hi Federico and team,
> > > >
> > > > Thank you for your detailed response on May 11, 2026. I greatly
> > appreciate
> > > > the collaborative effort over the past few months to harden the
> > KIP-1279
> > > > architecture.
> > > >
> > > > * 1. Resolved Architectural & Stability Vulnerabilities*
> > > >
> > > > I am pleased to confirm that the KIP has successfully integrated fixes
> > for
> > > > the critical vulnerabilities I flagged, thereby protecting the
> > cluster's
> > > > state machine, memory bounds, and control plane. Consider the following
> > > > items fully resolved on my end:
> > > >
> > > >    -
> > > >
> > > >    *VK4 / VK8 (Negative PID Bug & State Machine Failure):* I previously
> > > >    identified that the proposed -(sourceProducerId + 2) PID rewriting
> > > >    formula would fundamentally break hasProducerId() in
> > > >    AbstractRecordBatch.java, causing the broker to bypass
> > > >    ProducerStateManager.update() and default the Last Stable Offset
> > (LSO)
> > > >    to the High Watermark. I am glad to see the KIP authors abandoned
> > PID
> > > >    mapping entirely and adopted the deterministic MIRROR_PID_RESET
> > control
> > > >    record barrier. This perfectly protects Kafka's exactly-once
> > semantics.
> > > >
> > > >    -
> > > >
> > > >    *VK11 (TransactionIndex Rebuild Ambiguity):* Thank you for
> > officially
> > > >    confirming that the TransactionIndex is strictly rebuilt locally
> > during
> > > >    log appends rather than copied byte-for-byte. This resolves my
> > concern
> > > >    regarding PID mismatches inducing consumer read anomalies.
> > > >    -
> > > >
> > > >    *VK1 (Thundering Herd / OOM Heap Allocation):* Your clarification
> > that
> > > >    fetcher threads multiplex partitions meaning the memory footprint is
> > > >    strictly bounded by num_fetcher_threads * response_max_bytes rather
> > than
> > > >    a concurrent 1MB buffer per partition fully resolves my concern
> > > > regarding
> > > >    50GB broker-wide OOM spikes during mass partition wake-ups.
> > > >    -
> > > >
> > > >    *VK3 (Control Plane Hotspots):* I had flagged the severe risk of
> > > >    metadata saturation on a single broker during a "link flap" event.
> > Your
> > > >    confirmation that the __mirror_state topic utilizes a compound hash
> > > > of (mirrorName,
> > > >    topicId, partition number) mathematically ensures that the 50,000+
> > state
> > > >    transitions will be safely distributed across the cluster,
> > neutralizing
> > > > the
> > > >    single-node hotspot risk.
> > > >
> > > > 2. Outstanding Critical Blocker: VK12 (Offset Sync Data Loss)
> > > >
> > > > Regarding VK12, there is a fundamental misunderstanding in your
> > previous
> > > > reply. You stated: *"The scenario described can only occur if offsets
> > are
> > > > force-written to an active group, which the design prevents."*
> > > >
> > > > My concern has absolutely nothing to do with overwriting active
> > groups. My
> > > > concern applies strictly to the normal synchronization of inactive/dead
> > > > groups, and is based directly on the race condition currently
> > documented in
> > > > the official KIP-1279 text:
> > > >
> > > > *"During offset synchronization, the committed offset in the
> > destination
> > > > cluster may temporarily exceed the current log end offset (LEO) of the
> > > > mirror topic... consumers attempting to resume from offset 100 will
> > receive
> > > > an OffsetOutOfRangeException. To handle this gracefully, consumers
> > should
> > > > configure auto.offset.reset=latest..."*
> > > >
> > > > If a failover happens during this documented divergence window, a
> > > > reconnecting consumer will hit the OffsetOutOfRangeException. If the
> > > > downstream consumer follows the KIP's official advice and relies on
> > > > auto.offset.reset=latest, the consumer will jump to the absolute newest
> > > > offset on the partition. *This completely skips any newly produced
> > records
> > > > that arrived between the failover and the consumer reconnecting,
> > resulting
> > > > in silent, unrecoverable data loss for the downstream application.*
> > > > *Proposed Resolution: Double-Clamped Offset Safety Invariant* Instead
> > of
> > > > requiring consumers to use auto.offset.reset=latest and endorsing a
> > known
> > > > data loss vector, I propose that the ClusterMirrorCoordinator enforce a
> > > > pre-persist offset validation invariant during offset synchronization.
> > > > Before any translated offset is committed to the destination cluster,
> > the
> > > > coordinator must apply a double-clamp:
> > > >
> > > > *syncedOffset = max(destinationLSO, min(sourceCommitted,
> > destinationLEO))*
> > > >
> > > > Where destinationLSO is the Log Start Offset (earliest readable
> > position
> > > > post-retention) and destinationLEO is the Log End Offset (latest
> > replicated
> > > > position) of the destination partition.
> > > >
> > > > This guarantees that every persisted offset falls within the physically
> > > > valid range  eliminating both the OffsetOutOfRangeException caused by
> > > > exceeding the LEO during replication lag, and the expired-offset rewind
> > > > caused by falling below the LSO due to destination retention policies.
> > > >
> > > > The worst-case trade-off under this invariant is bounded re-processing
> > > > proportional to the replication lag at failover time not total
> > partition
> > > > depth which is perfectly consistent with Kafka's documented
> > at-least-once
> > > > delivery guarantees. Broad enterprise production evidence from
> > large-scale
> > > > cross-cluster failovers confirms that state checks alone (preventing
> > active
> > > > group overwrites) are insufficient; strict offset bounds clamping is
> > > > required to achieve enterprise-grade data integrity.
> > > >
> > > > I look forward to your thoughts on implementing this final offset
> > capping
> > > > logic. Once VK12 is patched, I believe this architecture will be
> > > > exceptionally robust and ready for enterprise deployment.
> > > >
> > > > Best Regards,
> > > >
> > > > Viquar Khan
> > > >
> > > >
> > > > On Mon, 18 May 2026 at 14:23, Rajini Sivaram <[email protected]>
> > > > wrote:
> > > >
> > > > > Hi Federico,
> > > > >
> > > > > Thanks for the updates! The KIP is looking good. A few more small
> > > > comments.
> > > > >
> > > > >
> > > > > RS13: A couple of places still refer to `kafka-mirror.sh` like under
> > > > > `Failover Process`. Can we change them to
> > `*kafka-cluster-mirrors.sh*`?
> > > > >
> > > > > RS14: Should we change `--entity-type mirrors` for kafka-configs to
> > be `
> > > > > --entity-type *cluster-mirrors*` to be consistent? Also,
> > > > > CLUSTER_MIRROR((byte)
> > > > > 64, "mirror"); could be `*cluster-mirror*`?
> > > > >
> > > > > RS15: It may be useful to rename `mirror.topic.num.partitions` and `
> > > > > mirror.topic.replication.factor` since they are very similar to `
> > > > > mirror.topic.properties.exclude`, but the `mirror.topic` prefix
> > refers to
> > > > > different topics (the internal topic for the first two and actual
> > mirror
> > > > > topics for the other one).
> > > > >
> > > > > RS16: ACL Sync: KIP says "Deletes ACLs that exist in destination but
> > not
> > > > in
> > > > > source using DeleteAcls request."
> > > > > What happens if someone creates an ACL on the destination to deny
> > > > > User:Alice access to all topics?
> > > > >
> > > > >    1. If that ACL also existed on the source cluster and then it was
> > > > >    removed, will it get removed from the destination?
> > > > >    2. If that ACL never existed on the source cluster, will it get
> > > > removed
> > > > >    from the destination?
> > > > >
> > > > > RS17: The table in the Source ACLs section says:
> > "DescribeClusterMirrors
> > > > >    MC      Read    Cluster     Log truncation"
> > > > > Should that be `ClusterMirror:Read` instead of `Cluster:Read`?
> > > > >
> > > > > Regards,
> > > > >
> > > > > Rajini
> > > > >
> > > > >
> > > > >
> > > > > On Fri, May 15, 2026 at 8:41 AM Federico Valeri <
> > [email protected]>
> > > > > wrote:
> > > > >
> > > > > > Hi Rajini, we finally addressed the API and tool naming
> > refactoring as
> > > > > > you suggested in RS6. Please take a look when you have time.
> > Thanks.
> > > > > >
> > > > > >
> > > > > > On Mon, May 11, 2026 at 6:17 PM Federico Valeri <
> > [email protected]>
> > > > > > wrote:
> > > > > > >
> > > > > > > Hello all, I want to highlight a couple of new paragraphs:
> > > > > > >
> > > > > > > 1. Leader Epoch Invariant: Cluster mirroring enforces the
> > invariant
> > > > > > > that the destination leader epoch must always be greater than or
> > > > equal
> > > > > > > to the source leader epoch (DLE>=SLE). Without this, consumers
> > on the
> > > > > > > destination cluster can get stuck in an infinite metadata refresh
> > > > loop
> > > > > > > when they encounter committed offsets carrying source epochs
> > higher
> > > > > > > than the local epoch. The invariant is maintained through three
> > > > > > > mechanisms: reactive bumping (epoch fencing triggered when SLE >
> > DLE
> > > > > > > during fetch), proactive bumping (scheduled when SLE approaches
> > DLE
> > > > > > > within a threshold), and periodic bumping (checked during
> > coordinator
> > > > > > > metadata sync).
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406620973#KIP1279:ClusterMirroring-LeaderEpochInvariant
> > > > > > >
> > > > > > > 2. Group Offsets: The coordinator periodically syncs consumer and
> > > > > > > share group offsets from the source cluster to the destination
> > for
> > > > all
> > > > > > > mirrored topics. Groups are filtered by configurable
> > include/exclude
> > > > > > > patterns, and offsets are only synced for groups that are not
> > > > > > > currently active on the destination cluster, preventing
> > overwrites of
> > > > > > > local consumer progress. Because source and destination share the
> > > > same
> > > > > > > topic offsets (no offset translation), synced offsets can be used
> > > > > > > directly without mapping.
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406620973#KIP1279:ClusterMirroring-GroupOffsets
> > > > > > >
> > > > > > > These new paragraphs directly address some of your questions,
> > but let
> > > > > > > me list them here:
> > > > > > >
> > > > > > > JR2: Yes, we removed the incorrect phrase and added more details
> > to
> > > > > > > the paragraph.
> > > > > > >
> > > > > > > JR4: When source cluster topic has tiered storage enabled, CM
> > works
> > > > by
> > > > > > > mirroring remote and local log into destination cluster. When
> > > > > > > destination cluster topic has tiered storage enabled, CM fails in
> > > > > > > PREPARING state because the LME may be in remote storage, but
> > works
> > > > > > > fine if already MIRRORING because no truncation is needed.
> > > > > > >
> > > > > > > JR11: See "Leader Epoch Invariant" paragraph mentioned above.
> > > > > > >
> > > > > > > JR13: We can't support stateful Streams application because
> > > > > > > asynchronous replication cannot preserve the transactional
> > boundaries
> > > > > > > between input offset commits, state store mutations written to
> > > > > > > changelog topics, and intermediate records written to repartition
> > > > > > > topics. The synchronous extension of this design will be able to
> > > > > > > support them. Existing Features Integration paragraph updated.
> > > > > > >
> > > > > > > JR18: See "Group Offsets" paragraph mentioned above.
> > > > > > >
> > > > > > > IY1: See "Group Offsets" paragraph mentioned above.
> > > > > > >
> > > > > > > Thanks
> > > > > > > Fede
> > > > > > >
> > > > > > > On Mon, May 11, 2026 at 6:08 PM Federico Valeri <
> > > > [email protected]>
> > > > > > wrote:
> > > > > > > >
> > > > > > > > Hi Vaquar,
> > > > > > > >
> > > > > > > > VK4/VK8: We don't do PID mapping anymore. The KIP was updated
> > some
> > > > > > > > time ago with the new approach based on the new PID reset
> > control
> > > > > > > > record.
> > > > > > > >
> > > > > > > > VK11: The transaction index is always built locally during log
> > > > > append,
> > > > > > > > never copied.
> > > > > > > >
> > > > > > > > VK1: The 50,000 * 1MB = 50GB calculation misunderstands the
> > fetch
> > > > > > > > model. Fetcher threads don't allocate one buffer per partition.
> > > > > Actual
> > > > > > > > peak memory is roughly num_fetcher_threads *
> > response_max_bytes,
> > > > not
> > > > > > > > num_partitions * partition_max_bytes. With 1 fetcher thread
> > and the
> > > > > > > > default response max, the memory footprint is modest
> > regardless of
> > > > > > > > partition count. We are leveraging the same proven pattern
> > used by
> > > > > the
> > > > > > > > internal replication.
> > > > > > > >
> > > > > > > > VK3: The __mirror_state topic uses hash-based partitioning
> > based on
> > > > > > > > mirrorName, topicId and partition number. With the production
> > > > default
> > > > > > > > of 50 partitions, 50,000 partition transitions distribute
> > across
> > > > ~50
> > > > > > > > partition leaders on different brokers, not a single broker.
> > This
> > > > is
> > > > > > > > the same proven pattern as __consumer_offsets, which handles
> > > > millions
> > > > > > > > of commits.
> > > > > > > >
> > > > > > > > VK12: The scenario described can only occur if offsets are
> > > > > > > > force-written to an active group, which the design prevents.
> > > > > > > >
> > > > > > > > Cheers
> > > > > > > > Fede
> > > > > >
> > > > >
> > > >
> >

Reply via email to