AHeise commented on a change in pull request #15294: URL: https://github.com/apache/flink/pull/15294#discussion_r604157199
########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java ########## @@ -99,87 +104,156 @@ @RunWith(Parameterized.class) @Category(FailsWithAdaptiveScheduler.class) // FLINK-21689 public class UnalignedCheckpointITCase extends UnalignedCheckpointTestBase { + enum Topology implements DagCreator { + PIPELINE { + @Override + public void create( + StreamExecutionEnvironment env, + int minCheckpoints, + boolean slotSharing, + int expectedRestarts) { + final int parallelism = env.getParallelism(); + final SingleOutputStreamOperator<Long> stream = + env.fromSource( + new LongSource( + minCheckpoints, + parallelism, + expectedRestarts, + env.getCheckpointInterval()), + noWatermarks(), + "source") + .slotSharingGroup(slotSharing ? "default" : "source") + .disableChaining() + .map(i -> checkHeader(i)) + .name("forward") + .uid("forward") + .slotSharingGroup(slotSharing ? "default" : "forward") + .keyBy(i -> withoutHeader(i) % parallelism * parallelism) + .process(new KeyedIdentityFunction()) + .name("keyed") + .uid("keyed"); + addFailingPipeline(minCheckpoints, slotSharing, stream); + } + }, + + MULTI_INPUT { + @Override + public void create( + StreamExecutionEnvironment env, + int minCheckpoints, + boolean slotSharing, + int expectedRestarts) { + final int parallelism = env.getParallelism(); + DataStream<Long> combinedSource = null; + for (int inputIndex = 0; inputIndex < NUM_SOURCES; inputIndex++) { + final SingleOutputStreamOperator<Long> source = + env.fromSource( + new LongSource( + minCheckpoints, + parallelism, + expectedRestarts, + env.getCheckpointInterval()), + noWatermarks(), + "source" + inputIndex) + .slotSharingGroup( + slotSharing ? "default" : ("source" + inputIndex)) + .disableChaining(); + combinedSource = + combinedSource == null + ? source + : combinedSource + .connect(source) + .flatMap(new MinEmittingFunction()) + .name("min" + inputIndex) + .uid("min" + inputIndex) + .slotSharingGroup( + slotSharing ? "default" : ("min" + inputIndex)); + } - @Parameterized.Parameters(name = "{0}") - public static Object[][] parameters() { - return new Object[][] { - new Object[] { - "non-parallel pipeline with local channels", createPipelineSettings(1, 1, true) - }, - new Object[] { - "non-parallel pipeline with remote channels", createPipelineSettings(1, 1, false) - }, - new Object[] { - "parallel pipeline with local channels, p = 5", createPipelineSettings(5, 5, true) - }, - new Object[] { - "parallel pipeline with remote channels, p = 5", createPipelineSettings(5, 1, false) - }, - new Object[] { - "parallel pipeline with mixed channels, p = 5", createPipelineSettings(5, 3, true) - }, - new Object[] { - "parallel pipeline with mixed channels, p = 20", - createPipelineSettings(20, 10, true) - }, - new Object[] { - "parallel pipeline with mixed channels, p = 20, timeout=1", - createPipelineSettings(20, 10, true, 1) - }, - new Object[] {"Parallel cogroup, p = 5", createCogroupSettings(5)}, - new Object[] {"Parallel cogroup, p = 10", createCogroupSettings(10)}, - new Object[] {"Parallel union, p = 5", createUnionSettings(5)}, - new Object[] {"Parallel union, p = 10", createUnionSettings(10)}, - }; - } + addFailingPipeline(minCheckpoints, slotSharing, combinedSource); + } + }, + + UNION { + @Override + public void create( + StreamExecutionEnvironment env, + int minCheckpoints, + boolean slotSharing, + int expectedRestarts) { + final int parallelism = env.getParallelism(); + DataStream<Long> combinedSource = null; + for (int inputIndex = 0; inputIndex < NUM_SOURCES; inputIndex++) { + final SingleOutputStreamOperator<Long> source = + env.fromSource( + new LongSource( + minCheckpoints, + parallelism, + expectedRestarts, + env.getCheckpointInterval()), + noWatermarks(), + "source" + inputIndex) + .slotSharingGroup( + slotSharing ? "default" : ("source" + inputIndex)) + .disableChaining(); + combinedSource = combinedSource == null ? source : combinedSource.union(source); + } - private static UnalignedSettings createPipelineSettings( - int parallelism, int slotsPerTaskManager, boolean slotSharing) { - return createPipelineSettings(parallelism, slotsPerTaskManager, slotSharing, 0); - } + final SingleOutputStreamOperator<Long> deduplicated = + combinedSource + .partitionCustom( + (key, numPartitions) -> + (int) (withoutHeader(key) % numPartitions), + l -> l) + .flatMap(new CountingMapFunction(NUM_SOURCES)); + addFailingPipeline(minCheckpoints, slotSharing, deduplicated); + } + }; - private static UnalignedSettings createPipelineSettings( - int parallelism, int slotsPerTaskManager, boolean slotSharing, int timeout) { - int numShuffles = 4; - return new UnalignedSettings(UnalignedCheckpointITCase::createPipeline) - .setParallelism(parallelism) - .setSlotSharing(slotSharing) - .setNumSlots(slotSharing ? parallelism : parallelism * numShuffles) - .setSlotsPerTaskManager(slotsPerTaskManager) - .setExpectedFailures(5) - .setFailuresAfterSourceFinishes(1) - .setAlignmentTimeout(timeout); + @Override + public String toString() { + return name().toLowerCase(); + } } - private static UnalignedSettings createCogroupSettings(int parallelism) { - int numShuffles = 10; - return new UnalignedSettings(UnalignedCheckpointITCase::createMultipleInputTopology) - .setParallelism(parallelism) - .setSlotSharing(true) - .setNumSlots(parallelism * numShuffles) - .setSlotsPerTaskManager(parallelism) - .setExpectedFailures(5) - .setFailuresAfterSourceFinishes(1); + @Parameterized.Parameters(name = "{0} with {2} channels, p = {1}, timeout = {3}") + public static Object[][] parameters() { + Object[] defaults = {Topology.PIPELINE, 1, MIXED, 0}; + + return Stream.of( + new Object[] {Topology.PIPELINE, 1, LOCAL}, + new Object[] {Topology.PIPELINE, 1, REMOTE}, + new Object[] {Topology.PIPELINE, 5, LOCAL}, + new Object[] {Topology.PIPELINE, 5, REMOTE}, + new Object[] {Topology.PIPELINE, 20}, + new Object[] {Topology.PIPELINE, 20, MIXED, 1}, + new Object[] {Topology.MULTI_INPUT, 5}, + new Object[] {Topology.MULTI_INPUT, 10}, + new Object[] {Topology.UNION, 5}, + new Object[] {Topology.UNION, 10}) + .map(params -> addDefaults(params, defaults)) + .toArray(Object[][]::new); } - private static UnalignedSettings createUnionSettings(int parallelism) { - int numShuffles = 6; - return new UnalignedSettings(UnalignedCheckpointITCase::createUnionTopology) - .setParallelism(parallelism) - .setSlotSharing(true) - .setNumSlots(parallelism * numShuffles) - .setSlotsPerTaskManager(parallelism) - .setExpectedFailures(5) - .setFailuresAfterSourceFinishes(1); + private static Object[] addDefaults(Object[] params, Object[] defaults) { + return ArrayUtils.addAll( + params, ArrayUtils.subarray(defaults, params.length, defaults.length)); } private final UnalignedSettings settings; - public UnalignedCheckpointITCase(String desc, UnalignedSettings settings) { - this.settings = settings; + public UnalignedCheckpointITCase( + Topology topology, int parallelism, ChannelType channelType, int timeout) { + settings = + new UnalignedSettings(topology) + .setParallelism(parallelism) + .setChannelTypes(channelType) + .setExpectedFailures(5) + .setFailuresAfterSourceFinishes(1) + .setAlignmentTimeout(timeout); } - @Test(timeout = 60_000) + @Test Review comment: The timeout is rather arbitrary both at the value and where we place it. (shouldn't it be done on all IT Cases?) I'd rather like to get https://github.com/apache/flink/pull/14834 merged soonish. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org