Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3778#discussion_r114293581 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java --- @@ -0,0 +1,771 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing.utils; + +import static org.junit.Assert.assertEquals; + +import org.apache.flink.api.common.accumulators.IntCounter; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Collector; +import org.junit.Ignore; +import org.junit.Test; + +/** + * This verifies that we can restore a complete job from a Flink 1.2 savepoint. + * + * <p>The test pipeline contains both "Checkpointed" state and keyed user state. + * + * <p>The tests will time out if they don't see the required number of successful checks within + * a time limit. + */ +public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigrationTestBase { + private static final int NUM_SOURCE_ELEMENTS = 4; + + /** + * This has to be manually executed to create the savepoint on Flink 1.2. + */ + @Test + @Ignore + public void testCreateSavepointOnFlink12() throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + // we only test memory state backend yet + env.setStateBackend(new MemoryStateBackend()); + env.enableCheckpointing(500); + env.setParallelism(4); + env.setMaxParallelism(4); + + env + .addSource(new LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource") + .flatMap(new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap") + .keyBy(0) + .flatMap(new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState") + .keyBy(0) + .flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap") + .keyBy(0) + .transform( + "custom_operator", + new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(), + new CheckpointedUdfOperator(new LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator") + .keyBy(0) + .transform( + "timely_stateful_operator", + new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(), + new TimelyStatefulOperator()).uid("TimelyStatefulOperator") + .addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>()); + + executeAndSavepoint( + env, + "src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint", + new Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS)); + } + + /** + * This has to be manually executed to create the savepoint on Flink 1.2. + */ + @Test + @Ignore + public void testCreateSavepointOnFlink12WithRocksDB() throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + RocksDBStateBackend rocksBackend = + new RocksDBStateBackend(new MemoryStateBackend()); + env.setStateBackend(rocksBackend); + env.enableCheckpointing(500); + env.setParallelism(4); + env.setMaxParallelism(4); + + env + .addSource(new LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource") + .flatMap(new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap") + .keyBy(0) + .flatMap(new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState") + .keyBy(0) + .flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap") + .keyBy(0) + .transform( + "custom_operator", + new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(), + new CheckpointedUdfOperator(new LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator") + .keyBy(0) + .transform( + "timely_stateful_operator", + new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(), + new TimelyStatefulOperator()).uid("TimelyStatefulOperator") + .addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>()); + + executeAndSavepoint( + env, + "src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint-rocksdb", + new Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS)); + } + + + @Test + public void testSavepointRestoreFromFlink12() throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(RestartStrategies.noRestart()); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + // we only test memory state backend yet + env.setStateBackend(new MemoryStateBackend()); + env.enableCheckpointing(500); + env.setParallelism(4); + env.setMaxParallelism(4); + + env + .addSource(new CheckingRestoringSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource") + .flatMap(new CheckingRestoringFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap") + .keyBy(0) + .flatMap(new CheckingRestoringFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState") + .keyBy(0) + .flatMap(new CheckingKeyedStateFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap") + .keyBy(0) + .transform( + "custom_operator", + new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(), + new CheckingRestoringUdfOperator(new CheckingRestoringFlatMapWithKeyedStateInOperator())).uid("LegacyCheckpointedOperator") + .keyBy(0) + .transform( + "timely_stateful_operator", + new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(), + new CheckingTimelyStatefulOperator()).uid("TimelyStatefulOperator") + .addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>()); + + restoreAndExecute( + env, + getResourceFilename("stateful-udf-migration-itcase-flink1.2-savepoint"), + new Tuple2<>(CheckingRestoringSource.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 1), + new Tuple2<>(CheckingRestoringFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS), + new Tuple2<>(CheckingRestoringFlatMapWithKeyedState.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS), + new Tuple2<>(CheckingKeyedStateFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS), + new Tuple2<>(CheckingRestoringUdfOperator.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS), + new Tuple2<>(CheckingRestoringFlatMapWithKeyedStateInOperator.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS), + new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS), + new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS), + new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS), + new Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS)); + } + + @Test + public void testSavepointRestoreFromFlink12FromRocksDB() throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(RestartStrategies.noRestart()); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + // we only test memory state backend yet + env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend())); + env.enableCheckpointing(500); + env.setParallelism(4); + env.setMaxParallelism(4); + + env + .addSource(new CheckingRestoringSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource") + .flatMap(new CheckingRestoringFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap") + .keyBy(0) + .flatMap(new CheckingRestoringFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState") + .keyBy(0) + .flatMap(new CheckingKeyedStateFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap") + .keyBy(0) + .transform( + "custom_operator", + new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(), + new CheckingRestoringUdfOperator(new CheckingRestoringFlatMapWithKeyedStateInOperator())).uid("LegacyCheckpointedOperator") + .keyBy(0) + .transform( + "timely_stateful_operator", + new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(), + new CheckingTimelyStatefulOperator()).uid("TimelyStatefulOperator") + .addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>()); + + restoreAndExecute( + env, + getResourceFilename("stateful-udf-migration-itcase-flink1.2-savepoint-rocksdb"), + new Tuple2<>(CheckingRestoringSource.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 1), + new Tuple2<>(CheckingRestoringFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS), + new Tuple2<>(CheckingRestoringFlatMapWithKeyedState.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS), + new Tuple2<>(CheckingKeyedStateFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS), + new Tuple2<>(CheckingRestoringUdfOperator.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS), + new Tuple2<>(CheckingRestoringFlatMapWithKeyedStateInOperator.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS), + new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS), + new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS), + new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS), + new Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS)); + } + + private static class LegacyCheckpointedSource + implements SourceFunction<Tuple2<Long, Long>>, Checkpointed<String> { + + public static String CHECKPOINTED_STRING = "Here be dragons!"; + + private static final long serialVersionUID = 1L; + + private volatile boolean isRunning = true; + + private final int numElements; + + public LegacyCheckpointedSource(int numElements) { + this.numElements = numElements; + } + + @Override + public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception { + + ctx.emitWatermark(new Watermark(0)); + + synchronized (ctx.getCheckpointLock()) { + for (long i = 0; i < numElements; i++) { + ctx.collect(new Tuple2<>(i, i)); + } + } + + // don't emit a final watermark so that we don't trigger the registered event-time + // timers + while (isRunning) { + Thread.sleep(20); + } + } + + @Override + public void cancel() { + isRunning = false; + } + + @Override + public void restoreState(String state) throws Exception { + assertEquals(CHECKPOINTED_STRING, state); + } + + @Override + public String snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + return CHECKPOINTED_STRING; + } + } + + private static class CheckingRestoringSource + extends RichSourceFunction<Tuple2<Long, Long>> + implements CheckpointedRestoring<String> { + + private static final long serialVersionUID = 1L; + + public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingRestoringSource.class + "_RESTORE_CHECK"; + + private volatile boolean isRunning = true; + + private final int numElements; + + private String restoredState; + + public CheckingRestoringSource(int numElements) { + this.numElements = numElements; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter()); + } + + @Override + public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception { + assertEquals(LegacyCheckpointedSource.CHECKPOINTED_STRING, restoredState); + getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1); + + // immediately trigger any set timers + ctx.emitWatermark(new Watermark(1000)); + + synchronized (ctx.getCheckpointLock()) { + for (long i = 0; i < numElements; i++) { + ctx.collect(new Tuple2<>(i, i)); + } + } + + while (isRunning) { + Thread.sleep(20); + } + } + + @Override + public void cancel() { + isRunning = false; + } + + @Override + public void restoreState(String state) throws Exception { + restoredState = state; + } + } + + public static class LegacyCheckpointedFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> + implements Checkpointed<Tuple2<String, Long>> { + + private static final long serialVersionUID = 1L; + + public static Tuple2<String, Long> CHECKPOINTED_TUPLE = + new Tuple2<>("hello", 42L); + + @Override + public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception { + out.collect(value); + } + + @Override + public void restoreState(Tuple2<String, Long> state) throws Exception { + } + + @Override + public Tuple2<String, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + return CHECKPOINTED_TUPLE; + } + } + + public static class CheckingRestoringFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> + implements CheckpointedRestoring<Tuple2<String, Long>> { + + private static final long serialVersionUID = 1L; + + public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingRestoringFlatMap.class + "_RESTORE_CHECK"; + + private transient Tuple2<String, Long> restoredState; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter()); + } + + @Override + public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception { + out.collect(value); + + assertEquals(LegacyCheckpointedFlatMap.CHECKPOINTED_TUPLE, restoredState); + getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1); + + } + + @Override + public void restoreState(Tuple2<String, Long> state) throws Exception { + restoredState = state; + } + } + + public static class LegacyCheckpointedFlatMapWithKeyedState + extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> + implements Checkpointed<Tuple2<String, Long>> { + + private static final long serialVersionUID = 1L; + + public static Tuple2<String, Long> CHECKPOINTED_TUPLE = + new Tuple2<>("hello", 42L); + + private final ValueStateDescriptor<Long> stateDescriptor = + new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE); + + @Override + public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception { + out.collect(value); + + getRuntimeContext().getState(stateDescriptor).update(value.f1); + + assertEquals(value.f1, getRuntimeContext().getState(stateDescriptor).value()); + } + + @Override + public void restoreState(Tuple2<String, Long> state) throws Exception { + } + + @Override + public Tuple2<String, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + return CHECKPOINTED_TUPLE; + } + } + + public static class CheckingRestoringFlatMapWithKeyedState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> + implements CheckpointedRestoring<Tuple2<String, Long>> { + + private static final long serialVersionUID = 1L; + + public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingRestoringFlatMapWithKeyedState.class + "_RESTORE_CHECK"; + + private transient Tuple2<String, Long> restoredState; + + private final ValueStateDescriptor<Long> stateDescriptor = + new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE); + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter()); + } + + @Override + public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception { + out.collect(value); + + ValueState<Long> state = getRuntimeContext().getState(stateDescriptor); + if (state == null) { + throw new RuntimeException("Missing key value state for " + value); + } + + assertEquals(value.f1, state.value()); + assertEquals(LegacyCheckpointedFlatMap.CHECKPOINTED_TUPLE, restoredState); + getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1); + } + + @Override + public void restoreState(Tuple2<String, Long> state) throws Exception { + restoredState = state; + } + } + + public static class CheckingRestoringFlatMapWithKeyedStateInOperator extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> + implements CheckpointedRestoring<Tuple2<String, Long>> { + + private static final long serialVersionUID = 1L; + + public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingRestoringFlatMapWithKeyedStateInOperator.class + "_RESTORE_CHECK"; + + private transient Tuple2<String, Long> restoredState; + + private final ValueStateDescriptor<Long> stateDescriptor = + new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE); + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter()); + } + + @Override + public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception { + out.collect(value); + + ValueState<Long> state = getRuntimeContext().getState(stateDescriptor); + if (state == null) { + throw new RuntimeException("Missing key value state for " + value); + } + + assertEquals(value.f1, state.value()); + assertEquals(LegacyCheckpointedFlatMap.CHECKPOINTED_TUPLE, restoredState); + getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1); + } + + @Override + public void restoreState(Tuple2<String, Long> state) throws Exception { + restoredState = state; + } + } + + public static class KeyedStateSettingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { + + private static final long serialVersionUID = 1L; + + private final ValueStateDescriptor<Long> stateDescriptor = + new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE); + + @Override + public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception { + out.collect(value); + + getRuntimeContext().getState(stateDescriptor).update(value.f1); + } + } + + public static class CheckingKeyedStateFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { + + private static final long serialVersionUID = 1L; + + public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingKeyedStateFlatMap.class + "_RESTORE_CHECK"; + + private final ValueStateDescriptor<Long> stateDescriptor = + new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE); + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter()); + } + + @Override + public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception { + out.collect(value); + + ValueState<Long> state = getRuntimeContext().getState(stateDescriptor); + if (state == null) { + throw new RuntimeException("Missing key value state for " + value); + } + + assertEquals(value.f1, state.value()); + getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1); + } + } + + public static class CheckpointedUdfOperator + extends AbstractUdfStreamOperator<Tuple2<Long, Long>, FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>> + implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> { + private static final long serialVersionUID = 1L; + + private static final String CHECKPOINTED_STRING = "Oh my, that's nice!"; + + public CheckpointedUdfOperator(FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> userFunction) { + super(userFunction); + } + + @Override + public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception { + userFunction.flatMap(element.getValue(), new TimestampedCollector<>(output)); + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + output.emitWatermark(mark); + } + + @Override + public void snapshotState( + FSDataOutputStream out, long checkpointId, long timestamp) throws Exception { + super.snapshotState(out, checkpointId, timestamp); + + DataOutputViewStreamWrapper streamWrapper = new DataOutputViewStreamWrapper(out); + + streamWrapper.writeUTF(CHECKPOINTED_STRING); + streamWrapper.flush(); + } + } + + public static class CheckingRestoringUdfOperator + extends AbstractUdfStreamOperator<Tuple2<Long, Long>, FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>> + implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> { + + private static final long serialVersionUID = 1L; + + public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingRestoringUdfOperator.class + "_RESTORE_CHECK"; + + private String restoredState; + + public CheckingRestoringUdfOperator(FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> userFunction) { + super(userFunction); + } + + @Override + public void open() throws Exception { + super.open(); + + getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter()); + } + + @Override + public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception { + userFunction.flatMap(element.getValue(), new TimestampedCollector<>(output)); + + assertEquals(CheckpointedUdfOperator.CHECKPOINTED_STRING, restoredState); + getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1); + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + output.emitWatermark(mark); + } + + @Override + public void restoreState(FSDataInputStream in) throws Exception { + super.restoreState(in); + + DataInputViewStreamWrapper streamWrapper = new DataInputViewStreamWrapper(in); + + restoredState = streamWrapper.readUTF(); + } + } + + public static class TimelyStatefulOperator + extends AbstractStreamOperator<Tuple2<Long, Long>> + implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>>, Triggerable<Long, Long> { + private static final long serialVersionUID = 1L; + + private final ValueStateDescriptor<Long> stateDescriptor = + new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE); + + private transient InternalTimerService<Long> timerService; + + @Override + public void open() throws Exception { + super.open(); + + timerService = getInternalTimerService( + "timer", + LongSerializer.INSTANCE, + this); + + } + + @Override + public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception { + ValueState<Long> state = getKeyedStateBackend().getPartitionedState( + element.getValue().f0, + LongSerializer.INSTANCE, + stateDescriptor); + + state.update(element.getValue().f1); + + timerService.registerEventTimeTimer(element.getValue().f0, timerService.currentWatermark() + 10); + timerService.registerProcessingTimeTimer(element.getValue().f0, timerService.currentProcessingTime() + 30_000); + + output.collect(element); + } + + @Override + public void onEventTime(InternalTimer<Long, Long> timer) throws Exception { + + } + + @Override + public void onProcessingTime(InternalTimer<Long, Long> timer) throws Exception { + + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + output.emitWatermark(mark); + } + } + + public static class CheckingTimelyStatefulOperator + extends AbstractStreamOperator<Tuple2<Long, Long>> + implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>>, Triggerable<Long, Long> { + private static final long serialVersionUID = 1L; + + public static final String SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR = CheckingTimelyStatefulOperator.class + "_PROCESS_CHECKS"; + public static final String SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR = CheckingTimelyStatefulOperator.class + "_ET_CHECKS"; + public static final String SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR = CheckingTimelyStatefulOperator.class + "_PT_CHECKS"; + + private final ValueStateDescriptor<Long> stateDescriptor = + new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE); + + private transient InternalTimerService<Long> timerService; + + @Override + public void open() throws Exception { + super.open(); + + timerService = getInternalTimerService( + "timer", + LongSerializer.INSTANCE, + this); + + getRuntimeContext().addAccumulator(SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR, new IntCounter()); + getRuntimeContext().addAccumulator(SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR, new IntCounter()); + getRuntimeContext().addAccumulator(SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR, new IntCounter()); + } + + @Override + public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception { + ValueState<Long> state = getKeyedStateBackend().getPartitionedState( + element.getValue().f0, + LongSerializer.INSTANCE, + stateDescriptor); + + assertEquals(state.value(), element.getValue().f1); + getRuntimeContext().getAccumulator(SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR).add(1); + + output.collect(element); + } + + @Override + public void onEventTime(InternalTimer<Long, Long> timer) throws Exception { + ValueState<Long> state = getKeyedStateBackend().getPartitionedState( + timer.getNamespace(), + LongSerializer.INSTANCE, + stateDescriptor); + + assertEquals(state.value(), timer.getNamespace()); + getRuntimeContext().getAccumulator(SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR).add(1); + } + + @Override + public void onProcessingTime(InternalTimer<Long, Long> timer) throws Exception { + ValueState<Long> state = getKeyedStateBackend().getPartitionedState( + timer.getNamespace(), + LongSerializer.INSTANCE, + stateDescriptor); + + assertEquals(state.value(), timer.getNamespace()); + getRuntimeContext().getAccumulator(SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR).add(1); + } + } + + --- End diff -- remove 2 empty lines
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---