xuzhiwen1255 commented on code in PR #21971:
URL: https://github.com/apache/flink/pull/21971#discussion_r1113724624


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##########
@@ -83,10 +85,22 @@ public void open(
             final int taskIdx = runtimeContext.getIndexOfThisSubtask();
             final long congruence = start + taskIdx;
 
-            long totalNoOfElements = Math.abs(end - start + 1);
-            final int baseSize = safeDivide(totalNoOfElements, stepSize);
-            final int toCollect =
-                    (totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 : 
baseSize;
+
+            Preconditions.checkArgument(
+                    start < end, "The start value cannot be greater than the 
end value.");
+
+            // After preventing setting to Long.MAX_VALUE, the length of
+            // Long type will be exceeded after +1
+            final BigInteger totalNoOfElements =
+                    BigInteger.valueOf(end)
+                            .subtract(BigInteger.valueOf(start))
+                            .add(BigInteger.valueOf(1));
+
+            final BigInteger baseSize = 
totalNoOfElements.divide(BigInteger.valueOf(stepSize));
+            final long toCollect =
+                    
totalNoOfElements.remainder(BigInteger.valueOf(stepSize)).longValue() > taskIdx
+                            ? baseSize.add(BigInteger.valueOf(1)).longValue()

Review Comment:
   Do you mean to limit the maximum value of the start of the sequence to 
Integer.MAX_VALUE-1?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to