Hi, Sorry, yes this is a bug to do with file locking and the clean-up thread. For now the workaround is to configure StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG to a very large value, i.e., Long.MAX_VALUE. So it is effectively disabled.
There are a couple of related JIRAs https://issues.apache.org/jira/browse/KAFKA-5562 and https://issues.apache.org/jira/browse/KAFKA-4890 Thanks, Damian On Wed, 26 Jul 2017 at 06:44 Eric Lalonde <e...@autonomic.ai> wrote: > Hello, I am able to reproduce this. It occurs during rebalancing when the > service is restarted. kafka-clients and kafka-streams are both at version > 0.10.2.1. 3 instances of the service, 4 threads per instance, 100 > partitions. > > log excerpt: > > Wed Jul 26 05:32:07 UTC 2017 > Streams state: REBALANCING > Num Stream Threads: 4 > > 2017-07-26 05:32:20.497 ERROR 7 --- [ StreamThread-1] > o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] > Failed to remove suspended task 2_68 > > org.apache.kafka.streams.errors.ProcessorStateException: Error while > closing the state manager > at > org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:133) > ~[kafka-streams-0.10.2.1.jar!/:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.closeNonAssignedSuspendedTasks(StreamThread.java:898) > [kafka-streams-0.10.2.1.jar!/:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69) > [kafka-streams-0.10.2.1.jar!/:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:233) > [kafka-streams-0.10.2.1.jar!/:na] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) > [kafka-clients-0.10.2.1.jar!/:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) > [kafka-clients-0.10.2.1.jar!/:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > [kafka-clients-0.10.2.1.jar!/:na] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) > [kafka-clients-0.10.2.1.jar!/:na] > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) > [kafka-clients-0.10.2.1.jar!/:na] > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > [kafka-clients-0.10.2.1.jar!/:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592) > [kafka-streams-0.10.2.1.jar!/:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) > [kafka-streams-0.10.2.1.jar!/:na] > Caused by: java.io.FileNotFoundException: > /home/kafka-streams/data/2_68/.checkpoint.tmp (No such file or directory) > at java.io.FileOutputStream.open0(Native Method) ~[na:1.8.0_102] > at java.io.FileOutputStream.open(FileOutputStream.java:270) > ~[na:1.8.0_102] > at java.io.FileOutputStream.<init>(FileOutputStream.java:213) > ~[na:1.8.0_102] > at java.io.FileOutputStream.<init>(FileOutputStream.java:162) > ~[na:1.8.0_102] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:71) > ~[kafka-streams-0.10.2.1.jar!/:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:386) > ~[kafka-streams-0.10.2.1.jar!/:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:131) > ~[kafka-streams-0.10.2.1.jar!/:na] > ... 11 common frames omitted > > > Interestingly, the directory 2_68 does not appear to exist on the instance > on which this exception was thrown: > > $ ls /home/kafka-streams/data/ > > 0_10 0_13 0_18 0_23 0_27 0_3 0_33 0_35 0_39 0_43 0_46 0_50 > 0_52 0_62 0_66 0_7 0_76 0_81 0_83 0_87 0_89 0_97 1_0 1_11 1_2 > 1_26 1_31 1_35 1_39 1_41 1_5 1_54 1_56 1_63 1_65 1_69 1_73 > 1_77 1_79 1_88 1_92 1_94 1_96 2_0 2_12 2_16 2_2 2_24 2_33 > 2_40 2_46 2_54 2_60 2_67 2_71 2_76 2_80 2_86 2_91 0_11 0_14 > 0_21 0_24 0_29 0_30 0_34 0_37 0_42 0_44 0_5 0_51 0_57 0_65 > 0_67 0_71 0_80 0_82 0_84 0_88 0_91 0_99 1_1 1_12 1_20 1_30 > 1_34 1_37 1_4 1_45 1_53 1_55 1_62 1_64 1_68 1_70 1_75 1_78 > 1_80 1_89 1_93 1_95 1_99 2_1 2_13 2_19 2_21 2_3 2_34 2_44 > 2_53 2_6 2_65 2_69 2_73 2_78 2_81 2_90 2_99 > > > On Jul 6, 2017, at 7:50 AM, Ian Duffy <i...@ianduffy.ie> wrote: > > > > Hi Damian, > > > > Sorry for the delayed reply have been out of office. > > > > I'm afraid I cannot check. We have alarms on our auto scaling groups for > > stream instances to kill them should the CPU utilization be < 1 for 30 > > mins. > > > > On Fri 30 Jun 2017 at 5:05 p.m., Damian Guy <damian....@gmail.com> > wrote: > > > >> Hi Ian, > >> > >> Can you check if the file exists and it is indeed a file rather then a > >> directory? > >> > >> Thanks, > >> Damian > >> > >> On Fri, 30 Jun 2017 at 16:45 Damian Guy <damian....@gmail.com> wrote: > >> > >>> Hi Ian, > >>> > >>> We had another report of what looks like the same issue. Will look into > >> it. > >>> > >>> Thanks, > >>> Damian > >>> > >>> On Fri, 30 Jun 2017 at 16:38 Ian Duffy <i...@ianduffy.ie> wrote: > >>> > >>>> Hi All, > >>>> > >>>> I was wondering if any of those who know stream internals should shed > >> any > >>>> light on the following exception: > >>>> > >>>> org.apache.kafka.streams.errors.ProcessorStateException: Error while > >>>> closing the state manager at > >>>> > >>>> > >> > org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:133) > >>>> at > >>>> > >>>> > >> > org.apache.kafka.streams.processor.internals.StreamThread.closeNonAssignedSuspendedTasks(StreamThread.java:898) > >>>> at > >>>> > >>>> > >> > org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69) > >>>> at > >>>> > >>>> > >> > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:233) > >>>> at > >>>> > >>>> > >> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) > >>>> at > >>>> > >>>> > >> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) > >>>> at > >>>> > >>>> > >> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > >>>> at > >>>> > >>>> > >> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) > >>>> at > >>>> > >>>> > >> > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) > >>>> at > >>>> > >>>> > >> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > >>>> at > >>>> > >>>> > >> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592) > >>>> at > >>>> > >>>> > >> > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) > >>>> Caused by: java.io.FileNotFoundException: > >>>> /data/kafka-streams-freshness/freshness-id-prod/2_46/.checkpoint.tmp > (No > >>>> such file or directory) at java.io.FileOutputStream.open0(Native > Method) > >>>> at > >>>> java.io.FileOutputStream.open(FileOutputStream.java:270) at > >>>> java.io.FileOutputStream.<init>(FileOutputStream.java:213) at > >>>> java.io.FileOutputStream.<init>(FileOutputStream.java:162) at > >>>> > >>>> > >> > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:71) > >>>> at > >>>> > >>>> > >> > org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:386) > >>>> at > >>>> > >>>> > >> > org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:131) > >>>> ... 11 common frames omitted > >>>> > >>>> Thanks, > >>>> Ian. > >>>> > >>> > >> > >