AHeise commented on a change in pull request #15294:
URL: https://github.com/apache/flink/pull/15294#discussion_r604157199



##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
##########
@@ -99,87 +104,156 @@
 @RunWith(Parameterized.class)
 @Category(FailsWithAdaptiveScheduler.class) // FLINK-21689
 public class UnalignedCheckpointITCase extends UnalignedCheckpointTestBase {
+    enum Topology implements DagCreator {
+        PIPELINE {
+            @Override
+            public void create(
+                    StreamExecutionEnvironment env,
+                    int minCheckpoints,
+                    boolean slotSharing,
+                    int expectedRestarts) {
+                final int parallelism = env.getParallelism();
+                final SingleOutputStreamOperator<Long> stream =
+                        env.fromSource(
+                                        new LongSource(
+                                                minCheckpoints,
+                                                parallelism,
+                                                expectedRestarts,
+                                                env.getCheckpointInterval()),
+                                        noWatermarks(),
+                                        "source")
+                                .slotSharingGroup(slotSharing ? "default" : 
"source")
+                                .disableChaining()
+                                .map(i -> checkHeader(i))
+                                .name("forward")
+                                .uid("forward")
+                                .slotSharingGroup(slotSharing ? "default" : 
"forward")
+                                .keyBy(i -> withoutHeader(i) % parallelism * 
parallelism)
+                                .process(new KeyedIdentityFunction())
+                                .name("keyed")
+                                .uid("keyed");
+                addFailingPipeline(minCheckpoints, slotSharing, stream);
+            }
+        },
+
+        MULTI_INPUT {
+            @Override
+            public void create(
+                    StreamExecutionEnvironment env,
+                    int minCheckpoints,
+                    boolean slotSharing,
+                    int expectedRestarts) {
+                final int parallelism = env.getParallelism();
+                DataStream<Long> combinedSource = null;
+                for (int inputIndex = 0; inputIndex < NUM_SOURCES; 
inputIndex++) {
+                    final SingleOutputStreamOperator<Long> source =
+                            env.fromSource(
+                                            new LongSource(
+                                                    minCheckpoints,
+                                                    parallelism,
+                                                    expectedRestarts,
+                                                    
env.getCheckpointInterval()),
+                                            noWatermarks(),
+                                            "source" + inputIndex)
+                                    .slotSharingGroup(
+                                            slotSharing ? "default" : 
("source" + inputIndex))
+                                    .disableChaining();
+                    combinedSource =
+                            combinedSource == null
+                                    ? source
+                                    : combinedSource
+                                            .connect(source)
+                                            .flatMap(new MinEmittingFunction())
+                                            .name("min" + inputIndex)
+                                            .uid("min" + inputIndex)
+                                            .slotSharingGroup(
+                                                    slotSharing ? "default" : 
("min" + inputIndex));
+                }
 
-    @Parameterized.Parameters(name = "{0}")
-    public static Object[][] parameters() {
-        return new Object[][] {
-            new Object[] {
-                "non-parallel pipeline with local channels", 
createPipelineSettings(1, 1, true)
-            },
-            new Object[] {
-                "non-parallel pipeline with remote channels", 
createPipelineSettings(1, 1, false)
-            },
-            new Object[] {
-                "parallel pipeline with local channels, p = 5", 
createPipelineSettings(5, 5, true)
-            },
-            new Object[] {
-                "parallel pipeline with remote channels, p = 5", 
createPipelineSettings(5, 1, false)
-            },
-            new Object[] {
-                "parallel pipeline with mixed channels, p = 5", 
createPipelineSettings(5, 3, true)
-            },
-            new Object[] {
-                "parallel pipeline with mixed channels, p = 20",
-                createPipelineSettings(20, 10, true)
-            },
-            new Object[] {
-                "parallel pipeline with mixed channels, p = 20, timeout=1",
-                createPipelineSettings(20, 10, true, 1)
-            },
-            new Object[] {"Parallel cogroup, p = 5", createCogroupSettings(5)},
-            new Object[] {"Parallel cogroup, p = 10", 
createCogroupSettings(10)},
-            new Object[] {"Parallel union, p = 5", createUnionSettings(5)},
-            new Object[] {"Parallel union, p = 10", createUnionSettings(10)},
-        };
-    }
+                addFailingPipeline(minCheckpoints, slotSharing, 
combinedSource);
+            }
+        },
+
+        UNION {
+            @Override
+            public void create(
+                    StreamExecutionEnvironment env,
+                    int minCheckpoints,
+                    boolean slotSharing,
+                    int expectedRestarts) {
+                final int parallelism = env.getParallelism();
+                DataStream<Long> combinedSource = null;
+                for (int inputIndex = 0; inputIndex < NUM_SOURCES; 
inputIndex++) {
+                    final SingleOutputStreamOperator<Long> source =
+                            env.fromSource(
+                                            new LongSource(
+                                                    minCheckpoints,
+                                                    parallelism,
+                                                    expectedRestarts,
+                                                    
env.getCheckpointInterval()),
+                                            noWatermarks(),
+                                            "source" + inputIndex)
+                                    .slotSharingGroup(
+                                            slotSharing ? "default" : 
("source" + inputIndex))
+                                    .disableChaining();
+                    combinedSource = combinedSource == null ? source : 
combinedSource.union(source);
+                }
 
-    private static UnalignedSettings createPipelineSettings(
-            int parallelism, int slotsPerTaskManager, boolean slotSharing) {
-        return createPipelineSettings(parallelism, slotsPerTaskManager, 
slotSharing, 0);
-    }
+                final SingleOutputStreamOperator<Long> deduplicated =
+                        combinedSource
+                                .partitionCustom(
+                                        (key, numPartitions) ->
+                                                (int) (withoutHeader(key) % 
numPartitions),
+                                        l -> l)
+                                .flatMap(new CountingMapFunction(NUM_SOURCES));
+                addFailingPipeline(minCheckpoints, slotSharing, deduplicated);
+            }
+        };
 
-    private static UnalignedSettings createPipelineSettings(
-            int parallelism, int slotsPerTaskManager, boolean slotSharing, int 
timeout) {
-        int numShuffles = 4;
-        return new UnalignedSettings(UnalignedCheckpointITCase::createPipeline)
-                .setParallelism(parallelism)
-                .setSlotSharing(slotSharing)
-                .setNumSlots(slotSharing ? parallelism : parallelism * 
numShuffles)
-                .setSlotsPerTaskManager(slotsPerTaskManager)
-                .setExpectedFailures(5)
-                .setFailuresAfterSourceFinishes(1)
-                .setAlignmentTimeout(timeout);
+        @Override
+        public String toString() {
+            return name().toLowerCase();
+        }
     }
 
-    private static UnalignedSettings createCogroupSettings(int parallelism) {
-        int numShuffles = 10;
-        return new 
UnalignedSettings(UnalignedCheckpointITCase::createMultipleInputTopology)
-                .setParallelism(parallelism)
-                .setSlotSharing(true)
-                .setNumSlots(parallelism * numShuffles)
-                .setSlotsPerTaskManager(parallelism)
-                .setExpectedFailures(5)
-                .setFailuresAfterSourceFinishes(1);
+    @Parameterized.Parameters(name = "{0} with {2} channels, p = {1}, timeout 
= {3}")
+    public static Object[][] parameters() {
+        Object[] defaults = {Topology.PIPELINE, 1, MIXED, 0};
+
+        return Stream.of(
+                        new Object[] {Topology.PIPELINE, 1, LOCAL},
+                        new Object[] {Topology.PIPELINE, 1, REMOTE},
+                        new Object[] {Topology.PIPELINE, 5, LOCAL},
+                        new Object[] {Topology.PIPELINE, 5, REMOTE},
+                        new Object[] {Topology.PIPELINE, 20},
+                        new Object[] {Topology.PIPELINE, 20, MIXED, 1},
+                        new Object[] {Topology.MULTI_INPUT, 5},
+                        new Object[] {Topology.MULTI_INPUT, 10},
+                        new Object[] {Topology.UNION, 5},
+                        new Object[] {Topology.UNION, 10})
+                .map(params -> addDefaults(params, defaults))
+                .toArray(Object[][]::new);
     }
 
-    private static UnalignedSettings createUnionSettings(int parallelism) {
-        int numShuffles = 6;
-        return new 
UnalignedSettings(UnalignedCheckpointITCase::createUnionTopology)
-                .setParallelism(parallelism)
-                .setSlotSharing(true)
-                .setNumSlots(parallelism * numShuffles)
-                .setSlotsPerTaskManager(parallelism)
-                .setExpectedFailures(5)
-                .setFailuresAfterSourceFinishes(1);
+    private static Object[] addDefaults(Object[] params, Object[] defaults) {
+        return ArrayUtils.addAll(
+                params, ArrayUtils.subarray(defaults, params.length, 
defaults.length));
     }
 
     private final UnalignedSettings settings;
 
-    public UnalignedCheckpointITCase(String desc, UnalignedSettings settings) {
-        this.settings = settings;
+    public UnalignedCheckpointITCase(
+            Topology topology, int parallelism, ChannelType channelType, int 
timeout) {
+        settings =
+                new UnalignedSettings(topology)
+                        .setParallelism(parallelism)
+                        .setChannelTypes(channelType)
+                        .setExpectedFailures(5)
+                        .setFailuresAfterSourceFinishes(1)
+                        .setAlignmentTimeout(timeout);
     }
 
-    @Test(timeout = 60_000)
+    @Test

Review comment:
       The timeout is rather arbitrary both at the value and where we place it. 
(shouldn't it be done on all IT Cases?)
   I'd rather like to get https://github.com/apache/flink/pull/14834 merged 
soonish.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to