[ https://issues.apache.org/jira/browse/FLINK-5969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15993060#comment-15993060 ]
ASF GitHub Bot commented on FLINK-5969: --------------------------------------- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3778#discussion_r114342461 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java --- @@ -0,0 +1,771 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing.utils; + +import static org.junit.Assert.assertEquals; + +import org.apache.flink.api.common.accumulators.IntCounter; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Collector; +import org.junit.Ignore; +import org.junit.Test; + +/** + * This verifies that we can restore a complete job from a Flink 1.2 savepoint. + * + * <p>The test pipeline contains both "Checkpointed" state and keyed user state. + * + * <p>The tests will time out if they don't see the required number of successful checks within + * a time limit. + */ +public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigrationTestBase { + private static final int NUM_SOURCE_ELEMENTS = 4; + + /** + * This has to be manually executed to create the savepoint on Flink 1.2. + */ + @Test + @Ignore + public void testCreateSavepointOnFlink12() throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + // we only test memory state backend yet + env.setStateBackend(new MemoryStateBackend()); + env.enableCheckpointing(500); + env.setParallelism(4); + env.setMaxParallelism(4); + + env + .addSource(new LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource") + .flatMap(new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap") + .keyBy(0) + .flatMap(new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState") + .keyBy(0) + .flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap") + .keyBy(0) + .transform( + "custom_operator", + new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(), + new CheckpointedUdfOperator(new LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator") + .keyBy(0) + .transform( + "timely_stateful_operator", + new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(), + new TimelyStatefulOperator()).uid("TimelyStatefulOperator") + .addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>()); + + executeAndSavepoint( + env, + "src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint", + new Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS)); + } + + /** + * This has to be manually executed to create the savepoint on Flink 1.2. + */ + @Test + @Ignore + public void testCreateSavepointOnFlink12WithRocksDB() throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + RocksDBStateBackend rocksBackend = + new RocksDBStateBackend(new MemoryStateBackend()); + env.setStateBackend(rocksBackend); + env.enableCheckpointing(500); + env.setParallelism(4); + env.setMaxParallelism(4); + + env + .addSource(new LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource") + .flatMap(new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap") + .keyBy(0) + .flatMap(new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState") + .keyBy(0) + .flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap") + .keyBy(0) + .transform( + "custom_operator", + new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(), + new CheckpointedUdfOperator(new LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator") + .keyBy(0) + .transform( + "timely_stateful_operator", + new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(), + new TimelyStatefulOperator()).uid("TimelyStatefulOperator") + .addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>()); + + executeAndSavepoint( + env, + "src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint-rocksdb", + new Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS)); + } + + + @Test + public void testSavepointRestoreFromFlink12() throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(RestartStrategies.noRestart()); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + // we only test memory state backend yet + env.setStateBackend(new MemoryStateBackend()); + env.enableCheckpointing(500); + env.setParallelism(4); + env.setMaxParallelism(4); + + env + .addSource(new CheckingRestoringSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource") + .flatMap(new CheckingRestoringFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap") + .keyBy(0) + .flatMap(new CheckingRestoringFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState") + .keyBy(0) + .flatMap(new CheckingKeyedStateFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap") + .keyBy(0) + .transform( + "custom_operator", + new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(), + new CheckingRestoringUdfOperator(new CheckingRestoringFlatMapWithKeyedStateInOperator())).uid("LegacyCheckpointedOperator") + .keyBy(0) + .transform( + "timely_stateful_operator", + new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(), + new CheckingTimelyStatefulOperator()).uid("TimelyStatefulOperator") + .addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>()); + + restoreAndExecute( + env, + getResourceFilename("stateful-udf-migration-itcase-flink1.2-savepoint"), + new Tuple2<>(CheckingRestoringSource.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 1), + new Tuple2<>(CheckingRestoringFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS), + new Tuple2<>(CheckingRestoringFlatMapWithKeyedState.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS), + new Tuple2<>(CheckingKeyedStateFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS), + new Tuple2<>(CheckingRestoringUdfOperator.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS), + new Tuple2<>(CheckingRestoringFlatMapWithKeyedStateInOperator.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS), + new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS), + new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS), + new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS), + new Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS)); + } + + @Test + public void testSavepointRestoreFromFlink12FromRocksDB() throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(RestartStrategies.noRestart()); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + // we only test memory state backend yet + env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend())); + env.enableCheckpointing(500); + env.setParallelism(4); + env.setMaxParallelism(4); + + env + .addSource(new CheckingRestoringSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource") + .flatMap(new CheckingRestoringFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap") + .keyBy(0) + .flatMap(new CheckingRestoringFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState") + .keyBy(0) + .flatMap(new CheckingKeyedStateFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap") + .keyBy(0) + .transform( + "custom_operator", + new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(), + new CheckingRestoringUdfOperator(new CheckingRestoringFlatMapWithKeyedStateInOperator())).uid("LegacyCheckpointedOperator") + .keyBy(0) + .transform( + "timely_stateful_operator", + new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(), + new CheckingTimelyStatefulOperator()).uid("TimelyStatefulOperator") + .addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>()); + + restoreAndExecute( + env, + getResourceFilename("stateful-udf-migration-itcase-flink1.2-savepoint-rocksdb"), + new Tuple2<>(CheckingRestoringSource.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 1), + new Tuple2<>(CheckingRestoringFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS), + new Tuple2<>(CheckingRestoringFlatMapWithKeyedState.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS), + new Tuple2<>(CheckingKeyedStateFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS), + new Tuple2<>(CheckingRestoringUdfOperator.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS), + new Tuple2<>(CheckingRestoringFlatMapWithKeyedStateInOperator.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS), + new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS), + new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS), + new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS), + new Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS)); + } + + private static class LegacyCheckpointedSource + implements SourceFunction<Tuple2<Long, Long>>, Checkpointed<String> { + + public static String CHECKPOINTED_STRING = "Here be dragons!"; + + private static final long serialVersionUID = 1L; + + private volatile boolean isRunning = true; + + private final int numElements; + + public LegacyCheckpointedSource(int numElements) { + this.numElements = numElements; + } + + @Override + public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception { + + ctx.emitWatermark(new Watermark(0)); + + synchronized (ctx.getCheckpointLock()) { + for (long i = 0; i < numElements; i++) { + ctx.collect(new Tuple2<>(i, i)); + } + } + + // don't emit a final watermark so that we don't trigger the registered event-time + // timers + while (isRunning) { + Thread.sleep(20); + } + } + + @Override + public void cancel() { + isRunning = false; + } + + @Override + public void restoreState(String state) throws Exception { + assertEquals(CHECKPOINTED_STRING, state); + } + + @Override + public String snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + return CHECKPOINTED_STRING; + } + } + + private static class CheckingRestoringSource + extends RichSourceFunction<Tuple2<Long, Long>> + implements CheckpointedRestoring<String> { + + private static final long serialVersionUID = 1L; + + public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingRestoringSource.class + "_RESTORE_CHECK"; + + private volatile boolean isRunning = true; + + private final int numElements; + + private String restoredState; + + public CheckingRestoringSource(int numElements) { + this.numElements = numElements; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter()); + } + + @Override + public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception { + assertEquals(LegacyCheckpointedSource.CHECKPOINTED_STRING, restoredState); + getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1); + + // immediately trigger any set timers + ctx.emitWatermark(new Watermark(1000)); + + synchronized (ctx.getCheckpointLock()) { + for (long i = 0; i < numElements; i++) { + ctx.collect(new Tuple2<>(i, i)); + } + } + + while (isRunning) { + Thread.sleep(20); + } + } + + @Override + public void cancel() { + isRunning = false; + } + + @Override + public void restoreState(String state) throws Exception { + restoredState = state; + } + } + + public static class LegacyCheckpointedFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> + implements Checkpointed<Tuple2<String, Long>> { + + private static final long serialVersionUID = 1L; + + public static Tuple2<String, Long> CHECKPOINTED_TUPLE = + new Tuple2<>("hello", 42L); + + @Override + public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception { + out.collect(value); + } + + @Override + public void restoreState(Tuple2<String, Long> state) throws Exception { + } + + @Override + public Tuple2<String, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + return CHECKPOINTED_TUPLE; + } + } + + public static class CheckingRestoringFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> + implements CheckpointedRestoring<Tuple2<String, Long>> { + + private static final long serialVersionUID = 1L; + + public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingRestoringFlatMap.class + "_RESTORE_CHECK"; + + private transient Tuple2<String, Long> restoredState; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter()); + } + + @Override + public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception { + out.collect(value); + + assertEquals(LegacyCheckpointedFlatMap.CHECKPOINTED_TUPLE, restoredState); + getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1); + + } + + @Override + public void restoreState(Tuple2<String, Long> state) throws Exception { + restoredState = state; + } + } + + public static class LegacyCheckpointedFlatMapWithKeyedState + extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> + implements Checkpointed<Tuple2<String, Long>> { + + private static final long serialVersionUID = 1L; + + public static Tuple2<String, Long> CHECKPOINTED_TUPLE = + new Tuple2<>("hello", 42L); + + private final ValueStateDescriptor<Long> stateDescriptor = + new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE); + + @Override + public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception { + out.collect(value); + + getRuntimeContext().getState(stateDescriptor).update(value.f1); + + assertEquals(value.f1, getRuntimeContext().getState(stateDescriptor).value()); + } + + @Override + public void restoreState(Tuple2<String, Long> state) throws Exception { + } + + @Override + public Tuple2<String, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + return CHECKPOINTED_TUPLE; + } + } + + public static class CheckingRestoringFlatMapWithKeyedState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> + implements CheckpointedRestoring<Tuple2<String, Long>> { + + private static final long serialVersionUID = 1L; + + public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingRestoringFlatMapWithKeyedState.class + "_RESTORE_CHECK"; + + private transient Tuple2<String, Long> restoredState; + + private final ValueStateDescriptor<Long> stateDescriptor = + new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE); + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter()); + } + + @Override + public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception { + out.collect(value); + + ValueState<Long> state = getRuntimeContext().getState(stateDescriptor); + if (state == null) { + throw new RuntimeException("Missing key value state for " + value); + } + + assertEquals(value.f1, state.value()); + assertEquals(LegacyCheckpointedFlatMap.CHECKPOINTED_TUPLE, restoredState); + getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1); + } + + @Override + public void restoreState(Tuple2<String, Long> state) throws Exception { + restoredState = state; + } + } + + public static class CheckingRestoringFlatMapWithKeyedStateInOperator extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> + implements CheckpointedRestoring<Tuple2<String, Long>> { + + private static final long serialVersionUID = 1L; + + public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingRestoringFlatMapWithKeyedStateInOperator.class + "_RESTORE_CHECK"; + + private transient Tuple2<String, Long> restoredState; + + private final ValueStateDescriptor<Long> stateDescriptor = + new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE); + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter()); + } + + @Override + public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception { + out.collect(value); + + ValueState<Long> state = getRuntimeContext().getState(stateDescriptor); + if (state == null) { + throw new RuntimeException("Missing key value state for " + value); + } + + assertEquals(value.f1, state.value()); + assertEquals(LegacyCheckpointedFlatMap.CHECKPOINTED_TUPLE, restoredState); + getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1); + } + + @Override + public void restoreState(Tuple2<String, Long> state) throws Exception { + restoredState = state; + } + } + + public static class KeyedStateSettingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { + + private static final long serialVersionUID = 1L; + + private final ValueStateDescriptor<Long> stateDescriptor = + new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE); + + @Override + public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception { + out.collect(value); + + getRuntimeContext().getState(stateDescriptor).update(value.f1); + } + } + + public static class CheckingKeyedStateFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { + + private static final long serialVersionUID = 1L; + + public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingKeyedStateFlatMap.class + "_RESTORE_CHECK"; + + private final ValueStateDescriptor<Long> stateDescriptor = + new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE); + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter()); + } + + @Override + public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception { + out.collect(value); + + ValueState<Long> state = getRuntimeContext().getState(stateDescriptor); + if (state == null) { + throw new RuntimeException("Missing key value state for " + value); + } + + assertEquals(value.f1, state.value()); + getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1); + } + } + + public static class CheckpointedUdfOperator + extends AbstractUdfStreamOperator<Tuple2<Long, Long>, FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>> + implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> { + private static final long serialVersionUID = 1L; + + private static final String CHECKPOINTED_STRING = "Oh my, that's nice!"; + + public CheckpointedUdfOperator(FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> userFunction) { + super(userFunction); + } + + @Override + public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception { + userFunction.flatMap(element.getValue(), new TimestampedCollector<>(output)); + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + output.emitWatermark(mark); + } + + @Override + public void snapshotState( + FSDataOutputStream out, long checkpointId, long timestamp) throws Exception { + super.snapshotState(out, checkpointId, timestamp); + + DataOutputViewStreamWrapper streamWrapper = new DataOutputViewStreamWrapper(out); + + streamWrapper.writeUTF(CHECKPOINTED_STRING); + streamWrapper.flush(); + } + } + + public static class CheckingRestoringUdfOperator + extends AbstractUdfStreamOperator<Tuple2<Long, Long>, FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>> + implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> { + + private static final long serialVersionUID = 1L; + + public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingRestoringUdfOperator.class + "_RESTORE_CHECK"; + + private String restoredState; + + public CheckingRestoringUdfOperator(FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> userFunction) { + super(userFunction); + } + + @Override + public void open() throws Exception { + super.open(); + + getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter()); + } + + @Override + public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception { + userFunction.flatMap(element.getValue(), new TimestampedCollector<>(output)); + + assertEquals(CheckpointedUdfOperator.CHECKPOINTED_STRING, restoredState); + getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1); + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + output.emitWatermark(mark); + } + + @Override + public void restoreState(FSDataInputStream in) throws Exception { + super.restoreState(in); + + DataInputViewStreamWrapper streamWrapper = new DataInputViewStreamWrapper(in); + + restoredState = streamWrapper.readUTF(); + } + } + + public static class TimelyStatefulOperator + extends AbstractStreamOperator<Tuple2<Long, Long>> + implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>>, Triggerable<Long, Long> { + private static final long serialVersionUID = 1L; + + private final ValueStateDescriptor<Long> stateDescriptor = + new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE); + + private transient InternalTimerService<Long> timerService; + + @Override + public void open() throws Exception { + super.open(); + + timerService = getInternalTimerService( + "timer", + LongSerializer.INSTANCE, + this); + + } + + @Override + public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception { + ValueState<Long> state = getKeyedStateBackend().getPartitionedState( + element.getValue().f0, + LongSerializer.INSTANCE, + stateDescriptor); + + state.update(element.getValue().f1); + + timerService.registerEventTimeTimer(element.getValue().f0, timerService.currentWatermark() + 10); + timerService.registerProcessingTimeTimer(element.getValue().f0, timerService.currentProcessingTime() + 30_000); + + output.collect(element); + } + + @Override + public void onEventTime(InternalTimer<Long, Long> timer) throws Exception { + + } + + @Override + public void onProcessingTime(InternalTimer<Long, Long> timer) throws Exception { + + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + output.emitWatermark(mark); + } + } + + public static class CheckingTimelyStatefulOperator + extends AbstractStreamOperator<Tuple2<Long, Long>> + implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>>, Triggerable<Long, Long> { + private static final long serialVersionUID = 1L; + + public static final String SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR = CheckingTimelyStatefulOperator.class + "_PROCESS_CHECKS"; + public static final String SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR = CheckingTimelyStatefulOperator.class + "_ET_CHECKS"; + public static final String SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR = CheckingTimelyStatefulOperator.class + "_PT_CHECKS"; + + private final ValueStateDescriptor<Long> stateDescriptor = + new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE); + + private transient InternalTimerService<Long> timerService; + + @Override + public void open() throws Exception { + super.open(); + + timerService = getInternalTimerService( + "timer", + LongSerializer.INSTANCE, + this); + + getRuntimeContext().addAccumulator(SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR, new IntCounter()); + getRuntimeContext().addAccumulator(SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR, new IntCounter()); + getRuntimeContext().addAccumulator(SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR, new IntCounter()); + } + + @Override + public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception { + ValueState<Long> state = getKeyedStateBackend().getPartitionedState( + element.getValue().f0, + LongSerializer.INSTANCE, + stateDescriptor); + + assertEquals(state.value(), element.getValue().f1); + getRuntimeContext().getAccumulator(SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR).add(1); + + output.collect(element); + } + + @Override + public void onEventTime(InternalTimer<Long, Long> timer) throws Exception { + ValueState<Long> state = getKeyedStateBackend().getPartitionedState( + timer.getNamespace(), + LongSerializer.INSTANCE, + stateDescriptor); + + assertEquals(state.value(), timer.getNamespace()); + getRuntimeContext().getAccumulator(SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR).add(1); + } + + @Override + public void onProcessingTime(InternalTimer<Long, Long> timer) throws Exception { + ValueState<Long> state = getKeyedStateBackend().getPartitionedState( + timer.getNamespace(), + LongSerializer.INSTANCE, + stateDescriptor); + + assertEquals(state.value(), timer.getNamespace()); + getRuntimeContext().getAccumulator(SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR).add(1); + } + } + + --- End diff -- Fixing > Add savepoint backwards compatibility tests from 1.2 to 1.3 > ----------------------------------------------------------- > > Key: FLINK-5969 > URL: https://issues.apache.org/jira/browse/FLINK-5969 > Project: Flink > Issue Type: Improvement > Components: Tests > Affects Versions: 1.3.0 > Reporter: Aljoscha Krettek > Assignee: Aljoscha Krettek > Priority: Blocker > Fix For: 1.3.0 > > > We currently only have tests that test migration from 1.1 to 1.3, because we > added these tests when releasing Flink 1.2. > We have to copy/migrate those tests: > - {{StatefulUDFSavepointMigrationITCase}} > - {{*MigrationTest}} > - {{AbstractKeyedCEPPatternOperator}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)