Damian Guy created KAFKA-4890:
---------------------------------

             Summary: State directory being deleted when another thread holds 
the lock
                 Key: KAFKA-4890
                 URL: https://issues.apache.org/jira/browse/KAFKA-4890
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 0.10.2.0
            Reporter: Damian Guy
         Attachments: logs.tar.gz

Looks like a state directory is being cleaned up when another thread already 
has the lock:

{{2017-03-12 20:39:17 [StreamThread-1] DEBUG o.a.k.s.p.i.ProcessorStateManager 
- task [0_6] Registering state store perGameScoreStore to its state manager

2017-03-12 20:40:21 [StreamThread-3] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_6 for task 0_6

2017-03-12 20:40:22 [StreamThread-1] ERROR o.a.k.c.c.i.ConsumerCoordinator - 
User provided listener 
org.apache.kafka.streams.processor.internals.StreamThread$1 for group 
fireflyProd failed on partition assignment
org.apache.kafka.streams.errors.ProcessorStateException: Error while executing 
put key \x00\x00\x00\x00}\xA2\x9E\x9D\x05\xF6\x95\xAB\x01\x12dayOfGame and 
value \x00\x00\x00\x00z\x00\x00\x00\x00\x00\x80G@ from store perGameScoreStore
        at 
org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:248)
        at 
org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:65)
        at 
org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:156)
        at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:230)
        at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:193)
        at 
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)
        at 
org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:152)
        at 
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:39)
        at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100)
        at 
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
        at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:62)
        at 
org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
        at 
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
        at 
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
 at 
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
Caused by: org.rocksdb.RocksDBException: `
        at org.rocksdb.RocksDB.put(Native Method)
        at org.rocksdb.RocksDB.put(RocksDB.java:488)
        at 
org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:246)
        ... 27 common frames omitted}}




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to