AHeise commented on a change in pull request #16019: URL: https://github.com/apache/flink/pull/16019#discussion_r642248206
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateAssignment.java ########## @@ -166,32 +169,47 @@ public OperatorSubtaskState getSubtaskState(OperatorInstanceID instanceID) { instanceID, inputOperatorID, getUpstreamAssignments(), - assignment -> assignment.outputSubtaskMappings, + assignment -> + assignment.outputSubtaskMappings.get( + getAssignmentIndex( + assignment.getDownstreamAssignments(), + this)), assignment -> assignment.getOutputMapping( - Arrays.asList(assignment.getDownstreamAssignments()) - .indexOf(this)), + getAssignmentIndex( + assignment.getDownstreamAssignments(), + this)), Review comment: Didn't you had a nice idea on how to reduce this monster? One option could be to pass a parameter through the lambda to `getOutputMapping` indicating if it should lazy initialize or not. ########## File path: flink-tests/src/test/resources/log4j2-test.properties ########## @@ -18,7 +18,7 @@ # Set root logger level to OFF to not flood build logs # set manually to INFO for debugging purposes -rootLogger.level = OFF +rootLogger.level = INFO Review comment: Revert. ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java ########## @@ -352,25 +353,72 @@ public void processBroadcastElement( assertEquals(7, streamGraph.getStreamNodes().size()); // forward - assertFalse(supportsUnalignedCheckpoints(streamGraph, source1, map1)); + assertThat(edge(streamGraph, source1, map1), supportsUnalignedCheckpoints(false)); // shuffle - assertTrue(supportsUnalignedCheckpoints(streamGraph, source2, map2)); + assertThat(edge(streamGraph, source2, map2), supportsUnalignedCheckpoints(true)); // broadcast, but other channel is forwarded - assertFalse(supportsUnalignedCheckpoints(streamGraph, map1, joined)); + assertThat(edge(streamGraph, map1, joined), supportsUnalignedCheckpoints(false)); // forward - assertFalse(supportsUnalignedCheckpoints(streamGraph, map2, joined)); + assertThat(edge(streamGraph, map2, joined), supportsUnalignedCheckpoints(false)); // shuffle - assertTrue(supportsUnalignedCheckpoints(streamGraph, joined, map3)); + assertThat(edge(streamGraph, joined, map3), supportsUnalignedCheckpoints(true)); // rescale - assertFalse(supportsUnalignedCheckpoints(streamGraph, map3, map4)); + assertThat(edge(streamGraph, map3, map4), supportsUnalignedCheckpoints(false)); } - private boolean supportsUnalignedCheckpoints( + @Test + public void testUnalignedCheckpointDisabledOnBroadcast() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(42); + + DataStream<Long> source1 = env.fromSequence(1L, 10L); + DataStream<Long> map1 = source1.broadcast().map(l -> l); + DataStream<Long> source2 = env.fromSequence(2L, 11L); + DataStream<Long> keyed = source2.keyBy(r -> 0L); + + final MapStateDescriptor<Long, Long> descriptor = + new MapStateDescriptor<>( + "broadcast", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO); + final BroadcastStream<Long> broadcast = map1.broadcast(descriptor); + final SingleOutputStreamOperator<Long> joined = + keyed.connect(broadcast) + .process( + new KeyedBroadcastProcessFunction<Long, Long, Long, Long>() { + @Override + public void processElement( + Long value, ReadOnlyContext ctx, Collector<Long> out) {} + + @Override + public void processBroadcastElement( + Long value, Context ctx, Collector<Long> out) {} + }); + + StreamGraph streamGraph = env.getStreamGraph(); + assertEquals(4, streamGraph.getStreamNodes().size()); + + // single broadcast + assertThat(edge(streamGraph, source1, map1), supportsUnalignedCheckpoints(false)); + // keyed, connected with broadcast + assertThat(edge(streamGraph, source2, joined), supportsUnalignedCheckpoints(false)); + // broadcast, connected with keyed + assertThat(edge(streamGraph, map1, joined), supportsUnalignedCheckpoints(false)); + } + + private static StreamEdge edge( StreamGraph streamGraph, DataStream<Long> op1, DataStream<Long> op2) { - return streamGraph - .getStreamEdges(op1.getId(), op2.getId()) - .get(0) - .supportsUnalignedCheckpoints(); + return streamGraph.getStreamEdges(op1.getId(), op2.getId()).get(0); Review comment: Add assert that there is only 1 edge? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org