[
https://issues.apache.org/jira/browse/KAFKA-20029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18049037#comment-18049037
]
sanghyeok An edited comment on KAFKA-20029 at 1/5/26 12:07 AM:
---------------------------------------------------------------
Hi [~schofielaj] !
Thanks for the valuable feedback.
I completely agree with you. The `_{_}consumer_offsets` and
`{_}_share_group_state` topics rely on similar hashing mechanisms,
so they carry similar risks regarding partition expansion.
I will update the scope of this issue to cover all internal topics and aim for
a unified solution.
Regarding the implementation, I suggested two options in the description:
*1. Server-side enforcement (Controller/AdminManager):*
Rejecting *CreatePartitions* requests for internal topics by default. I also
thought about introducing a safety config (e.g.,
{*}internal.topic.modification.enable{*}) to controller side for advanced users
who might need to override this in emergencies.
*2. CLI Warning*
A softer guardrail in *kafka-topics.sh* (though this wouldn't prevent API
calls).
My initial thought is that *controller-side enforcement* could provide the
strongest guardrail,
but I may be missing constraints or compatibility considerations.
What do you think would be the most appropriate approach here?
Also, given this would change behavior by rejecting a request that used to be
accepted,
do you think this should go through the *KIP,* or is it reasonable to handle it
as a safety improvement/bug fix (and document it in the release notes)?
Thanks for your time!
was (Author: JIRAUSER303328):
Hi [~schofielaj] !
Thanks for the valuable feedback.
I completely agree with you. The `__consumer_offsets` and `__share_group_state`
topics rely on similar hashing mechanisms,
so they carry similar risks regarding partition expansion.
I will update the scope of this issue to cover all internal topics and aim for
a unified solution.
Regarding the implementation, I suggested two options in the description:
*1. Server-side enforcement (Controller/AdminManager):*
- Rejecting *CreatePartitions* requests for internal topics by default. I also
thought about introducing a safety config (e.g.,
{*}internal.topic.modification.enable{*}) to controller side for advanced users
who might need to override this in emergencies.
*2. CLI Warning*
A softer guardrail in *kafka-topics.sh* (though this wouldn't prevent API
calls).
My initial thought is that *controller-side enforcement* could provide the
strongest guardrail,
but I may be missing constraints or compatibility considerations.
What do you think would be the most appropriate approach here?
Also, given this would change behavior by rejecting a request that used to be
accepted,
do you think this should go through the *KIP,* or is it reasonable to handle it
as a safety improvement/bug fix (and document it in the release notes)?
Thanks for your time!
> Disallow partition count increase for __transaction_state.
> ----------------------------------------------------------
>
> Key: KAFKA-20029
> URL: https://issues.apache.org/jira/browse/KAFKA-20029
> Project: Kafka
> Issue Type: Improvement
> Reporter: sanghyeok An
> Assignee: sanghyeok An
> Priority: Minor
>
> The routing logic for the Transaction Coordinator relies heavily on the
> partition count of the internal topic {{{}__transaction_state{}}}. The
> mapping is determined by
> {code:java}
> Utils.abs(transactionId.hashCode()) % transactionTopicPartitionCount. {code}
> Consequently, changing the number of partitions for {{__transaction_state}}
> (e.g., expanding from 50 to 150) changes the mapping logic. While Kafka
> documentation advises against modifying internal topics, there is currently
> no hard guardrail preventing this via the Admin API.
> IMHO, Allowing this operation can lead to a split-brain scenario during a
> rolling upgrade or cluster expansion, resulting in orphan transactions,
> hanging transactions.
>
> *Scenario & Analysis*
> Here is a breakdown of the race condition and inconsistency issues:
> # *Expansion:* An admin expands {{__transaction_state}} partitions from 50
> to 150. New partitions are created and leaders are elected.
> # *Metadata Inconsistency:* While the metadata cache updates, the
> {{transactionTopicPartitionCount}} used by {{partitionsFor}} logic might not
> update atomically across all nodes.
> # *Loading Failure:* A coordinator elected for a new partition might fail to
> load the transaction state in {{TransactionCoordinator#onElection}} due to a
> mismatch between the cached partition count and the actual partition count,
> throwing an exception.
> # *Split-Brain View:*
> ** *New Broker B* (starts up or receives update): Sees 150 partitions.
> ** {*}Old Broker A{*}: Still operates with the logic based on 50 partitions.
> # *Routing Divergence:*
> ** A client sends {{FindCoordinator}} for Transaction ID "A".
> ** *Broker B* calculates: {{hash("A") % 150}} -> Partition {*}130{*}.
> ** *Broker A* calculates: {{hash("A") % 50}} -> Partition {*}27{*}.
> # *non-deterministic behavior occur*
> ** If producer send a FindCoordinator to *Broker B* for Transaction ID "A",
> it receives partition {*}130{*}. However, it send a request to {*}Broker
> A{*}, it receive partition {*}27{*}.
> ** It means that producer will send {{InitProducerId}} to the leader of
> partition *130* or {*}27{*}. (anyway, the leader nodes of partition 27, 130
> will be included in old brokers)
> ** If TransactionCoordinator receive InitProducerID request for partition
> 130, it calls *partitionsFor(...)* to retrieve transaction state. but,
> TransactionCoordinator A is old broker and its
> *transactionTopicPartitionCount* is {*}50{*}. so result of re-calculating is
> {*}27{*}. It means that
> *** Errors.COORDINATOR_LOAD_IN_PROGRESS can occur.
> *** Transaction State will be write on unexpected __transaction_state
> partition. (for example, expect partition {*}130{*}, but partition *27.)*
> **** it cause orphan transaction. for example, when the broker roll out,
> they try to restore transaction state. they expect that transaction of "A"
> will be in partition 130, but actually it will be in partition 11. So, data
> topic may encounter hanging transaction problem.
> ** In addition to, since the leaders for Partition 130 and Partition 27 are
> likely different, we now have two coordinators potentially managing the same
> Transaction ID without knowledge of each other.
>
> *Impact*
> This non-deterministic behavior causes
> * *Orphan/Hanging Transactions:* The producer interacts with a new
> coordinator that has no history of the transaction.
> * *Availability Issues:* Clients may receive
> {{COORDINATOR_LOAD_IN_PROGRESS}} indefinitely if the coordinator fails to
> load the state due to count mismatch.
> * Potential worst-case correctness risk (not asserted as guaranteed):
> depending on the timing, the transition could increase the risk of unexpected
> fencing/coordination behavior.
>
>
> To ensure cluster stability and data integrity, how about enforcing a
> guardrail against modifying the partition count of __transaction_state?
> *1. Enforce validation in Controller/AdminManager*
> It would be good to reject {{AlterPartitions}} requests for internal topics
> by default.
> * Introduce a mechanism to check if the topic is internal during partition
> expansion.
> * If {{topic.isInternal()}} is true, return an {{InvalidRequestException}}
> with a clear error message stating that internal topic partition counts
> cannot be changed dynamically.
> * Introduce a safety configuration to controller side (e.g.,
> {{{}internal.topic.modification.enable{}}}, default {{{}false{}}}) for
> advanced users who strictly need to override this behavior, although it is
> strongly discouraged.
> *2. CLI Warning*
> Add a warning message or require a *{{--force}}* flag in the
> *{{kafka-topics.sh}}* CLI when attempting to modify internal topics. Note
> that this is a soft guardrail and does not prevent programmatic changes via
> the Admin API. (For example, the go, rust, python CLI...)
>
> Although the documentation explicitly advises against modifying the partition
> count of __transaction_state, the system currently permits it. This
> discrepancy creates a significant risk of critical human error. it would be
> good to enforce this safety constraint at the code level to ensure cluster
> stability.
> I would greatly appreciate kafka community's feedback on this issue.
> Thanks always.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)