[ 
https://issues.apache.org/jira/browse/KAFKA-20029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18049037#comment-18049037
 ] 

sanghyeok An edited comment on KAFKA-20029 at 1/5/26 12:07 AM:
---------------------------------------------------------------

Hi [~schofielaj] !

Thanks for the valuable feedback.

I completely agree with you. The `_{_}consumer_offsets` and 
`{_}_share_group_state` topics rely on similar hashing mechanisms, 
so they carry similar risks regarding partition expansion. 
I will update the scope of this issue to cover all internal topics and aim for 
a unified solution.

Regarding the implementation, I suggested two options in the description:

*1. Server-side enforcement (Controller/AdminManager):*

Rejecting *CreatePartitions* requests for internal topics by default. I also 
thought about introducing a safety config (e.g., 
{*}internal.topic.modification.enable{*}) to controller side for advanced users 
who might need to override this in emergencies.
    
*2. CLI Warning*
A softer guardrail in *kafka-topics.sh* (though this wouldn't prevent API 
calls).
    

My initial thought is that *controller-side enforcement* could provide the 
strongest guardrail, 
but I may be missing constraints or compatibility considerations. 
What do you think would be the most appropriate approach here?

Also, given this would change behavior by rejecting a request that used to be 
accepted, 
do you think this should go through the *KIP,* or is it reasonable to handle it 
as a safety improvement/bug fix (and document it in the release notes)?

Thanks for your time! 


was (Author: JIRAUSER303328):
Hi [~schofielaj] !

Thanks for the valuable feedback.

I completely agree with you. The `__consumer_offsets` and `__share_group_state` 
topics rely on similar hashing mechanisms, 
so they carry similar risks regarding partition expansion. 
I will update the scope of this issue to cover all internal topics and aim for 
a unified solution.

Regarding the implementation, I suggested two options in the description:

*1. Server-side enforcement (Controller/AdminManager):* 
- Rejecting *CreatePartitions* requests for internal topics by default. I also 
thought about introducing a safety config (e.g., 
{*}internal.topic.modification.enable{*}) to controller side for advanced users 
who might need to override this in emergencies.
    
*2. CLI Warning*
A softer guardrail in *kafka-topics.sh* (though this wouldn't prevent API 
calls).
    

My initial thought is that *controller-side enforcement* could provide the 
strongest guardrail, 
but I may be missing constraints or compatibility considerations. 
What do you think would be the most appropriate approach here?

Also, given this would change behavior by rejecting a request that used to be 
accepted, 
do you think this should go through the *KIP,* or is it reasonable to handle it 
as a safety improvement/bug fix (and document it in the release notes)?


Thanks for your time! 

> Disallow partition count increase for __transaction_state.
> ----------------------------------------------------------
>
>                 Key: KAFKA-20029
>                 URL: https://issues.apache.org/jira/browse/KAFKA-20029
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: sanghyeok An
>            Assignee: sanghyeok An
>            Priority: Minor
>
> The routing logic for the Transaction Coordinator relies heavily on the 
> partition count of the internal topic {{{}__transaction_state{}}}. The 
> mapping is determined by
> {code:java}
> Utils.abs(transactionId.hashCode()) % transactionTopicPartitionCount. {code}
> Consequently, changing the number of partitions for {{__transaction_state}} 
> (e.g., expanding from 50 to 150) changes the mapping logic. While Kafka 
> documentation advises against modifying internal topics, there is currently 
> no hard guardrail preventing this via the Admin API.
> IMHO, Allowing this operation can lead to a split-brain scenario during a 
> rolling upgrade or cluster expansion, resulting in orphan transactions, 
> hanging transactions.
>  
> *Scenario & Analysis*
> Here is a breakdown of the race condition and inconsistency issues:
>  # *Expansion:* An admin expands {{__transaction_state}} partitions from 50 
> to 150. New partitions are created and leaders are elected.
>  # *Metadata Inconsistency:* While the metadata cache updates, the 
> {{transactionTopicPartitionCount}} used by {{partitionsFor}} logic might not 
> update atomically across all nodes.
>  # *Loading Failure:* A coordinator elected for a new partition might fail to 
> load the transaction state in {{TransactionCoordinator#onElection}} due to a 
> mismatch between the cached partition count and the actual partition count, 
> throwing an exception.
>  # *Split-Brain View:*
>  ** *New Broker B* (starts up or receives update): Sees 150 partitions.
>  ** {*}Old Broker A{*}: Still operates with the logic based on 50 partitions.
>  # *Routing Divergence:*
>  ** A client sends {{FindCoordinator}} for Transaction ID "A".
>  ** *Broker B* calculates: {{hash("A") % 150}} -> Partition {*}130{*}.
>  ** *Broker A* calculates: {{hash("A") % 50}} -> Partition {*}27{*}.
>  # *non-deterministic behavior occur*
>  ** If producer send a FindCoordinator to *Broker B* for Transaction ID "A", 
> it receives partition {*}130{*}. However, it send a request to {*}Broker 
> A{*}, it receive partition {*}27{*}.
>  ** It means that producer will send {{InitProducerId}} to the leader of 
> partition *130* or {*}27{*}. (anyway, the leader nodes of partition 27, 130 
> will be included in old brokers)
>  ** If TransactionCoordinator receive InitProducerID request for partition 
> 130, it calls *partitionsFor(...)* to retrieve transaction state. but, 
> TransactionCoordinator A is old broker and its 
> *transactionTopicPartitionCount* is {*}50{*}. so result of re-calculating is 
> {*}27{*}. It means that
>  *** Errors.COORDINATOR_LOAD_IN_PROGRESS can occur.
>  *** Transaction State will be write on unexpected __transaction_state 
> partition. (for example, expect partition {*}130{*}, but partition *27.)*
>  **** it cause orphan transaction. for example, when the broker roll out, 
> they try to restore transaction state. they expect that transaction of "A" 
> will be in partition 130, but actually it will be in partition 11. So, data 
> topic may encounter hanging transaction problem. 
>  ** In addition to, since the leaders for Partition 130 and Partition 27 are 
> likely different, we now have two coordinators potentially managing the same 
> Transaction ID without knowledge of each other.
>  
> *Impact*
> This non-deterministic behavior causes
>  * *Orphan/Hanging Transactions:* The producer interacts with a new 
> coordinator that has no history of the transaction.
>  * *Availability Issues:* Clients may receive 
> {{COORDINATOR_LOAD_IN_PROGRESS}} indefinitely if the coordinator fails to 
> load the state due to count mismatch.
>  * Potential worst-case correctness risk (not asserted as guaranteed): 
> depending on the timing, the transition could increase the risk of unexpected 
> fencing/coordination behavior.
>  
>  
> To ensure cluster stability and data integrity, how about enforcing a 
> guardrail against modifying the partition count of __transaction_state?
> *1. Enforce validation in Controller/AdminManager*
> It would be good to reject {{AlterPartitions}} requests for internal topics 
> by default.
>  * Introduce a mechanism to check if the topic is internal during partition 
> expansion.
>  * If {{topic.isInternal()}} is true, return an {{InvalidRequestException}} 
> with a clear error message stating that internal topic partition counts 
> cannot be changed dynamically.
>  * Introduce a safety configuration to controller side (e.g., 
> {{{}internal.topic.modification.enable{}}}, default {{{}false{}}}) for 
> advanced users who strictly need to override this behavior, although it is 
> strongly discouraged.
> *2. CLI Warning*
> Add a warning message or require a *{{--force}}* flag in the 
> *{{kafka-topics.sh}}* CLI when attempting to modify internal topics. Note 
> that this is a soft guardrail and does not prevent programmatic changes via 
> the Admin API. (For example, the go, rust, python CLI...)
>  
> Although the documentation explicitly advises against modifying the partition 
> count of __transaction_state, the system currently permits it. This 
> discrepancy creates a significant risk of critical human error. it would be 
> good to enforce this safety constraint at the code level to ensure cluster 
> stability.
> I would greatly appreciate kafka community's feedback on this issue.
> Thanks always. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to