SteNicholas commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r633194823



##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/** Hybrid source reader that delegates to the actual current source reader. */

Review comment:
       The comment of `HybridSourceReader` is confused because there is no 
concept of current source reader.

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+
+/** Hybrid source that switches underlying sources based on configurable 
source chain. */
+public class HybridSource<T> implements Source<T, HybridSourceSplit, 
HybridSourceEnumState> {
+
+    private final SourceChain<T, ? extends SourceSplit, ?> sourceChain;
+
+    public HybridSource(SourceChain<T, ? extends SourceSplit, ?> sourceChain) {

Review comment:
       IMO, `SourceChain` shouldn't be generated from user side, which could be 
constructed in `HybridSource`. The constructor of `HybridSource` could be 
initial source or source list. 

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/** Hybrid source reader that delegates to the actual current source reader. */
+public class HybridSourceReader<T, SplitT extends SourceSplit>
+        implements SourceReader<T, HybridSourceSplit<SplitT>> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceReader.class);
+    private static final int SOURCE_READER_FINISHED_EVENT_DELAY = 250;

Review comment:
       Could the value of `SOURCE_READER_FINISHED_EVENT_DELAY ` be configured 
by users? And why does this need the `SOURCE_READER_FINISHED_EVENT_DELAY`?

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+
+/** Hybrid source that switches underlying sources based on configurable 
source chain. */
+public class HybridSource<T> implements Source<T, HybridSourceSplit, 
HybridSourceEnumState> {
+
+    private final SourceChain<T, ? extends SourceSplit, ?> sourceChain;
+
+    public HybridSource(SourceChain<T, ? extends SourceSplit, ?> sourceChain) {
+        this.sourceChain = sourceChain;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        for (Tuple2<Source<T, ? extends SourceSplit, ?>, ?> t : 
sourceChain.sources) {

Review comment:
       The `getBoundedness` method should return the boundedness of the latest 
source, which is reasonable from user's perspective. 

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+
+/** Hybrid source that switches underlying sources based on configurable 
source chain. */
+public class HybridSource<T> implements Source<T, HybridSourceSplit, 
HybridSourceEnumState> {
+
+    private final SourceChain<T, ? extends SourceSplit, ?> sourceChain;
+
+    public HybridSource(SourceChain<T, ? extends SourceSplit, ?> sourceChain) {
+        this.sourceChain = sourceChain;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        for (Tuple2<Source<T, ? extends SourceSplit, ?>, ?> t : 
sourceChain.sources) {
+            if (t.f0.getBoundedness() == Boundedness.CONTINUOUS_UNBOUNDED) {
+                return Boundedness.CONTINUOUS_UNBOUNDED;
+            }
+        }
+        return Boundedness.BOUNDED;
+    }
+
+    @Override
+    public SourceReader<T, HybridSourceSplit> createReader(SourceReaderContext 
readerContext)
+            throws Exception {
+        List<SourceReader<?, ? extends SourceSplit>> readers = new 
ArrayList<>();
+        for (Tuple2<Source<T, ? extends SourceSplit, ?>, ?> source : 
sourceChain.sources) {
+            readers.add(source.f0.createReader(readerContext));
+        }
+        return new HybridSourceReader(readerContext, readers);
+    }
+
+    @Override
+    public SplitEnumerator<HybridSourceSplit, HybridSourceEnumState> 
createEnumerator(
+            SplitEnumeratorContext<HybridSourceSplit> enumContext) {
+        return new HybridSourceSplitEnumerator(enumContext, sourceChain);
+    }
+
+    @Override
+    public SplitEnumerator<HybridSourceSplit, HybridSourceEnumState> 
restoreEnumerator(
+            SplitEnumeratorContext<HybridSourceSplit> enumContext, 
HybridSourceEnumState checkpoint)
+            throws Exception {
+        return new HybridSourceSplitEnumerator(
+                enumContext, sourceChain, checkpoint.getCurrentSourceIndex());
+    }
+
+    @Override
+    public SimpleVersionedSerializer<HybridSourceSplit> getSplitSerializer() {
+        List<SimpleVersionedSerializer<SourceSplit>> serializers = new 
ArrayList<>();
+        sourceChain.sources.forEach(
+                t -> 
serializers.add(castSerializer(t.f0.getSplitSerializer())));
+        return new SplitSerializerWrapper<>(serializers);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<HybridSourceEnumState> 
getEnumeratorCheckpointSerializer() {
+        List<SimpleVersionedSerializer<Object>> serializers = new 
ArrayList<>();
+        sourceChain.sources.forEach(
+                t -> 
serializers.add(castSerializer(t.f0.getEnumeratorCheckpointSerializer())));
+        return new HybridSourceEnumStateSerializer(serializers);
+    }
+
+    private static <T> SimpleVersionedSerializer<T> castSerializer(
+            SimpleVersionedSerializer<? extends T> s) {
+        @SuppressWarnings("rawtypes")
+        SimpleVersionedSerializer s1 = s;
+        return s1;
+    }
+
+    /** Serializes splits by delegating to the source-indexed split 
serializer. */
+    public static class SplitSerializerWrapper<SplitT extends SourceSplit>
+            implements SimpleVersionedSerializer<HybridSourceSplit> {
+
+        final List<SimpleVersionedSerializer<SourceSplit>> serializers;
+
+        public 
SplitSerializerWrapper(List<SimpleVersionedSerializer<SourceSplit>> 
serializers) {
+            this.serializers = serializers;
+        }
+
+        @Override
+        public int getVersion() {
+            return 0;
+        }
+
+        @Override
+        public byte[] serialize(HybridSourceSplit split) throws IOException {
+            try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                    DataOutputStream out = new DataOutputStream(baos)) {
+                out.writeInt(split.sourceIndex());
+                out.writeInt(serializerOf(split.sourceIndex()).getVersion());
+                byte[] serializedSplit =
+                        
serializerOf(split.sourceIndex()).serialize(split.getWrappedSplit());
+                out.writeInt(serializedSplit.length);
+                out.write(serializedSplit);
+                return baos.toByteArray();
+            }
+        }
+
+        @Override
+        public HybridSourceSplit deserialize(int version, byte[] serialized) 
throws IOException {
+            try (ByteArrayInputStream bais = new 
ByteArrayInputStream(serialized);
+                    DataInputStream in = new DataInputStream(bais)) {
+                int sourceIndex = in.readInt();
+                int nestedVersion = in.readInt();
+                int length = in.readInt();
+                byte[] splitBytes = new byte[length];
+                in.readFully(splitBytes);
+                SourceSplit split =
+                        serializerOf(sourceIndex).deserialize(nestedVersion, 
splitBytes);
+                return new HybridSourceSplit(sourceIndex, split);
+            }
+        }
+
+        private SimpleVersionedSerializer<SourceSplit> serializerOf(int 
sourceIndex) {
+            Preconditions.checkArgument(sourceIndex < serializers.size());
+            return serializers.get(sourceIndex);
+        }
+    }
+
+    /**
+     * Converts checkpoint between sources to transfer end position to next 
source's start position.
+     * Only required for dynamic position transfer at time of switching, 
otherwise source can be
+     * preconfigured with a start position during job submission.
+     */
+    public interface CheckpointConverter<InCheckpointT, OutCheckpointT>

Review comment:
       `CheckpointConverter` interface could be replaced with `Function`, 
because this interface is essentially a `Function`.

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/** Hybrid source reader that delegates to the actual current source reader. */
+public class HybridSourceReader<T, SplitT extends SourceSplit>
+        implements SourceReader<T, HybridSourceSplit<SplitT>> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceReader.class);
+    private static final int SOURCE_READER_FINISHED_EVENT_DELAY = 250;
+    private SourceReaderContext readerContext;
+    private List<SourceReader<?, SplitT>> realReaders;
+    private int currentSourceIndex = -1;
+    private long lastCheckpointId = -1;
+    private SourceReader<?, SplitT> currentReader;
+    private long lastReaderFinishedMs;
+
+    public HybridSourceReader(
+            SourceReaderContext readerContext, List<SourceReader<?, SplitT>> 
readers) {
+        this.readerContext = readerContext;
+        this.realReaders = readers;
+    }
+
+    @Override
+    public void start() {
+        setCurrentReader(0);
+    }
+
+    @Override
+    public InputStatus pollNext(ReaderOutput output) throws Exception {
+        InputStatus status = currentReader.pollNext(output);
+        if (status == InputStatus.END_OF_INPUT) {
+            // trap END_OF_INPUT if this wasn't the final reader
+            LOG.debug(
+                    "End of input subtask={} sourceIndex={} {}",
+                    readerContext.getIndexOfSubtask(),
+                    currentSourceIndex,
+                    currentReader);
+            if (currentSourceIndex + 1 < realReaders.size()) {
+                // signal coordinator to advance readers
+                long currentMillis = System.currentTimeMillis();
+                if (lastReaderFinishedMs + SOURCE_READER_FINISHED_EVENT_DELAY 
< currentMillis) {
+                    lastReaderFinishedMs = currentMillis;
+                    readerContext.sendSourceEventToCoordinator(
+                            new SourceReaderFinishedEvent(currentSourceIndex, 
lastCheckpointId));
+                }
+                // more data will be available from the next reader
+                return InputStatus.MORE_AVAILABLE;
+            }
+        }
+        return status;
+    }
+
+    @Override
+    public List<HybridSourceSplit<SplitT>> snapshotState(long checkpointId) {
+        this.lastCheckpointId = checkpointId;
+        List<SplitT> state = currentReader.snapshotState(checkpointId);
+        return wrappedSplits(currentSourceIndex, state);
+    }
+
+    public static <SplitT extends SourceSplit> List<HybridSourceSplit<SplitT>> 
wrappedSplits(
+            int readerIndex, List<SplitT> state) {
+        List<HybridSourceSplit<SplitT>> wrappedSplits = new 
ArrayList<>(state.size());
+        for (SplitT split : state) {
+            wrappedSplits.add(new HybridSourceSplit<>(readerIndex, split));
+        }
+        return wrappedSplits;
+    }
+
+    public static <SplitT extends SourceSplit> List<SplitT> unwrappedSplits(
+            List<HybridSourceSplit<SplitT>> splits) {
+        List<SplitT> unwrappedSplits = new ArrayList<>(splits.size());
+        for (HybridSourceSplit<SplitT> split : splits) {
+            unwrappedSplits.add(split.getWrappedSplit());
+        }
+        return unwrappedSplits;
+    }
+
+    @Override
+    public CompletableFuture<Void> isAvailable() {
+        return currentReader.isAvailable();
+    }
+
+    @Override
+    public void addSplits(List<HybridSourceSplit<SplitT>> splits) {
+        LOG.info(
+                "### Adding splits subtask={} sourceIndex={} {} {}",
+                readerContext.getIndexOfSubtask(),
+                currentSourceIndex,
+                currentReader,
+                splits);
+        List<SplitT> realSplits = new ArrayList<>(splits.size());
+        for (HybridSourceSplit split : splits) {
+            Preconditions.checkState(
+                    split.sourceIndex() == currentSourceIndex,
+                    "Split %s while current source is %s",
+                    split,
+                    currentSourceIndex);
+            realSplits.add((SplitT) split.getWrappedSplit());
+        }
+        currentReader.addSplits(realSplits);
+    }
+
+    @Override
+    public void notifyNoMoreSplits() {
+        currentReader.notifyNoMoreSplits();
+        LOG.debug(
+                "No more splits for reader subtask={} sourceIndex={} {}",
+                readerContext.getIndexOfSubtask(),
+                currentSourceIndex,
+                currentReader);
+    }
+
+    @Override
+    public void handleSourceEvents(SourceEvent sourceEvent) {
+        if (sourceEvent instanceof SwitchSourceEvent) {
+            SwitchSourceEvent sse = (SwitchSourceEvent) sourceEvent;
+            LOG.debug(
+                    "Switch source event: subtask={} sourceIndex={}",
+                    readerContext.getIndexOfSubtask(),
+                    sse.sourceIndex());
+            setCurrentReader(sse.sourceIndex());

Review comment:
       IMO, the source reader switching should be executed before sending 
`SourceReaderFinishedEvent`, not receiving the `SwitchSourceEvent`.

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
##########
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+
+/**
+ * Wraps the actual split enumerators and facilitates source switching. 
Enumerators are created
+ * lazily when source switch occurs to support runtime position conversion.
+ */
+public class HybridSourceSplitEnumerator<SplitT extends SourceSplit, 
CheckpointT>
+        implements SplitEnumerator<HybridSourceSplit<SplitT>, 
HybridSourceEnumState> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceSplitEnumerator.class);
+
+    private final SplitEnumeratorContext<HybridSourceSplit> context;
+    private final HybridSource.SourceChain<?, SplitT, Object> sourceChain;
+    // TODO: SourceCoordinatorContext does not provide access to current 
assignments
+    private final Map<Integer, List<HybridSourceSplit<SplitT>>> assignments;
+    private final Map<Integer, TreeMap<Integer, 
List<HybridSourceSplit<SplitT>>>> pendingSplits;
+    private final HashSet<Integer> pendingReaders;
+    private int currentSourceIndex;
+    private SplitEnumerator<SplitT, Object> currentEnumerator;
+
+    public HybridSourceSplitEnumerator(
+            SplitEnumeratorContext<HybridSourceSplit> context,
+            HybridSource.SourceChain<?, SplitT, Object> sourceChain) {
+        this(context, sourceChain, 0);
+    }
+
+    public HybridSourceSplitEnumerator(
+            SplitEnumeratorContext<HybridSourceSplit> context,
+            HybridSource.SourceChain<?, SplitT, Object> sourceChain,
+            int initialSourceIndex) {
+        Preconditions.checkArgument(initialSourceIndex < 
sourceChain.sources.size());
+        this.context = context;
+        this.sourceChain = sourceChain;
+        this.currentSourceIndex = initialSourceIndex;
+        this.assignments = new HashMap<>();
+        this.pendingSplits = new HashMap<>();
+        this.pendingReaders = new HashSet<>();
+    }
+
+    @Override
+    public void start() {
+        switchEnumerator();
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId, String requesterHostname) {
+        LOG.debug(
+                "handleSplitRequest subtask={} sourceIndex={} 
pendingSplits={}",
+                subtaskId,
+                currentSourceIndex,
+                pendingSplits);
+        // TODO: test coverage for on demand split assignment
+        assignPendingSplits(subtaskId);
+        currentEnumerator.handleSplitRequest(subtaskId, requesterHostname);
+    }
+
+    @Override
+    public void addSplitsBack(List<HybridSourceSplit<SplitT>> splits, int 
subtaskId) {
+        LOG.debug("Adding splits back for subtask={} {}", subtaskId, splits);
+        // Splits returned can belong to multiple sources, after switching 
since last checkpoint
+        TreeMap<Integer, List<HybridSourceSplit<SplitT>>> splitsBySourceIndex 
= new TreeMap<>();
+
+        for (HybridSourceSplit<SplitT> split : splits) {
+            splitsBySourceIndex
+                    .computeIfAbsent(split.sourceIndex(), k -> new 
ArrayList<>())
+                    .add(split);
+        }
+
+        splitsBySourceIndex.forEach(
+                (k, splitsPerSource) -> {
+                    if (k == currentSourceIndex) {
+                        currentEnumerator.addSplitsBack(
+                                
HybridSourceReader.unwrappedSplits(splitsPerSource), subtaskId);
+                    } else {
+                        pendingSplits
+                                .computeIfAbsent(subtaskId, sourceIndex -> new 
TreeMap<>())
+                                .put(k, splitsPerSource);
+                        if 
(context.registeredReaders().containsKey(subtaskId)) {
+                            assignPendingSplits(subtaskId);
+                        }
+                    }
+                });
+    }
+
+    @Override
+    public void addReader(int subtaskId) {
+        LOG.debug("addReader subtaskId={}", subtaskId);
+        if (pendingSplits.isEmpty()) {
+            context.sendEventToSourceReader(subtaskId, new 
SwitchSourceEvent(currentSourceIndex));
+            LOG.debug("Adding reader {} to enumerator {}", subtaskId, 
currentSourceIndex);
+            currentEnumerator.addReader(subtaskId);
+        } else {
+            // Defer adding reader to the current enumerator until splits 
belonging to earlier
+            // enumerators that were added back have been processed
+            pendingReaders.add(subtaskId);
+            assignPendingSplits(subtaskId);
+        }
+    }
+
+    private void assignPendingSplits(int subtaskId) {
+        TreeMap<Integer, List<HybridSourceSplit<SplitT>>> splitsBySource =
+                pendingSplits.get(subtaskId);
+        if (splitsBySource != null) {
+            int sourceIndex = splitsBySource.firstKey();
+            List<HybridSourceSplit<SplitT>> splits =
+                    
Preconditions.checkNotNull(splitsBySource.get(sourceIndex));
+            LOG.debug("Assigning pending splits subtask={} {}", subtaskId, 
splits);
+            context.sendEventToSourceReader(subtaskId, new 
SwitchSourceEvent(sourceIndex));
+            context.assignSplits(
+                    new SplitsAssignment<HybridSourceSplit>(
+                            Collections.singletonMap(subtaskId, (List) 
splits)));
+            context.signalNoMoreSplits(subtaskId);
+            // Empty collection indicates that splits have been assigned
+            splits.clear();
+        }
+    }
+
+    @Override
+    public HybridSourceEnumState snapshotState(long checkpointId) throws 
Exception {
+        Object enumState = currentEnumerator.snapshotState(checkpointId);
+        return new HybridSourceEnumState(currentSourceIndex, enumState);
+    }
+
+    @Override
+    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+        if (sourceEvent instanceof SourceReaderFinishedEvent) {
+            SourceReaderFinishedEvent srfe = (SourceReaderFinishedEvent) 
sourceEvent;
+            if (srfe.sourceIndex() != currentSourceIndex) {
+                if (srfe.sourceIndex() < currentSourceIndex) {
+                    // Assign pending splits if any
+                    TreeMap<Integer, List<HybridSourceSplit<SplitT>>> 
splitsBySource =
+                            pendingSplits.get(subtaskId);
+                    if (splitsBySource != null) {
+                        List<HybridSourceSplit<SplitT>> splits =
+                                splitsBySource.get(srfe.sourceIndex());
+                        if (splits != null && splits.isEmpty()) {
+                            // Splits have been processed by the reader
+                            splitsBySource.remove(srfe.sourceIndex());
+                        }
+                        if (splitsBySource.isEmpty()) {
+                            pendingSplits.remove(subtaskId);
+                        } else {
+                            Integer nextSubtaskSourceIndex = 
splitsBySource.firstKey();
+                            LOG.debug(
+                                    "Restore subtask={}, sourceIndex={}",
+                                    subtaskId,
+                                    nextSubtaskSourceIndex);
+                            context.sendEventToSourceReader(
+                                    subtaskId, new 
SwitchSourceEvent(nextSubtaskSourceIndex));
+                            assignPendingSplits(subtaskId);
+                        }
+                    }
+                    // Once all pending splits have been processed, add the 
readers to the current
+                    // enumerator, which may in turn trigger new split 
assignments
+                    if (!pendingReaders.isEmpty() && pendingSplits.isEmpty()) {
+                        // Advance pending readers to current enumerator
+                        LOG.debug(
+                                "Adding pending readers {} to enumerator 
currentSourceIndex={}",
+                                pendingReaders,
+                                currentSourceIndex);
+                        for (int pendingReaderSubtaskId : pendingReaders) {
+                            context.sendEventToSourceReader(
+                                    pendingReaderSubtaskId,
+                                    new SwitchSourceEvent(currentSourceIndex));
+                        }
+                        for (int pendingReaderSubtaskId : pendingReaders) {
+                            
currentEnumerator.addReader(pendingReaderSubtaskId);
+                        }
+                        pendingReaders.clear();
+                    }
+                } else {
+                    // source already switched
+                    LOG.debug("Ignoring out of order source event {}", srfe);
+                }
+                return;
+            }
+            this.assignments.remove(subtaskId);
+            LOG.info(
+                    "Reader finished for subtask {} remaining assignments {}",
+                    subtaskId,
+                    assignments);
+            if (this.assignments.isEmpty()) {
+                LOG.debug("No assignments remaining, ready to switch source!");
+                if (currentSourceIndex + 1 < sourceChain.sources.size()) {
+                    switchEnumerator();
+                    // switch all readers prior to sending split assignments
+                    for (int i = 0; i < context.currentParallelism(); i++) {
+                        context.sendEventToSourceReader(
+                                i, new SwitchSourceEvent(currentSourceIndex));
+                    }
+                    // trigger split assignment,
+                    // (initially happens as part of subtask/reader 
registration)
+                    for (int i = 0; i < context.currentParallelism(); i++) {
+                        LOG.debug("adding reader subtask={} sourceIndex={}", 
i, currentSourceIndex);
+                        currentEnumerator.addReader(i);
+                    }
+                }
+            }
+        } else {
+            currentEnumerator.handleSourceEvent(subtaskId, sourceEvent);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        currentEnumerator.close();
+    }
+
+    private void switchEnumerator() {
+
+        Object enumeratorState = null;
+        if (currentEnumerator != null) {
+            try {
+                enumeratorState = currentEnumerator.snapshotState(-1);
+                currentEnumerator.close();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+            currentEnumerator = null;
+            currentSourceIndex++;
+        }
+
+        SplitEnumeratorContextProxy delegatingContext =
+                new SplitEnumeratorContextProxy(currentSourceIndex, context, 
assignments);
+        Source<?, ? extends SourceSplit, Object> source =
+                (Source) sourceChain.sources.get(currentSourceIndex).f0;
+        HybridSource.CheckpointConverter<Object, Object> converter =

Review comment:
       `HybridSource.CheckpointConverter<Object, Object>` could be replaced 
with `HybridSource.CheckpointConverter<?, ?> converter `.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to