How about the following change ? diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 8d6e56a..92bedad 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -482,7 +482,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator if (eosEnabled) { if (!clean) { try { - if (!isZombie) { + if (!isZombie && transactionInFlight) { producer.abortTransaction(); } transactionInFlight = false;
On Wed, Apr 4, 2018 at 2:02 PM, Matthias J. Sax <matth...@confluent.io> wrote: > Thanks for reporting this. > > It's indeed a bug in Kafka Streams. It's related to this fix: > https://issues.apache.org/jira/browse/KAFKA-6634 -- the corresponding PR > introduces the issue. > > Because, we initialize TX delayed, for your case, we never initialize TX > and thus aborting the TX fails. > > Please open a JIRA for the issue. > > -Matthias > > On 4/4/18 9:32 AM, Ted Yu wrote: > > Looking at isTransitionValid(): > > > > case ABORTING_TRANSACTION: > > > > return source == IN_TRANSACTION || source == > > ABORTABLE_ERROR; > > > > The source state is not supposed to be READY. > > > > I don't see READY in the log you posted. > > > > > > Please consider logging a JIRA where you can attach logs. > > > > > > Cheers > > > > > > On Wed, Apr 4, 2018 at 2:49 AM, Frederic Arno <frederica...@gmail.com> > > wrote: > > > >> Hello, > >> > >> I running tests against kafka-streams 1.1 and get the following stack > >> trace (everything was working alright using kafka-streams 1.0): > >> > >> ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks > - > >> stream-thread [feedBuilder-XXX-StreamThread-4] Failed to close stream > >> task, 0_2 > >> org.apache.kafka.common.KafkaException: TransactionalId > feedBuilder-0_2: > >> Invalid transition attempted from state READY to state > ABORTING_TRANSACTION > >> at org.apache.kafka.clients.producer.internals.TransactionManag > >> er.transitionTo(TransactionManager.java:757) > >> at org.apache.kafka.clients.producer.internals.TransactionManag > >> er.transitionTo(TransactionManager.java:751) > >> at org.apache.kafka.clients.producer.internals.TransactionManag > >> er.beginAbort(TransactionManager.java:230) > >> at org.apache.kafka.clients.producer.KafkaProducer.abortTransac > >> tion(KafkaProducer.java:660) > >> at org.apache.kafka.streams.processor.internals.StreamTask. > >> closeSuspended(StreamTask.java:486) > >> at org.apache.kafka.streams.processor.internals.StreamTask. > >> close(StreamTask.java:546) > >> at org.apache.kafka.streams.processor.internals.AssignedTasks.c > >> loseNonRunningTasks(AssignedTasks.java:166) > >> at org.apache.kafka.streams.processor.internals.AssignedTasks. > >> suspend(AssignedTasks.java:151) > >> at org.apache.kafka.streams.processor.internals.TaskManager.sus > >> pendTasksAndState(TaskManager.java:242) > >> at org.apache.kafka.streams.processor.internals.StreamThread$Re > >> balanceListener.onPartitionsRevoked(StreamThread.java:291) > >> at org.apache.kafka.clients.consumer.internals.ConsumerCoordina > >> tor.onJoinPrepare(ConsumerCoordinator.java:414) > >> at org.apache.kafka.clients.consumer.internals.AbstractCoordina > >> tor.joinGroupIfNeeded(AbstractCoordinator.java:359) > >> at org.apache.kafka.clients.consumer.internals.AbstractCoordina > >> tor.ensureActiveGroup(AbstractCoordinator.java:316) > >> at org.apache.kafka.clients.consumer.internals.ConsumerCoordina > >> tor.poll(ConsumerCoordinator.java:290) > >> at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce( > >> KafkaConsumer.java:1149) > >> at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaCo > >> nsumer.java:1115) > >> at org.apache.kafka.streams.processor.internals.StreamThread. > >> pollRequests(StreamThread.java:827) > >> at org.apache.kafka.streams.processor.internals.StreamThread. > >> runOnce(StreamThread.java:784) > >> at org.apache.kafka.streams.processor.internals.StreamThread. > >> runLoop(StreamThread.java:750) > >> at org.apache.kafka.streams.processor.internals.StreamThread. > >> run(StreamThread.java:720) > >> > >> > >> This happens when starting the same stream-processing application on 3 > >> JVMs all running on the same linux box, JVMs are named JVM-[2-4]. All 3 > >> instances use separate stream state.dir. No record is ever processed > >> because the input kafka topics are empty at this stage. > >> > >> JVM-2 starts first, joined shortly after by JVM-4 and JVM-3, find the > >> state transition logs below. The above stacktrace is from JVM-4 > >> > >> [JVM-2] stream-client [feedBuilder-XXX] State transition from RUNNING to > >> REBALANCING > >> [JVM-2] stream-client [feedBuilder-XXX] State transition from > REBALANCING > >> to RUNNING > >> [JVM-4] stream-client [feedBuilder-XXX] State transition from RUNNING to > >> REBALANCING > >> [JVM-2] stream-client [feedBuilder-XXX] State transition from RUNNING to > >> REBALANCING > >> [JVM-2] stream-client [feedBuilder-XXX] State transition from > REBALANCING > >> to RUNNING > >> [JVM-3] stream-client [feedBuilder-XXX] State transition from RUNNING to > >> REBALANCING > >> [JVM-2] stream-client [feedBuilder-XXX] State transition from RUNNING to > >> REBALANCING > >> JVM-4 crashes here with above stacktrace > >> [JVM-2] stream-client [feedBuilder-XXX] State transition from > REBALANCING > >> to RUNNING > >> [JVM-4] stream-client [feedBuilder-XXX] State transition from > REBALANCING > >> to ERROR > >> [JVM-4] stream-client [feedBuilder-XXX] State transition from ERROR to > >> PENDING_SHUTDOWN > >> [JVM-4] stream-client [feedBuilder-XXX] State transition from > >> PENDING_SHUTDOWN to NOT_RUNNING > >> [JVM-4] stream-client [feedBuilder-XXX] State transition from RUNNING to > >> REBALANCING > >> [JVM-3] stream-client [feedBuilder-XXX] State transition from > REBALANCING > >> to RUNNING > >> [JVM-2] stream-client [feedBuilder-XXX] State transition from RUNNING to > >> REBALANCING > >> [JVM-3] stream-client [feedBuilder-XXX] State transition from RUNNING to > >> REBALANCING > >> [JVM-2] stream-client [feedBuilder-XXX] State transition from > REBALANCING > >> to RUNNING > >> [JVM-3] stream-client [feedBuilder-XXX] State transition from > REBALANCING > >> to RUNNING > >> [JVM-4] stream-client [feedBuilder-XXX] State transition from > REBALANCING > >> to RUNNING > >> [JVM-2] stream-client [feedBuilder-XXX] State transition from RUNNING to > >> PENDING_SHUTDOWN > >> [JVM-3] stream-client [feedBuilder-XXX] State transition from RUNNING to > >> REBALANCING > >> [JVM-4] stream-client [feedBuilder-XXX] State transition from RUNNING to > >> REBALANCING > >> [JVM-2] stream-client [feedBuilder-XXX] State transition from > >> PENDING_SHUTDOWN to NOT_RUNNING > >> [JVM-4] stream-client [feedBuilder-XXX] State transition from > REBALANCING > >> to RUNNING > >> [JVM-3] stream-client [feedBuilder-XXX] State transition from > REBALANCING > >> to RUNNING > >> [JVM-3] stream-client [feedBuilder-XXX] State transition from RUNNING to > >> PENDING_SHUTDOWN > >> [JVM-4] stream-client [feedBuilder-XXX] State transition from RUNNING to > >> PENDING_SHUTDOWN > >> [JVM-3] stream-client [feedBuilder-XXX] State transition from > >> PENDING_SHUTDOWN to NOT_RUNNING > >> [JVM-4] stream-client [feedBuilder-XXX] State transition from > >> PENDING_SHUTDOWN to NOT_RUNNING > >> > >> > >> What should I do with that? > >> > >> Thanks, Fred > >> > > > >