imaffe commented on code in PR #20725: URL: https://github.com/apache/flink/pull/20725#discussion_r960176312
########## flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java: ########## @@ -114,7 +135,12 @@ public void addSplitsBack(List<PulsarPartitionSplit> splits, int subtaskId) { // If the failed subtask has already restarted, we need to assign pending splits to it. if (context.registeredReaders().containsKey(subtaskId)) { - assignPendingPartitionSplits(singletonList(subtaskId)); + LOG.debug( + "Reader {} has been restarted after crashing, we will put splits back to it.", + subtaskId); + // Reassign for all readers in case of adding splits after scale up/down. + List<Integer> readers = new ArrayList<>(context.registeredReaders().keySet()); + assignPendingPartitionSplits(readers); Review Comment: Here, we are reassigning every time the addSplitsback is called. In shared mode, does this mean we can solve the splits assignment issue the same way (and don't need the source events anymore) ########## flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java: ########## @@ -64,17 +67,35 @@ public PulsarSourceEnumerator( PulsarSubscriber subscriber, StartCursor startCursor, + StopCursor stopCursor, + RangeGenerator rangeGenerator, + SourceConfiguration sourceConfiguration, + SplitEnumeratorContext<PulsarPartitionSplit> context) { + this( + subscriber, + startCursor, + stopCursor, + rangeGenerator, + sourceConfiguration, + context, + initialState()); + } + + public PulsarSourceEnumerator( Review Comment: I want to walk through the process of scale up/down to make sure my understanding is correct (especially in Shared subscription mode) : Whenever a scale up/down happen, the entire source will restart, first Enumerator starts, and each reader will be started and registerd to enumerator, and the addReader() method will be called. is this correct ? In case of reader failure, the enumerator will not be restart, instead, addSplitsBack() will be called and then addReader() is called later. Asking because I want to make sure whenever the scale up / down happens, the pendingSplits for that reader is in correct state (ensured by adding all fetched splits to each readers pending list at enumerator start all) ########## flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java: ########## @@ -164,7 +164,7 @@ private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IO // Schema int byteLen = ois.readInt(); byte[] schemaBytes = new byte[byteLen]; - IOUtils.readFully(ois, schemaBytes, 0, byteLen); + ois.readFully(schemaBytes); Review Comment: Is there any difference between IOUtils and ois ? ########## flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java: ########## @@ -114,7 +135,12 @@ public void addSplitsBack(List<PulsarPartitionSplit> splits, int subtaskId) { // If the failed subtask has already restarted, we need to assign pending splits to it. if (context.registeredReaders().containsKey(subtaskId)) { - assignPendingPartitionSplits(singletonList(subtaskId)); + LOG.debug( + "Reader {} has been restarted after crashing, we will put splits back to it.", + subtaskId); + // Reassign for all readers in case of adding splits after scale up/down. + List<Integer> readers = new ArrayList<>(context.registeredReaders().keySet()); + assignPendingPartitionSplits(readers); Review Comment: Will addSplitsBack be called when scale up/down ? Can you elaborate when will this be called during scale up/down ? Thanks ~ ########## flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializer.java: ########## @@ -54,57 +55,37 @@ private PulsarSourceEnumStateSerializer() { @Override public int getVersion() { - // We use PulsarPartitionSplitSerializer's version because we use reuse this class. - return PulsarPartitionSplitSerializer.CURRENT_VERSION; + return CURRENT_VERSION; } @Override public byte[] serialize(PulsarSourceEnumState obj) throws IOException { - // VERSION 0 serialization try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream out = new DataOutputStream(baos)) { serializeSet( out, obj.getAppendedPartitions(), SPLIT_SERIALIZER::serializeTopicPartition); - serializeSet( - out, - obj.getPendingPartitionSplits(), - SPLIT_SERIALIZER::serializePulsarPartitionSplit); Review Comment: My understanding : because we will reassign all readers whenever possible, we don't need to keep all the pending splits? Curious how will we use the `AppendedPartition` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org