Hi Sophie, thank you so much for sharing that. It all makes sense to me. Unfortunately my application uses REPLACE_THREAD, so it seems like I need a workaround for this until this thread unsafeness is removed. As I raised in my first email, would sharing only the ReadOnlyWindowStore instance with other threads be a workaround for this? Would the store object here be able to capture the changes that would be made by rebalancing?
I've filed a ticket here (I'm interested in submitting a patch, but I cannot make any commitment): https://issues.apache.org/jira/browse/KAFKA-16055 Regards, Kohei > On Dec 27, 2023, at 5:43, Sophie Blee-Goldman <sop...@responsive.dev> wrote: > > Hey Kohei, > > Good question -- I don't think there's exactly a short answer to this > seemingly simple question so bear with me for a second. > > My understanding is that KafkaStreams#store is very much intended to be > thread-safe, and would have been back when it was first added a long time > ago, and the javadocs should probably be updated to reflect that. > > That said, you are totally right that whatever the intention, it is > technically not completely thread safe anymore since the storeProviders map > can be mutated when threads are added or removed. Of course, as long as you > are not adding or removing StreamThreads in your application, it should be > effectively thread-safe (but note: this includes using the REPLACE_THREAD > option with the StreamsUncaughtExceptionHandler) > > We should go ahead and fix this of course. I'm pretty sure we can just > change the HashMap to a ConcurrentHashMap and be done with it -- there's > already locking around the actual map modifications with the > "changeThreadCount" lock in KafkaStreams, so we just need to make sure we > don't accidentally hit a ConcurrentModificationException by accessing this > map while it's being modified. > > Would you mind submitting a JIRA ticket > <https://issues.apache.org/jira/projects/KAFKA/issues> for this bug you > found? And would you be interested in submitting a patch yourself? > > Thanks! > Sophie > > On Fri, Dec 22, 2023 at 6:55 PM Kohei Nozaki <ko...@apache.org> wrote: > >> Hello, I have Kafka Streams questions related to thread safety. >> >> In my Kafka Streams application, I have 2 threads below: >> >> * Thread A: this creates a Topology object including state stores and >> everything and then eventually calls the constructor of the KafkaStreams >> class and the start() method. >> >> * Thread B: this has a reference to the KafkaStreams object the thread A >> created. This periodically calls KafkaStreams#store on the object, gets a >> ReadOnlyWindowStore instance and reads the data in the store for monitoring >> purposes. >> >> I'm wondering if what my app does is ok in terms of thread safeness. I'm >> not so worried about ReadOnlyWindowStore because the javadoc says: >> “Implementations should be thread-safe as concurrent reads and writes are >> expected.” >> >> But as for KafkaStreams#store, I'm not so sure if it is ok to call from >> separate threads. One thing which concerns me is that it touches a HashMap, >> which is not thread safe here >> https://github.com/apache/kafka/blob/3.6.1/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java#L39 >> . A KafkaStreams#removeStreamThread() call can mutate this HashMap object. >> Given that, I'm not so sure if this is designed to be thread-safe. >> >> My questions here: is it ok to call KafkaStreams#store from a thread which >> is different from the one which instantiated the KafkaStreams object? Or >> would that be better to call the store() method in the same thread and >> share only the ReadOnlyWindowStore instance with other threads? If that was >> better, would the store object be able to capture the changes that would be >> made by rebalancing? Is the KafkaStream class designed to be thread-safe at >> all? >> >> Regards, >> Kohei