Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3778#discussion_r114342231
  
    --- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java
 ---
    @@ -0,0 +1,771 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.checkpointing.utils;
    +
    +import static org.junit.Assert.assertEquals;
    +
    +import org.apache.flink.api.common.accumulators.IntCounter;
    +import org.apache.flink.api.common.functions.FlatMapFunction;
    +import org.apache.flink.api.common.functions.RichFlatMapFunction;
    +import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    +import org.apache.flink.api.common.state.ValueState;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeinfo.TypeHint;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
    +import org.apache.flink.core.fs.FSDataInputStream;
    +import org.apache.flink.core.fs.FSDataOutputStream;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.memory.MemoryStateBackend;
    +import org.apache.flink.streaming.api.TimeCharacteristic;
    +import org.apache.flink.streaming.api.checkpoint.Checkpointed;
    +import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
    +import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
    +import org.apache.flink.streaming.api.operators.InternalTimer;
    +import org.apache.flink.streaming.api.operators.InternalTimerService;
    +import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
    +import org.apache.flink.streaming.api.operators.TimestampedCollector;
    +import org.apache.flink.streaming.api.operators.Triggerable;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.util.Collector;
    +import org.junit.Ignore;
    +import org.junit.Test;
    +
    +/**
    + * This verifies that we can restore a complete job from a Flink 1.2 
savepoint.
    + *
    + * <p>The test pipeline contains both "Checkpointed" state and keyed user 
state.
    + *
    + * <p>The tests will time out if they don't see the required number of 
successful checks within
    + * a time limit.
    + */
    +public class StatefulJobSavepointFrom12MigrationITCase extends 
SavepointMigrationTestBase {
    +   private static final int NUM_SOURCE_ELEMENTS = 4;
    +
    +   /**
    +    * This has to be manually executed to create the savepoint on Flink 
1.2.
    +    */
    +   @Test
    +   @Ignore
    +   public void testCreateSavepointOnFlink12() throws Exception {
    +
    +           final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    +           env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +           // we only test memory state backend yet
    +           env.setStateBackend(new MemoryStateBackend());
    +           env.enableCheckpointing(500);
    +           env.setParallelism(4);
    +           env.setMaxParallelism(4);
    +
    +           env
    +                           .addSource(new 
LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
    +                           .flatMap(new 
LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
    +                           .keyBy(0)
    +                           .flatMap(new 
LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
    +                           .keyBy(0)
    +                           .flatMap(new 
KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
    +                           .keyBy(0)
    +                           .transform(
    +                                           "custom_operator",
    +                                           new TypeHint<Tuple2<Long, 
Long>>() {}.getTypeInfo(),
    +                                           new CheckpointedUdfOperator(new 
LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
    +                           .keyBy(0)
    +                           .transform(
    +                                           "timely_stateful_operator",
    +                                           new TypeHint<Tuple2<Long, 
Long>>() {}.getTypeInfo(),
    +                                           new 
TimelyStatefulOperator()).uid("TimelyStatefulOperator")
    +                           .addSink(new 
AccumulatorCountingSink<Tuple2<Long, Long>>());
    +
    +           executeAndSavepoint(
    +                           env,
    +                           
"src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint",
    +                           new 
Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, 
NUM_SOURCE_ELEMENTS));
    +   }
    +
    +   /**
    +    * This has to be manually executed to create the savepoint on Flink 
1.2.
    +    */
    +   @Test
    +   @Ignore
    +   public void testCreateSavepointOnFlink12WithRocksDB() throws Exception {
    +
    +           final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    +           env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +           RocksDBStateBackend rocksBackend =
    +                           new RocksDBStateBackend(new 
MemoryStateBackend());
    +           env.setStateBackend(rocksBackend);
    +           env.enableCheckpointing(500);
    +           env.setParallelism(4);
    +           env.setMaxParallelism(4);
    +
    +           env
    +                           .addSource(new 
LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
    +                           .flatMap(new 
LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
    +                           .keyBy(0)
    +                           .flatMap(new 
LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
    +                           .keyBy(0)
    +                           .flatMap(new 
KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
    +                           .keyBy(0)
    +                           .transform(
    +                                           "custom_operator",
    +                                           new TypeHint<Tuple2<Long, 
Long>>() {}.getTypeInfo(),
    +                                           new CheckpointedUdfOperator(new 
LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
    +                           .keyBy(0)
    +                           .transform(
    +                                           "timely_stateful_operator",
    +                                           new TypeHint<Tuple2<Long, 
Long>>() {}.getTypeInfo(),
    +                                           new 
TimelyStatefulOperator()).uid("TimelyStatefulOperator")
    +                           .addSink(new 
AccumulatorCountingSink<Tuple2<Long, Long>>());
    +
    +           executeAndSavepoint(
    +                           env,
    +                           
"src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint-rocksdb",
    +                           new 
Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, 
NUM_SOURCE_ELEMENTS));
    +   }
    +
    +
    +   @Test
    +   public void testSavepointRestoreFromFlink12() throws Exception {
    +
    +           final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    +           env.setRestartStrategy(RestartStrategies.noRestart());
    +           env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +           // we only test memory state backend yet
    +           env.setStateBackend(new MemoryStateBackend());
    +           env.enableCheckpointing(500);
    +           env.setParallelism(4);
    +           env.setMaxParallelism(4);
    +
    +           env
    +                           .addSource(new 
CheckingRestoringSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
    +                           .flatMap(new 
CheckingRestoringFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
    +                           .keyBy(0)
    +                           .flatMap(new 
CheckingRestoringFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
    +                           .keyBy(0)
    +                           .flatMap(new 
CheckingKeyedStateFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
    +                           .keyBy(0)
    +                           .transform(
    +                                           "custom_operator",
    +                                           new TypeHint<Tuple2<Long, 
Long>>() {}.getTypeInfo(),
    +                                           new 
CheckingRestoringUdfOperator(new 
CheckingRestoringFlatMapWithKeyedStateInOperator())).uid("LegacyCheckpointedOperator")
    +                           .keyBy(0)
    +                           .transform(
    +                                           "timely_stateful_operator",
    +                                           new TypeHint<Tuple2<Long, 
Long>>() {}.getTypeInfo(),
    +                                           new 
CheckingTimelyStatefulOperator()).uid("TimelyStatefulOperator")
    +                           .addSink(new 
AccumulatorCountingSink<Tuple2<Long, Long>>());
    +
    +           restoreAndExecute(
    +                           env,
    +                           
getResourceFilename("stateful-udf-migration-itcase-flink1.2-savepoint"),
    +                           new 
Tuple2<>(CheckingRestoringSource.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 1),
    +                           new 
Tuple2<>(CheckingRestoringFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 
NUM_SOURCE_ELEMENTS),
    +                           new 
Tuple2<>(CheckingRestoringFlatMapWithKeyedState.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
 NUM_SOURCE_ELEMENTS),
    +                           new 
Tuple2<>(CheckingKeyedStateFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 
NUM_SOURCE_ELEMENTS),
    +                           new 
Tuple2<>(CheckingRestoringUdfOperator.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 
NUM_SOURCE_ELEMENTS),
    +                           new 
Tuple2<>(CheckingRestoringFlatMapWithKeyedStateInOperator.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
 NUM_SOURCE_ELEMENTS),
    +                           new 
Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR, 
NUM_SOURCE_ELEMENTS),
    +                           new 
Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR,
 NUM_SOURCE_ELEMENTS),
    +                           new 
Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR,
 NUM_SOURCE_ELEMENTS),
    +                           new 
Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, 
NUM_SOURCE_ELEMENTS));
    +   }
    +
    +   @Test
    +   public void testSavepointRestoreFromFlink12FromRocksDB() throws 
Exception {
    +
    +           final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    +           env.setRestartStrategy(RestartStrategies.noRestart());
    +           env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +           // we only test memory state backend yet
    +           env.setStateBackend(new RocksDBStateBackend(new 
MemoryStateBackend()));
    +           env.enableCheckpointing(500);
    +           env.setParallelism(4);
    +           env.setMaxParallelism(4);
    +
    +           env
    +                           .addSource(new 
CheckingRestoringSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
    +                           .flatMap(new 
CheckingRestoringFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
    +                           .keyBy(0)
    +                           .flatMap(new 
CheckingRestoringFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
    +                           .keyBy(0)
    +                           .flatMap(new 
CheckingKeyedStateFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
    +                           .keyBy(0)
    +                           .transform(
    +                                           "custom_operator",
    +                                           new TypeHint<Tuple2<Long, 
Long>>() {}.getTypeInfo(),
    +                                           new 
CheckingRestoringUdfOperator(new 
CheckingRestoringFlatMapWithKeyedStateInOperator())).uid("LegacyCheckpointedOperator")
    +                           .keyBy(0)
    +                           .transform(
    +                                           "timely_stateful_operator",
    +                                           new TypeHint<Tuple2<Long, 
Long>>() {}.getTypeInfo(),
    +                                           new 
CheckingTimelyStatefulOperator()).uid("TimelyStatefulOperator")
    +                           .addSink(new 
AccumulatorCountingSink<Tuple2<Long, Long>>());
    +
    +           restoreAndExecute(
    +                           env,
    +                           
getResourceFilename("stateful-udf-migration-itcase-flink1.2-savepoint-rocksdb"),
    +                           new 
Tuple2<>(CheckingRestoringSource.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 1),
    +                           new 
Tuple2<>(CheckingRestoringFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 
NUM_SOURCE_ELEMENTS),
    +                           new 
Tuple2<>(CheckingRestoringFlatMapWithKeyedState.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
 NUM_SOURCE_ELEMENTS),
    +                           new 
Tuple2<>(CheckingKeyedStateFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 
NUM_SOURCE_ELEMENTS),
    +                           new 
Tuple2<>(CheckingRestoringUdfOperator.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 
NUM_SOURCE_ELEMENTS),
    +                           new 
Tuple2<>(CheckingRestoringFlatMapWithKeyedStateInOperator.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
 NUM_SOURCE_ELEMENTS),
    +                           new 
Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR, 
NUM_SOURCE_ELEMENTS),
    +                           new 
Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR,
 NUM_SOURCE_ELEMENTS),
    +                           new 
Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR,
 NUM_SOURCE_ELEMENTS),
    +                           new 
Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, 
NUM_SOURCE_ELEMENTS));
    +   }
    +
    +   private static class LegacyCheckpointedSource
    +                   implements SourceFunction<Tuple2<Long, Long>>, 
Checkpointed<String> {
    +
    +           public static String CHECKPOINTED_STRING = "Here be dragons!";
    +
    +           private static final long serialVersionUID = 1L;
    +
    +           private volatile boolean isRunning = true;
    +
    +           private final int numElements;
    +
    +           public LegacyCheckpointedSource(int numElements) {
    +                   this.numElements = numElements;
    +           }
    +
    +           @Override
    +           public void run(SourceContext<Tuple2<Long, Long>> ctx) throws 
Exception {
    +
    +                   ctx.emitWatermark(new Watermark(0));
    +
    +                   synchronized (ctx.getCheckpointLock()) {
    +                           for (long i = 0; i < numElements; i++) {
    +                                   ctx.collect(new Tuple2<>(i, i));
    +                           }
    +                   }
    +
    +                   // don't emit a final watermark so that we don't 
trigger the registered event-time
    +                   // timers
    +                   while (isRunning) {
    +                           Thread.sleep(20);
    +                   }
    +           }
    +
    +           @Override
    +           public void cancel() {
    +                   isRunning = false;
    +           }
    +
    +           @Override
    +           public void restoreState(String state) throws Exception {
    +                   assertEquals(CHECKPOINTED_STRING, state);
    +           }
    +
    +           @Override
    +           public String snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
    +                   return CHECKPOINTED_STRING;
    +           }
    +   }
    +
    +   private static class CheckingRestoringSource
    +                   extends RichSourceFunction<Tuple2<Long, Long>>
    +                   implements CheckpointedRestoring<String> {
    +
    +           private static final long serialVersionUID = 1L;
    +
    +           public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR 
= CheckingRestoringSource.class + "_RESTORE_CHECK";
    +
    +           private volatile boolean isRunning = true;
    +
    +           private final int numElements;
    +
    +           private String restoredState;
    +
    +           public CheckingRestoringSource(int numElements) {
    +                   this.numElements = numElements;
    +           }
    +
    +           @Override
    +           public void open(Configuration parameters) throws Exception {
    +                   super.open(parameters);
    +
    +                   
getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new 
IntCounter());
    +           }
    +
    +           @Override
    +           public void run(SourceContext<Tuple2<Long, Long>> ctx) throws 
Exception {
    +                   
assertEquals(LegacyCheckpointedSource.CHECKPOINTED_STRING, restoredState);
    +                   
getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
    +
    +                   // immediately trigger any set timers
    +                   ctx.emitWatermark(new Watermark(1000));
    +
    +                   synchronized (ctx.getCheckpointLock()) {
    +                           for (long i = 0; i < numElements; i++) {
    +                                   ctx.collect(new Tuple2<>(i, i));
    +                           }
    +                   }
    +
    +                   while (isRunning) {
    +                           Thread.sleep(20);
    +                   }
    +           }
    +
    +           @Override
    +           public void cancel() {
    +                   isRunning = false;
    +           }
    +
    +           @Override
    +           public void restoreState(String state) throws Exception {
    +                   restoredState = state;
    +           }
    +   }
    +
    +   public static class LegacyCheckpointedFlatMap extends 
RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
    +                   implements Checkpointed<Tuple2<String, Long>> {
    +
    +           private static final long serialVersionUID = 1L;
    +
    +           public static Tuple2<String, Long> CHECKPOINTED_TUPLE =
    +                           new Tuple2<>("hello", 42L);
    +
    +           @Override
    +           public void flatMap(Tuple2<Long, Long> value, 
Collector<Tuple2<Long, Long>> out) throws Exception {
    +                   out.collect(value);
    +           }
    +
    +           @Override
    +           public void restoreState(Tuple2<String, Long> state) throws 
Exception {
    +           }
    +
    +           @Override
    +           public Tuple2<String, Long> snapshotState(long checkpointId, 
long checkpointTimestamp) throws Exception {
    +                   return CHECKPOINTED_TUPLE;
    +           }
    +   }
    +
    +   public static class CheckingRestoringFlatMap extends 
RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
    +                   implements CheckpointedRestoring<Tuple2<String, Long>> {
    +
    +           private static final long serialVersionUID = 1L;
    +
    +           public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR 
= CheckingRestoringFlatMap.class + "_RESTORE_CHECK";
    +
    +           private transient Tuple2<String, Long> restoredState;
    +
    +           @Override
    +           public void open(Configuration parameters) throws Exception {
    +                   super.open(parameters);
    +
    +                   
getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new 
IntCounter());
    +           }
    +
    +           @Override
    +           public void flatMap(Tuple2<Long, Long> value, 
Collector<Tuple2<Long, Long>> out) throws Exception {
    +                   out.collect(value);
    +
    +                   
assertEquals(LegacyCheckpointedFlatMap.CHECKPOINTED_TUPLE, restoredState);
    +                   
getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
    +
    +           }
    +
    +           @Override
    +           public void restoreState(Tuple2<String, Long> state) throws 
Exception {
    +                   restoredState = state;
    +           }
    +   }
    +
    +   public static class LegacyCheckpointedFlatMapWithKeyedState
    +                   extends RichFlatMapFunction<Tuple2<Long, Long>, 
Tuple2<Long, Long>>
    +                   implements Checkpointed<Tuple2<String, Long>> {
    +
    +           private static final long serialVersionUID = 1L;
    +
    +           public static Tuple2<String, Long> CHECKPOINTED_TUPLE =
    +                           new Tuple2<>("hello", 42L);
    +
    +           private final ValueStateDescriptor<Long> stateDescriptor =
    +                           new ValueStateDescriptor<Long>("state-name", 
LongSerializer.INSTANCE);
    +
    +           @Override
    +           public void flatMap(Tuple2<Long, Long> value, 
Collector<Tuple2<Long, Long>> out) throws Exception {
    +                   out.collect(value);
    +
    +                   
getRuntimeContext().getState(stateDescriptor).update(value.f1);
    +
    +                   assertEquals(value.f1, 
getRuntimeContext().getState(stateDescriptor).value());
    --- End diff --
    
    There was a point in debugging this whole thing were I wasn't sure wether 
state was set correctly, so I introduced this sanity check. Could remove it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to