Thanks Ted! Done. On 4/4/18 7:30 PM, Ted Yu wrote: > I created KAFKA-6747. > Frederic's Id was on KAFKA-6323. > > Can some committer change the reporter of KAFKA-6747 to Frederic ? > > Thanks > > On Wed, Apr 4, 2018 at 4:35 PM, Ted Yu <yuzhih...@gmail.com> wrote: > >> https://github.com/apache/kafka/pull/4826 >> >> I will fill in JIRA Id once Frederic creates the JIRA. >> >> Cheers >> >> On Wed, Apr 4, 2018 at 4:29 PM, Matthias J. Sax <matth...@confluent.io> >> wrote: >> >>> Yes. That looks promising to me. Feel free to open an PR after we have a >>> JIRA -- or just create the JIRA right away. >>> >>> -Matthias >>> >>> On 4/4/18 2:57 PM, Ted Yu wrote: >>>> How about the following change ? >>>> >>>> diff --git >>>> a/streams/src/main/java/org/apache/kafka/streams/processor/ >>> internals/StreamTask.java >>>> b/streams/src/main/java/org/apache/kafka/streams/processor/ >>> internals/StreamTask.java >>>> index 8d6e56a..92bedad 100644 >>>> --- >>>> a/streams/src/main/java/org/apache/kafka/streams/processor/ >>> internals/StreamTask.java >>>> +++ >>>> b/streams/src/main/java/org/apache/kafka/streams/processor/ >>> internals/StreamTask.java >>>> @@ -482,7 +482,7 @@ public class StreamTask extends AbstractTask >>> implements >>>> ProcessorNodePunctuator >>>> if (eosEnabled) { >>>> if (!clean) { >>>> try { >>>> - if (!isZombie) { >>>> + if (!isZombie && transactionInFlight) { >>>> producer.abortTransaction(); >>>> } >>>> transactionInFlight = false; >>>> >>>> On Wed, Apr 4, 2018 at 2:02 PM, Matthias J. Sax <matth...@confluent.io> >>>> wrote: >>>> >>>>> Thanks for reporting this. >>>>> >>>>> It's indeed a bug in Kafka Streams. It's related to this fix: >>>>> https://issues.apache.org/jira/browse/KAFKA-6634 -- the corresponding >>> PR >>>>> introduces the issue. >>>>> >>>>> Because, we initialize TX delayed, for your case, we never initialize >>> TX >>>>> and thus aborting the TX fails. >>>>> >>>>> Please open a JIRA for the issue. >>>>> >>>>> -Matthias >>>>> >>>>> On 4/4/18 9:32 AM, Ted Yu wrote: >>>>>> Looking at isTransitionValid(): >>>>>> >>>>>> case ABORTING_TRANSACTION: >>>>>> >>>>>> return source == IN_TRANSACTION || source == >>>>>> ABORTABLE_ERROR; >>>>>> >>>>>> The source state is not supposed to be READY. >>>>>> >>>>>> I don't see READY in the log you posted. >>>>>> >>>>>> >>>>>> Please consider logging a JIRA where you can attach logs. >>>>>> >>>>>> >>>>>> Cheers >>>>>> >>>>>> >>>>>> On Wed, Apr 4, 2018 at 2:49 AM, Frederic Arno <frederica...@gmail.com >>>> >>>>>> wrote: >>>>>> >>>>>>> Hello, >>>>>>> >>>>>>> I running tests against kafka-streams 1.1 and get the following stack >>>>>>> trace (everything was working alright using kafka-streams 1.0): >>>>>>> >>>>>>> ERROR org.apache.kafka.streams.processor.internals.AssignedStreams >>> Tasks >>>>> - >>>>>>> stream-thread [feedBuilder-XXX-StreamThread-4] Failed to close >>> stream >>>>>>> task, 0_2 >>>>>>> org.apache.kafka.common.KafkaException: TransactionalId >>>>> feedBuilder-0_2: >>>>>>> Invalid transition attempted from state READY to state >>>>> ABORTING_TRANSACTION >>>>>>> at org.apache.kafka.clients.produ >>> cer.internals.TransactionManag >>>>>>> er.transitionTo(TransactionManager.java:757) >>>>>>> at org.apache.kafka.clients.produ >>> cer.internals.TransactionManag >>>>>>> er.transitionTo(TransactionManager.java:751) >>>>>>> at org.apache.kafka.clients.produ >>> cer.internals.TransactionManag >>>>>>> er.beginAbort(TransactionManager.java:230) >>>>>>> at org.apache.kafka.clients.produ >>> cer.KafkaProducer.abortTransac >>>>>>> tion(KafkaProducer.java:660) >>>>>>> at org.apache.kafka.streams.processor.internals.StreamTask. >>>>>>> closeSuspended(StreamTask.java:486) >>>>>>> at org.apache.kafka.streams.processor.internals.StreamTask. >>>>>>> close(StreamTask.java:546) >>>>>>> at org.apache.kafka.streams.proce >>> ssor.internals.AssignedTasks.c >>>>>>> loseNonRunningTasks(AssignedTasks.java:166) >>>>>>> at org.apache.kafka.streams.proce >>> ssor.internals.AssignedTasks. >>>>>>> suspend(AssignedTasks.java:151) >>>>>>> at org.apache.kafka.streams.proce >>> ssor.internals.TaskManager.sus >>>>>>> pendTasksAndState(TaskManager.java:242) >>>>>>> at org.apache.kafka.streams.proce >>> ssor.internals.StreamThread$Re >>>>>>> balanceListener.onPartitionsRevoked(StreamThread.java:291) >>>>>>> at org.apache.kafka.clients.consu >>> mer.internals.ConsumerCoordina >>>>>>> tor.onJoinPrepare(ConsumerCoordinator.java:414) >>>>>>> at org.apache.kafka.clients.consu >>> mer.internals.AbstractCoordina >>>>>>> tor.joinGroupIfNeeded(AbstractCoordinator.java:359) >>>>>>> at org.apache.kafka.clients.consu >>> mer.internals.AbstractCoordina >>>>>>> tor.ensureActiveGroup(AbstractCoordinator.java:316) >>>>>>> at org.apache.kafka.clients.consu >>> mer.internals.ConsumerCoordina >>>>>>> tor.poll(ConsumerCoordinator.java:290) >>>>>>> at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce( >>>>>>> KafkaConsumer.java:1149) >>>>>>> at org.apache.kafka.clients.consu >>> mer.KafkaConsumer.poll(KafkaCo >>>>>>> nsumer.java:1115) >>>>>>> at org.apache.kafka.streams.proce >>> ssor.internals.StreamThread. >>>>>>> pollRequests(StreamThread.java:827) >>>>>>> at org.apache.kafka.streams.proce >>> ssor.internals.StreamThread. >>>>>>> runOnce(StreamThread.java:784) >>>>>>> at org.apache.kafka.streams.proce >>> ssor.internals.StreamThread. >>>>>>> runLoop(StreamThread.java:750) >>>>>>> at org.apache.kafka.streams.proce >>> ssor.internals.StreamThread. >>>>>>> run(StreamThread.java:720) >>>>>>> >>>>>>> >>>>>>> This happens when starting the same stream-processing application on >>> 3 >>>>>>> JVMs all running on the same linux box, JVMs are named JVM-[2-4]. >>> All 3 >>>>>>> instances use separate stream state.dir. No record is ever processed >>>>>>> because the input kafka topics are empty at this stage. >>>>>>> >>>>>>> JVM-2 starts first, joined shortly after by JVM-4 and JVM-3, find the >>>>>>> state transition logs below. The above stacktrace is from JVM-4 >>>>>>> >>>>>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from >>> RUNNING to >>>>>>> REBALANCING >>>>>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from >>>>> REBALANCING >>>>>>> to RUNNING >>>>>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from >>> RUNNING to >>>>>>> REBALANCING >>>>>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from >>> RUNNING to >>>>>>> REBALANCING >>>>>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from >>>>> REBALANCING >>>>>>> to RUNNING >>>>>>> [JVM-3] stream-client [feedBuilder-XXX] State transition from >>> RUNNING to >>>>>>> REBALANCING >>>>>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from >>> RUNNING to >>>>>>> REBALANCING >>>>>>> JVM-4 crashes here with above stacktrace >>>>>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from >>>>> REBALANCING >>>>>>> to RUNNING >>>>>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from >>>>> REBALANCING >>>>>>> to ERROR >>>>>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from ERROR >>> to >>>>>>> PENDING_SHUTDOWN >>>>>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from >>>>>>> PENDING_SHUTDOWN to NOT_RUNNING >>>>>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from >>> RUNNING to >>>>>>> REBALANCING >>>>>>> [JVM-3] stream-client [feedBuilder-XXX] State transition from >>>>> REBALANCING >>>>>>> to RUNNING >>>>>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from >>> RUNNING to >>>>>>> REBALANCING >>>>>>> [JVM-3] stream-client [feedBuilder-XXX] State transition from >>> RUNNING to >>>>>>> REBALANCING >>>>>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from >>>>> REBALANCING >>>>>>> to RUNNING >>>>>>> [JVM-3] stream-client [feedBuilder-XXX] State transition from >>>>> REBALANCING >>>>>>> to RUNNING >>>>>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from >>>>> REBALANCING >>>>>>> to RUNNING >>>>>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from >>> RUNNING to >>>>>>> PENDING_SHUTDOWN >>>>>>> [JVM-3] stream-client [feedBuilder-XXX] State transition from >>> RUNNING to >>>>>>> REBALANCING >>>>>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from >>> RUNNING to >>>>>>> REBALANCING >>>>>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from >>>>>>> PENDING_SHUTDOWN to NOT_RUNNING >>>>>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from >>>>> REBALANCING >>>>>>> to RUNNING >>>>>>> [JVM-3] stream-client [feedBuilder-XXX] State transition from >>>>> REBALANCING >>>>>>> to RUNNING >>>>>>> [JVM-3] stream-client [feedBuilder-XXX] State transition from >>> RUNNING to >>>>>>> PENDING_SHUTDOWN >>>>>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from >>> RUNNING to >>>>>>> PENDING_SHUTDOWN >>>>>>> [JVM-3] stream-client [feedBuilder-XXX] State transition from >>>>>>> PENDING_SHUTDOWN to NOT_RUNNING >>>>>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from >>>>>>> PENDING_SHUTDOWN to NOT_RUNNING >>>>>>> >>>>>>> >>>>>>> What should I do with that? >>>>>>> >>>>>>> Thanks, Fred >>>>>>> >>>>>> >>>>> >>>>> >>>> >>> >>> >> >
signature.asc
Description: OpenPGP digital signature