[ https://issues.apache.org/jira/browse/KAFKA-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15932701#comment-15932701 ]
Yunus Olgun commented on KAFKA-4890: ------------------------------------ Happy to help, but I wasn't so helpful this time, unfortunately. Please disregard the unit test. Calling _get_ immediately after _submit_ turned this supposedly multithreaded test into a sequential one. Couldn't reproduce using proper Futures and simulated wait times. Also OverlappingFileLockException should protect against multithreaded or single threaded access from within same VM. My assumption was wrong. In logs3.tar.gz file, timestamps are in seconds and logs are coming from different threads. So, order of logs between different threads may not be correct. I will try to reproduce the issue using milliseconds and AsyncAppender in log configuration. - In 0.10.2.0, this bug was a blocker for me to use multithreaded in streams application. It happens frequently, rebalancing and state store initialization takes too long. - In 0.11.0.0, with default configuration I couldn't reproduce it. Even with state.cleanup.delay.ms=100, it takes some time. Also rebalancing and state store initalization is much faster now. It is not urgent for this version, imo. > State directory being deleted when another thread holds the lock > ---------------------------------------------------------------- > > Key: KAFKA-4890 > URL: https://issues.apache.org/jira/browse/KAFKA-4890 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.2.0 > Reporter: Damian Guy > Attachments: logs2.tar.gz, logs3.tar.gz, logs.tar.gz > > > Looks like a state directory is being cleaned up when another thread already > has the lock: > {code} > 2017-03-12 20:39:17 [StreamThread-1] DEBUG o.a.k.s.p.i.ProcessorStateManager > - task [0_6] Registering state store perGameScoreStore to its state manager > 2017-03-12 20:40:21 [StreamThread-3] INFO o.a.k.s.p.i.StateDirectory - > Deleting obsolete state directory 0_6 for task 0_6 > 2017-03-12 20:40:22 [StreamThread-1] ERROR o.a.k.c.c.i.ConsumerCoordinator - > User provided listener > org.apache.kafka.streams.processor.internals.StreamThread$1 for group > fireflyProd failed on partition assignment > org.apache.kafka.streams.errors.ProcessorStateException: Error while > executing put key > \x00\x00\x00\x00}\xA2\x9E\x9D\x05\xF6\x95\xAB\x01\x12dayOfGame and value > \x00\x00\x00\x00z\x00\x00\x00\x00\x00\x80G@ from store perGameScoreStore > at > org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:248) > at > org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:65) > at > org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:156) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:230) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:193) > at > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99) > at > org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:152) > at > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:39) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:62) > at > org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86) > at > org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141) > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207) > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937) > at > org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69) > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) > Caused by: org.rocksdb.RocksDBException: ` > at org.rocksdb.RocksDB.put(Native Method) > at org.rocksdb.RocksDB.put(RocksDB.java:488) > at > org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:246) > ... 27 common frames omitted > {code} > Also > {code} > 2017-03-12 20:46:58 [StreamThread-4] INFO o.a.k.s.p.i.StateDirectory - > Deleting obsolete state directory 0_2 for task 0_2 > ... > 2017-03-12 20:47:02 [StreamThread-2] ERROR o.a.k.s.p.i.StreamThread - > stream-thread [StreamThread-2] Failed to commit StandbyTask 0_2 state: > org.apache.kafka.streams.errors.ProcessorStateException: task [0_2] Failed to > flush state store lifetimeScoreStore > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:325) > at > org.apache.kafka.streams.processor.internals.StandbyTask.commit(StandbyTask.java:94) > at > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:777) > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:767) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:739) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:661) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) > Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error > while executing flush from store lifetimeScoreStore > at > org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:346) > at > org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:337) > at > org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractWrappedStateStore.flush(WrappedStateStore.java:80) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore$6.run(MeteredKeyValueStore.java:92) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:186) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:112) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:323) > ... 6 common frames omitted > Caused by: org.rocksdb.RocksDBException: a > at org.rocksdb.RocksDB.flush(Native Method) > at org.rocksdb.RocksDB.flush(RocksDB.java:1642) > {code} > Operating System info > Distributor ID: Debian > Description: Debian GNU/Linux 8.7 (jessie) > Release: 8.7 > Codename: jessie > uname: 3.16.0-4-amd64 > FWIW - i don't see anything obvious and I can't reproduce it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)