Hey, Here's a simple test. It's basically the WordCount example from Flink, but using RocksDB as the state backend and having a stateful operator. The javadocs explain how to use it.
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink.streaming.examples.wordcount; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.MultipleParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.contrib.streaming.state.PredefinedOptions; import org.apache.flink.contrib.streaming.state.RocksDBOptions; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.util.Collector; import java.nio.file.Files; import java.nio.file.Path; /** * Works fast in the following cases. * <ul> * <li>{@link #USE_MANAGED_MEMORY} is {@code false}</li> * <li>{@link #USE_MANAGED_MEMORY} is {@code true} and {@link #PARALLELISM} is 1 to 4.</li> * </ul> * <p> * Some results: * </p> * <ul> * <li>USE_MANAGED_MEMORY false parallelism 3: 3088 ms</li> * <li>USE_MANAGED_MEMORY false parallelism 4: 2971 ms</li> * <li>USE_MANAGED_MEMORY false parallelism 5: 2994 ms</li> * <li>USE_MANAGED_MEMORY true parallelism 3: 4337 ms</li> * <li>USE_MANAGED_MEMORY true parallelism 4: 2808 ms</li> * <li>USE_MANAGED_MEMORY true parallelism 5: 126050 ms</li> * </ul> * <p> */ public class WordCount { /** * The parallelism of the job. */ private static final int PARALLELISM = 5; /** * Whether to use managed memory. True, no changes in the config. * False, managed memory is disabled. */ private static final boolean USE_MANAGED_MEMORY = true; /** * The source synthesizes this many events. */ public static final int EVENT_COUNT = 1_000_000; /** * The value of each event is {@code EVENT_COUNT % MAX_VALUE}. * Essentially controls the count of unique keys. */ public static final int MAX_VALUE = 1_000; // ************************************************************************* // PROGRAM // ************************************************************************* public static void main(String[] args) throws Exception { // Checking input parameters final MultipleParameterTool params = MultipleParameterTool.fromArgs(args); // set up the execution environment Configuration configuration = new Configuration(); if (!USE_MANAGED_MEMORY) { configuration.setBoolean(RocksDBOptions.USE_MANAGED_MEMORY, USE_MANAGED_MEMORY); } final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(PARALLELISM, configuration); Path tempDirPath = Files.createTempDirectory("example"); String checkpointDataUri = "file://" + tempDirPath.toString(); RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(checkpointDataUri, true); rocksDBStateBackend.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED); env.setStateBackend((StateBackend) rocksDBStateBackend); // make parameters available in the web interface env.getConfig().setGlobalJobParameters(params); // get input data DataStream<Long> text = env.addSource(new ExampleCountSource()); text.keyBy(v -> v) .flatMap(new ValuesCounter()) .addSink(new DiscardingSink<>()); long before = System.currentTimeMillis(); env.execute("Streaming WordCount"); long duration = System.currentTimeMillis() - before; System.out.println("Done " + duration + " ms, parallelism " + PARALLELISM); } // ************************************************************************* // USER FUNCTIONS // ************************************************************************* private static class ValuesCounter extends RichFlatMapFunction<Long, Tuple2<Long, Long>> { private ValueState<Long> state; @Override public void flatMap(Long value, Collector<Tuple2<Long, Long>> out) throws Exception { Long oldCount = state.value(); if (oldCount == null) { oldCount = 0L; } long newCount = oldCount + 1; state.update(newCount); out.collect(Tuple2.of(value, newCount)); } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor("valueState", BasicTypeInfo.LONG_TYPE_INFO); state = getRuntimeContext().getState(descriptor); } } public static class ExampleCountSource implements SourceFunction<Long>, CheckpointedFunction { private long count = 0L; private volatile boolean isRunning = true; private transient ListState<Long> checkpointedCount; public void run(SourceContext<Long> ctx) { while (isRunning && count < EVENT_COUNT) { // this synchronized block ensures that state checkpointing, // internal state updates and emission of elements are an atomic operation synchronized (ctx.getCheckpointLock()) { ctx.collect(count % MAX_VALUE); count++; } } } public void cancel() { isRunning = false; } public void initializeState(FunctionInitializationContext context) throws Exception { this.checkpointedCount = context .getOperatorStateStore() .getListState(new ListStateDescriptor<>("count", Long.class)); if (context.isRestored()) { for (Long count : this.checkpointedCount.get()) { this.count = count; } } } public void snapshotState(FunctionSnapshotContext context) throws Exception { this.checkpointedCount.clear(); this.checkpointedCount.add(count); } } } Regards, Juha -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/