XComp commented on code in PR #22010: URL: https://github.com/apache/flink/pull/22010#discussion_r1257913768
########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java: ########## @@ -189,8 +206,28 @@ public static SequenceGenerator<String> stringGenerator(long start, long end) { return new SequenceGenerator<String>(start, end) { @Override public String next() { - return valuesToEmit.poll().toString(); + return nextValue().toString(); } }; } + + /** + * The internal state of the sequence generator, which is used to record the latest state of the + * sequence value sent by the current sequence generator. When recovering from the state, it is + * guaranteed to continue sending the sequence value from the latest state. + */ + private static class InternalState implements Comparable<InternalState> { Review Comment: What's your opinion on renaming this internal class into `SubTaskState`? `InternalState` sounds too generic. :thinking: ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java: ########## @@ -65,33 +80,45 @@ public void open( this.checkpointedState == null, "The " + getClass().getSimpleName() + " has already been initialized."); - this.checkpointedState = - context.getOperatorStateStore() - .getListState( - new ListStateDescriptor<>( - name + "-sequence-state", LongSerializer.INSTANCE)); - this.valuesToEmit = new ArrayDeque<>(); - if (context.isRestored()) { - // upon restoring + ListStateDescriptor<InternalState> stateDescriptor = Review Comment: ```suggestion final ListStateDescriptor<InternalState> stateDescriptor = ``` There are a few other code locations where we could make use of the `final` keyword for local variables. ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java: ########## @@ -22,29 +22,44 @@ import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.util.Preconditions; +import org.apache.flink.shaded.guava30.com.google.common.collect.Queues; + import java.math.BigDecimal; import java.math.MathContext; import java.math.RoundingMode; -import java.util.ArrayDeque; -import java.util.Deque; +import java.util.ArrayList; +import java.util.NoSuchElementException; +import java.util.Queue; /** * A stateful, re-scalable {@link DataGenerator} that emits each number from a given interval * exactly once, possibly in parallel. + * + * <p>It maintains a state internally to record the position of the current subtask sending + * sequence. When the task resumes, it will continue to send the sequence value according to the + * position sent by the state, until all the sequences have been sent. + * + * <p><b>IMPORTANT NOTE: </b> When the degree of parallelism increases, there may be cases where + * subtasks are running empty. When the degree of parallelism decreases, there may be cases where + * one subtask handles multiple states. Review Comment: Shall we also add that decreasing the parallelism might result in an out-of-orderness of the sequence? ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java: ########## @@ -65,33 +80,45 @@ public void open( this.checkpointedState == null, "The " + getClass().getSimpleName() + " has already been initialized."); - this.checkpointedState = - context.getOperatorStateStore() - .getListState( - new ListStateDescriptor<>( - name + "-sequence-state", LongSerializer.INSTANCE)); - this.valuesToEmit = new ArrayDeque<>(); - if (context.isRestored()) { - // upon restoring + ListStateDescriptor<InternalState> stateDescriptor = + new ListStateDescriptor<>( + name + "-sequence-state", TypeInformation.of(InternalState.class)); + this.checkpointedState = context.getOperatorStateStore().getListState(stateDescriptor); + this.internalStates = Queues.newPriorityQueue(); - for (Long v : this.checkpointedState.get()) { - this.valuesToEmit.add(v); - } + if (context.isRestored()) { + checkpointedState.get().forEach(state -> internalStates.offer(state)); } else { - // the first time the job is executed - final int stepSize = runtimeContext.getNumberOfParallelSubtasks(); - final int taskIdx = runtimeContext.getIndexOfThisSubtask(); - final long congruence = start + taskIdx; - - long totalNoOfElements = Math.abs(end - start + 1); - final int baseSize = safeDivide(totalNoOfElements, stepSize); - final int toCollect = - (totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 : baseSize; - - for (long collected = 0; collected < toCollect; collected++) { - this.valuesToEmit.add(collected * stepSize + congruence); + // The first time the job is executed. + final int startOffset = runtimeContext.getIndexOfThisSubtask(); + final long stepSize = runtimeContext.getNumberOfParallelSubtasks(); + InternalState state = new InternalState(stepSize, start + startOffset); + internalStates.offer(state); + } + } + + public Long nextValue() { + if (internalStates.isEmpty()) { + // Before calling nextValue method, you should call hasNext to check. Review Comment: ```suggestion ``` This comment doesn't add any value, don't you think? :thinking: If you want to give this hint to the developer, you might want to add it to the error message. Otherwise, you're expecting the developer to do a code investigation to find the hint of this comment. ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java: ########## @@ -22,29 +22,44 @@ import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.util.Preconditions; +import org.apache.flink.shaded.guava30.com.google.common.collect.Queues; Review Comment: ```suggestion import org.apache.flink.shaded.guava31.com.google.common.collect.Queues; ``` This won't work anymore because the `flink-shaded` dependencies were upgraded (FLINK-32032). Rebasing the branch might reveal other affected code locations. ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java: ########## @@ -65,33 +80,45 @@ public void open( this.checkpointedState == null, "The " + getClass().getSimpleName() + " has already been initialized."); - this.checkpointedState = - context.getOperatorStateStore() - .getListState( - new ListStateDescriptor<>( - name + "-sequence-state", LongSerializer.INSTANCE)); - this.valuesToEmit = new ArrayDeque<>(); - if (context.isRestored()) { - // upon restoring + ListStateDescriptor<InternalState> stateDescriptor = + new ListStateDescriptor<>( + name + "-sequence-state", TypeInformation.of(InternalState.class)); + this.checkpointedState = context.getOperatorStateStore().getListState(stateDescriptor); + this.internalStates = Queues.newPriorityQueue(); Review Comment: ```suggestion this.internalStates = Queues.newPriorityQueue(); ``` Can't we make `internalStates` a `final` field? We don't need to re-instantiate it. I'm then wondering whether we could add a Precondition that checks here that the queue is empty when calling the `open` method or whether there are situations where the queue could still have entries (in which case we would have to call `clear()`)? ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java: ########## @@ -22,29 +22,44 @@ import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.util.Preconditions; +import org.apache.flink.shaded.guava30.com.google.common.collect.Queues; + import java.math.BigDecimal; import java.math.MathContext; import java.math.RoundingMode; -import java.util.ArrayDeque; -import java.util.Deque; +import java.util.ArrayList; +import java.util.NoSuchElementException; +import java.util.Queue; /** * A stateful, re-scalable {@link DataGenerator} that emits each number from a given interval * exactly once, possibly in parallel. + * + * <p>It maintains a state internally to record the position of the current subtask sending + * sequence. When the task resumes, it will continue to send the sequence value according to the + * position sent by the state, until all the sequences have been sent. + * + * <p><b>IMPORTANT NOTE: </b> When the degree of parallelism increases, there may be cases where + * subtasks are running empty. When the degree of parallelism decreases, there may be cases where + * one subtask handles multiple states. */ @Experimental public abstract class SequenceGenerator<T> implements DataGenerator<T> { private final long start; private final long end; + /** + * Save the intermediate state of the data to be sent by the current subtask,when the state Review Comment: ```suggestion * Save the intermediate state of the data to be sent by the current subtask, when the state ``` nit ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java: ########## @@ -189,8 +206,28 @@ public static SequenceGenerator<String> stringGenerator(long start, long end) { return new SequenceGenerator<String>(start, end) { @Override public String next() { - return valuesToEmit.poll().toString(); + return nextValue().toString(); } }; } + + /** + * The internal state of the sequence generator, which is used to record the latest state of the Review Comment: ```suggestion * The internal state of the sequence generator's subtask(s), which is used to record the latest state of the ``` Is this more specific? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org