zentol commented on code in PR #20757: URL: https://github.com/apache/flink/pull/20757#discussion_r970445400
########## flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/GatedRateLimiter.java: ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source.lib.util; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +/** + * An implementation of {@link RateLimiter} that completes defined number of futures in-between the + * external notification events. The first cycle completes immediately, without waiting for the + * external notifications. + */ +public class GatedRateLimiter implements RateLimiter { + + private final int capacityPerCycle; + private int capacityLeft; + + /** + * Instantiates a new GatedRateLimiter. + * + * @param capacityPerCycle The number of completed futures per cycle. + */ + public GatedRateLimiter(int capacityPerCycle) { + this.capacityPerCycle = capacityPerCycle; + this.capacityLeft = capacityPerCycle + 1; + } + + CompletableFuture<Void> gatingFuture; + + @Override + public CompletionStage<Void> acquire() { + if (capacityLeft-- > 0) { + return CompletableFuture.completedFuture(null); + } else { + if (gatingFuture == null) { + gatingFuture = new CompletableFuture<>(); + } + return gatingFuture; + } + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + capacityLeft = capacityPerCycle - 1; Review Comment: strange that this decrements by 1 instead of incrementing like the constructor. ########## flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceITCase.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source.lib; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.datagen.DataGeneratorSource; +import org.apache.flink.api.connector.source.datagen.GeneratorFunction; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.util.TestLogger; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.LongStream; + +import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; + +/** An integration test for {@code DataGeneratorSource}. */ +public class DataGeneratorSourceITCase extends TestLogger { + + private static final int PARALLELISM = 4; + + @RegisterExtension + private static final MiniClusterExtension miniClusterExtension = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(PARALLELISM) + .build()); + + // ------------------------------------------------------------------------ + + @Test + @DisplayName("Combined results of parallel source readers produce the expected sequence.") + public void testParallelSourceExecution() throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + + final DataStream<Long> stream = getGeneratorSourceStream(index -> index, env, 1_000L); + + final List<Long> result = stream.executeAndCollect(10000); + + assertThat(result).containsExactlyInAnyOrderElementsOf(range(0, 999)); + } + + @Test + @DisplayName("Generator function can be instantiated as an anonymous class.") + public void testParallelSourceExecutionWithAnonymousClass() throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + + GeneratorFunction<Long, Long> generatorFunction = + new GeneratorFunction<Long, Long>() { + + @Override + public Long map(Long value) { + return value; + } + }; + + final DataStream<Long> stream = getGeneratorSourceStream(generatorFunction, env, 1_000L); + + final List<Long> result = stream.executeAndCollect(10000); + + assertThat(result).containsExactlyInAnyOrderElementsOf(range(0, 999)); + } + + @Test + @DisplayName("Exceptions from the generator function are not 'swallowed'.") + public void testFailingGeneratorFunction() throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + + GeneratorFunction<Long, Long> generatorFunction = + value -> { + throw new Exception("boom"); + }; + + final DataStream<Long> stream = getGeneratorSourceStream(generatorFunction, env, 1_000L); + + assertThatThrownBy( + () -> { + stream.executeAndCollect(10000); + }) + .satisfies(anyCauseMatches("exception on this input:")) + .satisfies(anyCauseMatches("boom")); + } + + @Test + @DisplayName("Exceptions from the generator function initialization are not 'swallowed'.") + public void testFailingGeneratorFunctionInitialization() throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + + GeneratorFunction<Long, Long> generatorFunctionFailingInit = + new GeneratorFunction<Long, Long>() { + @Override + public void open(SourceReaderContext readerContext) throws Exception { + throw new Exception("boom"); + } + + @Override + public Long map(Long value) { + return value; + } + }; + + final DataStream<Long> stream = + getGeneratorSourceStream(generatorFunctionFailingInit, env, 1_000L); + + // FIX_ME: failure details are swallowed by Flink + // Full details are still available at this line: + // https://github.com/apache/flink/blob/bccecc23067eb7f18e20bade814be73393401be5/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L758 + // But the execution falls through to the line below and discards the root cause of + // cancelling the source invokable without recording it: + // https://github.com/apache/flink/blob/bccecc23067eb7f18e20bade814be73393401be5/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L780 Review Comment: Please file a ticket; this shouldn't be the case. ########## flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSource.java: ########## @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source.lib; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.lib.NumberSequenceSource.NumberSequenceSplit; +import org.apache.flink.api.connector.source.lib.util.GeneratorSourceReaderFactory; +import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.util.Collection; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A data source that produces N data points in parallel. This source is useful for testing and for + * cases that just need a stream of N events of any kind. + * + * <p>The source splits the sequence into as many parallel sub-sequences as there are parallel + * source readers. + * + * <p>Users can supply a {@code GeneratorFunction} for mapping the (sub-)sequences of Long values + * into the generated events. For instance, the following code will produce the sequence of + * ["Number: 0", "Number: 2", ... , "Number: 999"] elements. + * + * <pre>{@code + * GeneratorFunction<Long, String> generatorFunction = index -> "Number: " + index; + * + * DataGeneratorSource<String> source = + * new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING); + * + * DataStreamSource<String> stream = + * env.fromSource(source, + * WatermarkStrategy.noWatermarks(), + * "Generator Source"); + * }</pre> + * + * <p>The order of elements depends on the parallelism. Each sub-sequence will be produced in order. + * Consequently, if the parallelism is limited to one, this will produce one sequence in order from + * "Number: 0" to "Number: 999". + * + * <p>Note that this approach also makes it possible to produce deterministic watermarks at the + * source based on the generated events and a custom {@code WatermarkStrategy}. + * + * <p>This source has built-in support for rate limiting. The following code will produce an + * effectively unbounded (Long.MAX_VALUE from practical perspective will never be reached) stream of + * Long values at the overall source rate (across all source subtasks) of 100 events per second. + * + * <pre>{@code + * GeneratorFunction<Long, Long> generatorFunction = index -> index; + * + * DataGeneratorSource<String> source = + * new DataGeneratorSource<>(generatorFunctionStateless, Long.MAX_VALUE, 100, Types.STRING); + * }</pre> + * + * <p>For more sophisticates use cases, users can take full control of the low-level data generation + * details by supplying a custom {@code SourceReaderFactory}. The instantiated {@code SourceReader}s + * are expected to produce data based on processing {@code NumberSequenceSplit}s. A customized + * generator could, for instance, synchronize the data release process with checkpointing by making + * use of ({@link SourceReader#notifyCheckpointComplete(long)}). Such functionality could be + * helpful, for instance, for testing sinks that are expected to create specific metadata upon the + * arrival of a checkpoint barrier and other similar use cases. + * + * <p>This source is always bounded. For very long sequences (for example when the {@code count} is + * set to Long.MAX_VALUE), users may want to consider executing the application in a streaming + * manner, because, despite the fact that the produced stream is bounded, the end bound is pretty + * far away. + */ +@Experimental +public class DataGeneratorSource<OUT> + implements Source<OUT, NumberSequenceSplit, Collection<NumberSequenceSplit>>, + ResultTypeQueryable<OUT> { + + private static final long serialVersionUID = 1L; + + private final SourceReaderFactory<OUT, NumberSequenceSplit> sourceReaderFactory; + private final TypeInformation<OUT> typeInfo; + + private final NumberSequenceSource numberSource; + + /** + * Instantiates a new {@code DataGeneratorSource}. + * + * @param generatorFunction The {@code GeneratorFunction} function. + * @param count The number of generated data points. + * @param typeInfo The type of the produced data points. + */ + public DataGeneratorSource( + GeneratorFunction<Long, OUT> generatorFunction, + long count, + TypeInformation<OUT> typeInfo) { + this(generatorFunction, count, -1, typeInfo); + } + + /** + * Instantiates a new {@code DataGeneratorSource}. + * + * @param generatorFunction The {@code GeneratorFunction} function. + * @param count The number of generated data points. + * @param sourceRatePerSecond The overall source rate per second (across all source subtasks). + * @param typeInfo The type of the produced data points. + */ + public DataGeneratorSource( + GeneratorFunction<Long, OUT> generatorFunction, + long count, + double sourceRatePerSecond, + TypeInformation<OUT> typeInfo) { + this( + new GeneratorSourceReaderFactory<>(generatorFunction, sourceRatePerSecond), + count, + typeInfo); + ClosureCleaner.clean( + generatorFunction, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + } + + /** + * Instantiates a new {@code DataGeneratorSource}. This constructor allows users can take + * control of the low-level data generation details by supplying a custom {@code + * SourceReaderFactory}. The instantiated {@code SourceReader}s are expected to produce data + * based on processing {@code NumberSequenceSplit}s. + * + * @param sourceReaderFactory The {@link SourceReader} factory. + * @param count The number of generated data points. + * @param typeInfo The type of the produced data points. + */ + public DataGeneratorSource( + SourceReaderFactory<OUT, NumberSequenceSplit> sourceReaderFactory, Review Comment: although this would again make things tricky with "global" vs "per-subtask" rates. ########## flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceITCase.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source.lib; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.datagen.DataGeneratorSource; +import org.apache.flink.api.connector.source.datagen.GeneratorFunction; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.util.TestLogger; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.LongStream; + +import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; + +/** An integration test for {@code DataGeneratorSource}. */ +public class DataGeneratorSourceITCase extends TestLogger { + + private static final int PARALLELISM = 4; + + @RegisterExtension + private static final MiniClusterExtension miniClusterExtension = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(PARALLELISM) + .build()); + + // ------------------------------------------------------------------------ + + @Test + @DisplayName("Combined results of parallel source readers produce the expected sequence.") + public void testParallelSourceExecution() throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + + final DataStream<Long> stream = getGeneratorSourceStream(index -> index, env, 1_000L); + + final List<Long> result = stream.executeAndCollect(10000); + + assertThat(result).containsExactlyInAnyOrderElementsOf(range(0, 999)); + } + + @Test + @DisplayName("Generator function can be instantiated as an anonymous class.") + public void testParallelSourceExecutionWithAnonymousClass() throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + + GeneratorFunction<Long, Long> generatorFunction = + new GeneratorFunction<Long, Long>() { + + @Override + public Long map(Long value) { + return value; + } + }; + + final DataStream<Long> stream = getGeneratorSourceStream(generatorFunction, env, 1_000L); + + final List<Long> result = stream.executeAndCollect(10000); + + assertThat(result).containsExactlyInAnyOrderElementsOf(range(0, 999)); + } + + @Test + @DisplayName("Exceptions from the generator function are not 'swallowed'.") + public void testFailingGeneratorFunction() throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + + GeneratorFunction<Long, Long> generatorFunction = + value -> { + throw new Exception("boom"); + }; + + final DataStream<Long> stream = getGeneratorSourceStream(generatorFunction, env, 1_000L); + + assertThatThrownBy( + () -> { + stream.executeAndCollect(10000); + }) + .satisfies(anyCauseMatches("exception on this input:")) + .satisfies(anyCauseMatches("boom")); + } + + @Test + @DisplayName("Exceptions from the generator function initialization are not 'swallowed'.") + public void testFailingGeneratorFunctionInitialization() throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + + GeneratorFunction<Long, Long> generatorFunctionFailingInit = + new GeneratorFunction<Long, Long>() { + @Override + public void open(SourceReaderContext readerContext) throws Exception { + throw new Exception("boom"); + } + + @Override + public Long map(Long value) { + return value; + } + }; + + final DataStream<Long> stream = + getGeneratorSourceStream(generatorFunctionFailingInit, env, 1_000L); + + // FIX_ME: failure details are swallowed by Flink + // Full details are still available at this line: + // https://github.com/apache/flink/blob/bccecc23067eb7f18e20bade814be73393401be5/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L758 + // But the execution falls through to the line below and discards the root cause of + // cancelling the source invokable without recording it: + // https://github.com/apache/flink/blob/bccecc23067eb7f18e20bade814be73393401be5/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L780 + assertThatThrownBy( + () -> { + stream.executeAndCollect(10000); + }) + .satisfies(anyCauseMatches("Failed to open")) + .satisfies(anyCauseMatches("boom")); + } + + @Test + @DisplayName( + "Result is correct when less elements are expected than the number of parallel source readers") + public void testLessSplitsThanParallelism() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + int n = PARALLELISM - 2; + DataStream<Long> stream = getGeneratorSourceStream(index -> index, env, n).map(l -> l); + + List<Long> result = stream.executeAndCollect(100); + + assertThat(result).containsExactlyInAnyOrderElementsOf(range(0, n - 1)); + } + + static class GeneratorFunctionFailingInit implements GeneratorFunction<Long, Long> { Review Comment: unused? ########## flink-core/src/test/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceTest.java: ########## @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source.lib; + +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.datagen.DataGeneratorSource; +import org.apache.flink.api.connector.source.datagen.GeneratorFunction; +import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.metrics.groups.SourceReaderMetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.mock.Whitebox; +import org.apache.flink.util.SimpleUserCodeClassLoader; +import org.apache.flink.util.UserCodeClassLoader; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Queue; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for the {@link DataGeneratorSource}. */ +public class DataGeneratorSourceTest { + + @Test + @DisplayName("Correctly restores SplitEnumerator from a snapshot.") + public void testRestoreEnumerator() throws Exception { + final GeneratorFunction<Long, Long> generatorFunctionStateless = index -> index; + final DataGeneratorSource<Long> dataGeneratorSource = + new DataGeneratorSource<>(generatorFunctionStateless, 100, Types.LONG); + + final int parallelism = 2; + final MockSplitEnumeratorContext<NumberSequenceSource.NumberSequenceSplit> context = + new MockSplitEnumeratorContext<>(parallelism); + + SplitEnumerator< + NumberSequenceSource.NumberSequenceSplit, + Collection<NumberSequenceSource.NumberSequenceSplit>> + enumerator = dataGeneratorSource.createEnumerator(context); + + // start() is not strictly necessary in the current implementation, but should logically be + // executed in this order (protect against any breaking changes in the start() method). + enumerator.start(); + + Collection<NumberSequenceSource.NumberSequenceSplit> enumeratorState = + enumerator.snapshotState(0); + + @SuppressWarnings("unchecked") + final Queue<NumberSequenceSource.NumberSequenceSplit> splits = + (Queue<NumberSequenceSource.NumberSequenceSplit>) + Whitebox.getInternalState(enumerator, "remainingSplits"); + + assertThat(splits).hasSize(parallelism); + + enumerator = dataGeneratorSource.restoreEnumerator(context, enumeratorState); + + @SuppressWarnings("unchecked") + final Queue<NumberSequenceSource.NumberSequenceSplit> restoredSplits = + (Queue<NumberSequenceSource.NumberSequenceSplit>) + Whitebox.getInternalState(enumerator, "remainingSplits"); + + assertThat(restoredSplits).hasSize(enumeratorState.size()); Review Comment: same as above. Instead of checking for some internal field, let's test the behavior (== splits are assigned to subtasks). ########## flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/GeneratingIteratorSourceReader.java: ########## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source.lib.util; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.datagen.GeneratorFunction; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.Iterator; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@code SourceReader} that takes the values of an iterator, supplied via an {@link + * IteratorSourceSplit}, and applies a {@link GeneratorFunction} to them to perform arbitrary + * transformations. + */ +@Experimental +public class GeneratingIteratorSourceReader< + E, O, IterT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IterT>> + extends IteratorSourceReaderBase<E, O, IterT, SplitT> { + + private final GeneratorFunction<E, O> generatorFunction; + + public GeneratingIteratorSourceReader( + SourceReaderContext context, GeneratorFunction<E, O> generatorFunction) { + super(context); + this.generatorFunction = checkNotNull(generatorFunction); + } + + // ------------------------------------------------------------------------ + + @Override + protected O convert(E value) { + try { + return generatorFunction.map(value); + } catch (Exception e) { + String message = + String.format( + "A user-provided generator function threw an exception on this input: %s", + value.toString()); + throw new FlinkRuntimeException(message, e); + } + } + + @Override + public void close() throws Exception { + generatorFunction.close(); + super.close(); + } + + @Override + public void start(SourceReaderContext context) { + try { + generatorFunction.open(context); + } catch (Exception e) { + throw new FlinkRuntimeException("Failed to open the GeneratorFunction", e); + } + } Review Comment: ```suggestion @Override public void start(SourceReaderContext context) { try { generatorFunction.open(context); } catch (Exception e) { throw new FlinkRuntimeException("Failed to open the GeneratorFunction", e); } } @Override public void close() throws Exception { generatorFunction.close(); super.close(); } ``` ########## flink-core/src/test/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceTest.java: ########## @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source.lib; + +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.datagen.DataGeneratorSource; +import org.apache.flink.api.connector.source.datagen.GeneratorFunction; +import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.metrics.groups.SourceReaderMetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.mock.Whitebox; +import org.apache.flink.util.SimpleUserCodeClassLoader; +import org.apache.flink.util.UserCodeClassLoader; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Queue; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for the {@link DataGeneratorSource}. */ +public class DataGeneratorSourceTest { + + @Test + @DisplayName("Correctly restores SplitEnumerator from a snapshot.") + public void testRestoreEnumerator() throws Exception { + final GeneratorFunction<Long, Long> generatorFunctionStateless = index -> index; + final DataGeneratorSource<Long> dataGeneratorSource = + new DataGeneratorSource<>(generatorFunctionStateless, 100, Types.LONG); + + final int parallelism = 2; + final MockSplitEnumeratorContext<NumberSequenceSource.NumberSequenceSplit> context = + new MockSplitEnumeratorContext<>(parallelism); + + SplitEnumerator< + NumberSequenceSource.NumberSequenceSplit, + Collection<NumberSequenceSource.NumberSequenceSplit>> + enumerator = dataGeneratorSource.createEnumerator(context); + + // start() is not strictly necessary in the current implementation, but should logically be + // executed in this order (protect against any breaking changes in the start() method). + enumerator.start(); Review Comment: very good :+1: ########## flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/GatedRateLimiter.java: ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source.lib.util; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +/** + * An implementation of {@link RateLimiter} that completes defined number of futures in-between the + * external notification events. The first cycle completes immediately, without waiting for the + * external notifications. + */ +public class GatedRateLimiter implements RateLimiter { + + private final int capacityPerCycle; + private int capacityLeft; + + /** + * Instantiates a new GatedRateLimiter. + * + * @param capacityPerCycle The number of completed futures per cycle. + */ + public GatedRateLimiter(int capacityPerCycle) { + this.capacityPerCycle = capacityPerCycle; + this.capacityLeft = capacityPerCycle + 1; Review Comment: I'd rather change the comparison in acquire to use `>=` than increment the count here. ########## flink-core/src/test/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceTest.java: ########## @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source.lib; + +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.datagen.DataGeneratorSource; +import org.apache.flink.api.connector.source.datagen.GeneratorFunction; +import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.metrics.groups.SourceReaderMetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.mock.Whitebox; +import org.apache.flink.util.SimpleUserCodeClassLoader; +import org.apache.flink.util.UserCodeClassLoader; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Queue; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for the {@link DataGeneratorSource}. */ +public class DataGeneratorSourceTest { + + @Test + @DisplayName("Correctly restores SplitEnumerator from a snapshot.") + public void testRestoreEnumerator() throws Exception { + final GeneratorFunction<Long, Long> generatorFunctionStateless = index -> index; + final DataGeneratorSource<Long> dataGeneratorSource = + new DataGeneratorSource<>(generatorFunctionStateless, 100, Types.LONG); + + final int parallelism = 2; + final MockSplitEnumeratorContext<NumberSequenceSource.NumberSequenceSplit> context = + new MockSplitEnumeratorContext<>(parallelism); + + SplitEnumerator< + NumberSequenceSource.NumberSequenceSplit, + Collection<NumberSequenceSource.NumberSequenceSplit>> + enumerator = dataGeneratorSource.createEnumerator(context); + + // start() is not strictly necessary in the current implementation, but should logically be + // executed in this order (protect against any breaking changes in the start() method). + enumerator.start(); + + Collection<NumberSequenceSource.NumberSequenceSplit> enumeratorState = + enumerator.snapshotState(0); + + @SuppressWarnings("unchecked") + final Queue<NumberSequenceSource.NumberSequenceSplit> splits = + (Queue<NumberSequenceSource.NumberSequenceSplit>) + Whitebox.getInternalState(enumerator, "remainingSplits"); + + assertThat(splits).hasSize(parallelism); Review Comment: I'd rather add a `@VisibleForTesting` method to the `IteratorSourceEnumerator` + cast. This here won't even show up when you look for usages of the enumerator class. Actually, what is the intention behind this assertion? Why can't we be use `enumeratorState`? Even without that, you could call `handleSplitRequest` and get the number of assigned splits via the `MockSplitEnumeratorContext`. ########## flink-core/src/test/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceTest.java: ########## @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source.lib; + +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.datagen.DataGeneratorSource; +import org.apache.flink.api.connector.source.datagen.GeneratorFunction; +import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.metrics.groups.SourceReaderMetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.mock.Whitebox; +import org.apache.flink.util.SimpleUserCodeClassLoader; +import org.apache.flink.util.UserCodeClassLoader; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Queue; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for the {@link DataGeneratorSource}. */ +public class DataGeneratorSourceTest { + + @Test + @DisplayName("Correctly restores SplitEnumerator from a snapshot.") + public void testRestoreEnumerator() throws Exception { + final GeneratorFunction<Long, Long> generatorFunctionStateless = index -> index; + final DataGeneratorSource<Long> dataGeneratorSource = + new DataGeneratorSource<>(generatorFunctionStateless, 100, Types.LONG); + + final int parallelism = 2; + final MockSplitEnumeratorContext<NumberSequenceSource.NumberSequenceSplit> context = + new MockSplitEnumeratorContext<>(parallelism); + + SplitEnumerator< + NumberSequenceSource.NumberSequenceSplit, + Collection<NumberSequenceSource.NumberSequenceSplit>> + enumerator = dataGeneratorSource.createEnumerator(context); + + // start() is not strictly necessary in the current implementation, but should logically be + // executed in this order (protect against any breaking changes in the start() method). + enumerator.start(); + + Collection<NumberSequenceSource.NumberSequenceSplit> enumeratorState = + enumerator.snapshotState(0); + + @SuppressWarnings("unchecked") + final Queue<NumberSequenceSource.NumberSequenceSplit> splits = + (Queue<NumberSequenceSource.NumberSequenceSplit>) + Whitebox.getInternalState(enumerator, "remainingSplits"); + + assertThat(splits).hasSize(parallelism); + + enumerator = dataGeneratorSource.restoreEnumerator(context, enumeratorState); + + @SuppressWarnings("unchecked") + final Queue<NumberSequenceSource.NumberSequenceSplit> restoredSplits = + (Queue<NumberSequenceSource.NumberSequenceSplit>) + Whitebox.getInternalState(enumerator, "remainingSplits"); + + assertThat(restoredSplits).hasSize(enumeratorState.size()); + } + + @Test + @DisplayName("Uses the underlying NumberSequenceSource correctly for checkpointing.") + public void testReaderCheckpoints() throws Exception { + final long from = 177; + final long mid = 333; + final long to = 563; + final long elementsPerCycle = (to - from) / 3; + + final TestingReaderOutput<Long> out = new TestingReaderOutput<>(); + + SourceReader<Long, NumberSequenceSource.NumberSequenceSplit> reader = createReader(); + reader.addSplits( + Arrays.asList( + new NumberSequenceSource.NumberSequenceSplit("split-1", from, mid), + new NumberSequenceSource.NumberSequenceSplit("split-2", mid + 1, to))); + + long remainingInCycle = elementsPerCycle; + while (reader.pollNext(out) != InputStatus.END_OF_INPUT) { + if (--remainingInCycle <= 0) { + remainingInCycle = elementsPerCycle; + // checkpoint + List<NumberSequenceSource.NumberSequenceSplit> splits = reader.snapshotState(1L); + + // re-create and restore + reader = createReader(); + if (splits.isEmpty()) { + reader.notifyNoMoreSplits(); + } else { + reader.addSplits(splits); + } + } + } + + final List<Long> result = out.getEmittedRecords(); + validateSequence(result, from, to); + } + + private static void validateSequence( + final List<Long> sequence, final long from, final long to) { + if (sequence.size() != to - from + 1) { + failSequence(sequence, from, to); + } + + long nextExpected = from; + for (Long next : sequence) { + if (next != nextExpected++) { + failSequence(sequence, from, to); + } + } + } + + private static void failSequence(final List<Long> sequence, final long from, final long to) { + Assertions.fail( + String.format( + "Expected: A sequence [%d, %d], but found: sequence (size %d) : %s", + from, to, sequence.size(), sequence)); + } + + private static SourceReader<Long, NumberSequenceSource.NumberSequenceSplit> createReader() + throws Exception { + // the arguments passed in the source constructor matter only to the enumerator + GeneratorFunction<Long, Long> generatorFunctionStateless = index -> index; + DataGeneratorSource<Long> dataGeneratorSource = + new DataGeneratorSource<>(generatorFunctionStateless, Long.MAX_VALUE, Types.LONG); + + return dataGeneratorSource.createReader(new DummyReaderContext()); + } + + // ------------------------------------------------------------------------ + // test utils / mocks + // + // the "flink-connector-test-utils module has proper mocks and utils, + // but cannot be used here, because it would create a cyclic dependency. + // ------------------------------------------------------------------------ + + private static final class DummyReaderContext implements SourceReaderContext { + + @Override + public SourceReaderMetricGroup metricGroup() { + return UnregisteredMetricsGroup.createSourceReaderMetricGroup(); + } + + @Override + public Configuration getConfiguration() { + return new Configuration(); + } + + @Override + public String getLocalHostName() { + return "localhost"; + } + + @Override + public int getIndexOfSubtask() { + return 0; + } + + @Override + public void sendSplitRequest() {} + + @Override + public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {} + + @Override + public UserCodeClassLoader getUserCodeClassLoader() { + return SimpleUserCodeClassLoader.create(getClass().getClassLoader()); + } + + @Override + public int currentParallelism() { + return 1; + } + } + + private static final class TestingReaderOutput<E> implements ReaderOutput<E> { Review Comment: The fact this we cant use the copy in the connector test utils poses an interesting questions: should this even be in flink-core? Why shouldn't it have it's own connector module? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org