Bill Bejeck created KAFKA-7534: ---------------------------------- Summary: Error during CachingKeyValueStore.flush may not allow RocksDB to close Key: KAFKA-7534 URL: https://issues.apache.org/jira/browse/KAFKA-7534 Project: Kafka Issue Type: Bug Components: streams Reporter: Bill Bejeck Assignee: Bill Bejeck
@Override public void flush() { lock.writeLock().lock(); try { cache.flush(cacheName); underlying.flush(); } finally { lock.writeLock().unlock(); } } @Override public void close() { flush(); underlying.close(); cache.close(cacheName); An exception leading to this, notice that another store is already closed and therefore not available: 2018-10-04 12:18:44,961 ERROR [org.apache.kafka.streams.processor.internals.ProcessorStateManager] (...-StreamThread-8) - task [8_11] Failed to close state store ...-STATE-STORE-0000000038: : org.apache.kafka.streams.errors.InvalidStateStoreException: Store KSTREAM-REDUCE-STATE-STORE-0000000025 is currently closed. at org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.validateStoreOpen(WrappedStateStore.java:70) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:150) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:38) at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:186) at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:112) at org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceValueGetter.get(KStreamReduce.java:124) at org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.get(KTableFilter.java:132) at org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:89) at org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:58) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50) at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90) at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:40) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:101) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:38) at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83) at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141) at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99) at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:125) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:123) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(CachingKeyValueStore.java:132) at org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.close(WrappedStateStore.java:89) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:269) at org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:245) at org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(StreamTask.java:546) at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:624) at org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:410) at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) at org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1172) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747) If the store is not closed we have witnessed that the lock is RocksDB is not removed properly which can lead to 2018-10-04 12:18:59,342 ERROR [stderr] (...-StreamThread-6) - Caused by: org.rocksdb.RocksDBException: While lock file: ...-STATE-STORE-0000000038/LOCK: No locks available 2018-10-04 12:18:59,342 ERROR [stderr] (...-StreamThread-6) - at org.rocksdb.RocksDB.open(Native Method) -- This message was sent by Atlassian JIRA (v7.6.3#76005)