How about the following change ?

diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 8d6e56a..92bedad 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -482,7 +482,7 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator
             if (eosEnabled) {
                 if (!clean) {
                     try {
-                        if (!isZombie) {
+                        if (!isZombie && transactionInFlight) {
                             producer.abortTransaction();
                         }
                         transactionInFlight = false;

On Wed, Apr 4, 2018 at 2:02 PM, Matthias J. Sax <matth...@confluent.io>
wrote:

> Thanks for reporting this.
>
> It's indeed a bug in Kafka Streams. It's related to this fix:
> https://issues.apache.org/jira/browse/KAFKA-6634 -- the corresponding PR
> introduces the issue.
>
> Because, we initialize TX delayed, for your case, we never initialize TX
> and thus aborting the TX fails.
>
> Please open a JIRA for the issue.
>
> -Matthias
>
> On 4/4/18 9:32 AM, Ted Yu wrote:
> > Looking at isTransitionValid():
> >
> >                 case ABORTING_TRANSACTION:
> >
> >                     return source == IN_TRANSACTION || source ==
> > ABORTABLE_ERROR;
> >
> > The source state is not supposed to be READY.
> >
> > I don't see READY in the log you posted.
> >
> >
> > Please consider logging a JIRA where you can attach logs.
> >
> >
> > Cheers
> >
> >
> > On Wed, Apr 4, 2018 at 2:49 AM, Frederic Arno <frederica...@gmail.com>
> > wrote:
> >
> >> Hello,
> >>
> >> I running tests against kafka-streams 1.1 and get the following stack
> >> trace (everything was working alright using kafka-streams 1.0):
> >>
> >> ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks
> -
> >> stream-thread [feedBuilder-XXX-StreamThread-4] Failed to close stream
> >> task, 0_2
> >> org.apache.kafka.common.KafkaException: TransactionalId
> feedBuilder-0_2:
> >> Invalid transition attempted from state READY to state
> ABORTING_TRANSACTION
> >>         at org.apache.kafka.clients.producer.internals.TransactionManag
> >> er.transitionTo(TransactionManager.java:757)
> >>         at org.apache.kafka.clients.producer.internals.TransactionManag
> >> er.transitionTo(TransactionManager.java:751)
> >>         at org.apache.kafka.clients.producer.internals.TransactionManag
> >> er.beginAbort(TransactionManager.java:230)
> >>         at org.apache.kafka.clients.producer.KafkaProducer.abortTransac
> >> tion(KafkaProducer.java:660)
> >>         at org.apache.kafka.streams.processor.internals.StreamTask.
> >> closeSuspended(StreamTask.java:486)
> >>         at org.apache.kafka.streams.processor.internals.StreamTask.
> >> close(StreamTask.java:546)
> >>         at org.apache.kafka.streams.processor.internals.AssignedTasks.c
> >> loseNonRunningTasks(AssignedTasks.java:166)
> >>         at org.apache.kafka.streams.processor.internals.AssignedTasks.
> >> suspend(AssignedTasks.java:151)
> >>         at org.apache.kafka.streams.processor.internals.TaskManager.sus
> >> pendTasksAndState(TaskManager.java:242)
> >>         at org.apache.kafka.streams.processor.internals.StreamThread$Re
> >> balanceListener.onPartitionsRevoked(StreamThread.java:291)
> >>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
> >> tor.onJoinPrepare(ConsumerCoordinator.java:414)
> >>         at org.apache.kafka.clients.consumer.internals.AbstractCoordina
> >> tor.joinGroupIfNeeded(AbstractCoordinator.java:359)
> >>         at org.apache.kafka.clients.consumer.internals.AbstractCoordina
> >> tor.ensureActiveGroup(AbstractCoordinator.java:316)
> >>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
> >> tor.poll(ConsumerCoordinator.java:290)
> >>         at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(
> >> KafkaConsumer.java:1149)
> >>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaCo
> >> nsumer.java:1115)
> >>         at org.apache.kafka.streams.processor.internals.StreamThread.
> >> pollRequests(StreamThread.java:827)
> >>         at org.apache.kafka.streams.processor.internals.StreamThread.
> >> runOnce(StreamThread.java:784)
> >>         at org.apache.kafka.streams.processor.internals.StreamThread.
> >> runLoop(StreamThread.java:750)
> >>         at org.apache.kafka.streams.processor.internals.StreamThread.
> >> run(StreamThread.java:720)
> >>
> >>
> >> This happens when starting the same stream-processing application on 3
> >> JVMs all running on the same linux box, JVMs are named JVM-[2-4]. All 3
> >> instances use separate stream state.dir. No record is ever processed
> >> because the input kafka topics are empty at this stage.
> >>
> >> JVM-2 starts first, joined shortly after by JVM-4 and JVM-3, find the
> >> state transition logs below. The above stacktrace is from JVM-4
> >>
> >> [JVM-2] stream-client [feedBuilder-XXX] State transition from RUNNING to
> >> REBALANCING
> >> [JVM-2] stream-client [feedBuilder-XXX] State transition from
> REBALANCING
> >> to RUNNING
> >> [JVM-4] stream-client [feedBuilder-XXX] State transition from RUNNING to
> >> REBALANCING
> >> [JVM-2] stream-client [feedBuilder-XXX] State transition from RUNNING to
> >> REBALANCING
> >> [JVM-2] stream-client [feedBuilder-XXX] State transition from
> REBALANCING
> >> to RUNNING
> >> [JVM-3] stream-client [feedBuilder-XXX] State transition from RUNNING to
> >> REBALANCING
> >> [JVM-2] stream-client [feedBuilder-XXX] State transition from RUNNING to
> >> REBALANCING
> >> JVM-4 crashes here with above stacktrace
> >> [JVM-2] stream-client [feedBuilder-XXX] State transition from
> REBALANCING
> >> to RUNNING
> >> [JVM-4] stream-client [feedBuilder-XXX] State transition from
> REBALANCING
> >> to ERROR
> >> [JVM-4] stream-client [feedBuilder-XXX] State transition from ERROR to
> >> PENDING_SHUTDOWN
> >> [JVM-4] stream-client [feedBuilder-XXX] State transition from
> >> PENDING_SHUTDOWN to NOT_RUNNING
> >> [JVM-4] stream-client [feedBuilder-XXX] State transition from RUNNING to
> >> REBALANCING
> >> [JVM-3] stream-client [feedBuilder-XXX] State transition from
> REBALANCING
> >> to RUNNING
> >> [JVM-2] stream-client [feedBuilder-XXX] State transition from RUNNING to
> >> REBALANCING
> >> [JVM-3] stream-client [feedBuilder-XXX] State transition from RUNNING to
> >> REBALANCING
> >> [JVM-2] stream-client [feedBuilder-XXX] State transition from
> REBALANCING
> >> to RUNNING
> >> [JVM-3] stream-client [feedBuilder-XXX] State transition from
> REBALANCING
> >> to RUNNING
> >> [JVM-4] stream-client [feedBuilder-XXX] State transition from
> REBALANCING
> >> to RUNNING
> >> [JVM-2] stream-client [feedBuilder-XXX] State transition from RUNNING to
> >> PENDING_SHUTDOWN
> >> [JVM-3] stream-client [feedBuilder-XXX] State transition from RUNNING to
> >> REBALANCING
> >> [JVM-4] stream-client [feedBuilder-XXX] State transition from RUNNING to
> >> REBALANCING
> >> [JVM-2] stream-client [feedBuilder-XXX] State transition from
> >> PENDING_SHUTDOWN to NOT_RUNNING
> >> [JVM-4] stream-client [feedBuilder-XXX] State transition from
> REBALANCING
> >> to RUNNING
> >> [JVM-3] stream-client [feedBuilder-XXX] State transition from
> REBALANCING
> >> to RUNNING
> >> [JVM-3] stream-client [feedBuilder-XXX] State transition from RUNNING to
> >> PENDING_SHUTDOWN
> >> [JVM-4] stream-client [feedBuilder-XXX] State transition from RUNNING to
> >> PENDING_SHUTDOWN
> >> [JVM-3] stream-client [feedBuilder-XXX] State transition from
> >> PENDING_SHUTDOWN to NOT_RUNNING
> >> [JVM-4] stream-client [feedBuilder-XXX] State transition from
> >> PENDING_SHUTDOWN to NOT_RUNNING
> >>
> >>
> >> What should I do with that?
> >>
> >> Thanks, Fred
> >>
> >
>
>

Reply via email to