XComp commented on code in PR #22010:
URL: https://github.com/apache/flink/pull/22010#discussion_r1257913768


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##########
@@ -189,8 +206,28 @@ public static SequenceGenerator<String> 
stringGenerator(long start, long end) {
         return new SequenceGenerator<String>(start, end) {
             @Override
             public String next() {
-                return valuesToEmit.poll().toString();
+                return nextValue().toString();
             }
         };
     }
+
+    /**
+     * The internal state of the sequence generator, which is used to record 
the latest state of the
+     * sequence value sent by the current sequence generator. When recovering 
from the state, it is
+     * guaranteed to continue sending the sequence value from the latest state.
+     */
+    private static class InternalState implements Comparable<InternalState> {

Review Comment:
   What's your opinion on renaming this internal class into `SubTaskState`? 
`InternalState` sounds too generic. :thinking: 



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##########
@@ -65,33 +80,45 @@ public void open(
                 this.checkpointedState == null,
                 "The " + getClass().getSimpleName() + " has already been 
initialized.");
 
-        this.checkpointedState =
-                context.getOperatorStateStore()
-                        .getListState(
-                                new ListStateDescriptor<>(
-                                        name + "-sequence-state", 
LongSerializer.INSTANCE));
-        this.valuesToEmit = new ArrayDeque<>();
-        if (context.isRestored()) {
-            // upon restoring
+        ListStateDescriptor<InternalState> stateDescriptor =

Review Comment:
   ```suggestion
           final ListStateDescriptor<InternalState> stateDescriptor =
   ```
   There are a few other code locations where we could make use of the `final` 
keyword for local variables.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##########
@@ -22,29 +22,44 @@
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.flink.shaded.guava30.com.google.common.collect.Queues;
+
 import java.math.BigDecimal;
 import java.math.MathContext;
 import java.math.RoundingMode;
-import java.util.ArrayDeque;
-import java.util.Deque;
+import java.util.ArrayList;
+import java.util.NoSuchElementException;
+import java.util.Queue;
 
 /**
  * A stateful, re-scalable {@link DataGenerator} that emits each number from a 
given interval
  * exactly once, possibly in parallel.
+ *
+ * <p>It maintains a state internally to record the position of the current 
subtask sending
+ * sequence. When the task resumes, it will continue to send the sequence 
value according to the
+ * position sent by the state, until all the sequences have been sent.
+ *
+ * <p><b>IMPORTANT NOTE: </b> When the degree of parallelism increases, there 
may be cases where
+ * subtasks are running empty. When the degree of parallelism decreases, there 
may be cases where
+ * one subtask handles multiple states.

Review Comment:
   Shall we also add that decreasing the parallelism might result in an 
out-of-orderness of the sequence?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##########
@@ -65,33 +80,45 @@ public void open(
                 this.checkpointedState == null,
                 "The " + getClass().getSimpleName() + " has already been 
initialized.");
 
-        this.checkpointedState =
-                context.getOperatorStateStore()
-                        .getListState(
-                                new ListStateDescriptor<>(
-                                        name + "-sequence-state", 
LongSerializer.INSTANCE));
-        this.valuesToEmit = new ArrayDeque<>();
-        if (context.isRestored()) {
-            // upon restoring
+        ListStateDescriptor<InternalState> stateDescriptor =
+                new ListStateDescriptor<>(
+                        name + "-sequence-state", 
TypeInformation.of(InternalState.class));
+        this.checkpointedState = 
context.getOperatorStateStore().getListState(stateDescriptor);
+        this.internalStates = Queues.newPriorityQueue();
 
-            for (Long v : this.checkpointedState.get()) {
-                this.valuesToEmit.add(v);
-            }
+        if (context.isRestored()) {
+            checkpointedState.get().forEach(state -> 
internalStates.offer(state));
         } else {
-            // the first time the job is executed
-            final int stepSize = runtimeContext.getNumberOfParallelSubtasks();
-            final int taskIdx = runtimeContext.getIndexOfThisSubtask();
-            final long congruence = start + taskIdx;
-
-            long totalNoOfElements = Math.abs(end - start + 1);
-            final int baseSize = safeDivide(totalNoOfElements, stepSize);
-            final int toCollect =
-                    (totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 : 
baseSize;
-
-            for (long collected = 0; collected < toCollect; collected++) {
-                this.valuesToEmit.add(collected * stepSize + congruence);
+            // The first time the job is executed.
+            final int startOffset = runtimeContext.getIndexOfThisSubtask();
+            final long stepSize = runtimeContext.getNumberOfParallelSubtasks();
+            InternalState state = new InternalState(stepSize, start + 
startOffset);
+            internalStates.offer(state);
+        }
+    }
+
+    public Long nextValue() {
+        if (internalStates.isEmpty()) {
+            // Before calling nextValue method, you should call hasNext to 
check.

Review Comment:
   ```suggestion
   ```
   This comment doesn't add any value, don't you think? :thinking: If you want 
to give this hint to the developer, you might want to add it to the error 
message. Otherwise, you're expecting the developer to do a code investigation 
to find the hint of this comment.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##########
@@ -22,29 +22,44 @@
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.flink.shaded.guava30.com.google.common.collect.Queues;

Review Comment:
   ```suggestion
   import org.apache.flink.shaded.guava31.com.google.common.collect.Queues;
   ```
   This won't work anymore because the `flink-shaded` dependencies were 
upgraded (FLINK-32032). Rebasing the branch might reveal other affected code 
locations.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##########
@@ -65,33 +80,45 @@ public void open(
                 this.checkpointedState == null,
                 "The " + getClass().getSimpleName() + " has already been 
initialized.");
 
-        this.checkpointedState =
-                context.getOperatorStateStore()
-                        .getListState(
-                                new ListStateDescriptor<>(
-                                        name + "-sequence-state", 
LongSerializer.INSTANCE));
-        this.valuesToEmit = new ArrayDeque<>();
-        if (context.isRestored()) {
-            // upon restoring
+        ListStateDescriptor<InternalState> stateDescriptor =
+                new ListStateDescriptor<>(
+                        name + "-sequence-state", 
TypeInformation.of(InternalState.class));
+        this.checkpointedState = 
context.getOperatorStateStore().getListState(stateDescriptor);
+        this.internalStates = Queues.newPriorityQueue();

Review Comment:
   ```suggestion
           this.internalStates = Queues.newPriorityQueue();
   ```
   Can't we make `internalStates` a `final` field? We don't need to 
re-instantiate it. I'm then wondering whether we could add a Precondition that 
checks here that the queue is empty when calling the `open` method or whether 
there are situations where the queue could still have entries (in which case we 
would have to call `clear()`)?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##########
@@ -22,29 +22,44 @@
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.flink.shaded.guava30.com.google.common.collect.Queues;
+
 import java.math.BigDecimal;
 import java.math.MathContext;
 import java.math.RoundingMode;
-import java.util.ArrayDeque;
-import java.util.Deque;
+import java.util.ArrayList;
+import java.util.NoSuchElementException;
+import java.util.Queue;
 
 /**
  * A stateful, re-scalable {@link DataGenerator} that emits each number from a 
given interval
  * exactly once, possibly in parallel.
+ *
+ * <p>It maintains a state internally to record the position of the current 
subtask sending
+ * sequence. When the task resumes, it will continue to send the sequence 
value according to the
+ * position sent by the state, until all the sequences have been sent.
+ *
+ * <p><b>IMPORTANT NOTE: </b> When the degree of parallelism increases, there 
may be cases where
+ * subtasks are running empty. When the degree of parallelism decreases, there 
may be cases where
+ * one subtask handles multiple states.
  */
 @Experimental
 public abstract class SequenceGenerator<T> implements DataGenerator<T> {
 
     private final long start;
     private final long end;
+    /**
+     * Save the intermediate state of the data to be sent by the current 
subtask,when the state

Review Comment:
   ```suggestion
        * Save the intermediate state of the data to be sent by the current 
subtask, when the state
   ```
   nit



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##########
@@ -189,8 +206,28 @@ public static SequenceGenerator<String> 
stringGenerator(long start, long end) {
         return new SequenceGenerator<String>(start, end) {
             @Override
             public String next() {
-                return valuesToEmit.poll().toString();
+                return nextValue().toString();
             }
         };
     }
+
+    /**
+     * The internal state of the sequence generator, which is used to record 
the latest state of the

Review Comment:
   ```suggestion
        * The internal state of the sequence generator's subtask(s), which is 
used to record the latest state of the
   ```
   Is this more specific?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to