[ https://issues.apache.org/jira/browse/KAFKA-6747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16427690#comment-16427690 ]
ASF GitHub Bot commented on KAFKA-6747: --------------------------------------- guozhangwang closed pull request #4826: KAFKA-6747 Check whether there is in-flight transaction before aborting transaction URL: https://github.com/apache/kafka/pull/4826 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 8d6e56a17aa..4b2e1b8cfb7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -358,8 +358,8 @@ private void commitOffsets(final boolean startNewTransaction) { producer.commitTransaction(); transactionInFlight = false; if (startNewTransaction) { - transactionInFlight = true; producer.beginTransaction(); + transactionInFlight = true; } } else { consumer.commitSync(consumedOffsetsAndMetadata); @@ -482,7 +482,7 @@ public void closeSuspended(boolean clean, if (eosEnabled) { if (!clean) { try { - if (!isZombie) { + if (!isZombie && transactionInFlight) { producer.abortTransaction(); } transactionInFlight = false; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index a30582905de..d6a5276a43d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -782,6 +782,14 @@ public void shouldInitAndBeginTransactionOnCreateIfEosEnabled() { assertTrue(producer.transactionInFlight()); } + @Test + public void shouldNotThrowOnCloseIfTaskWasNotInitializedWithEosEnabled() { + task = createStatelessTask(true); + + assertTrue(!producer.transactionInFlight()); + task.close(false, false); + } + @Test public void shouldNotInitOrBeginTransactionOnCreateIfEosDisabled() { task = createStatelessTask(false); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > kafka-streams Invalid transition attempted from state READY to state > ABORTING_TRANSACTION > ----------------------------------------------------------------------------------------- > > Key: KAFKA-6747 > URL: https://issues.apache.org/jira/browse/KAFKA-6747 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 1.1.0 > Reporter: Frederic Arno > Assignee: Ted Yu > Priority: Major > > [~frederica] running tests against kafka-streams 1.1 and get the following > stack trace (everything was working alright using kafka-streams 1.0): > {code} > ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks - > stream-thread [feedBuilder-XXX-StreamThread-4] Failed to close stream task, > 0_2 > org.apache.kafka.common.KafkaException: TransactionalId feedBuilder-0_2: > Invalid transition attempted from state READY to state ABORTING_TRANSACTION > at > org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:757) > at > org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:751) > at > org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:230) > at > org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:660) > at > org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(StreamTask.java:486) > at > org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:546) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.closeNonRunningTasks(AssignedTasks.java:166) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.suspend(AssignedTasks.java:151) > at > org.apache.kafka.streams.processor.internals.TaskManager.suspendTasksAndState(TaskManager.java:242) > at > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsRevoked(StreamThread.java:291) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:414) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:359) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1149) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:827) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:784) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720) > {code} > This happens when starting the same stream-processing application on 3 JVMs > all running on the same linux box, JVMs are named JVM-[2-4]. All 3 instances > use separate stream state.dir. No record is ever processed because the input > kafka topics are empty at this stage. > JVM-2 starts first, joined shortly after by JVM-4 and JVM-3, find the state > transition logs below. The above stacktrace is from JVM-4 > {code} > [JVM-2] stream-client [feedBuilder-XXX] State transition from RUNNING to > REBALANCING > [JVM-2] stream-client [feedBuilder-XXX] State transition from REBALANCING to > RUNNING > [JVM-4] stream-client [feedBuilder-XXX] State transition from RUNNING to > REBALANCING > [JVM-2] stream-client [feedBuilder-XXX] State transition from RUNNING to > REBALANCING > [JVM-2] stream-client [feedBuilder-XXX] State transition from REBALANCING to > RUNNING > [JVM-3] stream-client [feedBuilder-XXX] State transition from RUNNING to > REBALANCING > [JVM-2] stream-client [feedBuilder-XXX] State transition from RUNNING to > REBALANCING > JVM-4 crashes here with above stacktrace > [JVM-2] stream-client [feedBuilder-XXX] State transition from REBALANCING to > RUNNING > [JVM-4] stream-client [feedBuilder-XXX] State transition from REBALANCING to > ERROR > [JVM-4] stream-client [feedBuilder-XXX] State transition from ERROR to > PENDING_SHUTDOWN > [JVM-4] stream-client [feedBuilder-XXX] State transition from > PENDING_SHUTDOWN to NOT_RUNNING > [JVM-4] stream-client [feedBuilder-XXX] State transition from RUNNING to > REBALANCING > [JVM-3] stream-client [feedBuilder-XXX] State transition from REBALANCING to > RUNNING > [JVM-2] stream-client [feedBuilder-XXX] State transition from RUNNING to > REBALANCING > [JVM-3] stream-client [feedBuilder-XXX] State transition from RUNNING to > REBALANCING > [JVM-2] stream-client [feedBuilder-XXX] State transition from REBALANCING to > RUNNING > [JVM-3] stream-client [feedBuilder-XXX] State transition from REBALANCING to > RUNNING > [JVM-4] stream-client [feedBuilder-XXX] State transition from REBALANCING to > RUNNING > [JVM-2] stream-client [feedBuilder-XXX] State transition from RUNNING to > PENDING_SHUTDOWN > [JVM-3] stream-client [feedBuilder-XXX] State transition from RUNNING to > REBALANCING > [JVM-4] stream-client [feedBuilder-XXX] State transition from RUNNING to > REBALANCING > [JVM-2] stream-client [feedBuilder-XXX] State transition from > PENDING_SHUTDOWN to NOT_RUNNING > [JVM-4] stream-client [feedBuilder-XXX] State transition from REBALANCING to > RUNNING > [JVM-3] stream-client [feedBuilder-XXX] State transition from REBALANCING to > RUNNING > [JVM-3] stream-client [feedBuilder-XXX] State transition from RUNNING to > PENDING_SHUTDOWN > [JVM-4] stream-client [feedBuilder-XXX] State transition from RUNNING to > PENDING_SHUTDOWN > [JVM-3] stream-client [feedBuilder-XXX] State transition from > PENDING_SHUTDOWN to NOT_RUNNING > [JVM-4] stream-client [feedBuilder-XXX] State transition from > PENDING_SHUTDOWN to NOT_RUNNING > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)