Thanks Ted! Done.

On 4/4/18 7:30 PM, Ted Yu wrote:
> I created KAFKA-6747.
> Frederic's Id was on KAFKA-6323.
> 
> Can some committer change the reporter of KAFKA-6747 to Frederic ?
> 
> Thanks
> 
> On Wed, Apr 4, 2018 at 4:35 PM, Ted Yu <yuzhih...@gmail.com> wrote:
> 
>> https://github.com/apache/kafka/pull/4826
>>
>> I will fill in JIRA Id once Frederic creates the JIRA.
>>
>> Cheers
>>
>> On Wed, Apr 4, 2018 at 4:29 PM, Matthias J. Sax <matth...@confluent.io>
>> wrote:
>>
>>> Yes. That looks promising to me. Feel free to open an PR after we have a
>>> JIRA -- or just create the JIRA right away.
>>>
>>> -Matthias
>>>
>>> On 4/4/18 2:57 PM, Ted Yu wrote:
>>>> How about the following change ?
>>>>
>>>> diff --git
>>>> a/streams/src/main/java/org/apache/kafka/streams/processor/
>>> internals/StreamTask.java
>>>> b/streams/src/main/java/org/apache/kafka/streams/processor/
>>> internals/StreamTask.java
>>>> index 8d6e56a..92bedad 100644
>>>> ---
>>>> a/streams/src/main/java/org/apache/kafka/streams/processor/
>>> internals/StreamTask.java
>>>> +++
>>>> b/streams/src/main/java/org/apache/kafka/streams/processor/
>>> internals/StreamTask.java
>>>> @@ -482,7 +482,7 @@ public class StreamTask extends AbstractTask
>>> implements
>>>> ProcessorNodePunctuator
>>>>              if (eosEnabled) {
>>>>                  if (!clean) {
>>>>                      try {
>>>> -                        if (!isZombie) {
>>>> +                        if (!isZombie && transactionInFlight) {
>>>>                              producer.abortTransaction();
>>>>                          }
>>>>                          transactionInFlight = false;
>>>>
>>>> On Wed, Apr 4, 2018 at 2:02 PM, Matthias J. Sax <matth...@confluent.io>
>>>> wrote:
>>>>
>>>>> Thanks for reporting this.
>>>>>
>>>>> It's indeed a bug in Kafka Streams. It's related to this fix:
>>>>> https://issues.apache.org/jira/browse/KAFKA-6634 -- the corresponding
>>> PR
>>>>> introduces the issue.
>>>>>
>>>>> Because, we initialize TX delayed, for your case, we never initialize
>>> TX
>>>>> and thus aborting the TX fails.
>>>>>
>>>>> Please open a JIRA for the issue.
>>>>>
>>>>> -Matthias
>>>>>
>>>>> On 4/4/18 9:32 AM, Ted Yu wrote:
>>>>>> Looking at isTransitionValid():
>>>>>>
>>>>>>                 case ABORTING_TRANSACTION:
>>>>>>
>>>>>>                     return source == IN_TRANSACTION || source ==
>>>>>> ABORTABLE_ERROR;
>>>>>>
>>>>>> The source state is not supposed to be READY.
>>>>>>
>>>>>> I don't see READY in the log you posted.
>>>>>>
>>>>>>
>>>>>> Please consider logging a JIRA where you can attach logs.
>>>>>>
>>>>>>
>>>>>> Cheers
>>>>>>
>>>>>>
>>>>>> On Wed, Apr 4, 2018 at 2:49 AM, Frederic Arno <frederica...@gmail.com
>>>>
>>>>>> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> I running tests against kafka-streams 1.1 and get the following stack
>>>>>>> trace (everything was working alright using kafka-streams 1.0):
>>>>>>>
>>>>>>> ERROR org.apache.kafka.streams.processor.internals.AssignedStreams
>>> Tasks
>>>>> -
>>>>>>> stream-thread [feedBuilder-XXX-StreamThread-4] Failed to close
>>> stream
>>>>>>> task, 0_2
>>>>>>> org.apache.kafka.common.KafkaException: TransactionalId
>>>>> feedBuilder-0_2:
>>>>>>> Invalid transition attempted from state READY to state
>>>>> ABORTING_TRANSACTION
>>>>>>>         at org.apache.kafka.clients.produ
>>> cer.internals.TransactionManag
>>>>>>> er.transitionTo(TransactionManager.java:757)
>>>>>>>         at org.apache.kafka.clients.produ
>>> cer.internals.TransactionManag
>>>>>>> er.transitionTo(TransactionManager.java:751)
>>>>>>>         at org.apache.kafka.clients.produ
>>> cer.internals.TransactionManag
>>>>>>> er.beginAbort(TransactionManager.java:230)
>>>>>>>         at org.apache.kafka.clients.produ
>>> cer.KafkaProducer.abortTransac
>>>>>>> tion(KafkaProducer.java:660)
>>>>>>>         at org.apache.kafka.streams.processor.internals.StreamTask.
>>>>>>> closeSuspended(StreamTask.java:486)
>>>>>>>         at org.apache.kafka.streams.processor.internals.StreamTask.
>>>>>>> close(StreamTask.java:546)
>>>>>>>         at org.apache.kafka.streams.proce
>>> ssor.internals.AssignedTasks.c
>>>>>>> loseNonRunningTasks(AssignedTasks.java:166)
>>>>>>>         at org.apache.kafka.streams.proce
>>> ssor.internals.AssignedTasks.
>>>>>>> suspend(AssignedTasks.java:151)
>>>>>>>         at org.apache.kafka.streams.proce
>>> ssor.internals.TaskManager.sus
>>>>>>> pendTasksAndState(TaskManager.java:242)
>>>>>>>         at org.apache.kafka.streams.proce
>>> ssor.internals.StreamThread$Re
>>>>>>> balanceListener.onPartitionsRevoked(StreamThread.java:291)
>>>>>>>         at org.apache.kafka.clients.consu
>>> mer.internals.ConsumerCoordina
>>>>>>> tor.onJoinPrepare(ConsumerCoordinator.java:414)
>>>>>>>         at org.apache.kafka.clients.consu
>>> mer.internals.AbstractCoordina
>>>>>>> tor.joinGroupIfNeeded(AbstractCoordinator.java:359)
>>>>>>>         at org.apache.kafka.clients.consu
>>> mer.internals.AbstractCoordina
>>>>>>> tor.ensureActiveGroup(AbstractCoordinator.java:316)
>>>>>>>         at org.apache.kafka.clients.consu
>>> mer.internals.ConsumerCoordina
>>>>>>> tor.poll(ConsumerCoordinator.java:290)
>>>>>>>         at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(
>>>>>>> KafkaConsumer.java:1149)
>>>>>>>         at org.apache.kafka.clients.consu
>>> mer.KafkaConsumer.poll(KafkaCo
>>>>>>> nsumer.java:1115)
>>>>>>>         at org.apache.kafka.streams.proce
>>> ssor.internals.StreamThread.
>>>>>>> pollRequests(StreamThread.java:827)
>>>>>>>         at org.apache.kafka.streams.proce
>>> ssor.internals.StreamThread.
>>>>>>> runOnce(StreamThread.java:784)
>>>>>>>         at org.apache.kafka.streams.proce
>>> ssor.internals.StreamThread.
>>>>>>> runLoop(StreamThread.java:750)
>>>>>>>         at org.apache.kafka.streams.proce
>>> ssor.internals.StreamThread.
>>>>>>> run(StreamThread.java:720)
>>>>>>>
>>>>>>>
>>>>>>> This happens when starting the same stream-processing application on
>>> 3
>>>>>>> JVMs all running on the same linux box, JVMs are named JVM-[2-4].
>>> All 3
>>>>>>> instances use separate stream state.dir. No record is ever processed
>>>>>>> because the input kafka topics are empty at this stage.
>>>>>>>
>>>>>>> JVM-2 starts first, joined shortly after by JVM-4 and JVM-3, find the
>>>>>>> state transition logs below. The above stacktrace is from JVM-4
>>>>>>>
>>>>>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from
>>> RUNNING to
>>>>>>> REBALANCING
>>>>>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from
>>>>> REBALANCING
>>>>>>> to RUNNING
>>>>>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from
>>> RUNNING to
>>>>>>> REBALANCING
>>>>>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from
>>> RUNNING to
>>>>>>> REBALANCING
>>>>>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from
>>>>> REBALANCING
>>>>>>> to RUNNING
>>>>>>> [JVM-3] stream-client [feedBuilder-XXX] State transition from
>>> RUNNING to
>>>>>>> REBALANCING
>>>>>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from
>>> RUNNING to
>>>>>>> REBALANCING
>>>>>>> JVM-4 crashes here with above stacktrace
>>>>>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from
>>>>> REBALANCING
>>>>>>> to RUNNING
>>>>>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from
>>>>> REBALANCING
>>>>>>> to ERROR
>>>>>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from ERROR
>>> to
>>>>>>> PENDING_SHUTDOWN
>>>>>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from
>>>>>>> PENDING_SHUTDOWN to NOT_RUNNING
>>>>>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from
>>> RUNNING to
>>>>>>> REBALANCING
>>>>>>> [JVM-3] stream-client [feedBuilder-XXX] State transition from
>>>>> REBALANCING
>>>>>>> to RUNNING
>>>>>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from
>>> RUNNING to
>>>>>>> REBALANCING
>>>>>>> [JVM-3] stream-client [feedBuilder-XXX] State transition from
>>> RUNNING to
>>>>>>> REBALANCING
>>>>>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from
>>>>> REBALANCING
>>>>>>> to RUNNING
>>>>>>> [JVM-3] stream-client [feedBuilder-XXX] State transition from
>>>>> REBALANCING
>>>>>>> to RUNNING
>>>>>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from
>>>>> REBALANCING
>>>>>>> to RUNNING
>>>>>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from
>>> RUNNING to
>>>>>>> PENDING_SHUTDOWN
>>>>>>> [JVM-3] stream-client [feedBuilder-XXX] State transition from
>>> RUNNING to
>>>>>>> REBALANCING
>>>>>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from
>>> RUNNING to
>>>>>>> REBALANCING
>>>>>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from
>>>>>>> PENDING_SHUTDOWN to NOT_RUNNING
>>>>>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from
>>>>> REBALANCING
>>>>>>> to RUNNING
>>>>>>> [JVM-3] stream-client [feedBuilder-XXX] State transition from
>>>>> REBALANCING
>>>>>>> to RUNNING
>>>>>>> [JVM-3] stream-client [feedBuilder-XXX] State transition from
>>> RUNNING to
>>>>>>> PENDING_SHUTDOWN
>>>>>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from
>>> RUNNING to
>>>>>>> PENDING_SHUTDOWN
>>>>>>> [JVM-3] stream-client [feedBuilder-XXX] State transition from
>>>>>>> PENDING_SHUTDOWN to NOT_RUNNING
>>>>>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from
>>>>>>> PENDING_SHUTDOWN to NOT_RUNNING
>>>>>>>
>>>>>>>
>>>>>>> What should I do with that?
>>>>>>>
>>>>>>> Thanks, Fred
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to