On 2019/06/28 23:29:16, John Roesler <j...@confluent.io> wrote:
> Hey all,
>
> If you want to figure it out theoretically, if you print out the
> topology description, you'll have some number of state stores listed
> in there. The number of Rocks instances should just be
> (#global_state_stores +
> sum(#partitions_of_topic_per_local_state_store)) . The number of
> stream threads isn't relevant here.
>
> You can also figure it out empirically: the first level of
> subdirectories in the state dir are Tasks, and then within that, the
> next level is Stores. You should see the store directory names match
> up with the stores listed in the topology description. The number of
> Store directories is exactly the number of RocksDB instances you have.
>
> There are also metrics corresponding to each of the state stores, so
> you can compute it from what you find in the metrics.
>
> Hope that helps,
> -john
>
> On Thu, Jun 27, 2019 at 6:46 AM Patrik Kleindl <pklei...@gmail.com> wrote:
> >
> > Hi Kiran
> > Without much research my guess would be "num_stream_threads *
> > (#global_state_stores + sum(#partitions_of_topic_per_local_state_store))"
> > So 10 stores (regardless if explicitly defined or implicitely because of
> > some stateful operation) with 10 partitions each should result in 100
> > Rocksdb instances if you are running at the default of num_stream_threads=1.
> >
> > As I wrote before, start with 100.
> > If the error persists, half the number, if not, double it ;-) Repeat as
> > needed.
> >
> > If you reach the single-digit-range and the error still shows up, start
> > searching for any iterators over a store you might not have closed.
> >
> > br, Patrik
> >
> > On Thu, 27 Jun 2019 at 13:11, emailtokir...@gmail.com <
> > emailtokir...@gmail.com> wrote:
> >
> > >
> > >
> > > On 2019/06/27 09:02:39, Patrik Kleindl <pklei...@gmail.com> wrote:
> > > > Hello Kiran
> > > >
> > > > First, the value for maxOpenFiles is per RocksDB instance, and the
> > > > number
> > > > of those can get high if you have a lot of topic partitions etc.
> > > > Check the directory (state dir) to see how many there are.
> > > > Start with a low value (100) and see if that has some effect.
> > > >
> > > > Second, because I just found out, you should use
> > > > BlockBasedTableConfig tableConfig = (BlockBasedTableConfig)
> > > > options.tableFormatConfig();
> > > > tableConfig.setBlockCacheSize(100*1024*1024L);
> > > > tableConfig.setBlockSize(8*1024L);
> > > > instead of creating a new object to prevent accidently messing up
> > > > references.
> > > >
> > > > Hope that helps
> > > > best regards
> > > > Patrik
> > > >
> > > > On Thu, 27 Jun 2019 at 10:46, emailtokir...@gmail.com <
> > > > emailtokir...@gmail.com> wrote:
> > > >
> > > > >
> > > > >
> > > > > On 2019/06/26 21:58:02, Patrik Kleindl <pklei...@gmail.com> wrote:
> > > > > > Hi Kiran
> > > > > > You can use the RocksDBConfigSetter and pass
> > > > > >
> > > > > > options.setMaxOpenFiles(100);
> > > > > >
> > > > > > to all RocksDBs for the Streams application which limits how many
> > > > > > are
> > > > > > kept open at the same time.
> > > > > >
> > > > > > best regards
> > > > > >
> > > > > > Patrik
> > > > > >
> > > > > >
> > > > > > On Wed, 26 Jun 2019 at 16:14, emailtokir...@gmail.com <
> > > > > > emailtokir...@gmail.com> wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > We are using Kafka streams DSL APIs for doing some counter
> > > aggregations
> > > > > > > (running on OpenJDK 11.0.2). Our topology has some 400 sub
> > > topologies
> > > > > & we
> > > > > > > are using 8 partitions in source topic. When we start pumping more
> > > > > load, we
> > > > > > > start getting RockDBException stating "too many open files".
> > > > > > >
> > > > > > > Here are the stack trace samples:
> > > > > > >
> > > > > > >
> > > > >
> > > ------------------------------------------------------------------------------------------
> > > > > > > Caused by: org.rocksdb.RocksDBException: while open a file for
> > > lock:
> > > > > > > PPPPPPPPPPP.1512000000/LOCK: Too many open files
> > > > > > > at org.rocksdb.RocksDB.open(Native Method)
> > > > > > > at org.rocksdb.RocksDB.open(RocksDB.java:235)
> > > > > > > at
> > > > > > >
> > > > >
> > > org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:156)
> > > > > > > ... 24 common frames omitted
> > > > > > >
> > > > > > >
> > > > > > > Caused by:
> > > > > > > org.apache.kafka.streams.errors.ProcessorStateException:
> > > > > Error
> > > > > > > while executing flush from store XXXXXXXXXXX.1512000000
> > > > > > > at
> > > > > > >
> > > > >
> > > org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:397)
> > > > > > > at
> > > > > > >
> > > > >
> > > org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:388)
> > > > > > > at
> > > > > > >
> > > > >
> > > org.apache.kafka.streams.state.internals.Segments.flush(Segments.java:163)
> > > > > > > at
> > > > > > >
> > > > >
> > > org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.flush(RocksDBSegmentedBytesStore.java:178)
> > > > > > > at
> > > > > > >
> > > > >
> > > org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.flush(WrappedStateStore.java:85)
> > > > > > > at
> > > > > > >
> > > > >
> > > org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.flush(WrappedStateStore.java:85)
> > > > > > > at
> > > > > > >
> > > > >
> > > org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:130)
> > > > > > > at
> > > > > > >
> > > > >
> > > org.apache.kafka.streams.state.internals.MeteredWindowStore.flush(MeteredWindowStore.java:177)
> > > > > > > at
> > > > > > >
> > > > >
> > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:217)
> > > > > > > ... 10 more
> > > > > > > Caused by: org.rocksdb.RocksDBException: While open a file for
> > > > > appending:
> > > > > > > YYYYYYYYYYYYY.1512000000/000007.dbtmp: Too many open files
> > > > > > > at org.rocksdb.RocksDB.flush(Native Method)
> > > > > > > at org.rocksdb.RocksDB.flush(RocksDB.java:3401)
> > > > > > > at org.rocksdb.RocksDB.flush(RocksDB.java:3361)
> > > > > > > at
> > > > > > >
> > > > >
> > > org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:395)
> > > > > > >
> > > > > > >
> > > > >
> > > ------------------------------------------------------------------------------------------
> > > > > > >
> > > > > > > We tried increasing the open files limit at OS level to some
> > > > > > > decent
> > > > > > > number.. but still no luck. Obviously we don't want to have
> > > boundless
> > > > > open
> > > > > > > files..
> > > > > > >
> > > > > > > We also tried to play with commit interval(
> > > kafka.commit.interval.ms)
> > > > > and
> > > > > > > cache size (kafka.cache.max.bytes.buffering) .. but no luck there
> > > > > either.
> > > > > > >
> > > > > > > KAFKA-3904 talks about it.. but it was resolved long back..
> > > > > > >
> > > > > > > Any other config tuning we have to do?
> > > > > > >
> > > > > > > Appreciate any help in this regard!
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Kiran
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > > Hi Patrik/All,
> > > > >
> > > > > Thanks for providing some valuable pointer!
> > > > >
> > > > > I did that & it doesn't seems to work.
> > > > >
> > > > > Here is how my custom config setter looks like:
> > > > >
> > > > >
> > > > >
> > > ----------------------------------------------------------------------------------------------------
> > > > >
> > > > >
> > > > > @Override
> > > > > public void setConfig(final String storeName, final Options
> > > > > options, final Map<String, Object> configs) {
> > > > > // See #1 below.
> > > > > BlockBasedTableConfig tableConfig = new
> > > > > org.rocksdb.BlockBasedTableConfig();
> > > > >
> > > > > tableConfig.setBlockCacheSize(16 * 1024 * 1024L);
> > > > > // See #2 below.
> > > > > tableConfig.setBlockSize(16 * 1024L);
> > > > > // See #3 below.
> > > > > tableConfig.setCacheIndexAndFilterBlocks(false);
> > > > > // tableConfig.setPinL0FilterAndIndexBlocksInCache(true);
> > > > > options.setMaxOpenFiles(-1);
> > > > > options.setTableFormatConfig(tableConfig);
> > > > > // See #4 below.
> > > > > options.setMaxWriteBufferNumber(2);
> > > > > }
> > > > >
> > > > >
> > > ----------------------------------------------------------------------------------------------------
> > > > > I tried many options with this:
> > > > > 1. tableConfig.setCacheIndexAndFilterBlocks(true); ----> as per docs
> > > > > (
> > > > >
> > > https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB#indexes-and-filter-blocks
> > > )
> > > > > if we set to true, the max_open_files shouldn't play a role. But I was
> > > > > still getting too many open files exception from Rocksdb
> > > > >
> > > > > 2. tableConfig.setCacheIndexAndFilterBlocks(true);
> > > > > tableConfig.setPinL0FilterAndIndexBlocksInCache(true); ----> no luck;
> > > same
> > > > > exception
> > > > >
> > > > > 3. tableConfig.setCacheIndexAndFilterBlocks(false); and
> > > > > options.setMaxOpenFiles(50000); -----> This also resulted with
> > > same
> > > > > exception.. with java process having ~24K open files
> > > > >
> > > > > 4. tableConfig.setCacheIndexAndFilterBlocks(false); and
> > > > > options.setMaxOpenFiles(-1); -----> This also resulted with same
> > > > > exception.. with java process having ~24K ope files. As per the doc,
> > > if we
> > > > > set to -1, it means infinite and controlled by underlying OS limit.
> > > > >
> > > > > I am using MacOS Mojave (10.14.4) and OpenJDK 11.0.2. At OS level, I
> > > have
> > > > > bumped the open files limit to 1 million.
> > > > >
> > > > > $ ulimit -a
> > > > > core file size (blocks, -c) 0
> > > > > data seg size (kbytes, -d) unlimited
> > > > > file size (blocks, -f) unlimited
> > > > > max locked memory (kbytes, -l) unlimited
> > > > > max memory size (kbytes, -m) unlimited
> > > > > open files (-n) 1000000
> > > > > pipe size (512 bytes, -p) 1
> > > > > stack size (kbytes, -s) 8192
> > > > > cpu time (seconds, -t) unlimited
> > > > > max user processes (-u) 1418
> > > > > virtual memory (kbytes, -v) unlimited
> > > > >
> > > > >
> > > > > Am I missing some other config here?
> > > > >
> > > > > Thanks,
> > > > > Kiran
> > > > >
> > > > >
> > > >
> > >
> > > Hi Patrik,
> > >
> > > Thanks for quick response!
> > >
> > > I just checked the state dir. It has total of 3152 sub folders and total
> > > of 15451 files (within sub folders..).
> > >
> > > With this what should be the number that I should use?
> > >
> > > How many instances of rocksdb will be used? Any formula to determine that?
> > >
> > > Thanks again,
> > > Kiran
> > >
>
Hi John/Patrik,
Thanks for sharing some more insights!
So we have ~3.2K state store directories (which means those many rocksdb
instances.
So when we tried with these config params:
------------------
cache.index.and.filter.blocks=false
max.open.files=-1
block.cache.size=100* 1024 * 1024L
block.size=8*1024L
max.write.buffer.number=2
------------------
We still got too many open files exception from rocksdb side. At that time the
total open files on the VM were ~46K. At OS level we have increased the max
open files limit to 1 million.
As per above config, "max.open.files=-1" it means infinite open files for
rocksdb and it's controlled by OS open file limit (which is 1 million in our
case). We are not able to understand why rocksdb is not honouring our config
params?
P.S: we will not go with "max.open.files=-1" in production.. we are just trying
to understand how to overcome this exception by trying various combinations of
config params.
Are we missing some other key rocksdb configs here?
Thanks,
Kiran