Hi,

Sorry, yes this is a bug to do with file locking and the clean-up thread.
For now the workaround is to configure
StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG to a very large value, i.e.,
Long.MAX_VALUE. So it is effectively disabled.

There are a couple of related JIRAs
https://issues.apache.org/jira/browse/KAFKA-5562 and
https://issues.apache.org/jira/browse/KAFKA-4890

Thanks,
Damian

On Wed, 26 Jul 2017 at 06:44 Eric Lalonde <e...@autonomic.ai> wrote:

> Hello, I am able to reproduce this. It occurs during rebalancing when the
> service is restarted. kafka-clients and kafka-streams are both at version
> 0.10.2.1. 3 instances of the service, 4 threads per instance, 100
> partitions.
>
> log excerpt:
>
>  Wed Jul 26 05:32:07 UTC 2017
>  Streams state: REBALANCING
>  Num Stream Threads: 4
>
>  2017-07-26 05:32:20.497 ERROR 7 --- [ StreamThread-1]
> o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1]
> Failed to remove suspended task 2_68
>
>  org.apache.kafka.streams.errors.ProcessorStateException: Error while
> closing the state manager
>  at
> org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:133)
> ~[kafka-streams-0.10.2.1.jar!/:na]
>  at
> org.apache.kafka.streams.processor.internals.StreamThread.closeNonAssignedSuspendedTasks(StreamThread.java:898)
> [kafka-streams-0.10.2.1.jar!/:na]
>  at
> org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
> [kafka-streams-0.10.2.1.jar!/:na]
>  at
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:233)
> [kafka-streams-0.10.2.1.jar!/:na]
>  at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
> [kafka-clients-0.10.2.1.jar!/:na]
>  at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
> [kafka-clients-0.10.2.1.jar!/:na]
>  at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> [kafka-clients-0.10.2.1.jar!/:na]
>  at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
> [kafka-clients-0.10.2.1.jar!/:na]
>  at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
> [kafka-clients-0.10.2.1.jar!/:na]
>  at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
> [kafka-clients-0.10.2.1.jar!/:na]
>  at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
> [kafka-streams-0.10.2.1.jar!/:na]
>  at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> [kafka-streams-0.10.2.1.jar!/:na]
>  Caused by: java.io.FileNotFoundException:
> /home/kafka-streams/data/2_68/.checkpoint.tmp (No such file or directory)
>  at java.io.FileOutputStream.open0(Native Method) ~[na:1.8.0_102]
>  at java.io.FileOutputStream.open(FileOutputStream.java:270)
> ~[na:1.8.0_102]
>  at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
> ~[na:1.8.0_102]
>  at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
> ~[na:1.8.0_102]
>  at
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:71)
> ~[kafka-streams-0.10.2.1.jar!/:na]
>  at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:386)
> ~[kafka-streams-0.10.2.1.jar!/:na]
>  at
> org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:131)
> ~[kafka-streams-0.10.2.1.jar!/:na]
>  ... 11 common frames omitted
>
>
> Interestingly, the directory 2_68 does not appear to exist on the instance
> on which this  exception was thrown:
>
> $ ls /home/kafka-streams/data/
>
> 0_10  0_13  0_18  0_23  0_27  0_3   0_33  0_35  0_39  0_43  0_46  0_50
> 0_52  0_62  0_66  0_7   0_76  0_81  0_83  0_87  0_89  0_97  1_0  1_11  1_2
>  1_26  1_31  1_35  1_39  1_41  1_5   1_54  1_56  1_63  1_65  1_69  1_73
> 1_77  1_79  1_88  1_92  1_94  1_96  2_0  2_12  2_16  2_2   2_24  2_33
> 2_40  2_46  2_54  2_60  2_67  2_71  2_76  2_80  2_86  2_91 0_11  0_14
> 0_21  0_24   0_29  0_30  0_34  0_37  0_42  0_44  0_5   0_51  0_57  0_65
> 0_67  0_71  0_80  0_82  0_84  0_88  0_91  0_99  1_1  1_12  1_20  1_30
> 1_34  1_37  1_4   1_45  1_53  1_55  1_62  1_64  1_68  1_70  1_75  1_78
> 1_80  1_89  1_93  1_95  1_99  2_1  2_13  2_19  2_21  2_3   2_34  2_44
> 2_53  2_6   2_65  2_69  2_73  2_78  2_81  2_90  2_99
>
> > On Jul 6, 2017, at 7:50 AM, Ian Duffy <i...@ianduffy.ie> wrote:
> >
> > Hi Damian,
> >
> > Sorry for the delayed reply have been out of office.
> >
> > I'm afraid I cannot check. We have alarms on our auto scaling groups for
> > stream instances to kill them should the CPU utilization be < 1 for 30
> > mins.
> >
> > On Fri 30 Jun 2017 at 5:05 p.m., Damian Guy <damian....@gmail.com>
> wrote:
> >
> >> Hi Ian,
> >>
> >> Can you check if the file exists and it is indeed a file rather then a
> >> directory?
> >>
> >> Thanks,
> >> Damian
> >>
> >> On Fri, 30 Jun 2017 at 16:45 Damian Guy <damian....@gmail.com> wrote:
> >>
> >>> Hi Ian,
> >>>
> >>> We had another report of what looks like the same issue. Will look into
> >> it.
> >>>
> >>> Thanks,
> >>> Damian
> >>>
> >>> On Fri, 30 Jun 2017 at 16:38 Ian Duffy <i...@ianduffy.ie> wrote:
> >>>
> >>>> Hi All,
> >>>>
> >>>> I was wondering if any of those who know stream internals should shed
> >> any
> >>>> light on the following exception:
> >>>>
> >>>> org.apache.kafka.streams.errors.ProcessorStateException: Error while
> >>>> closing the state manager at
> >>>>
> >>>>
> >>
> org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:133)
> >>>> at
> >>>>
> >>>>
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.closeNonAssignedSuspendedTasks(StreamThread.java:898)
> >>>> at
> >>>>
> >>>>
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
> >>>> at
> >>>>
> >>>>
> >>
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:233)
> >>>> at
> >>>>
> >>>>
> >>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
> >>>> at
> >>>>
> >>>>
> >>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
> >>>> at
> >>>>
> >>>>
> >>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> >>>> at
> >>>>
> >>>>
> >>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
> >>>> at
> >>>>
> >>>>
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
> >>>> at
> >>>>
> >>>>
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
> >>>> at
> >>>>
> >>>>
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
> >>>> at
> >>>>
> >>>>
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> >>>> Caused by: java.io.FileNotFoundException:
> >>>> /data/kafka-streams-freshness/freshness-id-prod/2_46/.checkpoint.tmp
> (No
> >>>> such file or directory) at java.io.FileOutputStream.open0(Native
> Method)
> >>>> at
> >>>> java.io.FileOutputStream.open(FileOutputStream.java:270) at
> >>>> java.io.FileOutputStream.<init>(FileOutputStream.java:213) at
> >>>> java.io.FileOutputStream.<init>(FileOutputStream.java:162) at
> >>>>
> >>>>
> >>
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:71)
> >>>> at
> >>>>
> >>>>
> >>
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:386)
> >>>> at
> >>>>
> >>>>
> >>
> org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:131)
> >>>> ... 11 common frames omitted
> >>>>
> >>>> Thanks,
> >>>> Ian.
> >>>>
> >>>
> >>
>
>

Reply via email to