XComp commented on code in PR #21971: URL: https://github.com/apache/flink/pull/21971#discussion_r1118856054
########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java: ########## @@ -83,10 +85,22 @@ public void open( final int taskIdx = runtimeContext.getIndexOfThisSubtask(); final long congruence = start + taskIdx; - long totalNoOfElements = Math.abs(end - start + 1); - final int baseSize = safeDivide(totalNoOfElements, stepSize); - final int toCollect = - (totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 : baseSize; + + Preconditions.checkArgument( + start < end, "The start value cannot be greater than the end value."); + + // After preventing setting to Long.MAX_VALUE, the length of + // Long type will be exceeded after +1 + final BigInteger totalNoOfElements = + BigInteger.valueOf(end) + .subtract(BigInteger.valueOf(start)) + .add(BigInteger.valueOf(1)); + + final BigInteger baseSize = totalNoOfElements.divide(BigInteger.valueOf(stepSize)); + final long toCollect = + totalNoOfElements.remainder(BigInteger.valueOf(stepSize)).longValue() > taskIdx + ? baseSize.add(BigInteger.valueOf(1)).longValue() Review Comment: yes, that's what I was referring to. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org