Yes. That looks promising to me. Feel free to open an PR after we have a JIRA -- or just create the JIRA right away.
-Matthias On 4/4/18 2:57 PM, Ted Yu wrote: > How about the following change ? > > diff --git > a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java > b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java > index 8d6e56a..92bedad 100644 > --- > a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java > +++ > b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java > @@ -482,7 +482,7 @@ public class StreamTask extends AbstractTask implements > ProcessorNodePunctuator > if (eosEnabled) { > if (!clean) { > try { > - if (!isZombie) { > + if (!isZombie && transactionInFlight) { > producer.abortTransaction(); > } > transactionInFlight = false; > > On Wed, Apr 4, 2018 at 2:02 PM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> Thanks for reporting this. >> >> It's indeed a bug in Kafka Streams. It's related to this fix: >> https://issues.apache.org/jira/browse/KAFKA-6634 -- the corresponding PR >> introduces the issue. >> >> Because, we initialize TX delayed, for your case, we never initialize TX >> and thus aborting the TX fails. >> >> Please open a JIRA for the issue. >> >> -Matthias >> >> On 4/4/18 9:32 AM, Ted Yu wrote: >>> Looking at isTransitionValid(): >>> >>> case ABORTING_TRANSACTION: >>> >>> return source == IN_TRANSACTION || source == >>> ABORTABLE_ERROR; >>> >>> The source state is not supposed to be READY. >>> >>> I don't see READY in the log you posted. >>> >>> >>> Please consider logging a JIRA where you can attach logs. >>> >>> >>> Cheers >>> >>> >>> On Wed, Apr 4, 2018 at 2:49 AM, Frederic Arno <frederica...@gmail.com> >>> wrote: >>> >>>> Hello, >>>> >>>> I running tests against kafka-streams 1.1 and get the following stack >>>> trace (everything was working alright using kafka-streams 1.0): >>>> >>>> ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks >> - >>>> stream-thread [feedBuilder-XXX-StreamThread-4] Failed to close stream >>>> task, 0_2 >>>> org.apache.kafka.common.KafkaException: TransactionalId >> feedBuilder-0_2: >>>> Invalid transition attempted from state READY to state >> ABORTING_TRANSACTION >>>> at org.apache.kafka.clients.producer.internals.TransactionManag >>>> er.transitionTo(TransactionManager.java:757) >>>> at org.apache.kafka.clients.producer.internals.TransactionManag >>>> er.transitionTo(TransactionManager.java:751) >>>> at org.apache.kafka.clients.producer.internals.TransactionManag >>>> er.beginAbort(TransactionManager.java:230) >>>> at org.apache.kafka.clients.producer.KafkaProducer.abortTransac >>>> tion(KafkaProducer.java:660) >>>> at org.apache.kafka.streams.processor.internals.StreamTask. >>>> closeSuspended(StreamTask.java:486) >>>> at org.apache.kafka.streams.processor.internals.StreamTask. >>>> close(StreamTask.java:546) >>>> at org.apache.kafka.streams.processor.internals.AssignedTasks.c >>>> loseNonRunningTasks(AssignedTasks.java:166) >>>> at org.apache.kafka.streams.processor.internals.AssignedTasks. >>>> suspend(AssignedTasks.java:151) >>>> at org.apache.kafka.streams.processor.internals.TaskManager.sus >>>> pendTasksAndState(TaskManager.java:242) >>>> at org.apache.kafka.streams.processor.internals.StreamThread$Re >>>> balanceListener.onPartitionsRevoked(StreamThread.java:291) >>>> at org.apache.kafka.clients.consumer.internals.ConsumerCoordina >>>> tor.onJoinPrepare(ConsumerCoordinator.java:414) >>>> at org.apache.kafka.clients.consumer.internals.AbstractCoordina >>>> tor.joinGroupIfNeeded(AbstractCoordinator.java:359) >>>> at org.apache.kafka.clients.consumer.internals.AbstractCoordina >>>> tor.ensureActiveGroup(AbstractCoordinator.java:316) >>>> at org.apache.kafka.clients.consumer.internals.ConsumerCoordina >>>> tor.poll(ConsumerCoordinator.java:290) >>>> at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce( >>>> KafkaConsumer.java:1149) >>>> at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaCo >>>> nsumer.java:1115) >>>> at org.apache.kafka.streams.processor.internals.StreamThread. >>>> pollRequests(StreamThread.java:827) >>>> at org.apache.kafka.streams.processor.internals.StreamThread. >>>> runOnce(StreamThread.java:784) >>>> at org.apache.kafka.streams.processor.internals.StreamThread. >>>> runLoop(StreamThread.java:750) >>>> at org.apache.kafka.streams.processor.internals.StreamThread. >>>> run(StreamThread.java:720) >>>> >>>> >>>> This happens when starting the same stream-processing application on 3 >>>> JVMs all running on the same linux box, JVMs are named JVM-[2-4]. All 3 >>>> instances use separate stream state.dir. No record is ever processed >>>> because the input kafka topics are empty at this stage. >>>> >>>> JVM-2 starts first, joined shortly after by JVM-4 and JVM-3, find the >>>> state transition logs below. The above stacktrace is from JVM-4 >>>> >>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from RUNNING to >>>> REBALANCING >>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from >> REBALANCING >>>> to RUNNING >>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from RUNNING to >>>> REBALANCING >>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from RUNNING to >>>> REBALANCING >>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from >> REBALANCING >>>> to RUNNING >>>> [JVM-3] stream-client [feedBuilder-XXX] State transition from RUNNING to >>>> REBALANCING >>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from RUNNING to >>>> REBALANCING >>>> JVM-4 crashes here with above stacktrace >>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from >> REBALANCING >>>> to RUNNING >>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from >> REBALANCING >>>> to ERROR >>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from ERROR to >>>> PENDING_SHUTDOWN >>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from >>>> PENDING_SHUTDOWN to NOT_RUNNING >>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from RUNNING to >>>> REBALANCING >>>> [JVM-3] stream-client [feedBuilder-XXX] State transition from >> REBALANCING >>>> to RUNNING >>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from RUNNING to >>>> REBALANCING >>>> [JVM-3] stream-client [feedBuilder-XXX] State transition from RUNNING to >>>> REBALANCING >>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from >> REBALANCING >>>> to RUNNING >>>> [JVM-3] stream-client [feedBuilder-XXX] State transition from >> REBALANCING >>>> to RUNNING >>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from >> REBALANCING >>>> to RUNNING >>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from RUNNING to >>>> PENDING_SHUTDOWN >>>> [JVM-3] stream-client [feedBuilder-XXX] State transition from RUNNING to >>>> REBALANCING >>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from RUNNING to >>>> REBALANCING >>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from >>>> PENDING_SHUTDOWN to NOT_RUNNING >>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from >> REBALANCING >>>> to RUNNING >>>> [JVM-3] stream-client [feedBuilder-XXX] State transition from >> REBALANCING >>>> to RUNNING >>>> [JVM-3] stream-client [feedBuilder-XXX] State transition from RUNNING to >>>> PENDING_SHUTDOWN >>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from RUNNING to >>>> PENDING_SHUTDOWN >>>> [JVM-3] stream-client [feedBuilder-XXX] State transition from >>>> PENDING_SHUTDOWN to NOT_RUNNING >>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from >>>> PENDING_SHUTDOWN to NOT_RUNNING >>>> >>>> >>>> What should I do with that? >>>> >>>> Thanks, Fred >>>> >>> >> >> >
signature.asc
Description: OpenPGP digital signature