https://github.com/apache/kafka/pull/4826

I will fill in JIRA Id once Frederic creates the JIRA.

Cheers

On Wed, Apr 4, 2018 at 4:29 PM, Matthias J. Sax <matth...@confluent.io>
wrote:

> Yes. That looks promising to me. Feel free to open an PR after we have a
> JIRA -- or just create the JIRA right away.
>
> -Matthias
>
> On 4/4/18 2:57 PM, Ted Yu wrote:
> > How about the following change ?
> >
> > diff --git
> > a/streams/src/main/java/org/apache/kafka/streams/processor/internals/
> StreamTask.java
> > b/streams/src/main/java/org/apache/kafka/streams/processor/internals/
> StreamTask.java
> > index 8d6e56a..92bedad 100644
> > ---
> > a/streams/src/main/java/org/apache/kafka/streams/processor/internals/
> StreamTask.java
> > +++
> > b/streams/src/main/java/org/apache/kafka/streams/processor/internals/
> StreamTask.java
> > @@ -482,7 +482,7 @@ public class StreamTask extends AbstractTask
> implements
> > ProcessorNodePunctuator
> >              if (eosEnabled) {
> >                  if (!clean) {
> >                      try {
> > -                        if (!isZombie) {
> > +                        if (!isZombie && transactionInFlight) {
> >                              producer.abortTransaction();
> >                          }
> >                          transactionInFlight = false;
> >
> > On Wed, Apr 4, 2018 at 2:02 PM, Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> >> Thanks for reporting this.
> >>
> >> It's indeed a bug in Kafka Streams. It's related to this fix:
> >> https://issues.apache.org/jira/browse/KAFKA-6634 -- the corresponding
> PR
> >> introduces the issue.
> >>
> >> Because, we initialize TX delayed, for your case, we never initialize TX
> >> and thus aborting the TX fails.
> >>
> >> Please open a JIRA for the issue.
> >>
> >> -Matthias
> >>
> >> On 4/4/18 9:32 AM, Ted Yu wrote:
> >>> Looking at isTransitionValid():
> >>>
> >>>                 case ABORTING_TRANSACTION:
> >>>
> >>>                     return source == IN_TRANSACTION || source ==
> >>> ABORTABLE_ERROR;
> >>>
> >>> The source state is not supposed to be READY.
> >>>
> >>> I don't see READY in the log you posted.
> >>>
> >>>
> >>> Please consider logging a JIRA where you can attach logs.
> >>>
> >>>
> >>> Cheers
> >>>
> >>>
> >>> On Wed, Apr 4, 2018 at 2:49 AM, Frederic Arno <frederica...@gmail.com>
> >>> wrote:
> >>>
> >>>> Hello,
> >>>>
> >>>> I running tests against kafka-streams 1.1 and get the following stack
> >>>> trace (everything was working alright using kafka-streams 1.0):
> >>>>
> >>>> ERROR org.apache.kafka.streams.processor.internals.
> AssignedStreamsTasks
> >> -
> >>>> stream-thread [feedBuilder-XXX-StreamThread-4] Failed to close stream
> >>>> task, 0_2
> >>>> org.apache.kafka.common.KafkaException: TransactionalId
> >> feedBuilder-0_2:
> >>>> Invalid transition attempted from state READY to state
> >> ABORTING_TRANSACTION
> >>>>         at org.apache.kafka.clients.producer.internals.
> TransactionManag
> >>>> er.transitionTo(TransactionManager.java:757)
> >>>>         at org.apache.kafka.clients.producer.internals.
> TransactionManag
> >>>> er.transitionTo(TransactionManager.java:751)
> >>>>         at org.apache.kafka.clients.producer.internals.
> TransactionManag
> >>>> er.beginAbort(TransactionManager.java:230)
> >>>>         at org.apache.kafka.clients.producer.KafkaProducer.
> abortTransac
> >>>> tion(KafkaProducer.java:660)
> >>>>         at org.apache.kafka.streams.processor.internals.StreamTask.
> >>>> closeSuspended(StreamTask.java:486)
> >>>>         at org.apache.kafka.streams.processor.internals.StreamTask.
> >>>> close(StreamTask.java:546)
> >>>>         at org.apache.kafka.streams.processor.internals.
> AssignedTasks.c
> >>>> loseNonRunningTasks(AssignedTasks.java:166)
> >>>>         at org.apache.kafka.streams.processor.internals.
> AssignedTasks.
> >>>> suspend(AssignedTasks.java:151)
> >>>>         at org.apache.kafka.streams.processor.internals.
> TaskManager.sus
> >>>> pendTasksAndState(TaskManager.java:242)
> >>>>         at org.apache.kafka.streams.processor.internals.
> StreamThread$Re
> >>>> balanceListener.onPartitionsRevoked(StreamThread.java:291)
> >>>>         at org.apache.kafka.clients.consumer.internals.
> ConsumerCoordina
> >>>> tor.onJoinPrepare(ConsumerCoordinator.java:414)
> >>>>         at org.apache.kafka.clients.consumer.internals.
> AbstractCoordina
> >>>> tor.joinGroupIfNeeded(AbstractCoordinator.java:359)
> >>>>         at org.apache.kafka.clients.consumer.internals.
> AbstractCoordina
> >>>> tor.ensureActiveGroup(AbstractCoordinator.java:316)
> >>>>         at org.apache.kafka.clients.consumer.internals.
> ConsumerCoordina
> >>>> tor.poll(ConsumerCoordinator.java:290)
> >>>>         at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(
> >>>> KafkaConsumer.java:1149)
> >>>>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaCo
> >>>> nsumer.java:1115)
> >>>>         at org.apache.kafka.streams.processor.internals.StreamThread.
> >>>> pollRequests(StreamThread.java:827)
> >>>>         at org.apache.kafka.streams.processor.internals.StreamThread.
> >>>> runOnce(StreamThread.java:784)
> >>>>         at org.apache.kafka.streams.processor.internals.StreamThread.
> >>>> runLoop(StreamThread.java:750)
> >>>>         at org.apache.kafka.streams.processor.internals.StreamThread.
> >>>> run(StreamThread.java:720)
> >>>>
> >>>>
> >>>> This happens when starting the same stream-processing application on 3
> >>>> JVMs all running on the same linux box, JVMs are named JVM-[2-4]. All
> 3
> >>>> instances use separate stream state.dir. No record is ever processed
> >>>> because the input kafka topics are empty at this stage.
> >>>>
> >>>> JVM-2 starts first, joined shortly after by JVM-4 and JVM-3, find the
> >>>> state transition logs below. The above stacktrace is from JVM-4
> >>>>
> >>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from RUNNING
> to
> >>>> REBALANCING
> >>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from
> >> REBALANCING
> >>>> to RUNNING
> >>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from RUNNING
> to
> >>>> REBALANCING
> >>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from RUNNING
> to
> >>>> REBALANCING
> >>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from
> >> REBALANCING
> >>>> to RUNNING
> >>>> [JVM-3] stream-client [feedBuilder-XXX] State transition from RUNNING
> to
> >>>> REBALANCING
> >>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from RUNNING
> to
> >>>> REBALANCING
> >>>> JVM-4 crashes here with above stacktrace
> >>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from
> >> REBALANCING
> >>>> to RUNNING
> >>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from
> >> REBALANCING
> >>>> to ERROR
> >>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from ERROR to
> >>>> PENDING_SHUTDOWN
> >>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from
> >>>> PENDING_SHUTDOWN to NOT_RUNNING
> >>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from RUNNING
> to
> >>>> REBALANCING
> >>>> [JVM-3] stream-client [feedBuilder-XXX] State transition from
> >> REBALANCING
> >>>> to RUNNING
> >>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from RUNNING
> to
> >>>> REBALANCING
> >>>> [JVM-3] stream-client [feedBuilder-XXX] State transition from RUNNING
> to
> >>>> REBALANCING
> >>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from
> >> REBALANCING
> >>>> to RUNNING
> >>>> [JVM-3] stream-client [feedBuilder-XXX] State transition from
> >> REBALANCING
> >>>> to RUNNING
> >>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from
> >> REBALANCING
> >>>> to RUNNING
> >>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from RUNNING
> to
> >>>> PENDING_SHUTDOWN
> >>>> [JVM-3] stream-client [feedBuilder-XXX] State transition from RUNNING
> to
> >>>> REBALANCING
> >>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from RUNNING
> to
> >>>> REBALANCING
> >>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from
> >>>> PENDING_SHUTDOWN to NOT_RUNNING
> >>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from
> >> REBALANCING
> >>>> to RUNNING
> >>>> [JVM-3] stream-client [feedBuilder-XXX] State transition from
> >> REBALANCING
> >>>> to RUNNING
> >>>> [JVM-3] stream-client [feedBuilder-XXX] State transition from RUNNING
> to
> >>>> PENDING_SHUTDOWN
> >>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from RUNNING
> to
> >>>> PENDING_SHUTDOWN
> >>>> [JVM-3] stream-client [feedBuilder-XXX] State transition from
> >>>> PENDING_SHUTDOWN to NOT_RUNNING
> >>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from
> >>>> PENDING_SHUTDOWN to NOT_RUNNING
> >>>>
> >>>>
> >>>> What should I do with that?
> >>>>
> >>>> Thanks, Fred
> >>>>
> >>>
> >>
> >>
> >
>
>

Reply via email to