Thanks for the follow up Juha, I've just assigned FLINK-19238 to you. Let's further track this on JIRA.
Best Regards, Yu On Tue, 15 Sep 2020 at 15:04, Juha Mynttinen <juha.myntti...@king.com> wrote: > Hey > > I created this one https://issues.apache.org/jira/browse/FLINK-19238. > > Regards, > Juha > ------------------------------ > *From:* Yun Tang <myas...@live.com> > *Sent:* Tuesday, September 15, 2020 8:06 AM > *To:* Juha Mynttinen <juha.myntti...@king.com>; Stephan Ewen < > se...@apache.org> > *Cc:* user@flink.apache.org <user@flink.apache.org> > *Subject:* Re: Performance issue associated with managed RocksDB memory > > Hi Juha > > Would you please consider to contribute this back to community? If agreed, > please open a JIRA ticket and we could help review your PR then. > > Best > Yun Tang > ------------------------------ > *From:* Juha Mynttinen <juha.myntti...@king.com> > *Sent:* Thursday, September 10, 2020 19:05 > *To:* Stephan Ewen <se...@apache.org> > *Cc:* Yun Tang <myas...@live.com>; user@flink.apache.org < > user@flink.apache.org> > *Subject:* Re: Performance issue associated with managed RocksDB memory > > Hey > > I've fixed the code > (https://github.com/juha-mynttinen-king/flink/commits/arena_block_sanity_check > [github.com] > <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_juha-2Dmynttinen-2Dking_flink_commits_arena-5Fblock-5Fsanity-5Fcheck&d=DwMF-g&c=-0jfte1J3SKEE6FyZmTngg&r=-2x4lRPm2yEX3Ylri2jKFRC6zr9S6Iqg2kAJIspWwfA&m=hTf9FuebMj0aLzV_UjCRbhNFqYu9xv-z-Prn7VzN3mY&s=A12CH1PvP6wSCYufQeyIDbZlQI6LluLvQslQc2dMrZk&e=>) > slightly. Now it WARNs if there is the memory configuration issue. Also, I > think there was a bug in the way the check calculated the mutable memory, > fixed that. Also, wrote some tests. > > I tried the code and in my setup I get a bunch of WARN if the memory > configuration issue is happening: > > 20200910T140320.516+0300 WARN RocksDBStateBackend performance will be > poor because of the current Flink memory configuration! RocksDB will flush > memtable constantly, causing high IO and CPU. Typically the easiest fix is > to increase task manager managed memory size. If running locally, see the > parameter taskmanager.memory.managed.size. Details: arenaBlockSize 8388608 > < mutableLimit 7829367 (writeBufferSize 67108864 arenaBlockSizeConfigured 0 > defaultArenaBlockSize 8388608 writeBufferManagerCapacity 8947848) > > [org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.sanityCheckArenaBlockSize() > @ 189] > > Regards, > Juha > > ------------------------------ > *From:* Stephan Ewen <se...@apache.org> > *Sent:* Wednesday, September 9, 2020 1:56 PM > *To:* Juha Mynttinen <juha.myntti...@king.com> > *Cc:* Yun Tang <myas...@live.com>; user@flink.apache.org < > user@flink.apache.org> > *Subject:* Re: Performance issue associated with managed RocksDB memory > > Hey Juha! > > I agree that we cannot reasonably expect from the majority of users to > understand block sizes, area sizes, etc to get their application running. > So the default should be "inform when there is a problem and suggest to > use more memory." Block/arena size tuning is for the absolute expertes, the > 5% super power users. > > The managed memory is 128 MB by default in the mini cluster. In a > standalone session cluster setup with default config, it is 512 MB. > > Best, > Stephan > > > > On Wed, Sep 9, 2020 at 11:10 AM Juha Mynttinen <juha.myntti...@king.com> > wrote: > > Hey Yun, > > About the docs. I saw in the docs > (https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html > [ci.apache.org] > <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_ops_state_large-5Fstate-5Ftuning.html&d=DwMFaQ&c=-0jfte1J3SKEE6FyZmTngg&r=-2x4lRPm2yEX3Ylri2jKFRC6zr9S6Iqg2kAJIspWwfA&m=61BtxMX6UCHk2TX2mluIR7QceE2iUPJGiu7Tzgt8zi8&s=WLTgjNYrq8bVj4LEDQSaJfqBYUymaBBn1rRF8UE8Dsc&e=>) > this: > > "An advanced option (expert mode) to reduce the number of MemTable flushes > in setups with many states, is to tune RocksDB’s ColumnFamily options > (arena block size, max background flush threads, etc.) via a > RocksDBOptionsFactory". > > Only after debugging this issue we're talking about, I figured that this > snippet in the docs is probably talking about the issue I'm witnessing. I > think there are two issues here: > > 1) it's hard/impossible to know what kind of performance one can expect > from a Flink application. Thus, it's hard to know if one is suffering from > e.g. from this performance issue, or if the system is performing normally > (and inherently being slow). > 2) even if one suspects a performance issue, it's very hard to find the > root cause of the performance issue (memtable flush happening frequently). > To find out this one would need to know what's the normal flush frequency. > > Also the doc says "in setups with many states". The same problem is hit > when using just one state, but "high" parallelism (5). > > If the arena block size _ever_ needs to be configured only to "fix" this > issue, it'd be best if there _never_ was a need to modify arena block size. > What > if we forget even mentioning arena block size in the docs and focus on the > managed memory size, since managed memory size is something the user does > tune. > > You're right that a very clear WARN message could also help to cope with > the issue. What if there was a WARN message saying that performance will be > poor and you should increase the managed memory size? And get rid of that > arena block size decreasing example in the docs. > > Also, the default managed memory size is AFAIK 128MB right now. That could > be increased. That would get rid of this issue in many cases. > > Regards, > Juha > > ------------------------------ > *From:* Yun Tang <myas...@live.com> > *Sent:* Tuesday, September 8, 2020 8:05 PM > *To:* Juha Mynttinen <juha.myntti...@king.com>; user@flink.apache.org < > user@flink.apache.org> > *Subject:* Re: Performance issue associated with managed RocksDB memory > > Hi Juha > > I planned to give some descriptions in Flink documentation to give such > hints, however, it has too many details for RocksDB and we could increase > the managed memory size to a proper value to avoid this in most cases. > Since you have come across this and reported in user mailing list, and I > think it's worth to give some hints in Flink documentations. > > When talking about your idea to sanity check the arena size, I think a > warning should be enough as Flink seems never throw exception directly when > the performance could be poor. > > Best > Yun Tang > ------------------------------ > *From:* Juha Mynttinen <juha.myntti...@king.com> > *Sent:* Tuesday, September 8, 2020 20:56 > *To:* Yun Tang <myas...@live.com>; user@flink.apache.org < > user@flink.apache.org> > *Subject:* Re: Performance issue associated with managed RocksDB memory > > Hey Yun, > > Thanks for the detailed answer. It clarified how things work. Especially > what is the role of RocksDB arena, and arena block size. > > I think there's no real-world case where it would make sense to start to a > Flink job with RocksDB configured so that RocksDB flushes all the time, > i.e. where the check "mutable_memtable_memory_usage() > mutable_limit_" > is always true. The performance is just very poor and by using the same > amount of RAM but just configuring RocksDB differently, performance could > be e.g. 100 times better. > > It's very easy to hit this issue e.g. by just running a RocksDB-based > Flink app using RocksDB with either slightly higher parallelism or with > multiple operators. But finding out what and where the problem is very > hard, e.g. because the issue is happening in native code and won't be > visible even using a Java profiler. > > I wanted to see if it was possible to check the sanity of the arena block > size and just make the app crash if the arena block size is too high (or > the mutable limit too low). I came up with this > https://github.com/juha-mynttinen-king/flink/tree/arena_block_sanity_check > [github.com] > <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_juha-2Dmynttinen-2Dking_flink_tree_arena-5Fblock-5Fsanity-5Fcheck&d=DwMFAg&c=-0jfte1J3SKEE6FyZmTngg&r=-2x4lRPm2yEX3Ylri2jKFRC6zr9S6Iqg2kAJIspWwfA&m=KeJGah-zF_IKVwAN9Wz50XduWWt3gQtTI0EucGoOgTw&s=lqc16JFtbr3jKDpvzdJF0BiUrrTAEYtNF_bqM9Wl1Vs&e=>. > The code calculates the same parameters that are calculated in RocksDB and > throws if the arena block size is higher than the "mutable limit". > > I did a few quick tests and the code seems to work, with small parallelism > my app works, but with higher parallelism (when the app would flush all the > time), it crashes with message like "arenaBlockSize 8388608 < mutableLimit > 7340032 (writeBufferSize 67108864 arenaBlockSizeConfigured 0 > defaultArenaBlockSize 8388608 writeBufferManagerCapacity 8388608). RocksDB > would flush memtable constantly. Refusing to start. You can 1) make arena > block size smaller, 2) decrease parallelism (if possible), 3) increase > managed memory" > > Regards, > Juha > > ------------------------------ > *From:* Yun Tang <myas...@live.com> > *Sent:* Friday, August 28, 2020 6:58 AM > *To:* Juha Mynttinen <juha.myntti...@king.com>; user@flink.apache.org < > user@flink.apache.org> > *Subject:* Re: Performance issue associated with managed RocksDB memory > > Hi Juha > > Thanks for your enthusiasm to dig this problem and sorry for jumping in > late for this thread to share something about write buffer manager in > RocksDB. > > First of all, the reason why you meet the poor performance is due to > writer buffer manager has been assigned a much lower limit (due to poor > managed memory size on that slot) than actual needed. The competition of > allocating memory between different column families lead RocksDB to switch > active memtable to immutable memtable in advance, which leads to the poor > performance as this increase the write amplification. > > To keep the memory not exceed the limit, write buffer manager would decide > whether to flush the memtable in advance, which is the statement you found: > mutable_memtable_memory_usage() > > mutable_limit_ [1] and the memory usage includes allocated but not even > used arean_block. > When talking about the arena, memory allocator in RocksDB, I need to > correct one thing in your thread: the block cache would not allocate any > memory, all memory is allocated from arena. > > The core idea of RocksDB how to limit memory usage: arena allocates > memory, write buffer manager decide when to switch memtable to control the > active memory usage, and write buffer manager also accounts its allocated > memory into the cache. The underlying block cache evict memory with > accounting from write buffer manager and the cached block, filter & index. > > By default, arena_block_size is not configured, and it would be 1/8 of > write buffer size [2]. And the default write buffer size is 64MB, that's > why you could find "Options.arena_block_size: 8388608" in your logs. > As you can see, RocksDB think it could use 64MB write buffer by default. > However, Flink needs to control the total memory usage and has to configure > write buffer manager based on the managed memory. From your logs "Write > buffer is using 16789472 bytes out of a total of 17895697", I believe the > managed memory of that slot (managed memory size / num of slots in one TM) > is quite poor. If we have 1 slot with 1GB for task manager, the managed > memory should be near 300MB which is fine for default RocksDB > configuration. However, you just have about 90MB for the managed memory > over that slot. When you enable managed memory on RocksDB, it would try its > best to limit the total memory of all rocksDB instances within one slot > under 90MB. Once you disable the managed memory control over rocksDB, each > RocksDB instance could use about 64*2+8=136MB, since you have two operators > here, they could use more than 200MB+ in one slot. > > There existed several solutions to mitigate this regression: > > 1. Increase the overall managed memory size for one slot. > 2. Increase the write buffer ratio > 3. Set the arean_block_size explicitly instead of default 8MB to avoid > unwanted flush in advance: > > e.g: new ColumnFamilyOptions().setArenaBlockSize(2 * 1024 * 1024L); > > > [1] > https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/include/rocksdb/write_buffer_manager.h#L47 > [github.com] > <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_dataArtisans_frocksdb_blob_49bc897d5d768026f1eb816d960c1f2383396ef4_include_rocksdb_write-5Fbuffer-5Fmanager.h-23L47&d=DwMFAg&c=-0jfte1J3SKEE6FyZmTngg&r=-2x4lRPm2yEX3Ylri2jKFRC6zr9S6Iqg2kAJIspWwfA&m=9dqFsA-w9rEcr782SVR8quiS2bKsubnmM8ZshIPBlNM&s=Xly6aYk9rvQu-c5yGlirem4FcuzQItD7dLJP-mROsVE&e=> > [2] > https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/db/column_family.cc#L196 > [github.com] > <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_dataArtisans_frocksdb_blob_49bc897d5d768026f1eb816d960c1f2383396ef4_db_column-5Ffamily.cc-23L196&d=DwMFAg&c=-0jfte1J3SKEE6FyZmTngg&r=-2x4lRPm2yEX3Ylri2jKFRC6zr9S6Iqg2kAJIspWwfA&m=9dqFsA-w9rEcr782SVR8quiS2bKsubnmM8ZshIPBlNM&s=VQyThuy-5sP16APcviNgewjYr0fd43yZdxkyNw90Zzg&e=> > > Best > Yun Tang > > ------------------------------ > *From:* Juha Mynttinen <juha.myntti...@king.com> > *Sent:* Monday, August 24, 2020 15:56 > *To:* user@flink.apache.org <user@flink.apache.org> > *Subject:* Re: Performance issue associated with managed RocksDB memory > > The issue can be reproduced by using a certain combinations of the value of > RocksDBOptions.WRITE_BUFFER_RATIO (default 0.5) and the Flink job > parallelism. > > Examples that break: > * Parallelism 1 and WRITE_BUFFER_RATIO 0.1 > * Parallelism 5 and the default WRITE_BUFFER_RATIO 0.5 > > Examples that work: > * Parallelism 1 and WRITE_BUFFER_RATIO 0.5, duration 34164 ms > > In a working case (parallelism 1, WRITE_BUFFER_RATIO 0.2) RocksDB log looks > like this (right after the uninteresting bootup messages): > > 2020/08/21-12:55:41.693771 7f56e6643700 [db/db_impl.cc:1546] Created column > family [valueState] (ID 1) > 2020/08/21-12:55:42.213743 7f56e6643700 [db/db_impl_write.cc:1103] Flushing > column family with largest mem table size. Write buffer is using 16789472 > bytes out of a total of 17895697. > 2020/08/21-12:55:42.213799 7f56e6643700 [db/db_impl_write.cc:1423] > [valueState] New memtable created with log file: #3. Immutable memtables: > 0. > 2020/08/21-12:55:42.213924 7f56deffd700 (Original Log Time > 2020/08/21-12:55:42.213882) [db/db_impl_compaction_flush.cc:1560] Calling > FlushMemTableToOutputFile with column family [valueState], flush slots > available 1, compaction slots available 1, flush slots scheduled 1, > compaction slots scheduled 0 > 2020/08/21-12:55:42.213927 7f56deffd700 [db/flush_job.cc:304] [valueState] > [JOB 2] Flushing memtable with next log file: 3 > 2020/08/21-12:55:42.213969 7f56deffd700 EVENT_LOG_v1 {"time_micros": > 1598003742213958, "job": 2, "event": "flush_started", "num_memtables": 1, > "num_entries": 170995, "num_deletes": 0, "memory_usage": 8399008, > "flush_reason": "Write Buffer Full"} > 2020/08/21-12:55:42.213973 7f56deffd700 [db/flush_job.cc:334] [valueState] > [JOB 2] Level-0 flush table #9: started > 2020/08/21-12:55:42.228444 7f56deffd700 EVENT_LOG_v1 {"time_micros": > 1598003742228435, "cf_name": "valueState", "job": 2, "event": > "table_file_creation", "file_number": 9, "file_size": 10971, > "table_properties": {"data_size": 10200, "index_size": 168, "filter_size": > 0, "raw_key_size": 18000, "raw_average_key_size": 18, "raw_value_size": > 8000, "raw_average_value_size": 8, "num_data_blocks": 6, "num_entries": > 1000, "filter_policy_name": "", "kDeletedKeys": "0", "kMergeOperands": > "0"}} > 2020/08/21-12:55:42.228460 7f56deffd700 [db/flush_job.cc:374] [valueState] > [JOB 2] Level-0 flush table #9: 10971 bytes OK > > The main thing to look at is "num_entries": 170995, meaning RocksDB flushes > a memtable with quite large number of entries. It flushes 53 times during > the test, which sounds sensible. > > In a breaking case (parallelism 1, WRITE_BUFFER_RATIO 0.1) RocksDB log > looks > like this: > > 2020/08/21-12:53:02.917606 7f2cabfff700 [db/db_impl_write.cc:1103] Flushing > column family with largest mem table size. Write buffer is using 8396784 > bytes out of a total of 8947848. > 2020/08/21-12:53:02.917702 7f2cabfff700 [db/db_impl_write.cc:1423] > [valueState] New memtable created with log file: #3. Immutable memtables: > 0. > 2020/08/21-12:53:02.917988 7f2ca8bf1700 (Original Log Time > 2020/08/21-12:53:02.917868) [db/db_impl_compaction_flush.cc:1560] Calling > FlushMemTableToOutputFile with column family [valueState], flush slots > available 1, compaction slots available 1, flush slots scheduled 1, > compaction slots scheduled 0 > 2020/08/21-12:53:02.918004 7f2ca8bf1700 [db/flush_job.cc:304] [valueState] > [JOB 2] Flushing memtable with next log file: 3 > 2020/08/21-12:53:02.918099 7f2ca8bf1700 EVENT_LOG_v1 {"time_micros": > 1598003582918053, "job": 2, "event": "flush_started", "num_memtables": 1, > "num_entries": 29, "num_deletes": 0, "memory_usage": 6320, "flush_reason": > "Write Buffer Full"} > 2020/08/21-12:53:02.918118 7f2ca8bf1700 [db/flush_job.cc:334] [valueState] > [JOB 2] Level-0 flush table #9: started > ... > 2020/08/21-12:55:20.261887 7f2ca8bf1700 EVENT_LOG_v1 {"time_micros": > 1598003720261879, "job": 20079, "event": "flush_started", "num_memtables": > 1, "num_entries": 29, "num_deletes": 0, "memory_usage": 2240, > "flush_reason": "Write Buffer Full"} > 2020/08/21-12:55:20.261892 7f2ca8bf1700 [db/flush_job.cc:334] [valueState] > [JOB 20079] Level-0 flush table #20085: started > > This time "num_entries": 29, meaning RocksDB flushes the memtable when > there > are only 29 entries consuming 6320 bytes memory. All memtable flushes look > alike. There are total flushes 20079 times during the test, which is more > than 300 times more than with the working config. Memtable flush and the > compactions those will cause kill the performance. > > It looks like RocksDB flushes way too early, before the memtable should be > considered full. But why? The answer lies in the RocksDB code. > > kingspace/frocksdb/db/db_impl_write.cc > if (UNLIKELY(status.ok() && write_buffer_manager_->ShouldFlush())) { > // Before a new memtable is added in SwitchMemtable(), > // write_buffer_manager_->ShouldFlush() will keep returning true. If > another > // thread is writing to another DB with the same write buffer, they may > also > // be flushed. We may end up with flushing much more DBs than needed. > It's > // suboptimal but still correct. > status = HandleWriteBufferFull(write_context); > } > > ... > Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) { > mutex_.AssertHeld(); > assert(write_context != nullptr); > Status status; > > // Before a new memtable is added in SwitchMemtable(), > // write_buffer_manager_->ShouldFlush() will keep returning true. If > another > // thread is writing to another DB with the same write buffer, they may > also > // be flushed. We may end up with flushing much more DBs than needed. > It's > // suboptimal but still correct. > ROCKS_LOG_INFO( > immutable_db_options_.info_log, > "Flushing column family with largest mem table size. Write buffer is > " > "using %" PRIu64 " bytes out of a total of %" PRIu64 ".", > write_buffer_manager_->memory_usage(), > write_buffer_manager_->buffer_size()); > > > frocksdb/include/rocksdb/write_buffer_manager.h: > > bool ShouldFlush() const { > if (enabled()) { > if (mutable_memtable_memory_usage() > mutable_limit_) { > return true; > } > if (memory_usage() >= buffer_size_ && > mutable_memtable_memory_usage() >= buffer_size_ / 2) { > // If the memory exceeds the buffer size, we trigger more > aggressive > // flush. But if already more than half memory is being flushed, > // triggering more flush may not help. We will hold it instead. > return true; > } > } > return false; > } > > Let's dig some params. There's the line in the logs "Flushing column family > with largest mem table size. Write buffer is using 8396784 bytes out of a > total of 8947848.". From that we can see: > > write_buffer_manager_->memory_usage() is 8396784 > write_buffer_manager_->buffer_size() is 8947848 > > Additionally: > > buffer_size_ is 8947848. This can be seen e.g. putting a breakpoint in > RocksDBMemoryControllerUtils.createWriteBufferManager() > mutable_limit_ is buffer_size_ * 7 / 8 = 8947848 * 7 / 8 = 7829367 > > In ShouldFlush() "memory_usage() >= buffer_size_" is false, so the latter > if-statement in ShouldFlush() is false. The 1st one has to be true. I'm not > totally sure why this happens. > > Now I'm guessing. The memory RocksDB uses for the block cache is calculated > in the memory memtable uses (in mutable_memtable_memory_usage()). > > In RocksDB conf: > > Options.arena_block_size: 8388608 > > If the block cache has allocated one of these blocks, this check: > > mutable_memtable_memory_usage() > mutable_limit_ > > Becomes: > > 8388608 + really_used_by_memtable > 7829367 > 8388608 + 6320 > 7829367 > > This is always true (even if memtable used 0 bytes of memory). ShouldFlush > always returns true. This makes RocksDB constantly flush. > > Even if I didn't correctly understand the code, somehow the flushing > happens > constantly. > > The RocksDB docs https://github.com/facebook/rocksdb/wiki/MemTable#flush > [github.com] > <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_facebook_rocksdb_wiki_MemTable-23flush&d=DwMFAg&c=-0jfte1J3SKEE6FyZmTngg&r=-2x4lRPm2yEX3Ylri2jKFRC6zr9S6Iqg2kAJIspWwfA&m=9dqFsA-w9rEcr782SVR8quiS2bKsubnmM8ZshIPBlNM&s=zayCxl8PK6XCl4IQfMmjHY_RUc1_-429d8xpvdwn5rE&e=> > say > memtable is flushed when "write_buffer_manager signals a flush". It seems > that write buffer manager signaling to flush is happening here, but should > it really? It feels odd (if it really is so) that block cache size affects > the decision when the flush the memtable. > > > Here's the latest test program. I've tested against Flink 1.11.1. > > /* > * Licensed to the Apache Software Foundation (ASF) under one or more > * contributor license agreements. See the NOTICE file distributed with > * this work for additional information regarding copyright ownership. > * The ASF licenses this file to You under the Apache License, Version 2.0 > * (the "License"); you may not use this file except in compliance with > * the License. You may obtain a copy of the License at > * > * http://www.apache.org/licenses/LICENSE-2.0 [apache.org] > <https://urldefense.proofpoint.com/v2/url?u=http-3A__www.apache.org_licenses_LICENSE-2D2.0&d=DwMFAg&c=-0jfte1J3SKEE6FyZmTngg&r=-2x4lRPm2yEX3Ylri2jKFRC6zr9S6Iqg2kAJIspWwfA&m=9dqFsA-w9rEcr782SVR8quiS2bKsubnmM8ZshIPBlNM&s=yegrE6BuvXIACM2U8ntJc4oJ7mo3t7McnNc4jsBVmoc&e=> > * > * Unless required by applicable law or agreed to in writing, software > * distributed under the License is distributed on an "AS IS" BASIS, > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > * See the License for the specific language governing permissions and > * limitations under the License. > */ > > package org.apache.flink.streaming.examples.wordcount; > > import com.google.common.util.concurrent.RateLimiter; > import org.apache.flink.api.common.functions.RichFlatMapFunction; > import org.apache.flink.api.common.state.ListState; > import org.apache.flink.api.common.state.ListStateDescriptor; > import org.apache.flink.api.common.state.ValueState; > import org.apache.flink.api.common.state.ValueStateDescriptor; > import org.apache.flink.api.common.typeinfo.BasicTypeInfo; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.api.java.utils.MultipleParameterTool; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.contrib.streaming.state.RocksDBOptions; > import org.apache.flink.contrib.streaming.state.RocksDBOptionsFactory; > import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; > import org.apache.flink.runtime.state.FunctionInitializationContext; > import org.apache.flink.runtime.state.FunctionSnapshotContext; > import org.apache.flink.runtime.state.StateBackend; > import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; > import org.apache.flink.streaming.api.datastream.DataStream; > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.sink.DiscardingSink; > import org.apache.flink.streaming.api.functions.source.SourceFunction; > import org.apache.flink.util.Collector; > import org.rocksdb.ColumnFamilyOptions; > import org.rocksdb.DBOptions; > import org.rocksdb.InfoLogLevel; > > import java.nio.file.Files; > import java.nio.file.Path; > import java.util.Collection; > > import static > > org.apache.flink.contrib.streaming.state.PredefinedOptions.FLASH_SSD_OPTIMIZED; > > /** > * Works fast in the following cases. > * <ul> > * <li>{@link #USE_MANAGED_MEMORY} is {@code false}</li> > * <li>{@link #USE_MANAGED_MEMORY} is {@code true} and {@link > #PARALLELISM} is 1 to 4.</li> > * </ul> > * <p> > * Some results: > * </p> > * <ul> > * <li>USE_MANAGED_MEMORY false parallelism 3: 3088 ms</li> > * <li>USE_MANAGED_MEMORY false parallelism 4: 2971 ms</li> > * <li>USE_MANAGED_MEMORY false parallelism 5: 2994 ms</li> > * <li>USE_MANAGED_MEMORY true parallelism 3: 4337 ms</li> > * <li>USE_MANAGED_MEMORY true parallelism 4: 2808 ms</li> > * <li>USE_MANAGED_MEMORY true parallelism 5: 126050 ms</li> > * </ul> > * <p> > */ > public class WordCount { > /** > * The parallelism of the job. > */ > private static final int PARALLELISM = 1; > > /** > * Whether to use managed memory. True, no changes in the config. > * False, managed memory is disabled. > */ > private static final boolean USE_MANAGED_MEMORY = true; > > /** > * If {@link #USE_MANAGED_MEMORY} is {@code true} has effect. > * Sets the {@link RocksDBOptions#WRITE_BUFFER_RATIO}. > */ > private static Double WRITE_BUFFER_RATIO = 0.1; > > /** > * The source synthesizes this many events. > */ > public static final int EVENT_COUNT = 1_000_000; > > /** > * The value of each event is {@code EVENT_COUNT % MAX_VALUE}. > * Essentially controls the count of unique keys. > */ > public static final int MAX_VALUE = 1_000; > > /** > * If non-null, rate limits the events from the source. > */ > public static final Integer SOURCE_EVENTS_PER_SECOND = null; > > public static final boolean ENABLE_ROCKS_LOGGING = true; > > > // > ************************************************************************* > // PROGRAMF > // > ************************************************************************* > > public static void main(String[] args) throws Exception { > > // Checking input parameters > final MultipleParameterTool params = > MultipleParameterTool.fromArgs(args); > > // set up the execution environment > Configuration configuration = new Configuration(); > if (!USE_MANAGED_MEMORY) { > configuration.setBoolean(RocksDBOptions.USE_MANAGED_MEMORY, > USE_MANAGED_MEMORY); > } else { > if (WRITE_BUFFER_RATIO != null) { > configuration.setDouble(RocksDBOptions.WRITE_BUFFER_RATIO, > WRITE_BUFFER_RATIO.doubleValue()); > } > } > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironment(PARALLELISM, > configuration); > > Path tempDirPath = Files.createTempDirectory("example"); > String checkpointDataUri = "file://%22 + tempDirPath.toString(); > > RocksDBStateBackend rocksDBStateBackend = new > RocksDBStateBackend(checkpointDataUri, true); > if (ENABLE_ROCKS_LOGGING) { > rocksDBStateBackend.setRocksDBOptions(new > ExampleRocksDBOptionsFactory()); > } else { > rocksDBStateBackend.setPredefinedOptions(FLASH_SSD_OPTIMIZED); > } > > env.setStateBackend((StateBackend) rocksDBStateBackend); > > // make parameters available in the web interface > env.getConfig().setGlobalJobParameters(params); > > // get input data > DataStream<Long> text = env.addSource(new ExampleCountSource()); > > text.keyBy(v -> v) > .flatMap(new ValuesCounter()) > .addSink(new DiscardingSink<>()); > > long before = System.currentTimeMillis(); > env.execute("Streaming WordCount"); > long duration = System.currentTimeMillis() - before; > > System.out.println("Done " + duration + " ms, parallelism " + > PARALLELISM); > } > > > private static class ExampleRocksDBOptionsFactory implements > RocksDBOptionsFactory { > > @Override > public DBOptions createDBOptions(DBOptions currentOptions, > Collection<AutoCloseable> handlesToClose) { > currentOptions.setIncreaseParallelism(4) > .setUseFsync(false) > .setMaxOpenFiles(-1) > .setKeepLogFileNum(10) > .setInfoLogLevel(InfoLogLevel.INFO_LEVEL) > .setStatsDumpPeriodSec(0) > .setMaxLogFileSize(100 * 1024 * 1024); // 100 MB each > > return currentOptions; > } > > @Override > public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions > currentOptions, Collection<AutoCloseable> handlesToClose) { > return currentOptions; > } > } > > // > ************************************************************************* > // USER FUNCTIONS > // > ************************************************************************* > > private static class ValuesCounter extends RichFlatMapFunction<Long, > Tuple2<Long, Long>> { > private ValueState<Long> state; > > > @Override > public void flatMap(Long value, Collector<Tuple2<Long, Long>> > out) throws Exception { > Long oldCount = state.value(); > if (oldCount == null) { > oldCount = 0L; > } > long newCount = oldCount + 1; > state.update(newCount); > > out.collect(Tuple2.of(value, newCount)); > } > > @Override > public void open(Configuration parameters) throws Exception { > super.open(parameters); > > ValueStateDescriptor<Long> descriptor = new > ValueStateDescriptor("valueState", BasicTypeInfo.LONG_TYPE_INFO); > state = getRuntimeContext().getState(descriptor); > } > } > > public static class ExampleCountSource implements SourceFunction<Long>, > CheckpointedFunction { > private long count = 0L; > private volatile boolean isRunning = true; > > private transient ListState<Long> checkpointedCount; > > private static final RateLimiter rateLimiter = > SOURCE_EVENTS_PER_SECOND != null ? > RateLimiter.create(SOURCE_EVENTS_PER_SECOND) : null; > > public void run(SourceContext<Long> ctx) { > while (isRunning && count < EVENT_COUNT) { > if (rateLimiter != null) { > rateLimiter.acquire(); > } > // this synchronized block ensures that state > checkpointing, > // internal state updates and emission of elements are an > atomic operation > synchronized (ctx.getCheckpointLock()) { > ctx.collect(count % MAX_VALUE); > count++; > } > } > } > > public void cancel() { > isRunning = false; > } > > public void initializeState(FunctionInitializationContext context) > throws Exception { > this.checkpointedCount = context > .getOperatorStateStore() > .getListState(new ListStateDescriptor<>("count", > Long.class)); > > if (context.isRestored()) { > for (Long count : this.checkpointedCount.get()) { > this.count = count; > } > } > } > > public void snapshotState(FunctionSnapshotContext context) throws > Exception { > this.checkpointedCount.clear(); > this.checkpointedCount.add(count); > } > } > } > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > [apache-flink-user-mailing-list-archive.2336050.n4.nabble.com] > <https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dflink-2Duser-2Dmailing-2Dlist-2Darchive.2336050.n4.nabble.com_&d=DwMFAg&c=-0jfte1J3SKEE6FyZmTngg&r=-2x4lRPm2yEX3Ylri2jKFRC6zr9S6Iqg2kAJIspWwfA&m=9dqFsA-w9rEcr782SVR8quiS2bKsubnmM8ZshIPBlNM&s=xdutsLFVzPqnjT5kR1y76hiY-68pJNMeMHT5S7DL_d8&e=> > >