imaffe commented on code in PR #20725:
URL: https://github.com/apache/flink/pull/20725#discussion_r960176312


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java:
##########
@@ -114,7 +135,12 @@ public void addSplitsBack(List<PulsarPartitionSplit> 
splits, int subtaskId) {
 
         // If the failed subtask has already restarted, we need to assign 
pending splits to it.
         if (context.registeredReaders().containsKey(subtaskId)) {
-            assignPendingPartitionSplits(singletonList(subtaskId));
+            LOG.debug(
+                    "Reader {} has been restarted after crashing, we will put 
splits back to it.",
+                    subtaskId);
+            // Reassign for all readers in case of adding splits after scale 
up/down.
+            List<Integer> readers = new 
ArrayList<>(context.registeredReaders().keySet());
+            assignPendingPartitionSplits(readers);

Review Comment:
   Here, we are reassigning every time the addSplitsback is called. In shared 
mode, does this mean we can solve the splits assignment issue the same way (and 
don't need the source events anymore)



##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java:
##########
@@ -64,17 +67,35 @@
     public PulsarSourceEnumerator(
             PulsarSubscriber subscriber,
             StartCursor startCursor,
+            StopCursor stopCursor,
+            RangeGenerator rangeGenerator,
+            SourceConfiguration sourceConfiguration,
+            SplitEnumeratorContext<PulsarPartitionSplit> context) {
+        this(
+                subscriber,
+                startCursor,
+                stopCursor,
+                rangeGenerator,
+                sourceConfiguration,
+                context,
+                initialState());
+    }
+
+    public PulsarSourceEnumerator(

Review Comment:
   I want to walk through the process of scale up/down to make sure my 
understanding is correct (especially in Shared subscription mode) : 
   
   Whenever a scale up/down happen, the entire source will restart, first 
Enumerator starts, and each reader will be started and registerd to enumerator, 
and the addReader() method will be called. is this correct ?
   
   In case of reader failure, the enumerator will not be restart, instead, 
addSplitsBack() will be called and then addReader() is called later.
   
   Asking because I want to make sure whenever the scale up / down happens, the 
pendingSplits for that reader is in correct state (ensured by adding all 
fetched splits to each readers pending list at enumerator start all)
   



##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java:
##########
@@ -164,7 +164,7 @@ private void readObject(ObjectInputStream ois) throws 
ClassNotFoundException, IO
         // Schema
         int byteLen = ois.readInt();
         byte[] schemaBytes = new byte[byteLen];
-        IOUtils.readFully(ois, schemaBytes, 0, byteLen);
+        ois.readFully(schemaBytes);

Review Comment:
   Is there any difference between IOUtils and ois ? 



##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java:
##########
@@ -114,7 +135,12 @@ public void addSplitsBack(List<PulsarPartitionSplit> 
splits, int subtaskId) {
 
         // If the failed subtask has already restarted, we need to assign 
pending splits to it.
         if (context.registeredReaders().containsKey(subtaskId)) {
-            assignPendingPartitionSplits(singletonList(subtaskId));
+            LOG.debug(
+                    "Reader {} has been restarted after crashing, we will put 
splits back to it.",
+                    subtaskId);
+            // Reassign for all readers in case of adding splits after scale 
up/down.
+            List<Integer> readers = new 
ArrayList<>(context.registeredReaders().keySet());
+            assignPendingPartitionSplits(readers);

Review Comment:
   Will addSplitsBack be called when scale up/down ? Can you elaborate when 
will this be called during scale up/down ? Thanks ~ 



##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializer.java:
##########
@@ -54,57 +55,37 @@ private PulsarSourceEnumStateSerializer() {
 
     @Override
     public int getVersion() {
-        // We use PulsarPartitionSplitSerializer's version because we use 
reuse this class.
-        return PulsarPartitionSplitSerializer.CURRENT_VERSION;
+        return CURRENT_VERSION;
     }
 
     @Override
     public byte[] serialize(PulsarSourceEnumState obj) throws IOException {
-        // VERSION 0 serialization
         try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
                 DataOutputStream out = new DataOutputStream(baos)) {
             serializeSet(
                     out, obj.getAppendedPartitions(), 
SPLIT_SERIALIZER::serializeTopicPartition);
-            serializeSet(
-                    out,
-                    obj.getPendingPartitionSplits(),
-                    SPLIT_SERIALIZER::serializePulsarPartitionSplit);

Review Comment:
   My understanding : because we will reassign all readers whenever possible, 
we don't need to keep all the pending splits? Curious how will we use the 
`AppendedPartition` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to