For the K, we use a simple StringSerde, for the V, we use a custom Serde
which translates an avro payload into a generic bean containing an
identifier, a version and an Avro record.

On Sun, Feb 12, 2017 at 10:39 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Pierre,
>
> Could you let me know what serdes do you use for the key-value pair of type
> <K, V>?
>
> I looked at the 0.10.1.1 code and did not find any obvious memory leak from
> Streams in its iterator implementation. One thing that I'm suspecting is
> that when returning the key-value pair, we call the serdes to deserialize
> the bytes returned from RocksDB:
>
> KeyValue<>(serdes.keyFrom(iter.key()), serdes.valueFrom(iter.value()));
>
>
> Note that the bytes returned from iter.key() / value() are direct slice of
> the byte arrays stored in rocksdb (either from memtable of from ssfile),
> and if the serdes kept a reference to these bytes they may not be compacted
> or garbage collected immediately.
>
> Guozhang
>
> On Fri, Feb 10, 2017 at 12:18 PM, Pierre Coquentin <
> pierre.coquen...@gmail.com> wrote:
>
> > Hi Sachin,
> >
> > We have 6 consumers per node, each managing multiple partitions. We see
> the
> > memory growing at the start of the application.
> > To get the memory snapshot, download jemalloc here
> > https://github.com/jemalloc/jemalloc/releases, compile and install it
> > using
> > standard command with the option for enabling the profiling
> > ./configure --enable-prof
> > make
> > sudo make install
> > Edit the launcher of your jvm by adding:
> > export LD_PRELOAD=/usr/local/lib/libjemalloc.so
> > export
> > MALLOC_CONF=prof:true,lg_prof_interval:32,prof_prefix:/path/
> > to/store/jeprof
> > 32 means that each 2^32 bytes allocated (4GB), jemalloc will output a
> file
> > .heap.
> > You will need dot binary to transform the output of jemalloc into an
> image.
> > It's included in graphviz package.
> > Then you start your jvm and wait.
> > To transform the file, the application need to be running else you won't
> > have the name, only the address.
> > The command is:
> > jeprof --show_bytes --gif /usr/bin/java /path/to/store/jeprof-pid.x.
> > ix.heap
> > > output.gif
> >
> >
> > On Fri, Feb 10, 2017 at 11:55 AM, Sachin Mittal <sjmit...@gmail.com>
> > wrote:
> >
> > > Hi,
> > > We also seem to be facing some potential rocksdb issue when there are
> > more
> > > than one partition state store created on a machine.
> > > It looks like rocksdb is spending too much time in disk i/o.
> > >
> > > Could you please tell under what case you get the issue and also after
> > how
> > > long running the streams application you face these issues.
> > >
> > > In another application where we run only single partition stream, so
> only
> > > one state gets created we have never faced any issue and now (same)
> > streams
> > > application is running uninterrupted for months. There we see no cpu
> > > waiting time and all looks to be running fine.
> > >
> > > Also the images you have created, which tool you used to get the jvm
> > memory
> > > snapshot.
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > > On Fri, Feb 10, 2017 at 1:25 PM, Pierre Coquentin <
> > > pierre.coquen...@gmail.com> wrote:
> > >
> > > > Here is the gist with the two gif
> > > >
> > > > https://gist.github.com/PierreCoquentin/
> d2df46e5e1c0d3506f6311b343e6f7
> > 75
> > > >
> > > > On Fri, Feb 10, 2017 at 7:45 AM, Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > >
> > > > > Pierre,
> > > > >
> > > > > Apache mailing list has some restricts to attach large attachments
> > and
> > > I
> > > > > think that is why your gif files are not shown up. Could you try
> > using
> > > a
> > > > > gist link?
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Wed, Feb 8, 2017 at 9:49 AM, Pierre Coquentin <
> > > > > pierre.coquen...@gmail.com
> > > > > > wrote:
> > > > >
> > > > > > Well, I am a little perplexed now... I have already recompiled
> the
> > > > branch
> > > > > > 0.10.1 with rocksdb 4.11.2 and it doesn't seem better.
> > > > > > So I have modified the launcher of our jvm to use jemalloc with
> the
> > > > > > profile enabled and from the first result I have, it seems that
> the
> > > > > problem
> > > > > > comes from the method all()...
> > > > > > But I don't understand how the iterator could not be closed
> > correctly
> > > > > with
> > > > > > a try-with-resources... Tomorrow, I will try by calling
> explicitly
> > > the
> > > > > > method close...
> > > > > > Attached a gif taken after a reboot of the jvm with the profiling
> > > > > enabled,
> > > > > > the second, 12 hours later, we clearly see a problem with the
> > > iterator
> > > > > and
> > > > > > the call of DBIter.SeekToFirst and DBIter.Next which both are
> > called
> > > in
> > > > > the
> > > > > > method RocksDBStore.all().
> > > > > >
> > > > > >
> > > > > > On Wed, Feb 8, 2017 at 6:30 PM, Guozhang Wang <
> wangg...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > >> Hello Pierre,
> > > > > >>
> > > > > >> As Damian said your code looks fine and I cannot think of a
> direct
> > > > > reason
> > > > > >> for the rocksdb memory leak on top of my head.
> > > > > >>
> > > > > >> Could you build and try out the latest Kafka trunk (will be
> > released
> > > > as
> > > > > >> 0.10.2 in a few days) which contains a newer version of RocksDB
> > and
> > > > see
> > > > > if
> > > > > >> this issue still exists?
> > > > > >>
> > > > > >>
> > > > > >> Guozhang
> > > > > >>
> > > > > >> On Sat, Feb 4, 2017 at 11:54 AM, Damian Guy <
> damian....@gmail.com
> > >
> > > > > wrote:
> > > > > >>
> > > > > >> > Looks fine
> > > > > >> >
> > > > > >> > On Sat, 4 Feb 2017 at 19:27, Pierre Coquentin <
> > > > > >> pierre.coquen...@gmail.com>
> > > > > >> > wrote:
> > > > > >> >
> > > > > >> > Oh ok, this is a snippet of the code we use :
> > > > > >> >
> > > > > >> >             List<KeyValue<K, V>> keyValues = new
> ArrayList<>();
> > > > > >> >             try (KeyValueIterator<K, V> iterator =
> > kvStore.all())
> > > {
> > > > > >> >                 iterator.forEachRemaining(keyValues::add);
> > > > > >> >             }
> > > > > >> >             // process all entries at once
> > > > > >> >             try {
> > > > > >> >                 // business stuff
> > > > > >> >             } catch (Exception e) {
> > > > > >> >                 // handling problem
> > > > > >> >             } finally {
> > > > > >> >                 // delete them
> > > > > >> >                 keyValues.stream().map(item ->
> > > > > >> > item.key).forEach(kvStore::delete);
> > > > > >> >             }
> > > > > >> >
> > > > > >> > Something missing ?
> > > > > >> >
> > > > > >> > On Sat, Feb 4, 2017 at 7:59 PM, Damian Guy <
> > damian....@gmail.com>
> > > > > >> wrote:
> > > > > >> >
> > > > > >> > > Keeping the rocksdb iterator wouldn't cause a memory leak in
> > the
> > > > > heap.
> > > > > >> > That
> > > > > >> > > is why i asked.
> > > > > >> > >
> > > > > >> > > On Sat, 4 Feb 2017 at 16:36 Pierre Coquentin <
> > > > > >> pierre.coquen...@gmail.com
> > > > > >> > >
> > > > > >> > > wrote:
> > > > > >> > >
> > > > > >> > > > The iterator is inside a try-with-resources. And if the
> > memory
> > > > > leak
> > > > > >> was
> > > > > >> > > > inside our code, we will see it using visualvm or jmap,
> and
> > > > that's
> > > > > >> not
> > > > > >> > > the
> > > > > >> > > > case. This is not a memory leak in the heap. That's why my
> > > guess
> > > > > >> goes
> > > > > >> > > > directly to rocksdb.
> > > > > >> > > >
> > > > > >> > > > On Sat, Feb 4, 2017 at 5:31 PM, Damian Guy <
> > > > damian....@gmail.com>
> > > > > >> > wrote:
> > > > > >> > > >
> > > > > >> > > > > Hi Pierre,
> > > > > >> > > > >
> > > > > >> > > > > When you are iterating over the entries do you close the
> > > > > iterator
> > > > > >> > once
> > > > > >> > > > you
> > > > > >> > > > > are finished? If you don't then that will cause a memory
> > > leak.
> > > > > >> > > > >
> > > > > >> > > > > Thanks,
> > > > > >> > > > > Damian
> > > > > >> > > > >
> > > > > >> > > > > On Sat, 4 Feb 2017 at 16:18 Pierre Coquentin <
> > > > > >> > > pierre.coquen...@gmail.com
> > > > > >> > > > >
> > > > > >> > > > > wrote:
> > > > > >> > > > >
> > > > > >> > > > > > Hi,
> > > > > >> > > > > >
> > > > > >> > > > > > We ran a few tests with apache kafka 0.10.1.1.
> > > > > >> > > > > > We use a Topology with only one processor and a
> KVStore
> > > > > >> configured
> > > > > >> > as
> > > > > >> > > > > > persistent backed by rocksdb 4.9.0. Each events
> received
> > > are
> > > > > >> stored
> > > > > >> > > > using
> > > > > >> > > > > > the method put(key, value) and in the punctuate
> method,
> > we
> > > > > >> iterate
> > > > > >> > > over
> > > > > >> > > > > all
> > > > > >> > > > > > entries with all(), processing them and deleting them
> > with
> > > > > >> > > delete(key).
> > > > > >> > > > > > Then after a few days, the jvm crashed due to lack of
> > > > memory.
> > > > > >> > > Visualvm
> > > > > >> > > > or
> > > > > >> > > > > > jmap doesn't show anything, so my guesses were there
> > was a
> > > > > >> memory
> > > > > >> > > leak
> > > > > >> > > > in
> > > > > >> > > > > > rocksdb. We configured the KVStore to be in memory and
> > as
> > > > you
> > > > > >> can
> > > > > >> > see
> > > > > >> > > > in
> > > > > >> > > > > > picture 2, the memory is stable.
> > > > > >> > > > > > I didn't run the same test with a newer version of
> > rocksdb
> > > > > yet.
> > > > > >> > > > > > Any thoughts ?
> > > > > >> > > > > > Regards,
> > > > > >> > > > > >
> > > > > >> > > > > > Pierre
> > > > > >> > > > > >
> > > > > >> > > > >
> > > > > >> > > >
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >> --
> > > > > >> -- Guozhang
> > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to