kirktrue commented on PR #13591: URL: https://github.com/apache/kafka/pull/13591#issuecomment-1581369789
@jolshan here's the basic idea: ```diff diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 19f4c54b65..636c7907fa 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -240,6 +240,9 @@ public class Sender implements Runnable { public void run() { log.debug("Starting Kafka producer I/O thread."); + if (transactionManager != null) + transactionManager.setPoisonStateOnInvalidTransition(true); + // main loop, runs until close is called while (running) { try { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 02f689da68..aa6de49aef 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -122,6 +122,7 @@ public class TransactionManager { private final Set<TopicPartition> newPartitionsInTransaction; private final Set<TopicPartition> pendingPartitionsInTransaction; private final Set<TopicPartition> partitionsInTransaction; + private final ThreadLocal<Boolean> shouldPoisonStateOnInvalidTransition; private PendingStateTransition pendingTransition; // This is used by the TxnRequestHandlers to control how long to back off before a given request is retried. @@ -278,6 +279,7 @@ public class TransactionManager { this.newPartitionsInTransaction = new HashSet<>(); this.pendingPartitionsInTransaction = new HashSet<>(); this.partitionsInTransaction = new HashSet<>(); + this.shouldPoisonStateOnInvalidTransition = ThreadLocal.withInitial(() -> false); this.pendingRequests = new PriorityQueue<>(10, Comparator.comparingInt(o -> o.priority().priority)); this.pendingTxnOffsetCommits = new HashMap<>(); this.partitionsWithUnresolvedSequences = new HashMap<>(); @@ -287,6 +289,10 @@ public class TransactionManager { this.apiVersions = apiVersions; } + void setPoisonStateOnInvalidTransition(boolean shouldPoisonState) { + shouldPoisonStateOnInvalidTransition.set(shouldPoisonState); + } + public synchronized TransactionalRequestResult initializeTransactions() { return initializeTransactions(ProducerIdAndEpoch.NONE, CallingThread.APPLICATION); } @@ -1068,7 +1074,7 @@ public class TransactionManager { String message = idString + "Invalid transition attempted from state " + currentState.name() + " to state " + target.name(); - if (callingThread.shouldPoisonState()) { + if (shouldPoisonStateOnInvalidTransition.get()) { currentState = State.FATAL_ERROR; lastError = new IllegalStateException(message); throw lastError; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org