Hey Yu, 1. Memory and other configuration
There's not much configuration going on, it's all in the Java class WordCount. Specifically, memory-related there's this one: rocksDBStateBackend.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED); I quickly tried that commenting out that line doesn't seem to change anything. 2. I'm not sure what would / should I look for. For 'taskmanager.memory.managed.fraction' I tried configuration.setDouble("taskmanager.memory.managed.fraction", 0.8); But using debugger, I don't see that variable being used. Maybe it's not used in StreamExecutionEnvironment.createLocalEnvironment? 3. There are no timers, so I don't setting this parameter matters. Anyways, I tried this: configuration.setString(RocksDBOptions.TIMER_SERVICE_FACTORY, "HEAP"); final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(PARALLELISM, configuration); No changes in the performance (tried with parallelism 5 and without managed memory). Regards, Juha ________________________________ From: Yu Li <car...@gmail.com> Sent: Thursday, June 25, 2020 12:20 PM To: Andrey Zagrebin <azagre...@apache.org> Cc: Juha Mynttinen <juha.myntti...@king.com>; Yun Tang <myas...@live.com>; user <user@flink.apache.org> Subject: Re: Performance issue associated with managed RocksDB memory Thanks for the ping Andrey. Hi Juha, Thanks for reporting the issue. I'd like to check the below things before further digging into it: 1. Could you let us know your configurations (especially memory related ones) when running the tests? 2. Did you watch the memory consumption before / after turning `state.backend.rocksdb.memory.managed` off? If not, could you check it out and let us know the result? 2.1 Furthermore, if the memory consumption is much higher when turning managed memory off, could you try tuning up the managed memory fraction accordingly through `taskmanager.memory.managed.fraction` [1] and check the result? 3. With `state.backend.rocksdb.memory.managed` on and nothing else changed, could you try to set `state.backend.rocksdb.timer-service.factory` to `HEAP` and check out the result? (side note: starting from 1.10.0 release timers are stored in RocksDB by default when using RocksDBStateBackend [2]) What's more, you may find these documents [3] [4] useful for memory tunings of RocksDB backend. Thanks. Best Regards, Yu [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#taskmanager-memory-managed-fraction [ci.apache.org]<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.10_ops_config.html-23taskmanager-2Dmemory-2Dmanaged-2Dfraction&d=DwMFaQ&c=-0jfte1J3SKEE6FyZmTngg&r=-2x4lRPm2yEX3Ylri2jKFRC6zr9S6Iqg2kAJIspWwfA&m=XWFcSGOTorsQ-2NEelbZgYrzvT31kA_U4_1Sj11rusE&s=nWe2oPjCOeQgnztiDmXO2zE-8n3GoWKCMtDwsoammZ4&e=> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/release-notes/flink-1.10.html#state [ci.apache.org]<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.10_release-2Dnotes_flink-2D1.10.html-23state&d=DwMFaQ&c=-0jfte1J3SKEE6FyZmTngg&r=-2x4lRPm2yEX3Ylri2jKFRC6zr9S6Iqg2kAJIspWwfA&m=XWFcSGOTorsQ-2NEelbZgYrzvT31kA_U4_1Sj11rusE&s=vBem5cU31p97UhrmB0aDezh-6qJu3uHXu-HXLtWAb04&e=> [3] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/large_state_tuning.html#tuning-rocksdb-memory [ci.apache.org]<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.10_ops_state_large-5Fstate-5Ftuning.html-23tuning-2Drocksdb-2Dmemory&d=DwMFaQ&c=-0jfte1J3SKEE6FyZmTngg&r=-2x4lRPm2yEX3Ylri2jKFRC6zr9S6Iqg2kAJIspWwfA&m=XWFcSGOTorsQ-2NEelbZgYrzvT31kA_U4_1Sj11rusE&s=2kTtbYqWKZXmZjr-cqdgMFUmSD2jSmAZ_mWmYX7QVLA&e=> [4] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/state_backends.html#memory-management [ci.apache.org]<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.10_ops_state_state-5Fbackends.html-23memory-2Dmanagement&d=DwMFaQ&c=-0jfte1J3SKEE6FyZmTngg&r=-2x4lRPm2yEX3Ylri2jKFRC6zr9S6Iqg2kAJIspWwfA&m=XWFcSGOTorsQ-2NEelbZgYrzvT31kA_U4_1Sj11rusE&s=sNygzPzz1UEAGL8rSHJMA002su5zHtwCj-b3antyhoY&e=> On Thu, 25 Jun 2020 at 15:37, Andrey Zagrebin <azagre...@apache.org<mailto:azagre...@apache.org>> wrote: Hi Juha, Thanks for sharing the testing program to expose the problem. This indeed looks suboptimal if X does not leave space for the window operator. I am adding Yu and Yun who might have a better idea about what could be improved about sharing the RocksDB memory among operators. Best, Andrey On Thu, Jun 25, 2020 at 9:10 AM Juha Mynttinen <juha.myntti...@king.com<mailto:juha.myntti...@king.com>> wrote: Hey, Here's a simple test. It's basically the WordCount example from Flink, but using RocksDB as the state backend and having a stateful operator. The javadocs explain how to use it. /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 [apache.org]<https://urldefense.proofpoint.com/v2/url?u=http-3A__www.apache.org_licenses_LICENSE-2D2.0&d=DwMFaQ&c=-0jfte1J3SKEE6FyZmTngg&r=-2x4lRPm2yEX3Ylri2jKFRC6zr9S6Iqg2kAJIspWwfA&m=XWFcSGOTorsQ-2NEelbZgYrzvT31kA_U4_1Sj11rusE&s=CF2iW5teWqX33CzhRXJ9b8OTwWDFLWfTTWSiWqtoaio&e=> * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink.streaming.examples.wordcount; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.MultipleParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.contrib.streaming.state.PredefinedOptions; import org.apache.flink.contrib.streaming.state.RocksDBOptions; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.util.Collector; import java.nio.file.Files; import java.nio.file.Path; /** * Works fast in the following cases. * <ul> * <li>{@link #USE_MANAGED_MEMORY} is {@code false}</li> * <li>{@link #USE_MANAGED_MEMORY} is {@code true} and {@link #PARALLELISM} is 1 to 4.</li> * </ul> * <p> * Some results: * </p> * <ul> * <li>USE_MANAGED_MEMORY false parallelism 3: 3088 ms</li> * <li>USE_MANAGED_MEMORY false parallelism 4: 2971 ms</li> * <li>USE_MANAGED_MEMORY false parallelism 5: 2994 ms</li> * <li>USE_MANAGED_MEMORY true parallelism 3: 4337 ms</li> * <li>USE_MANAGED_MEMORY true parallelism 4: 2808 ms</li> * <li>USE_MANAGED_MEMORY true parallelism 5: 126050 ms</li> * </ul> * <p> */ public class WordCount { /** * The parallelism of the job. */ private static final int PARALLELISM = 5; /** * Whether to use managed memory. True, no changes in the config. * False, managed memory is disabled. */ private static final boolean USE_MANAGED_MEMORY = true; /** * The source synthesizes this many events. */ public static final int EVENT_COUNT = 1_000_000; /** * The value of each event is {@code EVENT_COUNT % MAX_VALUE}. * Essentially controls the count of unique keys. */ public static final int MAX_VALUE = 1_000; // ************************************************************************* // PROGRAM // ************************************************************************* public static void main(String[] args) throws Exception { // Checking input parameters final MultipleParameterTool params = MultipleParameterTool.fromArgs(args); // set up the execution environment Configuration configuration = new Configuration(); if (!USE_MANAGED_MEMORY) { configuration.setBoolean(RocksDBOptions.USE_MANAGED_MEMORY, USE_MANAGED_MEMORY); } final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(PARALLELISM, configuration); Path tempDirPath = Files.createTempDirectory("example"); String checkpointDataUri = "file://%22 + tempDirPath.toString(); RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(checkpointDataUri, true); rocksDBStateBackend.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED); env.setStateBackend((StateBackend) rocksDBStateBackend); // make parameters available in the web interface env.getConfig().setGlobalJobParameters(params); // get input data DataStream<Long> text = env.addSource(new ExampleCountSource()); text.keyBy(v -> v) .flatMap(new ValuesCounter()) .addSink(new DiscardingSink<>()); long before = System.currentTimeMillis(); env.execute("Streaming WordCount"); long duration = System.currentTimeMillis() - before; System.out.println("Done " + duration + " ms, parallelism " + PARALLELISM); } // ************************************************************************* // USER FUNCTIONS // ************************************************************************* private static class ValuesCounter extends RichFlatMapFunction<Long, Tuple2<Long, Long>> { private ValueState<Long> state; @Override public void flatMap(Long value, Collector<Tuple2<Long, Long>> out) throws Exception { Long oldCount = state.value(); if (oldCount == null) { oldCount = 0L; } long newCount = oldCount + 1; state.update(newCount); out.collect(Tuple2.of(value, newCount)); } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor("valueState", BasicTypeInfo.LONG_TYPE_INFO); state = getRuntimeContext().getState(descriptor); } } public static class ExampleCountSource implements SourceFunction<Long>, CheckpointedFunction { private long count = 0L; private volatile boolean isRunning = true; private transient ListState<Long> checkpointedCount; public void run(SourceContext<Long> ctx) { while (isRunning && count < EVENT_COUNT) { // this synchronized block ensures that state checkpointing, // internal state updates and emission of elements are an atomic operation synchronized (ctx.getCheckpointLock()) { ctx.collect(count % MAX_VALUE); count++; } } } public void cancel() { isRunning = false; } public void initializeState(FunctionInitializationContext context) throws Exception { this.checkpointedCount = context .getOperatorStateStore() .getListState(new ListStateDescriptor<>("count", Long.class)); if (context.isRestored()) { for (Long count : this.checkpointedCount.get()) { this.count = count; } } } public void snapshotState(FunctionSnapshotContext context) throws Exception { this.checkpointedCount.clear(); this.checkpointedCount.add(count); } } } Regards, Juha -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ [apache-flink-user-mailing-list-archive.2336050.n4.nabble.com]<https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dflink-2Duser-2Dmailing-2Dlist-2Darchive.2336050.n4.nabble.com_&d=DwMFaQ&c=-0jfte1J3SKEE6FyZmTngg&r=-2x4lRPm2yEX3Ylri2jKFRC6zr9S6Iqg2kAJIspWwfA&m=XWFcSGOTorsQ-2NEelbZgYrzvT31kA_U4_1Sj11rusE&s=IAemjhbmFdh9Wqn9tixCSS_w5wJ0HoRyyF9Hl05vTm4&e=>