Hello again fellow Kafkans,

Yesterday we observed a production deadlock take down one of our instances.
Upon digging, it's clear that our usage of Kafka is the proximate cause, but the
danger of our approach is not clear at all just from the Javadocs.

We have stream processors that read off an incoming KStream,
possibly cross-reference some data from an auxiliary table
via ReadOnlyKeyValueStore.get()

This is done via custom logic rather than a direct KTable join because which
index is consulted may change depending on the shape of incoming data.

The ROKVS docs state,

 * A key value store that only supports read operations.
 * Implementations should be thread-safe as concurrent reads and writes
 * are expected.

They do *not* indicate that the CachingKVS layer uses a ReadWriteLock.  If
you have one CachingKVS flush a record cause a read from another CKVS,
you are suddenly vulnerable to classic lock order reversals!  Surprise!

A partial stack trace highlighting the problem, with many uninteresting frames 
removed,
is inline at the bottom of this mail.

You could probably rightly point to us allowing a "observer" pattern to callback
from within streams processing code is dangerous.  We might move this off to
an auxiliary thread to alleviate this problem.

But it still remains -- when you go an read that ROKVS documentation, it sure
doesn't prepare you to this possibility!  And, it's a little frustrating that
we have to have this 'caching' layer at all -- we already had to add

        // ensure KTable doesn't delay updates due to buffering in cache
        kafkaStreamProps.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

because we are quite latency sensitive.  But there's seemingly no way to remove 
the
undesired cache and lock entirely, as it's hard coded in a number of places.

Should I file a ticket for this?  What do you see as the best way for the 
library to improve
so that future developers don't hit this as badly as I did?

Thanks for any thoughts,
Steven


"chat-80543cf6-3ae3-4a32-97a9-a85571934595-StreamThread-1" - Thread t@79
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - waiting to lock <4db7ff53> (a 
java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync) owned by 
"chat-80543cf6-3ae3-4a32-97a9-a85571934595-StreamThread-4" t@85
        at 
java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:157)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:38)
        at 
com.opentable.chat.service.LocalChatStorage.findById(LocalChatStorage.java:361)
        at 
com.opentable.chat.service.LocalChatStorage.request(LocalChatStorage.java:395)
        at 
com.opentable.chat.action.WelcomeAction.onEndpointUpdate(WelcomeAction.java:71)
        at 
com.opentable.chat.service.LocalChatStorage.lambda$handleEndpointUpdate$31(LocalChatStorage.java:1022)
        at 
com.opentable.chat.service.LocalChatStorage$$Lambda$466/1976342649.test(Unknown 
Source)
        at 
com.opentable.chat.service.LocalChatStorage.dispatchObservers(LocalChatStorage.java:1170)
        at 
com.opentable.chat.service.LocalChatStorage.handleEndpointUpdate(LocalChatStorage.java:1022)
        at 
com.opentable.chat.service.ChatPipeline$$Lambda$364/1199894489.apply(Unknown 
Source)
        at 
org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
        at 
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
        at 
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
        at 
org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:101)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:38)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83)
        at 
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:142)
        at 
org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:233)
        - locked <a4beebf> (a 
org.apache.kafka.streams.state.internals.NamedCache)
        at 
org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:243)
        at 
org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:153)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:228)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:221)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:38)
        at 
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
        at 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:411)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:918)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:798)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720)

   Locked ownable synchronizers:
        - locked <4397f604> (a 
java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)

"chat-80543cf6-3ae3-4a32-97a9-a85571934595-StreamThread-4" - Thread t@85
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - waiting to lock <4397f604> (a 
java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync) owned by 
"chat-80543cf6-3ae3-4a32-97a9-a85571934595-StreamThread-1" t@79
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at 
java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:157)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:38)
        at 
com.opentable.chat.service.LocalChatStorage.getEndpoint(LocalChatStorage.java:880)
        at 
com.opentable.chat.service.LocalChatStorage.getEndpoint(LocalChatStorage.java:894)
        at 
com.opentable.chat.action.WelcomeAction.onMessageUpdate(WelcomeAction.java:101)
        at 
com.opentable.chat.service.LocalChatStorage.lambda$handleMessageUpdate$34(LocalChatStorage.java:1142)
        at 
com.opentable.chat.service.LocalChatStorage$$Lambda$463/1420253975.test(Unknown 
Source)
        at 
com.opentable.chat.service.LocalChatStorage.dispatchObservers(LocalChatStorage.java:1170)
        at 
com.opentable.chat.service.LocalChatStorage.handleMessageUpdate(LocalChatStorage.java:1142)
        at 
com.opentable.chat.service.ChatPipeline$$Lambda$366/1134976216.apply(Unknown 
Source)
        at 
org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
        at 
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
        at 
org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:40)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:101)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:38)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83)
        at 
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:142)
        at 
org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:233)
        - locked <2a59610> (a 
org.apache.kafka.streams.state.internals.NamedCache)
        at 
org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:243)
        at 
org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:153)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:228)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:221)
        at 
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
        at 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:411)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:918)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:798)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720)

   Locked ownable synchronizers:
        - locked <4db7ff53> (a 
java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)


Attachment: signature.asc
Description: Message signed with OpenPGP using GPGMail

Reply via email to