Hi Patrik, Thanks for reporting this, it does indeed look like this could be an issue.
I've created https://issues.apache.org/jira/browse/KAFKA-7534 to look into the issue and if required create a patch. -Bill On Tue, Oct 23, 2018 at 9:24 AM Patrik Kleindl <pklei...@gmail.com> wrote: > Hello > > Can someone please verify if my assumption is correct? > In CachingKeyValueStore, if an exception happens during flush() the store > will not be closed properly. > > @Override > public void flush() { > lock.writeLock().lock(); > try { > cache.flush(cacheName); > underlying.flush(); > } finally { > lock.writeLock().unlock(); > } > } > > @Override > public void close() { > flush(); > underlying.close(); > cache.close(cacheName); > > An exception leading to this, notice that another store is already closed > and therefore not available: > 2018-10-04 12:18:44,961 ERROR > [org.apache.kafka.streams.processor.internals.ProcessorStateManager] > (...-StreamThread-8) - task [8_11] Failed to close state store > ...-STATE-STORE-0000000038: : > org.apache.kafka.streams.errors.InvalidStateStoreException: Store > KSTREAM-REDUCE-STATE-STORE-0000000025 is currently closed. > at > > org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.validateStoreOpen(WrappedStateStore.java:70) > at > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:150) > at > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:38) > at > > org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:186) > at > > org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:112) > at > > org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceValueGetter.get(KStreamReduce.java:124) > at > > org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.get(KTableFilter.java:132) > at > > org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:89) > at > > org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:58) > at > > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50) > at > > org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244) > at > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90) > at > > org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:40) > at > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:101) > at > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:38) > at > > org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83) > at > > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141) > at > > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99) > at > > org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:125) > at > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:123) > at > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(CachingKeyValueStore.java:132) > at > > org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.close(WrappedStateStore.java:89) > at > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:269) > at > > org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:245) > at > > org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(StreamTask.java:546) > at > > org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:624) > at > > org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:410) > at > > org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) > at > > org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1172) > at > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747) > > If the store is not closed we have witnessed that the lock is RocksDB is > not removed properly which can lead to > > 2018-10-04 12:18:59,342 ERROR [stderr] (...-StreamThread-6) - Caused > by: org.rocksdb.RocksDBException: While lock file: > ...-STATE-STORE-0000000038/LOCK: No locks available > 2018-10-04 12:18:59,342 ERROR [stderr] (...-StreamThread-6) - at > org.rocksdb.RocksDB.open(Native Method) > > best regards > > Patrik >