zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r970445400


##########
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/GatedRateLimiter.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+/**
+ * An implementation of {@link RateLimiter} that completes defined number of 
futures in-between the
+ * external notification events. The first cycle completes immediately, 
without waiting for the
+ * external notifications.
+ */
+public class GatedRateLimiter implements RateLimiter {
+
+    private final int capacityPerCycle;
+    private int capacityLeft;
+
+    /**
+     * Instantiates a new GatedRateLimiter.
+     *
+     * @param capacityPerCycle The number of completed futures per cycle.
+     */
+    public GatedRateLimiter(int capacityPerCycle) {
+        this.capacityPerCycle = capacityPerCycle;
+        this.capacityLeft = capacityPerCycle + 1;
+    }
+
+    CompletableFuture<Void> gatingFuture;
+
+    @Override
+    public CompletionStage<Void> acquire() {
+        if (capacityLeft-- > 0) {
+            return CompletableFuture.completedFuture(null);
+        } else {
+            if (gatingFuture == null) {
+                gatingFuture = new CompletableFuture<>();
+            }
+            return gatingFuture;
+        }
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) {
+        capacityLeft = capacityPerCycle - 1;

Review Comment:
   strange that this decrements by 1 instead of incrementing like the 
constructor.



##########
flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceITCase.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.datagen.DataGeneratorSource;
+import org.apache.flink.api.connector.source.datagen.GeneratorFunction;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+
+/** An integration test for {@code DataGeneratorSource}. */
+public class DataGeneratorSourceITCase extends TestLogger {
+
+    private static final int PARALLELISM = 4;
+
+    @RegisterExtension
+    private static final MiniClusterExtension miniClusterExtension =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(PARALLELISM)
+                            .build());
+
+    // ------------------------------------------------------------------------
+
+    @Test
+    @DisplayName("Combined results of parallel source readers produce the 
expected sequence.")
+    public void testParallelSourceExecution() throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(PARALLELISM);
+
+        final DataStream<Long> stream = getGeneratorSourceStream(index -> 
index, env, 1_000L);
+
+        final List<Long> result = stream.executeAndCollect(10000);
+
+        assertThat(result).containsExactlyInAnyOrderElementsOf(range(0, 999));
+    }
+
+    @Test
+    @DisplayName("Generator function can be instantiated as an anonymous 
class.")
+    public void testParallelSourceExecutionWithAnonymousClass() throws 
Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(PARALLELISM);
+
+        GeneratorFunction<Long, Long> generatorFunction =
+                new GeneratorFunction<Long, Long>() {
+
+                    @Override
+                    public Long map(Long value) {
+                        return value;
+                    }
+                };
+
+        final DataStream<Long> stream = 
getGeneratorSourceStream(generatorFunction, env, 1_000L);
+
+        final List<Long> result = stream.executeAndCollect(10000);
+
+        assertThat(result).containsExactlyInAnyOrderElementsOf(range(0, 999));
+    }
+
+    @Test
+    @DisplayName("Exceptions from the generator function are not 'swallowed'.")
+    public void testFailingGeneratorFunction() throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(PARALLELISM);
+
+        GeneratorFunction<Long, Long> generatorFunction =
+                value -> {
+                    throw new Exception("boom");
+                };
+
+        final DataStream<Long> stream = 
getGeneratorSourceStream(generatorFunction, env, 1_000L);
+
+        assertThatThrownBy(
+                        () -> {
+                            stream.executeAndCollect(10000);
+                        })
+                .satisfies(anyCauseMatches("exception on this input:"))
+                .satisfies(anyCauseMatches("boom"));
+    }
+
+    @Test
+    @DisplayName("Exceptions from the generator function initialization are 
not 'swallowed'.")
+    public void testFailingGeneratorFunctionInitialization() throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(PARALLELISM);
+
+        GeneratorFunction<Long, Long> generatorFunctionFailingInit =
+                new GeneratorFunction<Long, Long>() {
+                    @Override
+                    public void open(SourceReaderContext readerContext) throws 
Exception {
+                        throw new Exception("boom");
+                    }
+
+                    @Override
+                    public Long map(Long value) {
+                        return value;
+                    }
+                };
+
+        final DataStream<Long> stream =
+                getGeneratorSourceStream(generatorFunctionFailingInit, env, 
1_000L);
+
+        // FIX_ME: failure details are swallowed by Flink
+        // Full details are still available at this line:
+        // 
https://github.com/apache/flink/blob/bccecc23067eb7f18e20bade814be73393401be5/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L758
+        // But the execution falls through to the line below and discards the 
root cause of
+        // cancelling the source invokable without recording it:
+        // 
https://github.com/apache/flink/blob/bccecc23067eb7f18e20bade814be73393401be5/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L780

Review Comment:
   Please file a ticket; this shouldn't be the case.



##########
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSource.java:
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import 
org.apache.flink.api.connector.source.lib.NumberSequenceSource.NumberSequenceSplit;
+import 
org.apache.flink.api.connector.source.lib.util.GeneratorSourceReaderFactory;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.util.Collection;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A data source that produces N data points in parallel. This source is 
useful for testing and for
+ * cases that just need a stream of N events of any kind.
+ *
+ * <p>The source splits the sequence into as many parallel sub-sequences as 
there are parallel
+ * source readers.
+ *
+ * <p>Users can supply a {@code GeneratorFunction} for mapping the 
(sub-)sequences of Long values
+ * into the generated events. For instance, the following code will produce 
the sequence of
+ * ["Number: 0", "Number: 2", ... , "Number: 999"] elements.
+ *
+ * <pre>{@code
+ * GeneratorFunction<Long, String> generatorFunction = index -> "Number: " + 
index;
+ *
+ * DataGeneratorSource<String> source =
+ *         new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+ *
+ * DataStreamSource<String> stream =
+ *         env.fromSource(source,
+ *         WatermarkStrategy.noWatermarks(),
+ *         "Generator Source");
+ * }</pre>
+ *
+ * <p>The order of elements depends on the parallelism. Each sub-sequence will 
be produced in order.
+ * Consequently, if the parallelism is limited to one, this will produce one 
sequence in order from
+ * "Number: 0" to "Number: 999".
+ *
+ * <p>Note that this approach also makes it possible to produce deterministic 
watermarks at the
+ * source based on the generated events and a custom {@code WatermarkStrategy}.
+ *
+ * <p>This source has built-in support for rate limiting. The following code 
will produce an
+ * effectively unbounded (Long.MAX_VALUE from practical perspective will never 
be reached) stream of
+ * Long values at the overall source rate (across all source subtasks) of 100 
events per second.
+ *
+ * <pre>{@code
+ * GeneratorFunction<Long, Long> generatorFunction = index -> index;
+ *
+ * DataGeneratorSource<String> source =
+ *         new DataGeneratorSource<>(generatorFunctionStateless, 
Long.MAX_VALUE, 100, Types.STRING);
+ * }</pre>
+ *
+ * <p>For more sophisticates use cases, users can take full control of the 
low-level data generation
+ * details by supplying a custom {@code SourceReaderFactory}. The instantiated 
{@code SourceReader}s
+ * are expected to produce data based on processing {@code 
NumberSequenceSplit}s. A customized
+ * generator could, for instance, synchronize the data release process with 
checkpointing by making
+ * use of ({@link SourceReader#notifyCheckpointComplete(long)}). Such 
functionality could be
+ * helpful, for instance, for testing sinks that are expected to create 
specific metadata upon the
+ * arrival of a checkpoint barrier and other similar use cases.
+ *
+ * <p>This source is always bounded. For very long sequences (for example when 
the {@code count} is
+ * set to Long.MAX_VALUE), users may want to consider executing the 
application in a streaming
+ * manner, because, despite the fact that the produced stream is bounded, the 
end bound is pretty
+ * far away.
+ */
+@Experimental
+public class DataGeneratorSource<OUT>
+        implements Source<OUT, NumberSequenceSplit, 
Collection<NumberSequenceSplit>>,
+                ResultTypeQueryable<OUT> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final SourceReaderFactory<OUT, NumberSequenceSplit> 
sourceReaderFactory;
+    private final TypeInformation<OUT> typeInfo;
+
+    private final NumberSequenceSource numberSource;
+
+    /**
+     * Instantiates a new {@code DataGeneratorSource}.
+     *
+     * @param generatorFunction The {@code GeneratorFunction} function.
+     * @param count The number of generated data points.
+     * @param typeInfo The type of the produced data points.
+     */
+    public DataGeneratorSource(
+            GeneratorFunction<Long, OUT> generatorFunction,
+            long count,
+            TypeInformation<OUT> typeInfo) {
+        this(generatorFunction, count, -1, typeInfo);
+    }
+
+    /**
+     * Instantiates a new {@code DataGeneratorSource}.
+     *
+     * @param generatorFunction The {@code GeneratorFunction} function.
+     * @param count The number of generated data points.
+     * @param sourceRatePerSecond The overall source rate per second (across 
all source subtasks).
+     * @param typeInfo The type of the produced data points.
+     */
+    public DataGeneratorSource(
+            GeneratorFunction<Long, OUT> generatorFunction,
+            long count,
+            double sourceRatePerSecond,
+            TypeInformation<OUT> typeInfo) {
+        this(
+                new GeneratorSourceReaderFactory<>(generatorFunction, 
sourceRatePerSecond),
+                count,
+                typeInfo);
+        ClosureCleaner.clean(
+                generatorFunction, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+    }
+
+    /**
+     * Instantiates a new {@code DataGeneratorSource}. This constructor allows 
users can take
+     * control of the low-level data generation details by supplying a custom 
{@code
+     * SourceReaderFactory}. The instantiated {@code SourceReader}s are 
expected to produce data
+     * based on processing {@code NumberSequenceSplit}s.
+     *
+     * @param sourceReaderFactory The {@link SourceReader} factory.
+     * @param count The number of generated data points.
+     * @param typeInfo The type of the produced data points.
+     */
+    public DataGeneratorSource(
+            SourceReaderFactory<OUT, NumberSequenceSplit> sourceReaderFactory,

Review Comment:
   although this would again make things tricky with "global" vs "per-subtask" 
rates.



##########
flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceITCase.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.datagen.DataGeneratorSource;
+import org.apache.flink.api.connector.source.datagen.GeneratorFunction;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+
+/** An integration test for {@code DataGeneratorSource}. */
+public class DataGeneratorSourceITCase extends TestLogger {
+
+    private static final int PARALLELISM = 4;
+
+    @RegisterExtension
+    private static final MiniClusterExtension miniClusterExtension =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(PARALLELISM)
+                            .build());
+
+    // ------------------------------------------------------------------------
+
+    @Test
+    @DisplayName("Combined results of parallel source readers produce the 
expected sequence.")
+    public void testParallelSourceExecution() throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(PARALLELISM);
+
+        final DataStream<Long> stream = getGeneratorSourceStream(index -> 
index, env, 1_000L);
+
+        final List<Long> result = stream.executeAndCollect(10000);
+
+        assertThat(result).containsExactlyInAnyOrderElementsOf(range(0, 999));
+    }
+
+    @Test
+    @DisplayName("Generator function can be instantiated as an anonymous 
class.")
+    public void testParallelSourceExecutionWithAnonymousClass() throws 
Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(PARALLELISM);
+
+        GeneratorFunction<Long, Long> generatorFunction =
+                new GeneratorFunction<Long, Long>() {
+
+                    @Override
+                    public Long map(Long value) {
+                        return value;
+                    }
+                };
+
+        final DataStream<Long> stream = 
getGeneratorSourceStream(generatorFunction, env, 1_000L);
+
+        final List<Long> result = stream.executeAndCollect(10000);
+
+        assertThat(result).containsExactlyInAnyOrderElementsOf(range(0, 999));
+    }
+
+    @Test
+    @DisplayName("Exceptions from the generator function are not 'swallowed'.")
+    public void testFailingGeneratorFunction() throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(PARALLELISM);
+
+        GeneratorFunction<Long, Long> generatorFunction =
+                value -> {
+                    throw new Exception("boom");
+                };
+
+        final DataStream<Long> stream = 
getGeneratorSourceStream(generatorFunction, env, 1_000L);
+
+        assertThatThrownBy(
+                        () -> {
+                            stream.executeAndCollect(10000);
+                        })
+                .satisfies(anyCauseMatches("exception on this input:"))
+                .satisfies(anyCauseMatches("boom"));
+    }
+
+    @Test
+    @DisplayName("Exceptions from the generator function initialization are 
not 'swallowed'.")
+    public void testFailingGeneratorFunctionInitialization() throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(PARALLELISM);
+
+        GeneratorFunction<Long, Long> generatorFunctionFailingInit =
+                new GeneratorFunction<Long, Long>() {
+                    @Override
+                    public void open(SourceReaderContext readerContext) throws 
Exception {
+                        throw new Exception("boom");
+                    }
+
+                    @Override
+                    public Long map(Long value) {
+                        return value;
+                    }
+                };
+
+        final DataStream<Long> stream =
+                getGeneratorSourceStream(generatorFunctionFailingInit, env, 
1_000L);
+
+        // FIX_ME: failure details are swallowed by Flink
+        // Full details are still available at this line:
+        // 
https://github.com/apache/flink/blob/bccecc23067eb7f18e20bade814be73393401be5/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L758
+        // But the execution falls through to the line below and discards the 
root cause of
+        // cancelling the source invokable without recording it:
+        // 
https://github.com/apache/flink/blob/bccecc23067eb7f18e20bade814be73393401be5/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L780
+        assertThatThrownBy(
+                        () -> {
+                            stream.executeAndCollect(10000);
+                        })
+                .satisfies(anyCauseMatches("Failed to open"))
+                .satisfies(anyCauseMatches("boom"));
+    }
+
+    @Test
+    @DisplayName(
+            "Result is correct when less elements are expected than the number 
of parallel source readers")
+    public void testLessSplitsThanParallelism() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(PARALLELISM);
+        int n = PARALLELISM - 2;
+        DataStream<Long> stream = getGeneratorSourceStream(index -> index, 
env, n).map(l -> l);
+
+        List<Long> result = stream.executeAndCollect(100);
+
+        assertThat(result).containsExactlyInAnyOrderElementsOf(range(0, n - 
1));
+    }
+
+    static class GeneratorFunctionFailingInit implements 
GeneratorFunction<Long, Long> {

Review Comment:
   unused?



##########
flink-core/src/test/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceTest.java:
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.datagen.DataGeneratorSource;
+import org.apache.flink.api.connector.source.datagen.GeneratorFunction;
+import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.mock.Whitebox;
+import org.apache.flink.util.SimpleUserCodeClassLoader;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for the {@link DataGeneratorSource}. */
+public class DataGeneratorSourceTest {
+
+    @Test
+    @DisplayName("Correctly restores SplitEnumerator from a snapshot.")
+    public void testRestoreEnumerator() throws Exception {
+        final GeneratorFunction<Long, Long> generatorFunctionStateless = index 
-> index;
+        final DataGeneratorSource<Long> dataGeneratorSource =
+                new DataGeneratorSource<>(generatorFunctionStateless, 100, 
Types.LONG);
+
+        final int parallelism = 2;
+        final 
MockSplitEnumeratorContext<NumberSequenceSource.NumberSequenceSplit> context =
+                new MockSplitEnumeratorContext<>(parallelism);
+
+        SplitEnumerator<
+                        NumberSequenceSource.NumberSequenceSplit,
+                        Collection<NumberSequenceSource.NumberSequenceSplit>>
+                enumerator = dataGeneratorSource.createEnumerator(context);
+
+        // start() is not strictly necessary in the current implementation, 
but should logically be
+        // executed in this order (protect against any breaking changes in the 
start() method).
+        enumerator.start();
+
+        Collection<NumberSequenceSource.NumberSequenceSplit> enumeratorState =
+                enumerator.snapshotState(0);
+
+        @SuppressWarnings("unchecked")
+        final Queue<NumberSequenceSource.NumberSequenceSplit> splits =
+                (Queue<NumberSequenceSource.NumberSequenceSplit>)
+                        Whitebox.getInternalState(enumerator, 
"remainingSplits");
+
+        assertThat(splits).hasSize(parallelism);
+
+        enumerator = dataGeneratorSource.restoreEnumerator(context, 
enumeratorState);
+
+        @SuppressWarnings("unchecked")
+        final Queue<NumberSequenceSource.NumberSequenceSplit> restoredSplits =
+                (Queue<NumberSequenceSource.NumberSequenceSplit>)
+                        Whitebox.getInternalState(enumerator, 
"remainingSplits");
+
+        assertThat(restoredSplits).hasSize(enumeratorState.size());

Review Comment:
   same as above. Instead of checking for some internal field, let's test the 
behavior (== splits are assigned to subtasks).



##########
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/GeneratingIteratorSourceReader.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.datagen.GeneratorFunction;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Iterator;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@code SourceReader} that takes the values of an iterator, supplied via 
an {@link
+ * IteratorSourceSplit}, and applies a {@link GeneratorFunction} to them to 
perform arbitrary
+ * transformations.
+ */
+@Experimental
+public class GeneratingIteratorSourceReader<
+                E, O, IterT extends Iterator<E>, SplitT extends 
IteratorSourceSplit<E, IterT>>
+        extends IteratorSourceReaderBase<E, O, IterT, SplitT> {
+
+    private final GeneratorFunction<E, O> generatorFunction;
+
+    public GeneratingIteratorSourceReader(
+            SourceReaderContext context, GeneratorFunction<E, O> 
generatorFunction) {
+        super(context);
+        this.generatorFunction = checkNotNull(generatorFunction);
+    }
+
+    // ------------------------------------------------------------------------
+
+    @Override
+    protected O convert(E value) {
+        try {
+            return generatorFunction.map(value);
+        } catch (Exception e) {
+            String message =
+                    String.format(
+                            "A user-provided generator function threw an 
exception on this input: %s",
+                            value.toString());
+            throw new FlinkRuntimeException(message, e);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        generatorFunction.close();
+        super.close();
+    }
+
+    @Override
+    public void start(SourceReaderContext context) {
+        try {
+            generatorFunction.open(context);
+        } catch (Exception e) {
+            throw new FlinkRuntimeException("Failed to open the 
GeneratorFunction", e);
+        }
+    }

Review Comment:
   ```suggestion
       @Override
       public void start(SourceReaderContext context) {
           try {
               generatorFunction.open(context);
           } catch (Exception e) {
               throw new FlinkRuntimeException("Failed to open the 
GeneratorFunction", e);
           }
       }
   
       @Override
       public void close() throws Exception {
           generatorFunction.close();
           super.close();
       }
   ```



##########
flink-core/src/test/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceTest.java:
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.datagen.DataGeneratorSource;
+import org.apache.flink.api.connector.source.datagen.GeneratorFunction;
+import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.mock.Whitebox;
+import org.apache.flink.util.SimpleUserCodeClassLoader;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for the {@link DataGeneratorSource}. */
+public class DataGeneratorSourceTest {
+
+    @Test
+    @DisplayName("Correctly restores SplitEnumerator from a snapshot.")
+    public void testRestoreEnumerator() throws Exception {
+        final GeneratorFunction<Long, Long> generatorFunctionStateless = index 
-> index;
+        final DataGeneratorSource<Long> dataGeneratorSource =
+                new DataGeneratorSource<>(generatorFunctionStateless, 100, 
Types.LONG);
+
+        final int parallelism = 2;
+        final 
MockSplitEnumeratorContext<NumberSequenceSource.NumberSequenceSplit> context =
+                new MockSplitEnumeratorContext<>(parallelism);
+
+        SplitEnumerator<
+                        NumberSequenceSource.NumberSequenceSplit,
+                        Collection<NumberSequenceSource.NumberSequenceSplit>>
+                enumerator = dataGeneratorSource.createEnumerator(context);
+
+        // start() is not strictly necessary in the current implementation, 
but should logically be
+        // executed in this order (protect against any breaking changes in the 
start() method).
+        enumerator.start();

Review Comment:
   very good  :+1: 



##########
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/GatedRateLimiter.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+/**
+ * An implementation of {@link RateLimiter} that completes defined number of 
futures in-between the
+ * external notification events. The first cycle completes immediately, 
without waiting for the
+ * external notifications.
+ */
+public class GatedRateLimiter implements RateLimiter {
+
+    private final int capacityPerCycle;
+    private int capacityLeft;
+
+    /**
+     * Instantiates a new GatedRateLimiter.
+     *
+     * @param capacityPerCycle The number of completed futures per cycle.
+     */
+    public GatedRateLimiter(int capacityPerCycle) {
+        this.capacityPerCycle = capacityPerCycle;
+        this.capacityLeft = capacityPerCycle + 1;

Review Comment:
   I'd rather change the comparison in acquire to use `>=` than increment the 
count here.



##########
flink-core/src/test/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceTest.java:
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.datagen.DataGeneratorSource;
+import org.apache.flink.api.connector.source.datagen.GeneratorFunction;
+import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.mock.Whitebox;
+import org.apache.flink.util.SimpleUserCodeClassLoader;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for the {@link DataGeneratorSource}. */
+public class DataGeneratorSourceTest {
+
+    @Test
+    @DisplayName("Correctly restores SplitEnumerator from a snapshot.")
+    public void testRestoreEnumerator() throws Exception {
+        final GeneratorFunction<Long, Long> generatorFunctionStateless = index 
-> index;
+        final DataGeneratorSource<Long> dataGeneratorSource =
+                new DataGeneratorSource<>(generatorFunctionStateless, 100, 
Types.LONG);
+
+        final int parallelism = 2;
+        final 
MockSplitEnumeratorContext<NumberSequenceSource.NumberSequenceSplit> context =
+                new MockSplitEnumeratorContext<>(parallelism);
+
+        SplitEnumerator<
+                        NumberSequenceSource.NumberSequenceSplit,
+                        Collection<NumberSequenceSource.NumberSequenceSplit>>
+                enumerator = dataGeneratorSource.createEnumerator(context);
+
+        // start() is not strictly necessary in the current implementation, 
but should logically be
+        // executed in this order (protect against any breaking changes in the 
start() method).
+        enumerator.start();
+
+        Collection<NumberSequenceSource.NumberSequenceSplit> enumeratorState =
+                enumerator.snapshotState(0);
+
+        @SuppressWarnings("unchecked")
+        final Queue<NumberSequenceSource.NumberSequenceSplit> splits =
+                (Queue<NumberSequenceSource.NumberSequenceSplit>)
+                        Whitebox.getInternalState(enumerator, 
"remainingSplits");
+
+        assertThat(splits).hasSize(parallelism);

Review Comment:
   I'd rather add a `@VisibleForTesting` method to the 
`IteratorSourceEnumerator` + cast. This here won't even show up when you look 
for usages of the enumerator class.
   
   Actually, what is the intention behind this assertion? Why can't we be use 
`enumeratorState`?
   Even without that, you could call `handleSplitRequest` and get the number of 
assigned splits via the `MockSplitEnumeratorContext`.



##########
flink-core/src/test/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceTest.java:
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.datagen.DataGeneratorSource;
+import org.apache.flink.api.connector.source.datagen.GeneratorFunction;
+import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.mock.Whitebox;
+import org.apache.flink.util.SimpleUserCodeClassLoader;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for the {@link DataGeneratorSource}. */
+public class DataGeneratorSourceTest {
+
+    @Test
+    @DisplayName("Correctly restores SplitEnumerator from a snapshot.")
+    public void testRestoreEnumerator() throws Exception {
+        final GeneratorFunction<Long, Long> generatorFunctionStateless = index 
-> index;
+        final DataGeneratorSource<Long> dataGeneratorSource =
+                new DataGeneratorSource<>(generatorFunctionStateless, 100, 
Types.LONG);
+
+        final int parallelism = 2;
+        final 
MockSplitEnumeratorContext<NumberSequenceSource.NumberSequenceSplit> context =
+                new MockSplitEnumeratorContext<>(parallelism);
+
+        SplitEnumerator<
+                        NumberSequenceSource.NumberSequenceSplit,
+                        Collection<NumberSequenceSource.NumberSequenceSplit>>
+                enumerator = dataGeneratorSource.createEnumerator(context);
+
+        // start() is not strictly necessary in the current implementation, 
but should logically be
+        // executed in this order (protect against any breaking changes in the 
start() method).
+        enumerator.start();
+
+        Collection<NumberSequenceSource.NumberSequenceSplit> enumeratorState =
+                enumerator.snapshotState(0);
+
+        @SuppressWarnings("unchecked")
+        final Queue<NumberSequenceSource.NumberSequenceSplit> splits =
+                (Queue<NumberSequenceSource.NumberSequenceSplit>)
+                        Whitebox.getInternalState(enumerator, 
"remainingSplits");
+
+        assertThat(splits).hasSize(parallelism);
+
+        enumerator = dataGeneratorSource.restoreEnumerator(context, 
enumeratorState);
+
+        @SuppressWarnings("unchecked")
+        final Queue<NumberSequenceSource.NumberSequenceSplit> restoredSplits =
+                (Queue<NumberSequenceSource.NumberSequenceSplit>)
+                        Whitebox.getInternalState(enumerator, 
"remainingSplits");
+
+        assertThat(restoredSplits).hasSize(enumeratorState.size());
+    }
+
+    @Test
+    @DisplayName("Uses the underlying NumberSequenceSource correctly for 
checkpointing.")
+    public void testReaderCheckpoints() throws Exception {
+        final long from = 177;
+        final long mid = 333;
+        final long to = 563;
+        final long elementsPerCycle = (to - from) / 3;
+
+        final TestingReaderOutput<Long> out = new TestingReaderOutput<>();
+
+        SourceReader<Long, NumberSequenceSource.NumberSequenceSplit> reader = 
createReader();
+        reader.addSplits(
+                Arrays.asList(
+                        new 
NumberSequenceSource.NumberSequenceSplit("split-1", from, mid),
+                        new 
NumberSequenceSource.NumberSequenceSplit("split-2", mid + 1, to)));
+
+        long remainingInCycle = elementsPerCycle;
+        while (reader.pollNext(out) != InputStatus.END_OF_INPUT) {
+            if (--remainingInCycle <= 0) {
+                remainingInCycle = elementsPerCycle;
+                // checkpoint
+                List<NumberSequenceSource.NumberSequenceSplit> splits = 
reader.snapshotState(1L);
+
+                // re-create and restore
+                reader = createReader();
+                if (splits.isEmpty()) {
+                    reader.notifyNoMoreSplits();
+                } else {
+                    reader.addSplits(splits);
+                }
+            }
+        }
+
+        final List<Long> result = out.getEmittedRecords();
+        validateSequence(result, from, to);
+    }
+
+    private static void validateSequence(
+            final List<Long> sequence, final long from, final long to) {
+        if (sequence.size() != to - from + 1) {
+            failSequence(sequence, from, to);
+        }
+
+        long nextExpected = from;
+        for (Long next : sequence) {
+            if (next != nextExpected++) {
+                failSequence(sequence, from, to);
+            }
+        }
+    }
+
+    private static void failSequence(final List<Long> sequence, final long 
from, final long to) {
+        Assertions.fail(
+                String.format(
+                        "Expected: A sequence [%d, %d], but found: sequence 
(size %d) : %s",
+                        from, to, sequence.size(), sequence));
+    }
+
+    private static SourceReader<Long, 
NumberSequenceSource.NumberSequenceSplit> createReader()
+            throws Exception {
+        // the arguments passed in the source constructor matter only to the 
enumerator
+        GeneratorFunction<Long, Long> generatorFunctionStateless = index -> 
index;
+        DataGeneratorSource<Long> dataGeneratorSource =
+                new DataGeneratorSource<>(generatorFunctionStateless, 
Long.MAX_VALUE, Types.LONG);
+
+        return dataGeneratorSource.createReader(new DummyReaderContext());
+    }
+
+    // ------------------------------------------------------------------------
+    //  test utils / mocks
+    //
+    //  the "flink-connector-test-utils module has proper mocks and utils,
+    //  but cannot be used here, because it would create a cyclic dependency.
+    // ------------------------------------------------------------------------
+
+    private static final class DummyReaderContext implements 
SourceReaderContext {
+
+        @Override
+        public SourceReaderMetricGroup metricGroup() {
+            return UnregisteredMetricsGroup.createSourceReaderMetricGroup();
+        }
+
+        @Override
+        public Configuration getConfiguration() {
+            return new Configuration();
+        }
+
+        @Override
+        public String getLocalHostName() {
+            return "localhost";
+        }
+
+        @Override
+        public int getIndexOfSubtask() {
+            return 0;
+        }
+
+        @Override
+        public void sendSplitRequest() {}
+
+        @Override
+        public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {}
+
+        @Override
+        public UserCodeClassLoader getUserCodeClassLoader() {
+            return 
SimpleUserCodeClassLoader.create(getClass().getClassLoader());
+        }
+
+        @Override
+        public int currentParallelism() {
+            return 1;
+        }
+    }
+
+    private static final class TestingReaderOutput<E> implements 
ReaderOutput<E> {

Review Comment:
   The fact this we cant use the copy in the connector test utils poses an 
interesting questions: should this even be in flink-core? Why shouldn't it have 
it's own connector module?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to