Hello again fellow Kafkans, Yesterday we observed a production deadlock take down one of our instances. Upon digging, it's clear that our usage of Kafka is the proximate cause, but the danger of our approach is not clear at all just from the Javadocs.
We have stream processors that read off an incoming KStream, possibly cross-reference some data from an auxiliary table via ReadOnlyKeyValueStore.get() This is done via custom logic rather than a direct KTable join because which index is consulted may change depending on the shape of incoming data. The ROKVS docs state, * A key value store that only supports read operations. * Implementations should be thread-safe as concurrent reads and writes * are expected. They do *not* indicate that the CachingKVS layer uses a ReadWriteLock. If you have one CachingKVS flush a record cause a read from another CKVS, you are suddenly vulnerable to classic lock order reversals! Surprise! A partial stack trace highlighting the problem, with many uninteresting frames removed, is inline at the bottom of this mail. You could probably rightly point to us allowing a "observer" pattern to callback from within streams processing code is dangerous. We might move this off to an auxiliary thread to alleviate this problem. But it still remains -- when you go an read that ROKVS documentation, it sure doesn't prepare you to this possibility! And, it's a little frustrating that we have to have this 'caching' layer at all -- we already had to add // ensure KTable doesn't delay updates due to buffering in cache kafkaStreamProps.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); because we are quite latency sensitive. But there's seemingly no way to remove the undesired cache and lock entirely, as it's hard coded in a number of places. Should I file a ticket for this? What do you see as the best way for the library to improve so that future developers don't hit this as badly as I did? Thanks for any thoughts, Steven "chat-80543cf6-3ae3-4a32-97a9-a85571934595-StreamThread-1" - Thread t@79 java.lang.Thread.State: WAITING at sun.misc.Unsafe.park(Native Method) - waiting to lock <4db7ff53> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync) owned by "chat-80543cf6-3ae3-4a32-97a9-a85571934595-StreamThread-4" t@85 at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:157) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:38) at com.opentable.chat.service.LocalChatStorage.findById(LocalChatStorage.java:361) at com.opentable.chat.service.LocalChatStorage.request(LocalChatStorage.java:395) at com.opentable.chat.action.WelcomeAction.onEndpointUpdate(WelcomeAction.java:71) at com.opentable.chat.service.LocalChatStorage.lambda$handleEndpointUpdate$31(LocalChatStorage.java:1022) at com.opentable.chat.service.LocalChatStorage$$Lambda$466/1976342649.test(Unknown Source) at com.opentable.chat.service.LocalChatStorage.dispatchObservers(LocalChatStorage.java:1170) at com.opentable.chat.service.LocalChatStorage.handleEndpointUpdate(LocalChatStorage.java:1022) at com.opentable.chat.service.ChatPipeline$$Lambda$364/1199894489.apply(Unknown Source) at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174) at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:101) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:38) at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83) at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:142) at org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:233) - locked <a4beebf> (a org.apache.kafka.streams.state.internals.NamedCache) at org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:243) at org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:153) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:228) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:221) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:38) at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94) at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:411) at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:918) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:798) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720) Locked ownable synchronizers: - locked <4397f604> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync) "chat-80543cf6-3ae3-4a32-97a9-a85571934595-StreamThread-4" - Thread t@85 java.lang.Thread.State: WAITING at sun.misc.Unsafe.park(Native Method) - waiting to lock <4397f604> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync) owned by "chat-80543cf6-3ae3-4a32-97a9-a85571934595-StreamThread-1" t@79 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:157) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:38) at com.opentable.chat.service.LocalChatStorage.getEndpoint(LocalChatStorage.java:880) at com.opentable.chat.service.LocalChatStorage.getEndpoint(LocalChatStorage.java:894) at com.opentable.chat.action.WelcomeAction.onMessageUpdate(WelcomeAction.java:101) at com.opentable.chat.service.LocalChatStorage.lambda$handleMessageUpdate$34(LocalChatStorage.java:1142) at com.opentable.chat.service.LocalChatStorage$$Lambda$463/1420253975.test(Unknown Source) at com.opentable.chat.service.LocalChatStorage.dispatchObservers(LocalChatStorage.java:1170) at com.opentable.chat.service.LocalChatStorage.handleMessageUpdate(LocalChatStorage.java:1142) at com.opentable.chat.service.ChatPipeline$$Lambda$366/1134976216.apply(Unknown Source) at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:40) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:101) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:38) at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83) at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:142) at org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:233) - locked <2a59610> (a org.apache.kafka.streams.state.internals.NamedCache) at org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:243) at org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:153) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:228) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:221) at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94) at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:411) at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:918) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:798) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720) Locked ownable synchronizers: - locked <4db7ff53> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
signature.asc
Description: Message signed with OpenPGP using GPGMail