zhuzhurk commented on code in PR #24887:
URL: https://github.com/apache/flink/pull/24887#discussion_r1626862364


##########
flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeExecutionITCase.java:
##########
@@ -105,6 +109,8 @@ class SpeculativeExecutionITCase {
 
     private static final AtomicInteger slowTaskCounter = new AtomicInteger(1);
 
+    private static final AtomicInteger forceFailureCounter = new 
AtomicInteger();

Review Comment:
   Maybe make it a field of the source?



##########
flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeExecutionITCase.java:
##########
@@ -524,30 +542,66 @@ public void open(GenericInputSplit split) throws 
IOException {
     }
 
     private static class TestingNumberSequenceSource extends 
NumberSequenceSource {
-        private TestingNumberSequenceSource() {
+
+        private final boolean forceFailureFlag;
+
+        private TestingNumberSequenceSource(boolean forceFailureFlag) {
             super(0, NUMBERS_TO_PRODUCE - 1);
+            this.forceFailureFlag = forceFailureFlag;
         }
 
         @Override
         public SourceReader<Long, NumberSequenceSplit> createReader(
                 SourceReaderContext readerContext) {
-            return new TestingIteratorSourceReader(readerContext);
+            return new TestingIteratorSourceReader(readerContext, 
forceFailureFlag);
+        }
+
+        @Override
+        public SplitEnumerator<NumberSequenceSplit, 
Collection<NumberSequenceSplit>>
+                createEnumerator(final 
SplitEnumeratorContext<NumberSequenceSplit> enumContext) {
+
+            int splitSize = enumContext.currentParallelism();
+            if (forceFailureFlag) {
+                splitSize = 1;

Review Comment:
   Not sure why it should be 1 instead of 0(no split)?
   
   And comments are needed to explain such kind of magic.



##########
flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeExecutionITCase.java:
##########
@@ -524,30 +542,66 @@ public void open(GenericInputSplit split) throws 
IOException {
     }
 
     private static class TestingNumberSequenceSource extends 
NumberSequenceSource {
-        private TestingNumberSequenceSource() {
+
+        private final boolean forceFailureFlag;

Review Comment:
   Maybe add some comments to explain what will happen if `forceFailureCounter 
> 0`?



##########
flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeExecutionITCase.java:
##########
@@ -524,30 +542,66 @@ public void open(GenericInputSplit split) throws 
IOException {
     }
 
     private static class TestingNumberSequenceSource extends 
NumberSequenceSource {
-        private TestingNumberSequenceSource() {
+
+        private final boolean forceFailureFlag;

Review Comment:
   Looks to me it can be replaced by 
`forceFailureCounter`(forceFailureCounter=0 means forceFailureFlag=false).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to