The issue can be reproduced by using a certain combinations of the value of RocksDBOptions.WRITE_BUFFER_RATIO (default 0.5) and the Flink job parallelism.
Examples that break: * Parallelism 1 and WRITE_BUFFER_RATIO 0.1 * Parallelism 5 and the default WRITE_BUFFER_RATIO 0.5 Examples that work: * Parallelism 1 and WRITE_BUFFER_RATIO 0.5, duration 34164 ms In a working case (parallelism 1, WRITE_BUFFER_RATIO 0.2) RocksDB log looks like this (right after the uninteresting bootup messages): 2020/08/21-12:55:41.693771 7f56e6643700 [db/db_impl.cc:1546] Created column family [valueState] (ID 1) 2020/08/21-12:55:42.213743 7f56e6643700 [db/db_impl_write.cc:1103] Flushing column family with largest mem table size. Write buffer is using 16789472 bytes out of a total of 17895697. 2020/08/21-12:55:42.213799 7f56e6643700 [db/db_impl_write.cc:1423] [valueState] New memtable created with log file: #3. Immutable memtables: 0. 2020/08/21-12:55:42.213924 7f56deffd700 (Original Log Time 2020/08/21-12:55:42.213882) [db/db_impl_compaction_flush.cc:1560] Calling FlushMemTableToOutputFile with column family [valueState], flush slots available 1, compaction slots available 1, flush slots scheduled 1, compaction slots scheduled 0 2020/08/21-12:55:42.213927 7f56deffd700 [db/flush_job.cc:304] [valueState] [JOB 2] Flushing memtable with next log file: 3 2020/08/21-12:55:42.213969 7f56deffd700 EVENT_LOG_v1 {"time_micros": 1598003742213958, "job": 2, "event": "flush_started", "num_memtables": 1, "num_entries": 170995, "num_deletes": 0, "memory_usage": 8399008, "flush_reason": "Write Buffer Full"} 2020/08/21-12:55:42.213973 7f56deffd700 [db/flush_job.cc:334] [valueState] [JOB 2] Level-0 flush table #9: started 2020/08/21-12:55:42.228444 7f56deffd700 EVENT_LOG_v1 {"time_micros": 1598003742228435, "cf_name": "valueState", "job": 2, "event": "table_file_creation", "file_number": 9, "file_size": 10971, "table_properties": {"data_size": 10200, "index_size": 168, "filter_size": 0, "raw_key_size": 18000, "raw_average_key_size": 18, "raw_value_size": 8000, "raw_average_value_size": 8, "num_data_blocks": 6, "num_entries": 1000, "filter_policy_name": "", "kDeletedKeys": "0", "kMergeOperands": "0"}} 2020/08/21-12:55:42.228460 7f56deffd700 [db/flush_job.cc:374] [valueState] [JOB 2] Level-0 flush table #9: 10971 bytes OK The main thing to look at is "num_entries": 170995, meaning RocksDB flushes a memtable with quite large number of entries. It flushes 53 times during the test, which sounds sensible. In a breaking case (parallelism 1, WRITE_BUFFER_RATIO 0.1) RocksDB log looks like this: 2020/08/21-12:53:02.917606 7f2cabfff700 [db/db_impl_write.cc:1103] Flushing column family with largest mem table size. Write buffer is using 8396784 bytes out of a total of 8947848. 2020/08/21-12:53:02.917702 7f2cabfff700 [db/db_impl_write.cc:1423] [valueState] New memtable created with log file: #3. Immutable memtables: 0. 2020/08/21-12:53:02.917988 7f2ca8bf1700 (Original Log Time 2020/08/21-12:53:02.917868) [db/db_impl_compaction_flush.cc:1560] Calling FlushMemTableToOutputFile with column family [valueState], flush slots available 1, compaction slots available 1, flush slots scheduled 1, compaction slots scheduled 0 2020/08/21-12:53:02.918004 7f2ca8bf1700 [db/flush_job.cc:304] [valueState] [JOB 2] Flushing memtable with next log file: 3 2020/08/21-12:53:02.918099 7f2ca8bf1700 EVENT_LOG_v1 {"time_micros": 1598003582918053, "job": 2, "event": "flush_started", "num_memtables": 1, "num_entries": 29, "num_deletes": 0, "memory_usage": 6320, "flush_reason": "Write Buffer Full"} 2020/08/21-12:53:02.918118 7f2ca8bf1700 [db/flush_job.cc:334] [valueState] [JOB 2] Level-0 flush table #9: started ... 2020/08/21-12:55:20.261887 7f2ca8bf1700 EVENT_LOG_v1 {"time_micros": 1598003720261879, "job": 20079, "event": "flush_started", "num_memtables": 1, "num_entries": 29, "num_deletes": 0, "memory_usage": 2240, "flush_reason": "Write Buffer Full"} 2020/08/21-12:55:20.261892 7f2ca8bf1700 [db/flush_job.cc:334] [valueState] [JOB 20079] Level-0 flush table #20085: started This time "num_entries": 29, meaning RocksDB flushes the memtable when there are only 29 entries consuming 6320 bytes memory. All memtable flushes look alike. There are total flushes 20079 times during the test, which is more than 300 times more than with the working config. Memtable flush and the compactions those will cause kill the performance. It looks like RocksDB flushes way too early, before the memtable should be considered full. But why? The answer lies in the RocksDB code. kingspace/frocksdb/db/db_impl_write.cc if (UNLIKELY(status.ok() && write_buffer_manager_->ShouldFlush())) { // Before a new memtable is added in SwitchMemtable(), // write_buffer_manager_->ShouldFlush() will keep returning true. If another // thread is writing to another DB with the same write buffer, they may also // be flushed. We may end up with flushing much more DBs than needed. It's // suboptimal but still correct. status = HandleWriteBufferFull(write_context); } ... Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) { mutex_.AssertHeld(); assert(write_context != nullptr); Status status; // Before a new memtable is added in SwitchMemtable(), // write_buffer_manager_->ShouldFlush() will keep returning true. If another // thread is writing to another DB with the same write buffer, they may also // be flushed. We may end up with flushing much more DBs than needed. It's // suboptimal but still correct. ROCKS_LOG_INFO( immutable_db_options_.info_log, "Flushing column family with largest mem table size. Write buffer is " "using %" PRIu64 " bytes out of a total of %" PRIu64 ".", write_buffer_manager_->memory_usage(), write_buffer_manager_->buffer_size()); frocksdb/include/rocksdb/write_buffer_manager.h: bool ShouldFlush() const { if (enabled()) { if (mutable_memtable_memory_usage() > mutable_limit_) { return true; } if (memory_usage() >= buffer_size_ && mutable_memtable_memory_usage() >= buffer_size_ / 2) { // If the memory exceeds the buffer size, we trigger more aggressive // flush. But if already more than half memory is being flushed, // triggering more flush may not help. We will hold it instead. return true; } } return false; } Let's dig some params. There's the line in the logs "Flushing column family with largest mem table size. Write buffer is using 8396784 bytes out of a total of 8947848.". From that we can see: write_buffer_manager_->memory_usage() is 8396784 write_buffer_manager_->buffer_size() is 8947848 Additionally: buffer_size_ is 8947848. This can be seen e.g. putting a breakpoint in RocksDBMemoryControllerUtils.createWriteBufferManager() mutable_limit_ is buffer_size_ * 7 / 8 = 8947848 * 7 / 8 = 7829367 In ShouldFlush() "memory_usage() >= buffer_size_" is false, so the latter if-statement in ShouldFlush() is false. The 1st one has to be true. I'm not totally sure why this happens. Now I'm guessing. The memory RocksDB uses for the block cache is calculated in the memory memtable uses (in mutable_memtable_memory_usage()). In RocksDB conf: Options.arena_block_size: 8388608 If the block cache has allocated one of these blocks, this check: mutable_memtable_memory_usage() > mutable_limit_ Becomes: 8388608 + really_used_by_memtable > 7829367 8388608 + 6320 > 7829367 This is always true (even if memtable used 0 bytes of memory). ShouldFlush always returns true. This makes RocksDB constantly flush. Even if I didn't correctly understand the code, somehow the flushing happens constantly. The RocksDB docs https://github.com/facebook/rocksdb/wiki/MemTable#flush say memtable is flushed when "write_buffer_manager signals a flush". It seems that write buffer manager signaling to flush is happening here, but should it really? It feels odd (if it really is so) that block cache size affects the decision when the flush the memtable. Here's the latest test program. I've tested against Flink 1.11.1. /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink.streaming.examples.wordcount; import com.google.common.util.concurrent.RateLimiter; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.MultipleParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.contrib.streaming.state.RocksDBOptions; import org.apache.flink.contrib.streaming.state.RocksDBOptionsFactory; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.util.Collector; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; import org.rocksdb.InfoLogLevel; import java.nio.file.Files; import java.nio.file.Path; import java.util.Collection; import static org.apache.flink.contrib.streaming.state.PredefinedOptions.FLASH_SSD_OPTIMIZED; /** * Works fast in the following cases. * <ul> * <li>{@link #USE_MANAGED_MEMORY} is {@code false}</li> * <li>{@link #USE_MANAGED_MEMORY} is {@code true} and {@link #PARALLELISM} is 1 to 4.</li> * </ul> * <p> * Some results: * </p> * <ul> * <li>USE_MANAGED_MEMORY false parallelism 3: 3088 ms</li> * <li>USE_MANAGED_MEMORY false parallelism 4: 2971 ms</li> * <li>USE_MANAGED_MEMORY false parallelism 5: 2994 ms</li> * <li>USE_MANAGED_MEMORY true parallelism 3: 4337 ms</li> * <li>USE_MANAGED_MEMORY true parallelism 4: 2808 ms</li> * <li>USE_MANAGED_MEMORY true parallelism 5: 126050 ms</li> * </ul> * <p> */ public class WordCount { /** * The parallelism of the job. */ private static final int PARALLELISM = 1; /** * Whether to use managed memory. True, no changes in the config. * False, managed memory is disabled. */ private static final boolean USE_MANAGED_MEMORY = true; /** * If {@link #USE_MANAGED_MEMORY} is {@code true} has effect. * Sets the {@link RocksDBOptions#WRITE_BUFFER_RATIO}. */ private static Double WRITE_BUFFER_RATIO = 0.1; /** * The source synthesizes this many events. */ public static final int EVENT_COUNT = 1_000_000; /** * The value of each event is {@code EVENT_COUNT % MAX_VALUE}. * Essentially controls the count of unique keys. */ public static final int MAX_VALUE = 1_000; /** * If non-null, rate limits the events from the source. */ public static final Integer SOURCE_EVENTS_PER_SECOND = null; public static final boolean ENABLE_ROCKS_LOGGING = true; // ************************************************************************* // PROGRAMF // ************************************************************************* public static void main(String[] args) throws Exception { // Checking input parameters final MultipleParameterTool params = MultipleParameterTool.fromArgs(args); // set up the execution environment Configuration configuration = new Configuration(); if (!USE_MANAGED_MEMORY) { configuration.setBoolean(RocksDBOptions.USE_MANAGED_MEMORY, USE_MANAGED_MEMORY); } else { if (WRITE_BUFFER_RATIO != null) { configuration.setDouble(RocksDBOptions.WRITE_BUFFER_RATIO, WRITE_BUFFER_RATIO.doubleValue()); } } final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(PARALLELISM, configuration); Path tempDirPath = Files.createTempDirectory("example"); String checkpointDataUri = "file://" + tempDirPath.toString(); RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(checkpointDataUri, true); if (ENABLE_ROCKS_LOGGING) { rocksDBStateBackend.setRocksDBOptions(new ExampleRocksDBOptionsFactory()); } else { rocksDBStateBackend.setPredefinedOptions(FLASH_SSD_OPTIMIZED); } env.setStateBackend((StateBackend) rocksDBStateBackend); // make parameters available in the web interface env.getConfig().setGlobalJobParameters(params); // get input data DataStream<Long> text = env.addSource(new ExampleCountSource()); text.keyBy(v -> v) .flatMap(new ValuesCounter()) .addSink(new DiscardingSink<>()); long before = System.currentTimeMillis(); env.execute("Streaming WordCount"); long duration = System.currentTimeMillis() - before; System.out.println("Done " + duration + " ms, parallelism " + PARALLELISM); } private static class ExampleRocksDBOptionsFactory implements RocksDBOptionsFactory { @Override public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) { currentOptions.setIncreaseParallelism(4) .setUseFsync(false) .setMaxOpenFiles(-1) .setKeepLogFileNum(10) .setInfoLogLevel(InfoLogLevel.INFO_LEVEL) .setStatsDumpPeriodSec(0) .setMaxLogFileSize(100 * 1024 * 1024); // 100 MB each return currentOptions; } @Override public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) { return currentOptions; } } // ************************************************************************* // USER FUNCTIONS // ************************************************************************* private static class ValuesCounter extends RichFlatMapFunction<Long, Tuple2<Long, Long>> { private ValueState<Long> state; @Override public void flatMap(Long value, Collector<Tuple2<Long, Long>> out) throws Exception { Long oldCount = state.value(); if (oldCount == null) { oldCount = 0L; } long newCount = oldCount + 1; state.update(newCount); out.collect(Tuple2.of(value, newCount)); } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor("valueState", BasicTypeInfo.LONG_TYPE_INFO); state = getRuntimeContext().getState(descriptor); } } public static class ExampleCountSource implements SourceFunction<Long>, CheckpointedFunction { private long count = 0L; private volatile boolean isRunning = true; private transient ListState<Long> checkpointedCount; private static final RateLimiter rateLimiter = SOURCE_EVENTS_PER_SECOND != null ? RateLimiter.create(SOURCE_EVENTS_PER_SECOND) : null; public void run(SourceContext<Long> ctx) { while (isRunning && count < EVENT_COUNT) { if (rateLimiter != null) { rateLimiter.acquire(); } // this synchronized block ensures that state checkpointing, // internal state updates and emission of elements are an atomic operation synchronized (ctx.getCheckpointLock()) { ctx.collect(count % MAX_VALUE); count++; } } } public void cancel() { isRunning = false; } public void initializeState(FunctionInitializationContext context) throws Exception { this.checkpointedCount = context .getOperatorStateStore() .getListState(new ListStateDescriptor<>("count", Long.class)); if (context.isRestored()) { for (Long count : this.checkpointedCount.get()) { this.count = count; } } } public void snapshotState(FunctionSnapshotContext context) throws Exception { this.checkpointedCount.clear(); this.checkpointedCount.add(count); } } } -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/