[ https://issues.apache.org/jira/browse/KAFKA-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15923322#comment-15923322 ]
Guozhang Wang commented on KAFKA-4890: -------------------------------------- [~damianguy] I looked at the logs (actually only one of the log files {{firefly_2.log}} which I believe correspond to the first trace you posted above). And here is what I have found: 1. Before thread-1 hit the error, it did seem like having a long GC for while it is creating task 6 (used {{grep -i "threadid" logfile}}): {code} 2017-03-12 20:39:17 [StreamThread-1] DEBUG o.a.k.c.c.KafkaConsumer - Seeking to beginning of partition fireflyProd-perGameScoreStore-changelog-6 2017-03-12 20:39:17 [StreamThread-1] DEBUG o.a.k.c.m.Metrics - Added sensor with name topic.fireflyProd-userAwardStore-changelog.bytes-fetched 2017-03-12 20:39:17 [StreamThread-1] DEBUG o.a.k.c.m.Metrics - Added sensor with name topic.fireflyProd-userAwardStore-changelog.records-fetched 2017-03-12 20:39:17 [StreamThread-1] DEBUG o.a.k.c.m.Metrics - Added sensor with name fireflyProd-perGameScoreStore-changelog-6.records-lag 2017-03-12 20:40:22 [StreamThread-1] DEBUG o.a.k.c.c.KafkaConsumer - Unsubscribed all topics or patterns and assigned partitions // 11 seconds later 2017-03-12 20:40:22 [StreamThread-1] DEBUG o.a.k.c.m.Metrics - Removed sensor with name fireflyProd-perGameScoreStore-changelog-6.records-lag {code} And during this period of time thread-3 has deleted the state directory for task 0_6, as you observed: {code} 2017-03-12 20:40:21 [StreamThread-3] INFO o.a.k.s.p.i.StateDirectory - Deleting obsolete state directory 0_6 for task 0_6 {code} With a deeper look, it actually shows that it is because thread-3 has once successfully grabbed the lock for 0_6 (during generation 97) before this task has been created on thread-1 (during generation 98) and within that 2 seconds, thread-3 grabs a bunch of locks and successfully deleted the state directories. At this time it likely did not have any data yet. {code} 2017-03-12 20:37:04 [StreamThread-3] INFO o.a.k.c.c.i.AbstractCoordinator - Successfully joined group fireflyProd with generation 97 2017-03-12 20:37:04 [StreamThread-3] INFO o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions [debugStatistics-12] for group fireflyProd 2017-03-12 20:37:04 [StreamThread-1] INFO o.a.k.c.c.i.AbstractCoordinator - Successfully joined group fireflyProd with generation 97 2017-03-12 20:37:04 [StreamThread-1] INFO o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions [debugStatistics-14] for group fireflyProd ... 2017-03-12 20:38:07 [StreamThread-2] INFO o.a.k.s.p.i.StateDirectory - Deleting obsolete state directory 0_10 for task 0_10 2017-03-12 20:38:07 [StreamThread-1] INFO o.a.k.s.p.i.StateDirectory - Deleting obsolete state directory 0_10 for task 0_10 2017-03-12 20:38:07 [StreamThread-2] INFO o.a.k.s.p.i.StateDirectory - Deleting obsolete state directory 0_0 for task 0_0 2017-03-12 20:38:07 [StreamThread-1] INFO o.a.k.s.p.i.StateDirectory - Deleting obsolete state directory 0_5 for task 0_5 2017-03-12 20:38:07 [StreamThread-3] INFO o.a.k.s.p.i.StateDirectory - Deleting obsolete state directory 0_0 for task 0_0 2017-03-12 20:38:07 [StreamThread-1] INFO o.a.k.s.p.i.StateDirectory - Deleting obsolete state directory 0_4 for task 0_4 2017-03-12 20:38:07 [StreamThread-3] INFO o.a.k.s.p.i.StateDirectory - Deleting obsolete state directory 0_5 for task 0_5 2017-03-12 20:38:07 [StreamThread-3] INFO o.a.k.s.p.i.StateDirectory - Deleting obsolete state directory 0_4 for task 0_4 2017-03-12 20:38:07 [StreamThread-2] INFO o.a.k.s.p.i.StateDirectory - Deleting obsolete state directory 0_5 for task 0_5 2017-03-12 20:38:07 [StreamThread-2] INFO o.a.k.s.p.i.StateDirectory - Deleting obsolete state directory 0_4 for task 0_4 2017-03-12 20:38:07 [StreamThread-3] INFO o.a.k.s.p.i.StateDirectory - Deleting obsolete state directory 0_15 for task 0_15 2017-03-12 20:38:07 [StreamThread-2] INFO o.a.k.s.p.i.StateDirectory - Deleting obsolete state directory 0_15 for task 0_15 2017-03-12 20:38:07 [StreamThread-1] INFO o.a.k.s.p.i.StateDirectory - Deleting obsolete state directory 0_15 for task 0_15 2017-03-12 20:38:07 [StreamThread-2] INFO o.a.k.s.p.i.StateDirectory - Deleting obsolete state directory 0_8 for task 0_8 2017-03-12 20:38:07 [StreamThread-3] INFO o.a.k.s.p.i.StateDirectory - Deleting obsolete state directory 0_8 for task 0_8 2017-03-12 20:38:07 [StreamThread-1] INFO o.a.k.s.p.i.StateDirectory - Deleting obsolete state directory 0_8 for task 0_8 2017-03-12 20:38:07 [StreamThread-2] INFO o.a.k.s.p.i.StateDirectory - Deleting obsolete state directory 0_2 for task 0_2 2017-03-12 20:38:07 [StreamThread-1] INFO o.a.k.s.p.i.StateDirectory - Deleting obsolete state directory 0_2 for task 0_2 2017-03-12 20:38:07 [StreamThread-3] INFO o.a.k.s.p.i.StateDirectory - Deleting obsolete state directory 0_2 for task 0_2 2017-03-12 20:38:07 [StreamThread-3] INFO o.a.k.s.p.i.StateDirectory - Deleting obsolete state directory 0_6 for task 0_6 2017-03-12 20:38:07 [StreamThread-1] INFO o.a.k.s.p.i.StateDirectory - Deleting obsolete state directory 0_1 for task 0_1 2017-03-12 20:38:07 [StreamThread-2] INFO o.a.k.s.p.i.StateDirectory - Deleting obsolete state directory 0_7 for task 0_7 2017-03-12 20:38:07 [StreamThread-1] INFO o.a.k.s.p.i.StateDirectory - Deleting obsolete state directory 0_7 for task 0_7 2017-03-12 20:38:07 [StreamThread-3] INFO o.a.k.s.p.i.StateDirectory - Deleting obsolete state directory 0_1 for task 0_1 2017-03-12 20:38:07 [StreamThread-3] INFO o.a.k.s.p.i.StateDirectory - Deleting obsolete state directory 0_7 for task 0_7 2017-03-12 20:38:07 [StreamThread-3] INFO o.a.k.s.p.i.StateDirectory - Deleting obsolete state directory 0_11 for task 0_11 2017-03-12 20:38:07 [StreamThread-2] INFO o.a.k.s.p.i.StateDirectory - Deleting obsolete state directory 0_13 for task 0_13 2017-03-12 20:38:08 [StreamThread-3] INFO o.a.k.s.p.i.StateDirectory - Deleting obsolete state directory 0_13 for task 0_13 ... 2017-03-12 20:39:17 [StreamThread-1] INFO o.a.k.c.c.i.AbstractCoordinator - Successfully joined group fireflyProd with generation 98 2017-03-12 20:39:17 [StreamThread-1] INFO o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions [debugStatistics-6, debugStatistics-14] for group fireflyProd 2017-03-12 20:39:17 [StreamThread-3] INFO o.a.k.c.c.i.AbstractCoordinator - Successfully joined group fireflyProd with generation 98 2017-03-12 20:39:17 [StreamThread-3] INFO o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions [debugStatistics-3, debugStatistics-12] for group fireflyProd ... {code} So there are a couple of risks here: 1) In {{StateDirectory#unlock}}, the operation of releasing the lock and the closing of the channel is not atomic. So it is possible that thread-1 release the lock, thread-2 then grabs it, and then thread-1 close the channel causing thread-2's lock to be void. This is discussed in https://issues.apache.org/jira/browse/KAFKA-3812 already. 2) In {{StateDirectory#cleanRemovedTasks}} we delete the whole directory while holding the lock on an empty file within the directory; since the lock itself is advisory it does not prevent deleting the file while holding its file lock; but another concurrent thread trying to grab the lock could then re-create the file as a new one and hence grabbing the lock on a new file channel. Thus these two locks are not exclusive anymore. I guess the reason it is hard to reproduce (if you are trying out on trunk) is that since the fix made in trunk, cleanup timeout only start ticking after the rebalance is done, hence these things happen much unlikely (note that thread-3 immediately tries to delete other task's state directory 1 second after it has created its own tasks). > State directory being deleted when another thread holds the lock > ---------------------------------------------------------------- > > Key: KAFKA-4890 > URL: https://issues.apache.org/jira/browse/KAFKA-4890 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.2.0 > Reporter: Damian Guy > Attachments: logs.tar.gz > > > Looks like a state directory is being cleaned up when another thread already > has the lock: > {code} > 2017-03-12 20:39:17 [StreamThread-1] DEBUG o.a.k.s.p.i.ProcessorStateManager > - task [0_6] Registering state store perGameScoreStore to its state manager > 2017-03-12 20:40:21 [StreamThread-3] INFO o.a.k.s.p.i.StateDirectory - > Deleting obsolete state directory 0_6 for task 0_6 > 2017-03-12 20:40:22 [StreamThread-1] ERROR o.a.k.c.c.i.ConsumerCoordinator - > User provided listener > org.apache.kafka.streams.processor.internals.StreamThread$1 for group > fireflyProd failed on partition assignment > org.apache.kafka.streams.errors.ProcessorStateException: Error while > executing put key > \x00\x00\x00\x00}\xA2\x9E\x9D\x05\xF6\x95\xAB\x01\x12dayOfGame and value > \x00\x00\x00\x00z\x00\x00\x00\x00\x00\x80G@ from store perGameScoreStore > at > org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:248) > at > org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:65) > at > org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:156) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:230) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:193) > at > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99) > at > org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:152) > at > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:39) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:62) > at > org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86) > at > org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141) > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207) > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937) > at > org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69) > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) > Caused by: org.rocksdb.RocksDBException: ` > at org.rocksdb.RocksDB.put(Native Method) > at org.rocksdb.RocksDB.put(RocksDB.java:488) > at > org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:246) > ... 27 common frames omitted > {code} > Also > {code} > 2017-03-12 20:46:58 [StreamThread-4] INFO o.a.k.s.p.i.StateDirectory - > Deleting obsolete state directory 0_2 for task 0_2 > ... > 2017-03-12 20:47:02 [StreamThread-2] ERROR o.a.k.s.p.i.StreamThread - > stream-thread [StreamThread-2] Failed to commit StandbyTask 0_2 state: > org.apache.kafka.streams.errors.ProcessorStateException: task [0_2] Failed to > flush state store lifetimeScoreStore > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:325) > at > org.apache.kafka.streams.processor.internals.StandbyTask.commit(StandbyTask.java:94) > at > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:777) > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:767) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:739) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:661) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) > Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error > while executing flush from store lifetimeScoreStore > at > org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:346) > at > org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:337) > at > org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractWrappedStateStore.flush(WrappedStateStore.java:80) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore$6.run(MeteredKeyValueStore.java:92) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:186) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:112) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:323) > ... 6 common frames omitted > Caused by: org.rocksdb.RocksDBException: a > at org.rocksdb.RocksDB.flush(Native Method) > at org.rocksdb.RocksDB.flush(RocksDB.java:1642) > {code} > Operating System info > Distributor ID: Debian > Description: Debian GNU/Linux 8.7 (jessie) > Release: 8.7 > Codename: jessie > uname: 3.16.0-4-amd64 > FWIW - i don't see anything obvious and I can't reproduce it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)