zhuzhurk commented on code in PR #24887: URL: https://github.com/apache/flink/pull/24887#discussion_r1626862364
########## flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeExecutionITCase.java: ########## @@ -105,6 +109,8 @@ class SpeculativeExecutionITCase { private static final AtomicInteger slowTaskCounter = new AtomicInteger(1); + private static final AtomicInteger forceFailureCounter = new AtomicInteger(); Review Comment: Maybe make it a field of the source? ########## flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeExecutionITCase.java: ########## @@ -524,30 +542,66 @@ public void open(GenericInputSplit split) throws IOException { } private static class TestingNumberSequenceSource extends NumberSequenceSource { - private TestingNumberSequenceSource() { + + private final boolean forceFailureFlag; + + private TestingNumberSequenceSource(boolean forceFailureFlag) { super(0, NUMBERS_TO_PRODUCE - 1); + this.forceFailureFlag = forceFailureFlag; } @Override public SourceReader<Long, NumberSequenceSplit> createReader( SourceReaderContext readerContext) { - return new TestingIteratorSourceReader(readerContext); + return new TestingIteratorSourceReader(readerContext, forceFailureFlag); + } + + @Override + public SplitEnumerator<NumberSequenceSplit, Collection<NumberSequenceSplit>> + createEnumerator(final SplitEnumeratorContext<NumberSequenceSplit> enumContext) { + + int splitSize = enumContext.currentParallelism(); + if (forceFailureFlag) { + splitSize = 1; Review Comment: Not sure why it should be 1 instead of 0(no split)? And comments are needed to explain such kind of magic. ########## flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeExecutionITCase.java: ########## @@ -524,30 +542,66 @@ public void open(GenericInputSplit split) throws IOException { } private static class TestingNumberSequenceSource extends NumberSequenceSource { - private TestingNumberSequenceSource() { + + private final boolean forceFailureFlag; Review Comment: Maybe add some comments to explain what will happen if `forceFailureCounter > 0`? ########## flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeExecutionITCase.java: ########## @@ -524,30 +542,66 @@ public void open(GenericInputSplit split) throws IOException { } private static class TestingNumberSequenceSource extends NumberSequenceSource { - private TestingNumberSequenceSource() { + + private final boolean forceFailureFlag; Review Comment: Looks to me it can be replaced by `forceFailureCounter`(forceFailureCounter=0 means forceFailureFlag=false). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org