SteNicholas commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r633194823
########## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java ########## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** Hybrid source reader that delegates to the actual current source reader. */ Review comment: The comment of `HybridSourceReader` is confused because there is no concept of current source reader. ########## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; + +/** Hybrid source that switches underlying sources based on configurable source chain. */ +public class HybridSource<T> implements Source<T, HybridSourceSplit, HybridSourceEnumState> { + + private final SourceChain<T, ? extends SourceSplit, ?> sourceChain; + + public HybridSource(SourceChain<T, ? extends SourceSplit, ?> sourceChain) { Review comment: IMO, `SourceChain` shouldn't be generated from user side, which could be constructed in `HybridSource`. The constructor of `HybridSource` could be initial source or source list. ########## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java ########## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** Hybrid source reader that delegates to the actual current source reader. */ +public class HybridSourceReader<T, SplitT extends SourceSplit> + implements SourceReader<T, HybridSourceSplit<SplitT>> { + private static final Logger LOG = LoggerFactory.getLogger(HybridSourceReader.class); + private static final int SOURCE_READER_FINISHED_EVENT_DELAY = 250; Review comment: Could the value of `SOURCE_READER_FINISHED_EVENT_DELAY ` be configured by users? And why does this need the `SOURCE_READER_FINISHED_EVENT_DELAY`? ########## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; + +/** Hybrid source that switches underlying sources based on configurable source chain. */ +public class HybridSource<T> implements Source<T, HybridSourceSplit, HybridSourceEnumState> { + + private final SourceChain<T, ? extends SourceSplit, ?> sourceChain; + + public HybridSource(SourceChain<T, ? extends SourceSplit, ?> sourceChain) { + this.sourceChain = sourceChain; + } + + @Override + public Boundedness getBoundedness() { + for (Tuple2<Source<T, ? extends SourceSplit, ?>, ?> t : sourceChain.sources) { Review comment: The `getBoundedness` method should return the boundedness of the latest source, which is reasonable from user's perspective. ########## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; + +/** Hybrid source that switches underlying sources based on configurable source chain. */ +public class HybridSource<T> implements Source<T, HybridSourceSplit, HybridSourceEnumState> { + + private final SourceChain<T, ? extends SourceSplit, ?> sourceChain; + + public HybridSource(SourceChain<T, ? extends SourceSplit, ?> sourceChain) { + this.sourceChain = sourceChain; + } + + @Override + public Boundedness getBoundedness() { + for (Tuple2<Source<T, ? extends SourceSplit, ?>, ?> t : sourceChain.sources) { + if (t.f0.getBoundedness() == Boundedness.CONTINUOUS_UNBOUNDED) { + return Boundedness.CONTINUOUS_UNBOUNDED; + } + } + return Boundedness.BOUNDED; + } + + @Override + public SourceReader<T, HybridSourceSplit> createReader(SourceReaderContext readerContext) + throws Exception { + List<SourceReader<?, ? extends SourceSplit>> readers = new ArrayList<>(); + for (Tuple2<Source<T, ? extends SourceSplit, ?>, ?> source : sourceChain.sources) { + readers.add(source.f0.createReader(readerContext)); + } + return new HybridSourceReader(readerContext, readers); + } + + @Override + public SplitEnumerator<HybridSourceSplit, HybridSourceEnumState> createEnumerator( + SplitEnumeratorContext<HybridSourceSplit> enumContext) { + return new HybridSourceSplitEnumerator(enumContext, sourceChain); + } + + @Override + public SplitEnumerator<HybridSourceSplit, HybridSourceEnumState> restoreEnumerator( + SplitEnumeratorContext<HybridSourceSplit> enumContext, HybridSourceEnumState checkpoint) + throws Exception { + return new HybridSourceSplitEnumerator( + enumContext, sourceChain, checkpoint.getCurrentSourceIndex()); + } + + @Override + public SimpleVersionedSerializer<HybridSourceSplit> getSplitSerializer() { + List<SimpleVersionedSerializer<SourceSplit>> serializers = new ArrayList<>(); + sourceChain.sources.forEach( + t -> serializers.add(castSerializer(t.f0.getSplitSerializer()))); + return new SplitSerializerWrapper<>(serializers); + } + + @Override + public SimpleVersionedSerializer<HybridSourceEnumState> getEnumeratorCheckpointSerializer() { + List<SimpleVersionedSerializer<Object>> serializers = new ArrayList<>(); + sourceChain.sources.forEach( + t -> serializers.add(castSerializer(t.f0.getEnumeratorCheckpointSerializer()))); + return new HybridSourceEnumStateSerializer(serializers); + } + + private static <T> SimpleVersionedSerializer<T> castSerializer( + SimpleVersionedSerializer<? extends T> s) { + @SuppressWarnings("rawtypes") + SimpleVersionedSerializer s1 = s; + return s1; + } + + /** Serializes splits by delegating to the source-indexed split serializer. */ + public static class SplitSerializerWrapper<SplitT extends SourceSplit> + implements SimpleVersionedSerializer<HybridSourceSplit> { + + final List<SimpleVersionedSerializer<SourceSplit>> serializers; + + public SplitSerializerWrapper(List<SimpleVersionedSerializer<SourceSplit>> serializers) { + this.serializers = serializers; + } + + @Override + public int getVersion() { + return 0; + } + + @Override + public byte[] serialize(HybridSourceSplit split) throws IOException { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(baos)) { + out.writeInt(split.sourceIndex()); + out.writeInt(serializerOf(split.sourceIndex()).getVersion()); + byte[] serializedSplit = + serializerOf(split.sourceIndex()).serialize(split.getWrappedSplit()); + out.writeInt(serializedSplit.length); + out.write(serializedSplit); + return baos.toByteArray(); + } + } + + @Override + public HybridSourceSplit deserialize(int version, byte[] serialized) throws IOException { + try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); + DataInputStream in = new DataInputStream(bais)) { + int sourceIndex = in.readInt(); + int nestedVersion = in.readInt(); + int length = in.readInt(); + byte[] splitBytes = new byte[length]; + in.readFully(splitBytes); + SourceSplit split = + serializerOf(sourceIndex).deserialize(nestedVersion, splitBytes); + return new HybridSourceSplit(sourceIndex, split); + } + } + + private SimpleVersionedSerializer<SourceSplit> serializerOf(int sourceIndex) { + Preconditions.checkArgument(sourceIndex < serializers.size()); + return serializers.get(sourceIndex); + } + } + + /** + * Converts checkpoint between sources to transfer end position to next source's start position. + * Only required for dynamic position transfer at time of switching, otherwise source can be + * preconfigured with a start position during job submission. + */ + public interface CheckpointConverter<InCheckpointT, OutCheckpointT> Review comment: `CheckpointConverter` interface could be replaced with `Function`, because this interface is essentially a `Function`. ########## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java ########## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** Hybrid source reader that delegates to the actual current source reader. */ +public class HybridSourceReader<T, SplitT extends SourceSplit> + implements SourceReader<T, HybridSourceSplit<SplitT>> { + private static final Logger LOG = LoggerFactory.getLogger(HybridSourceReader.class); + private static final int SOURCE_READER_FINISHED_EVENT_DELAY = 250; + private SourceReaderContext readerContext; + private List<SourceReader<?, SplitT>> realReaders; + private int currentSourceIndex = -1; + private long lastCheckpointId = -1; + private SourceReader<?, SplitT> currentReader; + private long lastReaderFinishedMs; + + public HybridSourceReader( + SourceReaderContext readerContext, List<SourceReader<?, SplitT>> readers) { + this.readerContext = readerContext; + this.realReaders = readers; + } + + @Override + public void start() { + setCurrentReader(0); + } + + @Override + public InputStatus pollNext(ReaderOutput output) throws Exception { + InputStatus status = currentReader.pollNext(output); + if (status == InputStatus.END_OF_INPUT) { + // trap END_OF_INPUT if this wasn't the final reader + LOG.debug( + "End of input subtask={} sourceIndex={} {}", + readerContext.getIndexOfSubtask(), + currentSourceIndex, + currentReader); + if (currentSourceIndex + 1 < realReaders.size()) { + // signal coordinator to advance readers + long currentMillis = System.currentTimeMillis(); + if (lastReaderFinishedMs + SOURCE_READER_FINISHED_EVENT_DELAY < currentMillis) { + lastReaderFinishedMs = currentMillis; + readerContext.sendSourceEventToCoordinator( + new SourceReaderFinishedEvent(currentSourceIndex, lastCheckpointId)); + } + // more data will be available from the next reader + return InputStatus.MORE_AVAILABLE; + } + } + return status; + } + + @Override + public List<HybridSourceSplit<SplitT>> snapshotState(long checkpointId) { + this.lastCheckpointId = checkpointId; + List<SplitT> state = currentReader.snapshotState(checkpointId); + return wrappedSplits(currentSourceIndex, state); + } + + public static <SplitT extends SourceSplit> List<HybridSourceSplit<SplitT>> wrappedSplits( + int readerIndex, List<SplitT> state) { + List<HybridSourceSplit<SplitT>> wrappedSplits = new ArrayList<>(state.size()); + for (SplitT split : state) { + wrappedSplits.add(new HybridSourceSplit<>(readerIndex, split)); + } + return wrappedSplits; + } + + public static <SplitT extends SourceSplit> List<SplitT> unwrappedSplits( + List<HybridSourceSplit<SplitT>> splits) { + List<SplitT> unwrappedSplits = new ArrayList<>(splits.size()); + for (HybridSourceSplit<SplitT> split : splits) { + unwrappedSplits.add(split.getWrappedSplit()); + } + return unwrappedSplits; + } + + @Override + public CompletableFuture<Void> isAvailable() { + return currentReader.isAvailable(); + } + + @Override + public void addSplits(List<HybridSourceSplit<SplitT>> splits) { + LOG.info( + "### Adding splits subtask={} sourceIndex={} {} {}", + readerContext.getIndexOfSubtask(), + currentSourceIndex, + currentReader, + splits); + List<SplitT> realSplits = new ArrayList<>(splits.size()); + for (HybridSourceSplit split : splits) { + Preconditions.checkState( + split.sourceIndex() == currentSourceIndex, + "Split %s while current source is %s", + split, + currentSourceIndex); + realSplits.add((SplitT) split.getWrappedSplit()); + } + currentReader.addSplits(realSplits); + } + + @Override + public void notifyNoMoreSplits() { + currentReader.notifyNoMoreSplits(); + LOG.debug( + "No more splits for reader subtask={} sourceIndex={} {}", + readerContext.getIndexOfSubtask(), + currentSourceIndex, + currentReader); + } + + @Override + public void handleSourceEvents(SourceEvent sourceEvent) { + if (sourceEvent instanceof SwitchSourceEvent) { + SwitchSourceEvent sse = (SwitchSourceEvent) sourceEvent; + LOG.debug( + "Switch source event: subtask={} sourceIndex={}", + readerContext.getIndexOfSubtask(), + sse.sourceIndex()); + setCurrentReader(sse.sourceIndex()); Review comment: IMO, the source reader switching should be executed before sending `SourceReaderFinishedEvent`, not receiving the `SwitchSourceEvent`. ########## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java ########## @@ -0,0 +1,390 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderInfo; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; + +/** + * Wraps the actual split enumerators and facilitates source switching. Enumerators are created + * lazily when source switch occurs to support runtime position conversion. + */ +public class HybridSourceSplitEnumerator<SplitT extends SourceSplit, CheckpointT> + implements SplitEnumerator<HybridSourceSplit<SplitT>, HybridSourceEnumState> { + private static final Logger LOG = LoggerFactory.getLogger(HybridSourceSplitEnumerator.class); + + private final SplitEnumeratorContext<HybridSourceSplit> context; + private final HybridSource.SourceChain<?, SplitT, Object> sourceChain; + // TODO: SourceCoordinatorContext does not provide access to current assignments + private final Map<Integer, List<HybridSourceSplit<SplitT>>> assignments; + private final Map<Integer, TreeMap<Integer, List<HybridSourceSplit<SplitT>>>> pendingSplits; + private final HashSet<Integer> pendingReaders; + private int currentSourceIndex; + private SplitEnumerator<SplitT, Object> currentEnumerator; + + public HybridSourceSplitEnumerator( + SplitEnumeratorContext<HybridSourceSplit> context, + HybridSource.SourceChain<?, SplitT, Object> sourceChain) { + this(context, sourceChain, 0); + } + + public HybridSourceSplitEnumerator( + SplitEnumeratorContext<HybridSourceSplit> context, + HybridSource.SourceChain<?, SplitT, Object> sourceChain, + int initialSourceIndex) { + Preconditions.checkArgument(initialSourceIndex < sourceChain.sources.size()); + this.context = context; + this.sourceChain = sourceChain; + this.currentSourceIndex = initialSourceIndex; + this.assignments = new HashMap<>(); + this.pendingSplits = new HashMap<>(); + this.pendingReaders = new HashSet<>(); + } + + @Override + public void start() { + switchEnumerator(); + } + + @Override + public void handleSplitRequest(int subtaskId, String requesterHostname) { + LOG.debug( + "handleSplitRequest subtask={} sourceIndex={} pendingSplits={}", + subtaskId, + currentSourceIndex, + pendingSplits); + // TODO: test coverage for on demand split assignment + assignPendingSplits(subtaskId); + currentEnumerator.handleSplitRequest(subtaskId, requesterHostname); + } + + @Override + public void addSplitsBack(List<HybridSourceSplit<SplitT>> splits, int subtaskId) { + LOG.debug("Adding splits back for subtask={} {}", subtaskId, splits); + // Splits returned can belong to multiple sources, after switching since last checkpoint + TreeMap<Integer, List<HybridSourceSplit<SplitT>>> splitsBySourceIndex = new TreeMap<>(); + + for (HybridSourceSplit<SplitT> split : splits) { + splitsBySourceIndex + .computeIfAbsent(split.sourceIndex(), k -> new ArrayList<>()) + .add(split); + } + + splitsBySourceIndex.forEach( + (k, splitsPerSource) -> { + if (k == currentSourceIndex) { + currentEnumerator.addSplitsBack( + HybridSourceReader.unwrappedSplits(splitsPerSource), subtaskId); + } else { + pendingSplits + .computeIfAbsent(subtaskId, sourceIndex -> new TreeMap<>()) + .put(k, splitsPerSource); + if (context.registeredReaders().containsKey(subtaskId)) { + assignPendingSplits(subtaskId); + } + } + }); + } + + @Override + public void addReader(int subtaskId) { + LOG.debug("addReader subtaskId={}", subtaskId); + if (pendingSplits.isEmpty()) { + context.sendEventToSourceReader(subtaskId, new SwitchSourceEvent(currentSourceIndex)); + LOG.debug("Adding reader {} to enumerator {}", subtaskId, currentSourceIndex); + currentEnumerator.addReader(subtaskId); + } else { + // Defer adding reader to the current enumerator until splits belonging to earlier + // enumerators that were added back have been processed + pendingReaders.add(subtaskId); + assignPendingSplits(subtaskId); + } + } + + private void assignPendingSplits(int subtaskId) { + TreeMap<Integer, List<HybridSourceSplit<SplitT>>> splitsBySource = + pendingSplits.get(subtaskId); + if (splitsBySource != null) { + int sourceIndex = splitsBySource.firstKey(); + List<HybridSourceSplit<SplitT>> splits = + Preconditions.checkNotNull(splitsBySource.get(sourceIndex)); + LOG.debug("Assigning pending splits subtask={} {}", subtaskId, splits); + context.sendEventToSourceReader(subtaskId, new SwitchSourceEvent(sourceIndex)); + context.assignSplits( + new SplitsAssignment<HybridSourceSplit>( + Collections.singletonMap(subtaskId, (List) splits))); + context.signalNoMoreSplits(subtaskId); + // Empty collection indicates that splits have been assigned + splits.clear(); + } + } + + @Override + public HybridSourceEnumState snapshotState(long checkpointId) throws Exception { + Object enumState = currentEnumerator.snapshotState(checkpointId); + return new HybridSourceEnumState(currentSourceIndex, enumState); + } + + @Override + public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { + if (sourceEvent instanceof SourceReaderFinishedEvent) { + SourceReaderFinishedEvent srfe = (SourceReaderFinishedEvent) sourceEvent; + if (srfe.sourceIndex() != currentSourceIndex) { + if (srfe.sourceIndex() < currentSourceIndex) { + // Assign pending splits if any + TreeMap<Integer, List<HybridSourceSplit<SplitT>>> splitsBySource = + pendingSplits.get(subtaskId); + if (splitsBySource != null) { + List<HybridSourceSplit<SplitT>> splits = + splitsBySource.get(srfe.sourceIndex()); + if (splits != null && splits.isEmpty()) { + // Splits have been processed by the reader + splitsBySource.remove(srfe.sourceIndex()); + } + if (splitsBySource.isEmpty()) { + pendingSplits.remove(subtaskId); + } else { + Integer nextSubtaskSourceIndex = splitsBySource.firstKey(); + LOG.debug( + "Restore subtask={}, sourceIndex={}", + subtaskId, + nextSubtaskSourceIndex); + context.sendEventToSourceReader( + subtaskId, new SwitchSourceEvent(nextSubtaskSourceIndex)); + assignPendingSplits(subtaskId); + } + } + // Once all pending splits have been processed, add the readers to the current + // enumerator, which may in turn trigger new split assignments + if (!pendingReaders.isEmpty() && pendingSplits.isEmpty()) { + // Advance pending readers to current enumerator + LOG.debug( + "Adding pending readers {} to enumerator currentSourceIndex={}", + pendingReaders, + currentSourceIndex); + for (int pendingReaderSubtaskId : pendingReaders) { + context.sendEventToSourceReader( + pendingReaderSubtaskId, + new SwitchSourceEvent(currentSourceIndex)); + } + for (int pendingReaderSubtaskId : pendingReaders) { + currentEnumerator.addReader(pendingReaderSubtaskId); + } + pendingReaders.clear(); + } + } else { + // source already switched + LOG.debug("Ignoring out of order source event {}", srfe); + } + return; + } + this.assignments.remove(subtaskId); + LOG.info( + "Reader finished for subtask {} remaining assignments {}", + subtaskId, + assignments); + if (this.assignments.isEmpty()) { + LOG.debug("No assignments remaining, ready to switch source!"); + if (currentSourceIndex + 1 < sourceChain.sources.size()) { + switchEnumerator(); + // switch all readers prior to sending split assignments + for (int i = 0; i < context.currentParallelism(); i++) { + context.sendEventToSourceReader( + i, new SwitchSourceEvent(currentSourceIndex)); + } + // trigger split assignment, + // (initially happens as part of subtask/reader registration) + for (int i = 0; i < context.currentParallelism(); i++) { + LOG.debug("adding reader subtask={} sourceIndex={}", i, currentSourceIndex); + currentEnumerator.addReader(i); + } + } + } + } else { + currentEnumerator.handleSourceEvent(subtaskId, sourceEvent); + } + } + + @Override + public void close() throws IOException { + currentEnumerator.close(); + } + + private void switchEnumerator() { + + Object enumeratorState = null; + if (currentEnumerator != null) { + try { + enumeratorState = currentEnumerator.snapshotState(-1); + currentEnumerator.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + currentEnumerator = null; + currentSourceIndex++; + } + + SplitEnumeratorContextProxy delegatingContext = + new SplitEnumeratorContextProxy(currentSourceIndex, context, assignments); + Source<?, ? extends SourceSplit, Object> source = + (Source) sourceChain.sources.get(currentSourceIndex).f0; + HybridSource.CheckpointConverter<Object, Object> converter = Review comment: `HybridSource.CheckpointConverter<Object, Object>` could be replaced with `HybridSource.CheckpointConverter<?, ?> converter `. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org