Many places it is mentioned that closing the iterator is fixing the issue but this is true only if we use Processor APIs. But in DSL there is no iterator explicitly available and we are using wrapper methods like aggregate, map, groupBy, etc.
Here is the snapshot of the issue with exact statistics observed from the recent run in Mac with Mojave. - Number of state directories are 3154 and hence there will be 3154 rocksdb instances - OS openfiles limit was set to 1Million, here is the break out on number of open files: Total open files from the system: 41645 Open files from stream application: 16545 Open files related to state directories: 16025 So, if we do the math 16025/3154 ~ 5 files per instance - Following parameters were used but still problem exists cache.index.and.filter.blocks=false block.cache.size=100MB code block.size=8MB max.write.buffer.number=2 table.cache.numshardbits=8 max.open.files=-1 compaction.readahead.size=256MB skip.stats.update_on_db_open=true write.buffer.size=32MB - Topic has 8 partitions and the streaming application is running as SINGLE instance with SINGLE thread - Noticed these rocksdb properties have been consumed by the app but not working as expected (or defined in the documentation) - Observed no issues reported related to memory Thanks Thameem On 2019/07/03 02:53:20, "e...@gmail.com> wrote: > > > On 2019/06/28 23:29:16, John Roesler <jo...@confluent.io> wrote: > > > Hey all,> > > > > > If you want to figure it out theoretically, if you print out the> > > topology description, you'll have some number of state stores listed> > > in there. The number of Rocks instances should just be> > > (#global_state_stores +> > > sum(#partitions_of_topic_per_local_state_store)) . The number of> > > stream threads isn't relevant here.> > > > > > You can also figure it out empirically: the first level of> > > subdirectories in the state dir are Tasks, and then within that, the> > > next level is Stores. You should see the store directory names match> > > up with the stores listed in the topology description. The number of> > > Store directories is exactly the number of RocksDB instances you have.> > > > > > There are also metrics corresponding to each of the state stores, so> > > you can compute it from what you find in the metrics.> > > > > > Hope that helps,> > > -john> > > > > > On Thu, Jun 27, 2019 at 6:46 AM Patrik Kleindl <pk...@gmail.com> wrote:> > > >> > > > Hi Kiran> > > > Without much research my guess would be "num_stream_threads *> > > > (#global_state_stores + > > > sum(#partitions_of_topic_per_local_state_store))"> > > > So 10 stores (regardless if explicitly defined or implicitely because of> > > > some stateful operation) with 10 partitions each should result in 100> > > > Rocksdb instances if you are running at the default of > > > num_stream_threads=1.> > > >> > > > As I wrote before, start with 100.> > > > If the error persists, half the number, if not, double it ;-) Repeat as> > > > needed.> > > >> > > > If you reach the single-digit-range and the error still shows up, start> > > > searching for any iterators over a store you might not have closed.> > > >> > > > br, Patrik> > > >> > > > On Thu, 27 Jun 2019 at 13:11, emailtokir...@gmail.com <> > > > emailtokir...@gmail.com> wrote:> > > >> > > > >> > > > >> > > > > On 2019/06/27 09:02:39, Patrik Kleindl <pk...@gmail.com> wrote:> > > > > > Hello Kiran> > > > > >> > > > > > First, the value for maxOpenFiles is per RocksDB instance, and the > > > > > number> > > > > > of those can get high if you have a lot of topic partitions etc.> > > > > > Check the directory (state dir) to see how many there are.> > > > > > Start with a low value (100) and see if that has some effect.> > > > > >> > > > > > Second, because I just found out, you should use> > > > > > BlockBasedTableConfig tableConfig = (BlockBasedTableConfig)> > > > > > options.tableFormatConfig();> > > > > > tableConfig.setBlockCacheSize(100*1024*1024L);> > > > > > tableConfig.setBlockSize(8*1024L);> > > > > > instead of creating a new object to prevent accidently messing up> > > > > > references.> > > > > >> > > > > > Hope that helps> > > > > > best regards> > > > > > Patrik> > > > > >> > > > > > On Thu, 27 Jun 2019 at 10:46, emailtokir...@gmail.com <> > > > > > emailtokir...@gmail.com> wrote:> > > > > >> > > > > > >> > > > > > >> > > > > > > On 2019/06/26 21:58:02, Patrik Kleindl <pk...@gmail.com> wrote:> > > > > > > > Hi Kiran> > > > > > > > You can use the RocksDBConfigSetter and pass> > > > > > > >> > > > > > > > options.setMaxOpenFiles(100);> > > > > > > >> > > > > > > > to all RocksDBs for the Streams application which limits how many > > > > > > > are> > > > > > > > kept open at the same time.> > > > > > > >> > > > > > > > best regards> > > > > > > >> > > > > > > > Patrik> > > > > > > >> > > > > > > >> > > > > > > > On Wed, 26 Jun 2019 at 16:14, emailtokir...@gmail.com <> > > > > > > > emailtokir...@gmail.com> wrote:> > > > > > > >> > > > > > > > > Hi,> > > > > > > > >> > > > > > > > > We are using Kafka streams DSL APIs for doing some counter> > > > > aggregations> > > > > > > > > (running on OpenJDK 11.0.2). Our topology has some 400 sub> > > > > topologies> > > > > > > & we> > > > > > > > > are using 8 partitions in source topic. When we start pumping > > > > > > > > more> > > > > > > load, we> > > > > > > > > start getting RockDBException stating "too many open files".> > > > > > > > >> > > > > > > > > Here are the stack trace samples:> > > > > > > > >> > > > > > > > >> > > > > > >> > > > > ------------------------------------------------------------------------------------------> > > > > > > > > > > > > Caused by: org.rocksdb.RocksDBException: while open a file for> > > > > lock:> > > > > > > > > PPPPPPPPPPP.1512000000/LOCK: Too many open files> > > > > > > > > at org.rocksdb.RocksDB.open(Native Method)> > > > > > > > > at org.rocksdb.RocksDB.open(RocksDB.java:235)> > > > > > > > > at> > > > > > > > >> > > > > > >> > > > > org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:156)> > > > > > > > > > > > > ... 24 common frames omitted> > > > > > > > >> > > > > > > > >> > > > > > > > > Caused by: > > > > > > > > org.apache.kafka.streams.errors.ProcessorStateException:> > > > > > > Error> > > > > > > > > while executing flush from store XXXXXXXXXXX.1512000000> > > > > > > > > at> > > > > > > > >> > > > > > >> > > > > org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:397)> > > > > > > > > > > > > at> > > > > > > > >> > > > > > >> > > > > org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:388)> > > > > > > > > > > > > at> > > > > > > > >> > > > > > >> > > > > org.apache.kafka.streams.state.internals.Segments.flush(Segments.java:163)> > > > > > > > > > > > > at> > > > > > > > >> > > > > > >> > > > > org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.flush(RocksDBSegmentedBytesStore.java:178)> > > > > > > > > > > > > at> > > > > > > > >> > > > > > >> > > > > org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.flush(WrappedStateStore.java:85)> > > > > > > > > > > > > at> > > > > > > > >> > > > > > >> > > > > org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.flush(WrappedStateStore.java:85)> > > > > > > > > > > > > at> > > > > > > > >> > > > > > >> > > > > org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:130)> > > > > > > > > > > > > at> > > > > > > > >> > > > > > >> > > > > org.apache.kafka.streams.state.internals.MeteredWindowStore.flush(MeteredWindowStore.java:177)> > > > > > > > > > > > > at> > > > > > > > >> > > > > > >> > > > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:217)> > > > > > > > > > > > > ... 10 more> > > > > > > > > Caused by: org.rocksdb.RocksDBException: While open a file for> > > > > > > appending:> > > > > > > > > YYYYYYYYYYYYY.1512000000/000007.dbtmp: Too many open files> > > > > > > > > at org.rocksdb.RocksDB.flush(Native Method)> > > > > > > > > at org.rocksdb.RocksDB.flush(RocksDB.java:3401)> > > > > > > > > at org.rocksdb.RocksDB.flush(RocksDB.java:3361)> > > > > > > > > at> > > > > > > > >> > > > > > >> > > > > org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:395)> > > > > > > > > > > > >> > > > > > > > >> > > > > > >> > > > > ------------------------------------------------------------------------------------------> > > > > > > > > > > > >> > > > > > > > > We tried increasing the open files limit at OS level to some > > > > > > > > decent> > > > > > > > > number.. but still no luck. Obviously we don't want to have> > > > > boundless> > > > > > > open> > > > > > > > > files..> > > > > > > > >> > > > > > > > > We also tried to play with commit interval(> > > > > kafka.commit.interval.ms)> > > > > > > and> > > > > > > > > cache size (kafka.cache.max.bytes.buffering) .. but no luck > > > > > > > > there> > > > > > > either.> > > > > > > > >> > > > > > > > > KAFKA-3904 talks about it.. but it was resolved long back..> > > > > > > > >> > > > > > > > > Any other config tuning we have to do?> > > > > > > > >> > > > > > > > > Appreciate any help in this regard!> > > > > > > > >> > > > > > > > > Thanks,> > > > > > > > > Kiran> > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > > Hi Patrik/All,> > > > > > >> > > > > > > Thanks for providing some valuable pointer!> > > > > > >> > > > > > > I did that & it doesn't seems to work.> > > > > > >> > > > > > > Here is how my custom config setter looks like:> > > > > > >> > > > > > >> > > > > > >> > > > > ----------------------------------------------------------------------------------------------------> > > > > > > > > > >> > > > > > >> > > > > > > @Override> > > > > > > public void setConfig(final String storeName, final > > > > > > Options> > > > > > > options, final Map<String, Object> configs) {> > > > > > > // See #1 below.> > > > > > > BlockBasedTableConfig tableConfig = new> > > > > > > org.rocksdb.BlockBasedTableConfig();> > > > > > >> > > > > > > tableConfig.setBlockCacheSize(16 * 1024 * 1024L);> > > > > > > // See #2 below.> > > > > > > tableConfig.setBlockSize(16 * 1024L);> > > > > > > // See #3 below.> > > > > > > tableConfig.setCacheIndexAndFilterBlocks(false);> > > > > > > // > > > > > > tableConfig.setPinL0FilterAndIndexBlocksInCache(true);> > > > > > > options.setMaxOpenFiles(-1);> > > > > > > options.setTableFormatConfig(tableConfig);> > > > > > > // See #4 below.> > > > > > > options.setMaxWriteBufferNumber(2);> > > > > > > }> > > > > > >> > > > > > >> > > > > ----------------------------------------------------------------------------------------------------> > > > > > > > > > > I tried many options with this:> > > > > > > 1. tableConfig.setCacheIndexAndFilterBlocks(true); ----> as per > > > > > > docs (> > > > > > >> > > > > https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB#indexes-and-filter-blocks> > > > > > > > > )> > > > > > > if we set to true, the max_open_files shouldn't play a role. But I > > > > > > was> > > > > > > still getting too many open files exception from Rocksdb> > > > > > >> > > > > > > 2. tableConfig.setCacheIndexAndFilterBlocks(true);> > > > > > > tableConfig.setPinL0FilterAndIndexBlocksInCache(true); ----> no > > > > > > luck;> > > > > same> > > > > > > exception> > > > > > >> > > > > > > 3. tableConfig.setCacheIndexAndFilterBlocks(false); and> > > > > > > options.setMaxOpenFiles(50000); -----> This also resulted > > > > > > with> > > > > same> > > > > > > exception.. with java process having ~24K open files> > > > > > >> > > > > > > 4. tableConfig.setCacheIndexAndFilterBlocks(false); and> > > > > > > options.setMaxOpenFiles(-1); -----> This also resulted with > > > > > > same> > > > > > > exception.. with java process having ~24K ope files. As per the > > > > > > doc,> > > > > if we> > > > > > > set to -1, it means infinite and controlled by underlying OS > > > > > > limit.> > > > > > >> > > > > > > I am using MacOS Mojave (10.14.4) and OpenJDK 11.0.2. At OS level, > > > > > > I> > > > > have> > > > > > > bumped the open files limit to 1 million.> > > > > > >> > > > > > > $ ulimit -a> > > > > > > core file size (blocks, -c) 0> > > > > > > data seg size (kbytes, -d) unlimited> > > > > > > file size (blocks, -f) unlimited> > > > > > > max locked memory (kbytes, -l) unlimited> > > > > > > max memory size (kbytes, -m) unlimited> > > > > > > open files (-n) 1000000> > > > > > > pipe size (512 bytes, -p) 1> > > > > > > stack size (kbytes, -s) 8192> > > > > > > cpu time (seconds, -t) unlimited> > > > > > > max user processes (-u) 1418> > > > > > > virtual memory (kbytes, -v) unlimited> > > > > > >> > > > > > >> > > > > > > Am I missing some other config here?> > > > > > >> > > > > > > Thanks,> > > > > > > Kiran> > > > > > >> > > > > > >> > > > > >> > > > >> > > > > Hi Patrik,> > > > >> > > > > Thanks for quick response!> > > > >> > > > > I just checked the state dir. It has total of 3152 sub folders and > > > > total> > > > > of 15451 files (within sub folders..).> > > > >> > > > > With this what should be the number that I should use?> > > > >> > > > > How many instances of rocksdb will be used? Any formula to determine > > > > that?> > > > >> > > > > Thanks again,> > > > > Kiran> > > > >> > > > > > Hi John/Patrik,> > > Thanks for sharing some more insights!> > > So we have ~3.2K state store directories (which means those many rocksdb > instances.> > > So when we tried with these config params:> > ------------------> > cache.index.and.filter.blocks=false> > max.open.files=-1> > block.cache.size=100* 1024 * 1024L> > block.size=8*1024L> > max.write.buffer.number=2> > ------------------> > We still got too many open files exception from rocksdb side. At that time > the total open files on the VM were ~46K. At OS level we have increased the > max open files limit to 1 million.> > > As per above config, "max.open.files=-1" it means infinite open files for > rocksdb and it's controlled by OS open file limit (which is 1 million in our > case). We are not able to understand why rocksdb is not honouring our config > params? > > > P.S: we will not go with "max.open.files=-1" in production.. we are just > trying to understand how to overcome this exception by trying various > combinations of config params.> > > Are we missing some other key rocksdb configs here?> > > Thanks,> > Kiran> > > >