Sameer,
It might be that put, delete, putIfAbsent etc operations can be
non-synchronized. However for get and range operations that can be
performed by IQ, i.e,  other threads, we need to guard against the store
being closed by the StreamThread, hence the synchronization.

Thanks,
Damian

On Wed, 16 Aug 2017 at 07:17 Sameer Kumar <sam.kum.w...@gmail.com> wrote:

> Can you share more info on this.
>
>
> As per the rocksb doc(
> https://github.com/facebook/rocksdb/wiki/basic-operations)
>
> Within a single process, the same rocksdb::DBobject may be safely shared by
> multiple concurrent threads. I.e., different threads may write into or
> fetch iterators or call Get on the same database without any external
> synchronization (the leveldb implementation will automatically do the
> required synchronization).
>
> Did we faced any issues, I want to explore this and see if I can help on
> this. Is there a existing KIP on this.
>
> -Sameer.
>
>
> On Tue, Aug 15, 2017 at 10:42 AM, Sameer Kumar <sam.kum.w...@gmail.com>
> wrote:
>
> > got it..Thanks Guozhang.
> >
> > On Tue, Aug 15, 2017 at 1:55 AM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> >> Sameer,
> >>
> >> It is mainly to guard for concurrent access for interactive queries:
> >>
> >> https://kafka.apache.org/0110/documentation/streams/develope
> >> r-guide#streams_interactive_queries
> >>
> >> In Kafka Streams, we allow users to independently query the running
> state
> >> stores in real-time in their own caller thread while the application's
> >> thread is continuously updating these stores, and hence the
> >> synchronization.
> >>
> >>
> >> Guozhang
> >>
> >>
> >> On Sun, Aug 13, 2017 at 10:24 PM, Sameer Kumar <sam.kum.w...@gmail.com>
> >> wrote:
> >>
> >> > Hi,
> >> >
> >> > I was inspecting RocksDBStore and I saw get and put methods being
> >> > synchronized.
> >> >
> >> > Are there any issues in RocksDB due to which we have used it. Could
> >> someone
> >> > please enlighten more on this.
> >> >
> >> >  @Override
> >> >     public synchronized V get(K key) {
> >> >         validateStoreOpen();
> >> >         byte[] byteValue = getInternal(serdes.rawKey(key));
> >> >         if (byteValue == null) {
> >> >             return null;
> >> >         } else {
> >> >             return serdes.valueFrom(byteValue);
> >> >         }
> >> >     }
> >> >
> >> >     @SuppressWarnings("unchecked")
> >> >     @Override
> >> >     public synchronized void put(K key, V value) {
> >> >         validateStoreOpen();
> >> >         byte[] rawKey = serdes.rawKey(key);
> >> >         byte[] rawValue = serdes.rawValue(value);
> >> >         putInternal(rawKey, rawValue);
> >> >     }
> >> >
> >> > -Sameer.
> >> >
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
>

Reply via email to