kirktrue commented on PR #13591:
URL: https://github.com/apache/kafka/pull/13591#issuecomment-1581369789

   @jolshan here's the basic idea:
   
   ```diff
   diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
   index 19f4c54b65..636c7907fa 100644
   --- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
   +++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
   @@ -240,6 +240,9 @@ public class Sender implements Runnable {
        public void run() {
            log.debug("Starting Kafka producer I/O thread.");
    
   +        if (transactionManager != null)
   +            transactionManager.setPoisonStateOnInvalidTransition(true);
   +
            // main loop, runs until close is called
            while (running) {
                try {
   diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
   index 02f689da68..aa6de49aef 100644
   --- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
   +++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
   @@ -122,6 +122,7 @@ public class TransactionManager {
        private final Set<TopicPartition> newPartitionsInTransaction;
        private final Set<TopicPartition> pendingPartitionsInTransaction;
        private final Set<TopicPartition> partitionsInTransaction;
   +    private final ThreadLocal<Boolean> shouldPoisonStateOnInvalidTransition;
        private PendingStateTransition pendingTransition;
    
        // This is used by the TxnRequestHandlers to control how long to back 
off before a given request is retried.
   @@ -278,6 +279,7 @@ public class TransactionManager {
            this.newPartitionsInTransaction = new HashSet<>();
            this.pendingPartitionsInTransaction = new HashSet<>();
            this.partitionsInTransaction = new HashSet<>();
   +        this.shouldPoisonStateOnInvalidTransition = 
ThreadLocal.withInitial(() -> false);
            this.pendingRequests = new PriorityQueue<>(10, 
Comparator.comparingInt(o -> o.priority().priority));
            this.pendingTxnOffsetCommits = new HashMap<>();
            this.partitionsWithUnresolvedSequences = new HashMap<>();
   @@ -287,6 +289,10 @@ public class TransactionManager {
            this.apiVersions = apiVersions;
        }
    
   +    void setPoisonStateOnInvalidTransition(boolean shouldPoisonState) {
   +        shouldPoisonStateOnInvalidTransition.set(shouldPoisonState);
   +    }
   +
        public synchronized TransactionalRequestResult initializeTransactions() 
{
            return initializeTransactions(ProducerIdAndEpoch.NONE, 
CallingThread.APPLICATION);
        }
   @@ -1068,7 +1074,7 @@ public class TransactionManager {
                String message = idString + "Invalid transition attempted from 
state "
                        + currentState.name() + " to state " + target.name();
    
   -            if (callingThread.shouldPoisonState()) {
   +            if (shouldPoisonStateOnInvalidTransition.get()) {
                    currentState = State.FATAL_ERROR;
                    lastError = new IllegalStateException(message);
                    throw lastError;
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to